Skip to content

Commit 28c2e53

Browse files
Merge pull request #157 from blockchain-etl/upgrade-composer-version
Upgrade composer version
2 parents 11d33e2 + c222580 commit 28c2e53

28 files changed

+89
-86
lines changed

airflow/README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ Airflow DAGs for exporting and loading the Polygon blockchain data to Google Big
3939
gcloud composer environments create \
4040
${ENVIRONMENT_NAME} \
4141
--location=us-central1 \
42-
--image-version=composer-2.0.28-airflow-2.2.5 \
42+
--image-version=composer-2.1.14-airflow-2.5.1 \
4343
--environment-size=small \
4444
--scheduler-cpu=2 \
4545
--scheduler-memory=4 \
@@ -96,13 +96,13 @@ Note that the variable names must be prefixed with `{chain}_`, e.g. `polygon_out
9696
| `output_bucket` | GCS bucket where exported files with blockchain data will be stored |
9797
| `export_start_date` | export start date, default: `2019-04-22` |
9898
| `export_end_date` | export end date, used for integration testing, default: None |
99-
| `export_schedule_interval` | export cron schedule, default: `0 1 * * *` |
99+
| `export_schedule` | export cron schedule, default: `0 1 * * *` |
100100
| `provider_uris` | comma-separated list of provider URIs for [polygon-etl](https://polygon-etl.readthedocs.io/en/latest/commands) command |
101101
| `notification_emails` | comma-separated list of emails where notifications on DAG failures, retries and successes will be delivered. This variable must not be prefixed with `{chain}_` |
102102
| `export_max_active_runs` | max active DAG runs for export, default: `3` |
103103
| `export_max_workers` | max workers for [polygon-etl](https://polygon-etl.readthedocs.io/en/latest/commands) command, default: `5` |
104104
| `destination_dataset_project_id` | GCS project id where destination BigQuery dataset is |
105-
| `load_schedule_interval` | load cron schedule, default: `0 2 * * *` |
105+
| `load_schedule` | load cron schedule, default: `0 2 * * *` |
106106
| `load_end_date` | load end date, used for integration testing, default: None |
107107

108108
### Creating a Cloud Source Repository for Configuration Files

airflow/dags/polygon_export_dag.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
dag_id='polygon_export_dag',
99
**read_export_dag_vars(
1010
var_prefix='polygon_',
11-
export_schedule_interval='0 2 * * *',
11+
export_schedule='0 2 * * *',
1212
export_start_date='2020-05-30',
1313
export_max_active_runs=3,
1414
export_max_active_tasks=12,

airflow/dags/polygon_load_dag.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,6 @@
1414
chain='polygon',
1515
**read_load_dag_vars(
1616
var_prefix='polygon_',
17-
load_schedule_interval='0 7 * * *'
17+
load_schedule='0 7 * * *'
1818
)
1919
)

airflow/dags/polygon_parse_dag.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
parse_dag_vars = read_parse_dag_vars(
1919
var_prefix=var_prefix,
20-
parse_schedule_interval='30 8 * * *'
20+
parse_schedule='30 8 * * *'
2121
)
2222

2323
for folder in glob(table_definitions_folder):

airflow/dags/polygon_partition_dag.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,6 @@
1616
public_dataset_name = 'crypto_polygon',
1717
**read_partition_dag_vars(
1818
var_prefix="polygon_",
19-
partition_schedule_interval="0 8 * * *",
19+
partition_schedule="0 8 * * *",
2020
),
2121
)

airflow/dags/polygonetl_airflow/build_export_dag.py

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from tempfile import TemporaryDirectory
88

99
from airflow import DAG, configuration
10-
from airflow.operators.dummy import DummyOperator
10+
from airflow.operators.empty import EmptyOperator
1111
from airflow.operators.python import PythonOperator
1212

1313
from polygonetl.cli import (
@@ -34,21 +34,21 @@
3434

3535

3636
def build_export_dag(
37-
dag_id,
38-
provider_uris,
39-
provider_uris_archival,
40-
output_bucket,
41-
export_start_date,
42-
export_end_date=None,
43-
notification_emails=None,
44-
export_schedule_interval='0 0 * * *',
45-
export_max_workers=10,
46-
export_traces_max_workers=10,
47-
export_batch_size=200,
48-
export_max_active_runs=None,
49-
export_max_active_tasks=None,
50-
export_retries=5,
51-
**kwargs
37+
dag_id,
38+
provider_uris,
39+
provider_uris_archival,
40+
output_bucket,
41+
export_start_date,
42+
export_end_date=None,
43+
notification_emails=None,
44+
export_schedule='0 0 * * *',
45+
export_max_workers=10,
46+
export_traces_max_workers=10,
47+
export_batch_size=200,
48+
export_max_active_runs=None,
49+
export_max_active_tasks=None,
50+
export_retries=5,
51+
**kwargs
5252
):
5353
default_dag_args = {
5454
"depends_on_past": False,
@@ -82,7 +82,7 @@ def build_export_dag(
8282

8383
dag = DAG(
8484
dag_id,
85-
schedule_interval=export_schedule_interval,
85+
schedule=export_schedule,
8686
default_args=default_dag_args,
8787
max_active_runs=export_max_active_runs,
8888
max_active_tasks=export_max_active_tasks,
@@ -345,7 +345,7 @@ def add_export_task(
345345
return None
346346

347347
# Operators
348-
export_complete = DummyOperator(task_id="export_complete", dag=dag)
348+
export_complete = EmptyOperator(task_id="export_complete", dag=dag)
349349

350350
export_blocks_and_transactions_operator = add_export_task(
351351
export_blocks_and_transactions_toggle,

airflow/dags/polygonetl_airflow/build_load_dag.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def build_load_dag(
3535
load_start_date=datetime(2018, 7, 1),
3636
load_end_date=None,
3737
load_catchup=False,
38-
load_schedule_interval='0 0 * * *',
38+
load_schedule='0 0 * * *',
3939
load_all_partitions=True
4040
):
4141
# The following datasets must be created in BigQuery:
@@ -93,7 +93,7 @@ def read_file(filepath):
9393
dag = models.DAG(
9494
dag_id,
9595
catchup=load_catchup,
96-
schedule_interval=load_schedule_interval,
96+
schedule=load_schedule,
9797
default_args=default_dag_args)
9898

9999
dags_folder = os.environ.get('DAGS_FOLDER', '/home/airflow/gcs/dags')

airflow/dags/polygonetl_airflow/build_parse_dag.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def build_parse_dag(
3333
internal_project_id,
3434
notification_emails=None,
3535
parse_start_date=datetime(2020, 5, 30),
36-
parse_schedule_interval='0 0 * * *',
36+
parse_schedule='0 0 * * *',
3737
parse_all_partitions=None,
3838
):
3939

@@ -57,7 +57,7 @@ def build_parse_dag(
5757
dag = models.DAG(
5858
dag_id,
5959
catchup=False,
60-
schedule_interval=parse_schedule_interval,
60+
schedule=parse_schedule,
6161
default_args=default_dag_args)
6262

6363
def create_parse_task():

airflow/dags/polygonetl_airflow/build_partition_dag.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ def build_partition_dag(
2323
public_dataset_name,
2424
load_dag_id,
2525
partition_start_date=datetime(2015, 7, 30),
26-
partition_schedule_interval='0 0 * * *',
26+
partition_schedule='0 0 * * *',
2727
notification_emails=None,
2828
):
2929

@@ -44,7 +44,7 @@ def build_partition_dag(
4444
dag = models.DAG(
4545
dag_id,
4646
catchup=False,
47-
schedule_interval=partition_schedule_interval,
47+
schedule=partition_schedule,
4848
default_args=default_dag_args)
4949

5050
def add_partition_tasks(task, sql_template, dependencies=None):

airflow/dags/polygonetl_airflow/build_verify_streaming_dag.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ def build_verify_streaming_dag(
1919
chain='polygon',
2020
notification_emails=None,
2121
start_date=datetime(2018, 7, 1),
22-
schedule_interval='*/10 * * * *',
22+
schedule='*/10 * * * *',
2323
max_lag_in_minutes=15):
2424
dataset_name = 'crypto_{}'.format(chain)
2525

@@ -46,7 +46,7 @@ def build_verify_streaming_dag(
4646
dag = DAG(
4747
dag_id,
4848
catchup=False,
49-
schedule_interval=schedule_interval,
49+
schedule=schedule,
5050
default_args=default_dag_args)
5151

5252
dags_folder = os.environ.get('DAGS_FOLDER', '/home/airflow/gcs/dags')

0 commit comments

Comments
 (0)