diff --git a/airflow/dags/polygonetl_airflow/build_export_dag.py b/airflow/dags/polygonetl_airflow/build_export_dag.py index 97b8f7fb..4067e6d2 100644 --- a/airflow/dags/polygonetl_airflow/build_export_dag.py +++ b/airflow/dags/polygonetl_airflow/build_export_dag.py @@ -39,6 +39,7 @@ def build_export_dag( provider_uris_archival, output_bucket, export_start_date, + export_end_date=None, notification_emails=None, export_schedule_interval='0 0 * * *', export_max_workers=10, @@ -52,6 +53,7 @@ def build_export_dag( default_dag_args = { "depends_on_past": False, "start_date": export_start_date, + "end_date": export_end_date, "email_on_failure": True, "email_on_retry": False, "retries": export_retries, diff --git a/airflow/dags/polygonetl_airflow/build_load_dag.py b/airflow/dags/polygonetl_airflow/build_load_dag.py index 5644f266..d72bba46 100644 --- a/airflow/dags/polygonetl_airflow/build_load_dag.py +++ b/airflow/dags/polygonetl_airflow/build_load_dag.py @@ -33,6 +33,8 @@ def build_load_dag( chain='polygon', notification_emails=None, load_start_date=datetime(2018, 7, 1), + load_end_date=None, + load_catchup=False, load_schedule_interval='0 0 * * *', load_all_partitions=True ): @@ -76,6 +78,7 @@ def read_file(filepath): default_dag_args = { 'depends_on_past': False, 'start_date': load_start_date, + 'end_date': load_end_date, 'email_on_failure': True, 'email_on_retry': False, 'retries': 5, @@ -89,7 +92,7 @@ def read_file(filepath): # Define a DAG (directed acyclic graph) of tasks. dag = models.DAG( dag_id, - catchup=False, + catchup=load_catchup, schedule_interval=load_schedule_interval, default_args=default_dag_args) diff --git a/airflow/dags/polygonetl_airflow/variables.py b/airflow/dags/polygonetl_airflow/variables.py index 2f39dca7..f227bacd 100644 --- a/airflow/dags/polygonetl_airflow/variables.py +++ b/airflow/dags/polygonetl_airflow/variables.py @@ -54,8 +54,14 @@ def read_load_dag_vars(var_prefix, **kwargs): # 'success_notification_emails': read_var('success_notification_emails', None, False, **kwargs), 'load_schedule_interval': read_var('load_schedule_interval', var_prefix, True, **kwargs), 'load_all_partitions': parse_bool(read_var('load_all_partitions', var_prefix, False, **kwargs), default=None), + 'load_catchup': parse_bool(read_var('load_catchup', var_prefix, False, **kwargs), default=False), } + load_start_date = read_var('load_start_date', var_prefix, False, **kwargs) + if load_start_date is not None: + load_start_date = datetime.strptime(load_start_date, '%Y-%m-%d') + vars['load_start_date'] = load_start_date + load_end_date = read_var('load_end_date', var_prefix, False, **kwargs) if load_end_date is not None: load_end_date = datetime.strptime(load_end_date, '%Y-%m-%d')