How To Create Path Using Execution Date In Airflow?
I have the following Airflow dag: start_task = DummyOperator(task_id='start_task', dag=dag) gcs_export_uri_template = 'adstest/2018/08/31/*' update_bigquery = GoogleCl
Solution 1:
You can use Airflow Macros to achieve this as follows:
gcs_export_uri_template=[
"adstest/{{ macros.ds_format(ds, '%Y-%m-%d', '%Y/%m/%d') }}/*",
"adstest/{{ macros.ds_format(prev_ds, '%Y-%m-%d', '%Y/%m/%d') }}/*",
"adstest/{{ macros.ds_format(macros.ds_add(ds, -2), '%Y-%m-%d', '%Y/%m/%d') }}/*"
]
update_bigquery = GoogleCloudStorageToBigQueryOperator(
dag=dag,
task_id='load_ads_to_BigQuery',
bucket=GCS_BUCKET_ID,
destination_project_dataset_table=table_name_template,
source_format='CSV',
source_objects=gcs_export_uri_template,
schema_fields=dc(),
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_APPEND',
skip_leading_rows = 1,
google_cloud_storage_conn_id=CONNECTION_ID,
bigquery_conn_id=CONNECTION_ID
)
When you run the above code, you can check in the Web UI, the rendered parameter:
For EDITED Comment:
You will need to pass the value of the loop variable i
in params
parameter and use it in the string as params.i
as follows:
for i in range(5, 0, -1):
gcs_export_uri_template = ["adstest/{{ macros.ds_format(macros.ds_add(ds, -params.i), '%Y-%m-%d', '%Y/%m/%d') }}/*"]
update_bigquery = GoogleCloudStorageToBigQueryOperator(
dag=dag,
task_id='load_ads_to_BigQuery-{}'.format(i),
bucket=GCS_BUCKET_ID,
destination_project_dataset_table=table_name_template,
source_format='CSV',
source_objects=gcs_export_uri_template,
schema_fields=dc(),
params={'i': i},
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_APPEND',
skip_leading_rows=1,
google_cloud_storage_conn_id=CONNECTION_ID,
bigquery_conn_id=CONNECTION_ID
)
start_task >> update_bigquery
Post a Comment for "How To Create Path Using Execution Date In Airflow?"