Skip to content Skip to sidebar Skip to footer

Airflow - Awsathenaoperator For Dynamic List Of Queries

I have a DAG which has step read_date_information_file which reads a file and returns the list of queries (which I can access from output). I then want to loop through this and exe

Solution 1:

The issue you are facing is not directly related to Athena. It's more of a wrong usage of Airflow.

You are experiencing this issue because Airflow was unable to import your DAG into DagBag due to timeout (source code)

This happens because you are making expensive call to the meta database by trying to create tasks from Xcoms. Airflow parse your DAG every 30 seconds (default value of min_file_process_interval ) This means that every 30 seconds you are opening connection to the database. This is a bad practice and you should not do that! It can easily overwhelm your database.

If you still wish to continue with this dangerous path you need to change the default of DAGBAG_IMPORT_TIMEOUT in airflow.cfg (see source code)

My recommendation: Do not try to dynamically create tasks based on Xcoms.

As for your specific use case - you did not mention what you are trying to solve. I would assume based on the function name (read_date_information_file) that you are trying to run Athena query for a specific date and this date changes whenever someone parses the file. Maybe you should just try to integrate the date directly into your query using Jinja. Meaning that your event in query=event will contain the reference to the xcom directly something like:

SELECT ...
FROM ...
WHERE something={{ ti.xcom_pull(task_ids='read_date_information_file') }}

That way all you need is a single static AWSAthenaOperator but the query it runs is dynamically changed based on the Xcom value pushed by read_date_information_file task.

Post a Comment for "Airflow - Awsathenaoperator For Dynamic List Of Queries"