Skip to content

Commit d9df9e8

Browse files
Filter manifest files based on partition summaries (#938)
Filter manifest files based on partition summaries when constructing parquet infos. This can avoid a lot of overhead reading from slow storage e.g. s3 in the case where there are many manifest files but only a few that will match the filter.
1 parent 77f31e1 commit d9df9e8

1 file changed

Lines changed: 21 additions & 10 deletions

File tree

bodo/io/iceberg/read_metadata.py

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,11 @@
3434
from pyiceberg.catalog import Catalog
3535
from pyiceberg.expressions import BooleanExpression
3636
from pyiceberg.io import FileIO
37-
from pyiceberg.table import FileScanTask, Table
37+
from pyiceberg.table import DataScan, FileScanTask, ManifestFile, Table
3838

3939

4040
def _construct_parquet_infos(
41-
table: Table, tasks: pt.Iterable[FileScanTask]
41+
table: Table, table_scan: DataScan
4242
) -> tuple[list[IcebergParquetInfo], int]:
4343
"""
4444
Construct IcebergParquetInfo objects for each file
@@ -55,15 +55,28 @@ def _construct_parquet_infos(
5555
ManifestEntry,
5656
ManifestEntryStatus,
5757
)
58+
from pyiceberg.typedef import KeyDefaultDict
5859

5960
file_path_to_schema_id = {}
61+
tasks: pt.Iterable[FileScanTask] = table_scan.plan_files()
6062

6163
s = time.monotonic_ns()
6264
# Construct a mapping from file path to schema ID
6365
snap = table.current_snapshot()
6466
assert snap is not None
6567

66-
for manifest_file in snap.manifests(table.io):
68+
# Filter manifest files based on partition summaries, similar to:
69+
# https://github.com/apache/iceberg-python/blob/59dc8d13ad4e1500fff12946f1bfaddb5484f90e/pyiceberg/table/__init__.py#L1942
70+
manifest_evaluators: dict[int, pt.Callable[[ManifestFile], bool]] = KeyDefaultDict(
71+
table_scan._build_manifest_evaluator
72+
)
73+
manifests = [
74+
manifest_file
75+
for manifest_file in snap.manifests(table.io)
76+
if manifest_evaluators[manifest_file.partition_spec_id](manifest_file)
77+
]
78+
79+
for manifest_file in manifests:
6780
# Similar to PyIceberg's fetch_manifest_entry here:
6881
# https://github.com/apache/iceberg-python/blob/38ebb19a39407f52fe439289af8be81268932b0b/pyiceberg/manifest.py#L696
6982
input_file = table.io.new_input(manifest_file.manifest_path)
@@ -172,14 +185,12 @@ def get_iceberg_file_list_parallel(
172185
ev_iceberg_fl.add_attribute("g_filters", filters)
173186
try:
174187
table = catalog.load_table(table_id)
175-
pq_infos, get_file_to_schema_us = _construct_parquet_infos(
176-
table,
177-
table.scan(
178-
filters,
179-
snapshot_id=snapshot_id if snapshot_id > -1 else None,
180-
limit=limit if limit > -1 else None,
181-
).plan_files(),
188+
table_scan = table.scan(
189+
filters,
190+
snapshot_id=snapshot_id if snapshot_id > -1 else None,
191+
limit=limit if limit > -1 else None,
182192
)
193+
pq_infos, get_file_to_schema_us = _construct_parquet_infos(table, table_scan)
183194

184195
if tracing.is_tracing(): # pragma: no cover
185196
ICEBERG_TRACING_NUM_FILES_TO_LOG = int(

0 commit comments

Comments
 (0)