Skip to content

Commit 6ca2c6f

Browse files
Merge pull request #105 from blockchain-etl/feat/split-task-export-geth-traces
Split `export_geth_traces` into 24 tasks
2 parents c1725c5 + a3d136e commit 6ca2c6f

File tree

2 files changed

+119
-57
lines changed

2 files changed

+119
-57
lines changed

airflow/dags/polygonetl_airflow/build_export_dag.py

Lines changed: 103 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,17 @@
22

33
import os
44
import logging
5-
from datetime import timedelta
5+
from datetime import datetime, time, timedelta, timezone
66
from pathlib import Path
77
from tempfile import TemporaryDirectory
88

99
from airflow import DAG, configuration
10+
from airflow.operators.dummy import DummyOperator
1011
from airflow.operators.python import PythonOperator
1112

1213
from polygonetl.cli import (
1314
get_block_range_for_date,
15+
get_block_range_for_timestamps,
1416
extract_csv_column,
1517
export_blocks_and_transactions,
1618
export_receipts_and_logs,
@@ -106,13 +108,42 @@ def copy_from_export_path(export_path, file_path):
106108

107109
download_from_gcs(bucket=output_bucket, object=export_path + filename, filename=file_path)
108110

109-
def get_block_range(tempdir, date, provider_uri):
110-
logging.info('Calling get_block_range_for_date({}, {}, ...)'.format(provider_uri, date))
111-
get_block_range_for_date.callback(
112-
provider_uri=provider_uri, date=date, output=os.path.join(tempdir, "blocks_meta.txt")
113-
)
111+
def get_block_range(tempdir, date, provider_uri, hour=None):
112+
if hour is None:
113+
block_range_filename = "blocks_meta.txt"
114+
115+
logging.info(
116+
f"Calling get_block_range_for_date({provider_uri}, {date}, ...)"
117+
)
118+
get_block_range_for_date.callback(
119+
provider_uri=provider_uri,
120+
date=date,
121+
output=os.path.join(tempdir, block_range_filename),
122+
)
123+
else:
124+
block_range_filename = f"blocks_meta_{hour:02}.txt"
125+
126+
start_datetime = datetime.combine(
127+
date,
128+
time(hour=hour, minute=0, second=0, tzinfo=timezone.utc),
129+
)
130+
end_datetime = datetime.combine(
131+
date,
132+
time(hour=hour, minute=59, second=59, tzinfo=timezone.utc),
133+
)
134+
135+
logging.info(
136+
"Calling get_block_range_for_timestamp"
137+
f"({provider_uri}, {start_datetime} to {end_datetime}, ...)"
138+
)
139+
get_block_range_for_timestamps.callback(
140+
provider_uri=provider_uri,
141+
start_timestamp=start_datetime.timestamp(),
142+
end_timestamp=end_datetime.timestamp(),
143+
output=os.path.join(tempdir, block_range_filename),
144+
)
114145

115-
with open(os.path.join(tempdir, "blocks_meta.txt")) as block_range_file:
146+
with open(os.path.join(tempdir, block_range_filename)) as block_range_file:
116147
block_range = block_range_file.read()
117148
start_block, end_block = block_range.split(",")
118149

@@ -176,42 +207,46 @@ def export_receipts_and_logs_command(logical_date, provider_uri, **kwargs):
176207
)
177208
copy_to_export_path(os.path.join(tempdir, "logs.json"), export_path("logs", logical_date))
178209

179-
def extract_contracts_command(logical_date, **kwargs):
210+
def extract_contracts_command(logical_date, hour, **kwargs):
180211
with TemporaryDirectory(dir=TEMP_DIR) as tempdir:
181212
copy_from_export_path(
182-
export_path("traces", logical_date), os.path.join(tempdir, "traces.csv")
213+
export_path("traces", logical_date),
214+
os.path.join(tempdir, f"traces_{hour:02}.csv"),
183215
)
184216

185217
logging.info('Calling extract_contracts(..., {}, {})'.format(
186218
export_batch_size, export_max_workers
187219
))
188220
extract_contracts.callback(
189-
traces=os.path.join(tempdir, "traces.csv"),
190-
output=os.path.join(tempdir, "contracts.json"),
221+
traces=os.path.join(tempdir, f"traces_{hour:02}.csv"),
222+
output=os.path.join(tempdir, f"contracts_{hour:02}.json"),
191223
batch_size=export_batch_size,
192224
max_workers=export_max_workers,
193225
)
194226

195227
copy_to_export_path(
196-
os.path.join(tempdir, "contracts.json"), export_path("contracts", logical_date)
228+
os.path.join(tempdir, f"contracts_{hour:02}.json"),
229+
export_path("contracts", logical_date),
197230
)
198231

199-
def extract_tokens_command(logical_date, provider_uri, **kwargs):
232+
def extract_tokens_command(logical_date, provider_uri, hour, **kwargs):
200233
with TemporaryDirectory(dir=TEMP_DIR) as tempdir:
201234
copy_from_export_path(
202-
export_path("contracts", logical_date), os.path.join(tempdir, "contracts.json")
235+
export_path("contracts", logical_date),
236+
os.path.join(tempdir, f"contracts_{hour:02}.json"),
203237
)
204238

205239
logging.info('Calling extract_tokens(..., {}, {})'.format(export_max_workers, provider_uri))
206240
extract_tokens.callback(
207-
contracts=os.path.join(tempdir, "contracts.json"),
208-
output=os.path.join(tempdir, "tokens.csv"),
241+
contracts=os.path.join(tempdir, f"contracts_{hour:02}.json"),
242+
output=os.path.join(tempdir, f"tokens_{hour:02}.csv"),
209243
max_workers=export_max_workers,
210244
provider_uri=provider_uri,
211245
)
212246

213247
copy_to_export_path(
214-
os.path.join(tempdir, "tokens.csv"), export_path("tokens", logical_date)
248+
os.path.join(tempdir, f"tokens_{hour:02}.csv"),
249+
export_path("tokens", logical_date),
215250
)
216251

217252
def extract_token_transfers_command(logical_date, **kwargs):
@@ -235,9 +270,11 @@ def extract_token_transfers_command(logical_date, **kwargs):
235270
export_path("token_transfers", logical_date),
236271
)
237272

238-
def export_traces_command(logical_date, provider_uri, **kwargs):
273+
def export_traces_command(logical_date, provider_uri, hour, **kwargs):
239274
with TemporaryDirectory(dir=TEMP_DIR) as tempdir:
240-
start_block, end_block = get_block_range(tempdir, logical_date, provider_uri)
275+
start_block, end_block = get_block_range(
276+
tempdir, logical_date, provider_uri, hour
277+
)
241278
if start_block == 0:
242279
start_block = 1
243280

@@ -252,34 +289,44 @@ def export_traces_command(logical_date, provider_uri, **kwargs):
252289
start_block=start_block,
253290
end_block=end_block,
254291
batch_size=export_traces_batch_size,
255-
output=os.path.join(tempdir, "geth_traces.json"),
292+
output=os.path.join(tempdir, f"geth_traces_{hour:02}.json"),
256293
max_workers=export_traces_max_workers,
257-
provider_uri=provider_uri
294+
provider_uri=provider_uri,
258295
)
259296
copy_to_export_path(
260-
os.path.join(tempdir, "geth_traces.json"), export_path("traces", logical_date)
297+
os.path.join(tempdir, f"geth_traces_{hour:02}.json"),
298+
export_path("traces", logical_date),
261299
)
262300
else:
263301
copy_from_export_path(
264-
export_path("traces", logical_date), os.path.join(tempdir, "geth_traces.json"),
302+
export_path("traces", logical_date),
303+
os.path.join(tempdir, f"geth_traces_{hour:02}.json"),
265304
)
266305

267306
extract_geth_traces.callback(
268-
input=os.path.join(tempdir, "geth_traces.json"),
269-
output=os.path.join(tempdir, 'traces.csv'),
270-
max_workers=1
307+
input=os.path.join(tempdir, f"geth_traces_{hour:02}.json"),
308+
output=os.path.join(tempdir, f"traces_{hour:02}.csv"),
309+
max_workers=1,
271310
)
272311

273312
copy_to_export_path(
274-
os.path.join(tempdir, "traces.csv"), export_path("traces", logical_date)
313+
os.path.join(tempdir, f"traces_{hour:02}.csv"),
314+
export_path("traces", logical_date),
275315
)
276316

277-
def add_export_task(toggle, task_id, python_callable, dependencies=None):
317+
def add_export_task(
318+
toggle,
319+
task_id,
320+
python_callable,
321+
op_kwargs=None,
322+
dependencies=None,
323+
):
278324
if toggle:
279325
operator = PythonOperator(
280326
task_id=task_id,
281327
python_callable=python_callable,
282328
execution_timeout=timedelta(hours=24),
329+
op_kwargs=op_kwargs,
283330
dag=dag,
284331
)
285332
if dependencies is not None and len(dependencies) > 0:
@@ -291,6 +338,7 @@ def add_export_task(toggle, task_id, python_callable, dependencies=None):
291338
return None
292339

293340
# Operators
341+
export_complete = DummyOperator(task_id="export_complete", dag=dag)
294342

295343
export_blocks_and_transactions_operator = add_export_task(
296344
export_blocks_and_transactions_toggle,
@@ -311,26 +359,35 @@ def add_export_task(toggle, task_id, python_callable, dependencies=None):
311359
extract_token_transfers_command,
312360
dependencies=[export_receipts_and_logs_operator],
313361
)
362+
extract_token_transfers_operator >> export_complete
363+
364+
for hour in range(24):
365+
export_traces_operator = add_export_task(
366+
export_traces_toggle,
367+
f"export_geth_traces_{hour:02}",
368+
add_provider_uri_fallback_loop(
369+
export_traces_command,
370+
provider_uris_archival,
371+
),
372+
op_kwargs={"hour": hour},
373+
)
314374

315-
export_traces_operator = add_export_task(
316-
export_traces_toggle,
317-
"export_geth_traces",
318-
add_provider_uri_fallback_loop(export_traces_command, provider_uris_archival)
319-
)
320-
321-
extract_contracts_operator = add_export_task(
322-
extract_contracts_toggle,
323-
"extract_contracts",
324-
extract_contracts_command,
325-
dependencies=[export_traces_operator],
326-
)
375+
extract_contracts_operator = add_export_task(
376+
extract_contracts_toggle,
377+
f"extract_contracts_{hour:02}",
378+
extract_contracts_command,
379+
op_kwargs={"hour": hour},
380+
dependencies=[export_traces_operator],
381+
)
327382

328-
extract_tokens_operator = add_export_task(
329-
extract_tokens_toggle,
330-
"extract_tokens",
331-
add_provider_uri_fallback_loop(extract_tokens_command, provider_uris),
332-
dependencies=[extract_contracts_operator],
333-
)
383+
extract_tokens_operator = add_export_task(
384+
extract_tokens_toggle,
385+
f"extract_tokens_{hour:02}",
386+
add_provider_uri_fallback_loop(extract_tokens_command, provider_uris),
387+
op_kwargs={"hour": hour},
388+
dependencies=[extract_contracts_operator],
389+
)
390+
extract_tokens_operator >> export_complete
334391

335392
return dag
336393

airflow/dags/polygonetl_airflow/build_load_dag.py

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010
from airflow import models
1111
from airflow.providers.google.cloud.hooks.gcs import GCSHook
1212
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
13-
from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor
1413
from airflow.operators.python import PythonOperator
14+
from airflow.sensors.external_task import ExternalTaskSensor
1515
from google.cloud import bigquery
1616
from google.cloud.bigquery import TimePartitioning
1717

@@ -95,17 +95,20 @@ def read_file(filepath):
9595

9696
dags_folder = os.environ.get('DAGS_FOLDER', '/home/airflow/gcs/dags')
9797

98-
def add_load_tasks(task, file_format, allow_quoted_newlines=False):
99-
wait_sensor = GCSObjectExistenceSensor(
100-
task_id='wait_latest_{task}'.format(task=task),
101-
timeout=12 * 60 * 60,
102-
poke_interval=60,
103-
bucket=output_bucket,
104-
object='export/{task}/block_date={datestamp}/{task}.{file_format}'.format(
105-
task=task, datestamp='{{ds}}', file_format=file_format),
106-
dag=dag
98+
if not load_all_partitions:
99+
wait_sensor = ExternalTaskSensor(
100+
task_id="wait_export_dag",
101+
external_dag_id=f"{chain}_export_dag",
102+
external_task_id="export_complete",
103+
execution_delta=timedelta(hours=1),
104+
priority_weight=0,
105+
mode="reschedule",
106+
poke_interval=5 * 60,
107+
timeout=8 * 60 * 60,
108+
dag=dag,
107109
)
108110

111+
def add_load_tasks(task, file_format, allow_quoted_newlines=False):
109112
def load_task(ds, **kwargs):
110113
client = bigquery.Client()
111114
job_config = bigquery.LoadJobConfig()
@@ -155,7 +158,9 @@ def load_task(ds, **kwargs):
155158
dag=dag
156159
)
157160

158-
wait_sensor >> load_operator
161+
if not load_all_partitions:
162+
wait_sensor >> load_operator
163+
159164
return load_operator
160165

161166
def add_enrich_tasks(task, time_partitioning_field='block_timestamp', dependencies=None, always_load_all_partitions=False):

0 commit comments

Comments
 (0)