From 513e6dc9e4c53585f2ce004c96a2d5e7f80f6335 Mon Sep 17 00:00:00 2001 From: charlielewisme Date: Wed, 21 Dec 2022 11:20:46 +0000 Subject: [PATCH 1/3] split export traces,contracts,tokens into 24 tasks --- .../polygonetl_airflow/build_export_dag.py | 149 ++++++++++++------ 1 file changed, 103 insertions(+), 46 deletions(-) diff --git a/airflow/dags/polygonetl_airflow/build_export_dag.py b/airflow/dags/polygonetl_airflow/build_export_dag.py index 08573650..7308193f 100644 --- a/airflow/dags/polygonetl_airflow/build_export_dag.py +++ b/airflow/dags/polygonetl_airflow/build_export_dag.py @@ -2,15 +2,17 @@ import os import logging -from datetime import timedelta +from datetime import datetime, time, timedelta, timezone from pathlib import Path from tempfile import TemporaryDirectory from airflow import DAG, configuration +from airflow.operators.dummy import DummyOperator from airflow.operators.python import PythonOperator from polygonetl.cli import ( get_block_range_for_date, + get_block_range_for_timestamps, extract_csv_column, export_blocks_and_transactions, export_receipts_and_logs, @@ -106,13 +108,42 @@ def copy_from_export_path(export_path, file_path): download_from_gcs(bucket=output_bucket, object=export_path + filename, filename=file_path) - def get_block_range(tempdir, date, provider_uri): - logging.info('Calling get_block_range_for_date({}, {}, ...)'.format(provider_uri, date)) - get_block_range_for_date.callback( - provider_uri=provider_uri, date=date, output=os.path.join(tempdir, "blocks_meta.txt") - ) + def get_block_range(tempdir, date, provider_uri, hour=None): + if hour is None: + block_range_filename = "blocks_meta.txt" + + logging.info( + f"Calling get_block_range_for_date({provider_uri}, {date}, ...)" + ) + get_block_range_for_date.callback( + provider_uri=provider_uri, + date=date, + output=os.path.join(tempdir, block_range_filename), + ) + else: + block_range_filename = f"blocks_meta{hour:02}.txt" + + start_datetime = datetime.combine( + date, + time(hour=hour, minute=0, second=0, tzinfo=timezone.utc), + ) + end_datetime = datetime.combine( + date, + time(hour=hour, minute=59, second=59, tzinfo=timezone.utc), + ) + + logging.info( + "Calling get_block_range_for_timestamp" + f"({provider_uri}, {start_datetime} to {end_datetime}, ...)" + ) + get_block_range_for_timestamps.callback( + provider_uri=provider_uri, + start_timestamp=start_datetime.timestamp(), + end_timestamp=end_datetime.timestamp(), + output=os.path.join(tempdir, block_range_filename), + ) - with open(os.path.join(tempdir, "blocks_meta.txt")) as block_range_file: + with open(os.path.join(tempdir, block_range_filename)) as block_range_file: block_range = block_range_file.read() start_block, end_block = block_range.split(",") @@ -176,42 +207,46 @@ def export_receipts_and_logs_command(logical_date, provider_uri, **kwargs): ) copy_to_export_path(os.path.join(tempdir, "logs.json"), export_path("logs", logical_date)) - def extract_contracts_command(logical_date, **kwargs): + def extract_contracts_command(logical_date, hour, **kwargs): with TemporaryDirectory(dir=TEMP_DIR) as tempdir: copy_from_export_path( - export_path("traces", logical_date), os.path.join(tempdir, "traces.csv") + export_path("traces", logical_date), + os.path.join(tempdir, f"traces_{hour:02}.csv"), ) logging.info('Calling extract_contracts(..., {}, {})'.format( export_batch_size, export_max_workers )) extract_contracts.callback( - traces=os.path.join(tempdir, "traces.csv"), - output=os.path.join(tempdir, "contracts.json"), + traces=os.path.join(tempdir, f"traces_{hour:02}.csv"), + output=os.path.join(tempdir, f"contracts_{hour:02}.json"), batch_size=export_batch_size, max_workers=export_max_workers, ) copy_to_export_path( - os.path.join(tempdir, "contracts.json"), export_path("contracts", logical_date) + os.path.join(tempdir, f"contracts_{hour:02}.json"), + export_path("contracts", logical_date), ) - def extract_tokens_command(logical_date, provider_uri, **kwargs): + def extract_tokens_command(logical_date, provider_uri, hour, **kwargs): with TemporaryDirectory(dir=TEMP_DIR) as tempdir: copy_from_export_path( - export_path("contracts", logical_date), os.path.join(tempdir, "contracts.json") + export_path("contracts", logical_date), + os.path.join(tempdir, f"contracts_{hour:02}.json"), ) logging.info('Calling extract_tokens(..., {}, {})'.format(export_max_workers, provider_uri)) extract_tokens.callback( - contracts=os.path.join(tempdir, "contracts.json"), - output=os.path.join(tempdir, "tokens.csv"), + contracts=os.path.join(tempdir, f"contracts_{hour:02}.json"), + output=os.path.join(tempdir, f"tokens_{hour:02}.csv"), max_workers=export_max_workers, provider_uri=provider_uri, ) copy_to_export_path( - os.path.join(tempdir, "tokens.csv"), export_path("tokens", logical_date) + os.path.join(tempdir, f"tokens_{hour:02}.csv"), + export_path("tokens", logical_date), ) def extract_token_transfers_command(logical_date, **kwargs): @@ -235,9 +270,11 @@ def extract_token_transfers_command(logical_date, **kwargs): export_path("token_transfers", logical_date), ) - def export_traces_command(logical_date, provider_uri, **kwargs): + def export_traces_command(logical_date, provider_uri, hour, **kwargs): with TemporaryDirectory(dir=TEMP_DIR) as tempdir: - start_block, end_block = get_block_range(tempdir, logical_date, provider_uri) + start_block, end_block = get_block_range( + tempdir, logical_date, provider_uri, hour + ) if start_block == 0: start_block = 1 @@ -252,34 +289,44 @@ def export_traces_command(logical_date, provider_uri, **kwargs): start_block=start_block, end_block=end_block, batch_size=export_traces_batch_size, - output=os.path.join(tempdir, "geth_traces.json"), + output=os.path.join(tempdir, f"geth_traces_{hour:02}.json"), max_workers=export_traces_max_workers, - provider_uri=provider_uri + provider_uri=provider_uri, ) copy_to_export_path( - os.path.join(tempdir, "geth_traces.json"), export_path("traces", logical_date) + os.path.join(tempdir, f"geth_traces_{hour:02}.json"), + export_path("traces", logical_date), ) else: copy_from_export_path( - export_path("traces", logical_date), os.path.join(tempdir, "geth_traces.json"), + export_path("traces", logical_date), + os.path.join(tempdir, f"geth_traces_{hour:02}.json"), ) extract_geth_traces.callback( - input=os.path.join(tempdir, "geth_traces.json"), - output=os.path.join(tempdir, 'traces.csv'), - max_workers=1 + input=os.path.join(tempdir, f"geth_traces_{hour:02}.json"), + output=os.path.join(tempdir, f"traces_{hour:02}.csv"), + max_workers=1, ) copy_to_export_path( - os.path.join(tempdir, "traces.csv"), export_path("traces", logical_date) + os.path.join(tempdir, f"traces_{hour:02}.csv"), + export_path("traces", logical_date), ) - def add_export_task(toggle, task_id, python_callable, dependencies=None): + def add_export_task( + toggle, + task_id, + python_callable, + op_kwargs=None, + dependencies=None, + ): if toggle: operator = PythonOperator( task_id=task_id, python_callable=python_callable, execution_timeout=timedelta(hours=24), + op_kwargs=op_kwargs, dag=dag, ) if dependencies is not None and len(dependencies) > 0: @@ -291,6 +338,7 @@ def add_export_task(toggle, task_id, python_callable, dependencies=None): return None # Operators + export_complete = DummyOperator(task_id="export_complete", dag=dag) export_blocks_and_transactions_operator = add_export_task( export_blocks_and_transactions_toggle, @@ -311,26 +359,35 @@ def add_export_task(toggle, task_id, python_callable, dependencies=None): extract_token_transfers_command, dependencies=[export_receipts_and_logs_operator], ) + extract_token_transfers_operator >> export_complete + + for hour in range(24): + export_traces_operator = add_export_task( + export_traces_toggle, + f"export_geth_traces_{hour:02}", + add_provider_uri_fallback_loop( + export_traces_command, + provider_uris_archival, + ), + op_kwargs={"hour": hour}, + ) - export_traces_operator = add_export_task( - export_traces_toggle, - "export_geth_traces", - add_provider_uri_fallback_loop(export_traces_command, provider_uris_archival) - ) - - extract_contracts_operator = add_export_task( - extract_contracts_toggle, - "extract_contracts", - extract_contracts_command, - dependencies=[export_traces_operator], - ) + extract_contracts_operator = add_export_task( + extract_contracts_toggle, + f"extract_contracts_{hour:02}", + extract_contracts_command, + op_kwargs={"hour": hour}, + dependencies=[export_traces_operator], + ) - extract_tokens_operator = add_export_task( - extract_tokens_toggle, - "extract_tokens", - add_provider_uri_fallback_loop(extract_tokens_command, provider_uris), - dependencies=[extract_contracts_operator], - ) + extract_tokens_operator = add_export_task( + extract_tokens_toggle, + f"extract_tokens_{hour:02}", + add_provider_uri_fallback_loop(extract_tokens_command, provider_uris), + op_kwargs={"hour": hour}, + dependencies=[extract_contracts_operator], + ) + extract_tokens_operator >> export_complete return dag From 333fd7e0e1e59380eb9d01848c9d64dc6e0af63e Mon Sep 17 00:00:00 2001 From: charlielewisme Date: Wed, 21 Dec 2022 11:21:13 +0000 Subject: [PATCH 2/3] consolidate wait operators into one --- .../dags/polygonetl_airflow/build_load_dag.py | 27 +++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/airflow/dags/polygonetl_airflow/build_load_dag.py b/airflow/dags/polygonetl_airflow/build_load_dag.py index 4153003e..bf407945 100644 --- a/airflow/dags/polygonetl_airflow/build_load_dag.py +++ b/airflow/dags/polygonetl_airflow/build_load_dag.py @@ -10,8 +10,8 @@ from airflow import models from airflow.providers.google.cloud.hooks.gcs import GCSHook from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator -from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor from airflow.operators.python import PythonOperator +from airflow.sensors.external_task import ExternalTaskSensor from google.cloud import bigquery from google.cloud.bigquery import TimePartitioning @@ -95,17 +95,20 @@ def read_file(filepath): dags_folder = os.environ.get('DAGS_FOLDER', '/home/airflow/gcs/dags') - def add_load_tasks(task, file_format, allow_quoted_newlines=False): - wait_sensor = GCSObjectExistenceSensor( - task_id='wait_latest_{task}'.format(task=task), - timeout=12 * 60 * 60, - poke_interval=60, - bucket=output_bucket, - object='export/{task}/block_date={datestamp}/{task}.{file_format}'.format( - task=task, datestamp='{{ds}}', file_format=file_format), - dag=dag + if not load_all_partitions: + wait_sensor = ExternalTaskSensor( + task_id="wait_export_dag", + external_dag_id=f"{chain}_export_dag", + external_task_id="export_complete", + execution_delta=timedelta(hours=1), + priority_weight=0, + mode="reschedule", + poke_interval=5 * 60, + timeout=8 * 60 * 60, + dag=dag, ) + def add_load_tasks(task, file_format, allow_quoted_newlines=False): def load_task(ds, **kwargs): client = bigquery.Client() job_config = bigquery.LoadJobConfig() @@ -155,7 +158,9 @@ def load_task(ds, **kwargs): dag=dag ) - wait_sensor >> load_operator + if not load_all_partitions: + wait_sensor >> load_operator + return load_operator def add_enrich_tasks(task, time_partitioning_field='block_timestamp', dependencies=None, always_load_all_partitions=False): From a3d136e6cfb1580219e6c59ea16b36a8344d657e Mon Sep 17 00:00:00 2001 From: charlielewisme Date: Wed, 21 Dec 2022 15:21:57 +0000 Subject: [PATCH 3/3] fix block_range_filename --- airflow/dags/polygonetl_airflow/build_export_dag.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/dags/polygonetl_airflow/build_export_dag.py b/airflow/dags/polygonetl_airflow/build_export_dag.py index 7308193f..8e7e9ae4 100644 --- a/airflow/dags/polygonetl_airflow/build_export_dag.py +++ b/airflow/dags/polygonetl_airflow/build_export_dag.py @@ -121,7 +121,7 @@ def get_block_range(tempdir, date, provider_uri, hour=None): output=os.path.join(tempdir, block_range_filename), ) else: - block_range_filename = f"blocks_meta{hour:02}.txt" + block_range_filename = f"blocks_meta_{hour:02}.txt" start_datetime = datetime.combine( date,