Skip to content

Commit ac8377f

Browse files
Merge pull request #154 from blockchain-etl/improve-support-for-selected-re-exports
Improve support for selected re-exports
2 parents f35945f + 700f4c1 commit ac8377f

File tree

3 files changed

+12
-1
lines changed

3 files changed

+12
-1
lines changed

airflow/dags/polygonetl_airflow/build_export_dag.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ def build_export_dag(
3939
provider_uris_archival,
4040
output_bucket,
4141
export_start_date,
42+
export_end_date=None,
4243
notification_emails=None,
4344
export_schedule_interval='0 0 * * *',
4445
export_max_workers=10,
@@ -52,6 +53,7 @@ def build_export_dag(
5253
default_dag_args = {
5354
"depends_on_past": False,
5455
"start_date": export_start_date,
56+
"end_date": export_end_date,
5557
"email_on_failure": True,
5658
"email_on_retry": False,
5759
"retries": export_retries,

airflow/dags/polygonetl_airflow/build_load_dag.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ def build_load_dag(
3333
chain='polygon',
3434
notification_emails=None,
3535
load_start_date=datetime(2018, 7, 1),
36+
load_end_date=None,
37+
load_catchup=False,
3638
load_schedule_interval='0 0 * * *',
3739
load_all_partitions=True
3840
):
@@ -76,6 +78,7 @@ def read_file(filepath):
7678
default_dag_args = {
7779
'depends_on_past': False,
7880
'start_date': load_start_date,
81+
'end_date': load_end_date,
7982
'email_on_failure': True,
8083
'email_on_retry': False,
8184
'retries': 5,
@@ -89,7 +92,7 @@ def read_file(filepath):
8992
# Define a DAG (directed acyclic graph) of tasks.
9093
dag = models.DAG(
9194
dag_id,
92-
catchup=False,
95+
catchup=load_catchup,
9396
schedule_interval=load_schedule_interval,
9497
default_args=default_dag_args)
9598

airflow/dags/polygonetl_airflow/variables.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,14 @@ def read_load_dag_vars(var_prefix, **kwargs):
5454
# 'success_notification_emails': read_var('success_notification_emails', None, False, **kwargs),
5555
'load_schedule_interval': read_var('load_schedule_interval', var_prefix, True, **kwargs),
5656
'load_all_partitions': parse_bool(read_var('load_all_partitions', var_prefix, False, **kwargs), default=None),
57+
'load_catchup': parse_bool(read_var('load_catchup', var_prefix, False, **kwargs), default=False),
5758
}
5859

60+
load_start_date = read_var('load_start_date', var_prefix, False, **kwargs)
61+
if load_start_date is not None:
62+
load_start_date = datetime.strptime(load_start_date, '%Y-%m-%d')
63+
vars['load_start_date'] = load_start_date
64+
5965
load_end_date = read_var('load_end_date', var_prefix, False, **kwargs)
6066
if load_end_date is not None:
6167
load_end_date = datetime.strptime(load_end_date, '%Y-%m-%d')

0 commit comments

Comments
 (0)