Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,15 @@ ManifestFileContent::ManifestFileContent(
for (const auto & [column_id, bounds] : value_for_bounds)
{
DB::NameAndTypePair name_and_type = schema_processor.getFieldCharacteristics(schema_id, column_id);
auto left = deserializeFieldFromBinaryRepr(bounds.first.safeGet<std::string>(), name_and_type.type, true);
auto right = deserializeFieldFromBinaryRepr(bounds.second.safeGet<std::string>(), name_and_type.type, false);

String left_str;
String right_str;
/// lower_bound and upper_bound may be NULL.
if (!bounds.first.tryGet(left_str) || !bounds.second.tryGet(right_str))
continue;

auto left = deserializeFieldFromBinaryRepr(left_str, name_and_type.type, true);
auto right = deserializeFieldFromBinaryRepr(right_str, name_and_type.type, false);
if (!left || !right)
continue;

Expand Down
139 changes: 139 additions & 0 deletions tests/integration/test_storage_iceberg/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3016,3 +3016,142 @@ def test_explicit_metadata_file(started_cluster, storage_type):
create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster, explicit_metadata_path="metadata/v11.metadata.json")

assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100


@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"])
def test_minmax_pruning_with_null(started_cluster, storage_type):
instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
TABLE_NAME = "test_minmax_pruning_with_null" + storage_type + "_" + get_uuid_str()

def execute_spark_query(query: str):
spark.sql(query)
default_upload_directory(
started_cluster,
storage_type,
f"/iceberg_data/default/{TABLE_NAME}/",
f"/iceberg_data/default/{TABLE_NAME}/",
)
return

execute_spark_query(
f"""
CREATE TABLE {TABLE_NAME} (
tag INT,
date DATE,
ts TIMESTAMP,
time_struct struct<a : DATE, b : TIMESTAMP>,
name VARCHAR(50),
number BIGINT
)
USING iceberg
OPTIONS('format-version'='2')
"""
)

# min-max value of time_struct in manifest file is null.
execute_spark_query(
f"""
INSERT INTO {TABLE_NAME} VALUES
(1, DATE '2024-01-20',
TIMESTAMP '2024-02-20 10:00:00', null, 'vasya', 5)
"""
)

execute_spark_query(
f"""
INSERT INTO {TABLE_NAME} VALUES
(2, DATE '2024-02-20',
TIMESTAMP '2024-03-20 15:00:00', null, 'vasilisa', 6)
"""
)

execute_spark_query(
f"""
INSERT INTO {TABLE_NAME} VALUES
(3, DATE '2025-03-20',
TIMESTAMP '2024-04-30 14:00:00', null, 'icebreaker', 7)
"""
)
execute_spark_query(
f"""
INSERT INTO {TABLE_NAME} VALUES
(4, DATE '2025-04-20',
TIMESTAMP '2024-05-30 14:00:00', null, 'iceberg', 8)
"""
)

execute_spark_query(
f"""
INSERT INTO {TABLE_NAME} VALUES
(1, DATE '2024-01-20',
TIMESTAMP '2024-02-20 10:00:00', named_struct('a', DATE '2024-02-20', 'b', TIMESTAMP '2024-02-20 10:00:00'), 'vasya', 5)
"""
)

creation_expression = get_creation_expression(
storage_type, TABLE_NAME, started_cluster, table_function=True
)

def check_validity_and_get_prunned_files(select_expression):
query_id1 = f"{TABLE_NAME}-{uuid.uuid4()}"
query_id2 = f"{TABLE_NAME}-{uuid.uuid4()}"

data1 = instance.query(
select_expression,
query_id=query_id1,
settings={"use_iceberg_partition_pruning": 0, "input_format_parquet_bloom_filter_push_down": 0, "input_format_parquet_filter_push_down": 0},
)
data1 = list(
map(
lambda x: x.split("\t"),
filter(lambda x: len(x) > 0, data1.strip().split("\n")),
)
)

data2 = instance.query(
select_expression,
query_id=query_id2,
settings={"use_iceberg_partition_pruning": 1, "input_format_parquet_bloom_filter_push_down": 0, "input_format_parquet_filter_push_down": 0},
)
data2 = list(
map(
lambda x: x.split("\t"),
filter(lambda x: len(x) > 0, data2.strip().split("\n")),
)
)

assert data1 == data2

instance.query("SYSTEM FLUSH LOGS")

print(
"Unprunned: ",
instance.query(
f"SELECT ProfileEvents['IcebergMinMaxIndexPrunnedFiles'] FROM system.query_log WHERE query_id = '{query_id1}' AND type = 'QueryFinish'"
),
)
print(
"Prunned: ",
instance.query(
f"SELECT ProfileEvents['IcebergMinMaxIndexPrunnedFiles'] FROM system.query_log WHERE query_id = '{query_id2}' AND type = 'QueryFinish'"
),
)

assert 0 == int(
instance.query(
f"SELECT ProfileEvents['IcebergMinMaxIndexPrunnedFiles'] FROM system.query_log WHERE query_id = '{query_id1}' AND type = 'QueryFinish'"
)
)
return int(
instance.query(
f"SELECT ProfileEvents['IcebergMinMaxIndexPrunnedFiles'] FROM system.query_log WHERE query_id = '{query_id2}' AND type = 'QueryFinish'"
)
)

assert (
check_validity_and_get_prunned_files(
f"SELECT * FROM {creation_expression} WHERE time_struct.a <= '2024-02-01' ORDER BY ALL"
)
== 1
)
Loading