Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 103 additions & 46 deletions airflow/dags/polygonetl_airflow/build_export_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@

import os
import logging
from datetime import timedelta
from datetime import datetime, time, timedelta, timezone
from pathlib import Path
from tempfile import TemporaryDirectory

from airflow import DAG, configuration
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator

from polygonetl.cli import (
get_block_range_for_date,
get_block_range_for_timestamps,
extract_csv_column,
export_blocks_and_transactions,
export_receipts_and_logs,
Expand Down Expand Up @@ -106,13 +108,42 @@ def copy_from_export_path(export_path, file_path):

download_from_gcs(bucket=output_bucket, object=export_path + filename, filename=file_path)

def get_block_range(tempdir, date, provider_uri):
logging.info('Calling get_block_range_for_date({}, {}, ...)'.format(provider_uri, date))
get_block_range_for_date.callback(
provider_uri=provider_uri, date=date, output=os.path.join(tempdir, "blocks_meta.txt")
)
def get_block_range(tempdir, date, provider_uri, hour=None):
if hour is None:
block_range_filename = "blocks_meta.txt"

logging.info(
f"Calling get_block_range_for_date({provider_uri}, {date}, ...)"
)
get_block_range_for_date.callback(
provider_uri=provider_uri,
date=date,
output=os.path.join(tempdir, block_range_filename),
)
else:
block_range_filename = f"blocks_meta_{hour:02}.txt"

start_datetime = datetime.combine(
date,
time(hour=hour, minute=0, second=0, tzinfo=timezone.utc),
)
end_datetime = datetime.combine(
date,
time(hour=hour, minute=59, second=59, tzinfo=timezone.utc),
)

logging.info(
"Calling get_block_range_for_timestamp"
f"({provider_uri}, {start_datetime} to {end_datetime}, ...)"
)
get_block_range_for_timestamps.callback(
provider_uri=provider_uri,
start_timestamp=start_datetime.timestamp(),
end_timestamp=end_datetime.timestamp(),
output=os.path.join(tempdir, block_range_filename),
)

with open(os.path.join(tempdir, "blocks_meta.txt")) as block_range_file:
with open(os.path.join(tempdir, block_range_filename)) as block_range_file:
block_range = block_range_file.read()
start_block, end_block = block_range.split(",")

Expand Down Expand Up @@ -176,42 +207,46 @@ def export_receipts_and_logs_command(logical_date, provider_uri, **kwargs):
)
copy_to_export_path(os.path.join(tempdir, "logs.json"), export_path("logs", logical_date))

def extract_contracts_command(logical_date, **kwargs):
def extract_contracts_command(logical_date, hour, **kwargs):
with TemporaryDirectory(dir=TEMP_DIR) as tempdir:
copy_from_export_path(
export_path("traces", logical_date), os.path.join(tempdir, "traces.csv")
export_path("traces", logical_date),
os.path.join(tempdir, f"traces_{hour:02}.csv"),
)

logging.info('Calling extract_contracts(..., {}, {})'.format(
export_batch_size, export_max_workers
))
extract_contracts.callback(
traces=os.path.join(tempdir, "traces.csv"),
output=os.path.join(tempdir, "contracts.json"),
traces=os.path.join(tempdir, f"traces_{hour:02}.csv"),
output=os.path.join(tempdir, f"contracts_{hour:02}.json"),
batch_size=export_batch_size,
max_workers=export_max_workers,
)

copy_to_export_path(
os.path.join(tempdir, "contracts.json"), export_path("contracts", logical_date)
os.path.join(tempdir, f"contracts_{hour:02}.json"),
export_path("contracts", logical_date),
)

def extract_tokens_command(logical_date, provider_uri, **kwargs):
def extract_tokens_command(logical_date, provider_uri, hour, **kwargs):
with TemporaryDirectory(dir=TEMP_DIR) as tempdir:
copy_from_export_path(
export_path("contracts", logical_date), os.path.join(tempdir, "contracts.json")
export_path("contracts", logical_date),
os.path.join(tempdir, f"contracts_{hour:02}.json"),
)

logging.info('Calling extract_tokens(..., {}, {})'.format(export_max_workers, provider_uri))
extract_tokens.callback(
contracts=os.path.join(tempdir, "contracts.json"),
output=os.path.join(tempdir, "tokens.csv"),
contracts=os.path.join(tempdir, f"contracts_{hour:02}.json"),
output=os.path.join(tempdir, f"tokens_{hour:02}.csv"),
max_workers=export_max_workers,
provider_uri=provider_uri,
)

copy_to_export_path(
os.path.join(tempdir, "tokens.csv"), export_path("tokens", logical_date)
os.path.join(tempdir, f"tokens_{hour:02}.csv"),
export_path("tokens", logical_date),
)

def extract_token_transfers_command(logical_date, **kwargs):
Expand All @@ -235,9 +270,11 @@ def extract_token_transfers_command(logical_date, **kwargs):
export_path("token_transfers", logical_date),
)

def export_traces_command(logical_date, provider_uri, **kwargs):
def export_traces_command(logical_date, provider_uri, hour, **kwargs):
with TemporaryDirectory(dir=TEMP_DIR) as tempdir:
start_block, end_block = get_block_range(tempdir, logical_date, provider_uri)
start_block, end_block = get_block_range(
tempdir, logical_date, provider_uri, hour
)
if start_block == 0:
start_block = 1

Expand All @@ -252,34 +289,44 @@ def export_traces_command(logical_date, provider_uri, **kwargs):
start_block=start_block,
end_block=end_block,
batch_size=export_traces_batch_size,
output=os.path.join(tempdir, "geth_traces.json"),
output=os.path.join(tempdir, f"geth_traces_{hour:02}.json"),
max_workers=export_traces_max_workers,
provider_uri=provider_uri
provider_uri=provider_uri,
)
copy_to_export_path(
os.path.join(tempdir, "geth_traces.json"), export_path("traces", logical_date)
os.path.join(tempdir, f"geth_traces_{hour:02}.json"),
export_path("traces", logical_date),
)
else:
copy_from_export_path(
export_path("traces", logical_date), os.path.join(tempdir, "geth_traces.json"),
export_path("traces", logical_date),
os.path.join(tempdir, f"geth_traces_{hour:02}.json"),
)

extract_geth_traces.callback(
input=os.path.join(tempdir, "geth_traces.json"),
output=os.path.join(tempdir, 'traces.csv'),
max_workers=1
input=os.path.join(tempdir, f"geth_traces_{hour:02}.json"),
output=os.path.join(tempdir, f"traces_{hour:02}.csv"),
max_workers=1,
)

copy_to_export_path(
os.path.join(tempdir, "traces.csv"), export_path("traces", logical_date)
os.path.join(tempdir, f"traces_{hour:02}.csv"),
export_path("traces", logical_date),
)

def add_export_task(toggle, task_id, python_callable, dependencies=None):
def add_export_task(
toggle,
task_id,
python_callable,
op_kwargs=None,
dependencies=None,
):
if toggle:
operator = PythonOperator(
task_id=task_id,
python_callable=python_callable,
execution_timeout=timedelta(hours=24),
op_kwargs=op_kwargs,
dag=dag,
)
if dependencies is not None and len(dependencies) > 0:
Expand All @@ -291,6 +338,7 @@ def add_export_task(toggle, task_id, python_callable, dependencies=None):
return None

# Operators
export_complete = DummyOperator(task_id="export_complete", dag=dag)

export_blocks_and_transactions_operator = add_export_task(
export_blocks_and_transactions_toggle,
Expand All @@ -311,26 +359,35 @@ def add_export_task(toggle, task_id, python_callable, dependencies=None):
extract_token_transfers_command,
dependencies=[export_receipts_and_logs_operator],
)
extract_token_transfers_operator >> export_complete

for hour in range(24):
export_traces_operator = add_export_task(
export_traces_toggle,
f"export_geth_traces_{hour:02}",
add_provider_uri_fallback_loop(
export_traces_command,
provider_uris_archival,
),
op_kwargs={"hour": hour},
)

export_traces_operator = add_export_task(
export_traces_toggle,
"export_geth_traces",
add_provider_uri_fallback_loop(export_traces_command, provider_uris_archival)
)

extract_contracts_operator = add_export_task(
extract_contracts_toggle,
"extract_contracts",
extract_contracts_command,
dependencies=[export_traces_operator],
)
extract_contracts_operator = add_export_task(
extract_contracts_toggle,
f"extract_contracts_{hour:02}",
extract_contracts_command,
op_kwargs={"hour": hour},
dependencies=[export_traces_operator],
)

extract_tokens_operator = add_export_task(
extract_tokens_toggle,
"extract_tokens",
add_provider_uri_fallback_loop(extract_tokens_command, provider_uris),
dependencies=[extract_contracts_operator],
)
extract_tokens_operator = add_export_task(
extract_tokens_toggle,
f"extract_tokens_{hour:02}",
add_provider_uri_fallback_loop(extract_tokens_command, provider_uris),
op_kwargs={"hour": hour},
dependencies=[extract_contracts_operator],
)
extract_tokens_operator >> export_complete

return dag

Expand Down
27 changes: 16 additions & 11 deletions airflow/dags/polygonetl_airflow/build_load_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
from airflow import models
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor
from airflow.operators.python import PythonOperator
from airflow.sensors.external_task import ExternalTaskSensor
from google.cloud import bigquery
from google.cloud.bigquery import TimePartitioning

Expand Down Expand Up @@ -95,17 +95,20 @@ def read_file(filepath):

dags_folder = os.environ.get('DAGS_FOLDER', '/home/airflow/gcs/dags')

def add_load_tasks(task, file_format, allow_quoted_newlines=False):
wait_sensor = GCSObjectExistenceSensor(
task_id='wait_latest_{task}'.format(task=task),
timeout=12 * 60 * 60,
poke_interval=60,
bucket=output_bucket,
object='export/{task}/block_date={datestamp}/{task}.{file_format}'.format(
task=task, datestamp='{{ds}}', file_format=file_format),
dag=dag
if not load_all_partitions:
wait_sensor = ExternalTaskSensor(
task_id="wait_export_dag",
external_dag_id=f"{chain}_export_dag",
external_task_id="export_complete",
execution_delta=timedelta(hours=1),
priority_weight=0,
mode="reschedule",
poke_interval=5 * 60,
timeout=8 * 60 * 60,
dag=dag,
)

def add_load_tasks(task, file_format, allow_quoted_newlines=False):
def load_task(ds, **kwargs):
client = bigquery.Client()
job_config = bigquery.LoadJobConfig()
Expand Down Expand Up @@ -155,7 +158,9 @@ def load_task(ds, **kwargs):
dag=dag
)

wait_sensor >> load_operator
if not load_all_partitions:
wait_sensor >> load_operator

return load_operator

def add_enrich_tasks(task, time_partitioning_field='block_timestamp', dependencies=None, always_load_all_partitions=False):
Expand Down