diff --git a/airflow/dags/polygon_export_dag.py b/airflow/dags/polygon_export_dag.py index 4dfa1318..179c550c 100644 --- a/airflow/dags/polygon_export_dag.py +++ b/airflow/dags/polygon_export_dag.py @@ -8,9 +8,10 @@ dag_id='polygon_export_dag', **read_export_dag_vars( var_prefix='polygon_', - export_schedule_interval='0 5 * * *', + export_schedule_interval='0 2 * * *', export_start_date='2020-05-30', export_max_active_runs=3, + export_max_active_tasks=12, export_max_workers=5, export_traces_max_workers=10, ) diff --git a/airflow/dags/polygon_load_dag.py b/airflow/dags/polygon_load_dag.py index d45fea32..37b29d74 100644 --- a/airflow/dags/polygon_load_dag.py +++ b/airflow/dags/polygon_load_dag.py @@ -14,6 +14,6 @@ chain='polygon', **read_load_dag_vars( var_prefix='polygon_', - load_schedule_interval='0 6 * * *' + load_schedule_interval='0 7 * * *' ) ) diff --git a/airflow/dags/polygon_parse_dag.py b/airflow/dags/polygon_parse_dag.py index af8f358b..cc68cdee 100644 --- a/airflow/dags/polygon_parse_dag.py +++ b/airflow/dags/polygon_parse_dag.py @@ -28,6 +28,6 @@ **read_parse_dag_vars( var_prefix=var_prefix, dataset=dataset, - parse_schedule_interval='30 7 * * *' + parse_schedule_interval='30 8 * * *' ) ) diff --git a/airflow/dags/polygon_partition_dag.py b/airflow/dags/polygon_partition_dag.py index 3e69f4ce..48b64d01 100644 --- a/airflow/dags/polygon_partition_dag.py +++ b/airflow/dags/polygon_partition_dag.py @@ -16,6 +16,6 @@ public_dataset_name = 'crypto_polygon', **read_partition_dag_vars( var_prefix="polygon_", - partition_schedule_interval="0 7 * * *", + partition_schedule_interval="0 8 * * *", ), ) diff --git a/airflow/dags/polygonetl_airflow/build_export_dag.py b/airflow/dags/polygonetl_airflow/build_export_dag.py index 8e7e9ae4..97b8f7fb 100644 --- a/airflow/dags/polygonetl_airflow/build_export_dag.py +++ b/airflow/dags/polygonetl_airflow/build_export_dag.py @@ -45,6 +45,7 @@ def build_export_dag( export_traces_max_workers=10, export_batch_size=200, export_max_active_runs=None, + export_max_active_tasks=None, export_retries=5, **kwargs ): @@ -74,11 +75,15 @@ def build_export_dag( if export_max_active_runs is None: export_max_active_runs = configuration.conf.getint('core', 'max_active_runs_per_dag') + if export_max_active_tasks is None: + export_max_active_tasks = configuration.conf.getint('core', 'max_active_tasks_per_dag') + dag = DAG( dag_id, schedule_interval=export_schedule_interval, default_args=default_dag_args, - max_active_runs=export_max_active_runs + max_active_runs=export_max_active_runs, + max_active_tasks=export_max_active_tasks, ) from airflow.providers.google.cloud.hooks.gcs import GCSHook diff --git a/airflow/dags/polygonetl_airflow/build_load_dag.py b/airflow/dags/polygonetl_airflow/build_load_dag.py index bf407945..5644f266 100644 --- a/airflow/dags/polygonetl_airflow/build_load_dag.py +++ b/airflow/dags/polygonetl_airflow/build_load_dag.py @@ -100,7 +100,7 @@ def read_file(filepath): task_id="wait_export_dag", external_dag_id=f"{chain}_export_dag", external_task_id="export_complete", - execution_delta=timedelta(hours=1), + execution_delta=timedelta(hours=5), priority_weight=0, mode="reschedule", poke_interval=5 * 60, diff --git a/airflow/dags/polygonetl_airflow/variables.py b/airflow/dags/polygonetl_airflow/variables.py index 61395b6b..8fa0b319 100644 --- a/airflow/dags/polygonetl_airflow/variables.py +++ b/airflow/dags/polygonetl_airflow/variables.py @@ -20,6 +20,9 @@ def read_export_dag_vars(var_prefix, **kwargs): export_max_active_runs = read_var('export_max_active_runs', var_prefix, False, **kwargs) export_max_active_runs = int(export_max_active_runs) if export_max_active_runs is not None else None + export_max_active_tasks = read_var('export_max_active_tasks', var_prefix, False, **kwargs) + export_max_active_tasks = int(export_max_active_tasks) if export_max_active_tasks is not None else None + vars = { 'output_bucket': read_var('output_bucket', var_prefix, True, **kwargs), 'export_start_date': export_start_date, @@ -29,6 +32,7 @@ def read_export_dag_vars(var_prefix, **kwargs): 'provider_uris_archival': provider_uris_archival, 'notification_emails': read_var('notification_emails', None, False, **kwargs), 'export_max_active_runs': export_max_active_runs, + 'export_max_active_tasks': export_max_active_tasks, 'export_max_workers': int(read_var('export_max_workers', var_prefix, True, **kwargs)), 'export_traces_max_workers': int(read_var('export_traces_max_workers', var_prefix, True, **kwargs)), }