From e3f0ac02f7babc4ca809983f244721ca11f7c8f0 Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Wed, 9 Jul 2025 14:34:03 +0200 Subject: [PATCH] Merge pull request #883 from Altinity/backports/25.3/81451_iceberg_support_compressed_metadata --- .../DataLakes/Iceberg/IcebergMetadata.cpp | 68 ++++++++++++++----- .../integration/test_storage_iceberg/test.py | 44 +++++++++++- 2 files changed, 94 insertions(+), 18 deletions(-) diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index e7f7a16858f3..e59d7199f563 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -15,6 +15,7 @@ #include #include "Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadataFilesCache.h" #include +#include #include #include @@ -104,7 +105,8 @@ Poco::JSON::Object::Ptr getMetadataJSONObject( StorageObjectStorage::ConfigurationPtr configuration_ptr, IcebergMetadataFilesCachePtr cache_ptr, const ContextPtr & local_context, - LoggerPtr log) + LoggerPtr log, + CompressionMethod compression_method) { auto create_fn = [&]() { @@ -115,7 +117,14 @@ Poco::JSON::Object::Ptr getMetadataJSONObject( if (cache_ptr) read_settings.enable_filesystem_cache = false; - auto buf = StorageObjectStorageSource::createReadBuffer(object_info, object_storage, local_context, log, read_settings); + auto source_buf = StorageObjectStorageSource::createReadBuffer(object_info, object_storage, local_context, log); + + std::unique_ptr buf; + if (compression_method != CompressionMethod::None) + buf = wrapReadBufferWithCompressionMethod(std::move(source_buf), compression_method); + else + buf = std::move(source_buf); + String json_str; readJSONObjectPossiblyInvalid(json_str, *buf); return json_str; @@ -274,7 +283,30 @@ Int32 IcebergMetadata::parseTableSchema( } } -static std::pair getMetadataFileAndVersion(const std::string & path) +struct MetadataFileWithInfo +{ + Int32 version; + String path; + CompressionMethod compression_method; +}; + +static CompressionMethod getCompressionMethodFromMetadataFile(const String & path) +{ + constexpr std::string_view metadata_suffix = ".metadata.json"; + + auto compression_method = chooseCompressionMethod(path, "auto"); + + /// NOTE you will be surprised, but some metadata files store compression not in the end of the file name, + /// but somewhere in the middle of the file name, before metadata.json suffix. + /// Maybe history of Iceberg metadata files is not so long, but it is already full of surprises. + /// Example of weird engineering decisions: 00000-85befd5a-69c7-46d4-bca6-cfbd67f0f7e6.gz.metadata.json + if (compression_method == CompressionMethod::None && path.ends_with(metadata_suffix)) + compression_method = chooseCompressionMethod(path.substr(0, path.size() - metadata_suffix.size()), "auto"); + + return compression_method; +} + +static MetadataFileWithInfo getMetadataFileAndVersion(const std::string & path) { String file_name(path.begin() + path.find_last_of('/') + 1, path.end()); String version_str; @@ -289,7 +321,10 @@ static std::pair getMetadataFileAndVersion(const std::string & pa throw Exception( ErrorCodes::BAD_ARGUMENTS, "Bad metadata file name: {}. Expected vN.metadata.json where N is a number", file_name); - return std::make_pair(std::stoi(version_str), path); + return MetadataFileWithInfo{ + .version = std::stoi(version_str), + .path = path, + .compression_method = getCompressionMethodFromMetadataFile(path)}; } enum class MostRecentMetadataFileSelectionWay @@ -300,7 +335,7 @@ enum class MostRecentMetadataFileSelectionWay struct ShortMetadataFileInfo { - UInt32 version; + Int32 version; UInt64 last_updated_ms; String path; }; @@ -312,7 +347,7 @@ struct ShortMetadataFileInfo * 1) v.metadata.json, where V - metadata version. * 2) -.metadata.json, where V - metadata version */ -static std::pair getLatestMetadataFileAndVersion( +static MetadataFileWithInfo getLatestMetadataFileAndVersion( const ObjectStoragePtr & object_storage, StorageObjectStorage::ConfigurationPtr configuration_ptr, IcebergMetadataFilesCachePtr cache_ptr, @@ -336,10 +371,10 @@ static std::pair getLatestMetadataFileAndVersion( metadata_files_with_versions.reserve(metadata_files.size()); for (const auto & path : metadata_files) { - auto [version, metadata_file_path] = getMetadataFileAndVersion(path); + auto [version, metadata_file_path, compression_method] = getMetadataFileAndVersion(path); if (need_all_metadata_files_parsing) { - auto metadata_file_object = getMetadataJSONObject(metadata_file_path, object_storage, configuration_ptr, cache_ptr, local_context, log); + auto metadata_file_object = getMetadataJSONObject(metadata_file_path, object_storage, configuration_ptr, cache_ptr, local_context, log, compression_method); if (table_uuid.has_value()) { if (metadata_file_object->has(f_table_uuid)) @@ -389,10 +424,11 @@ static std::pair getLatestMetadataFileAndVersion( [](const ShortMetadataFileInfo & a, const ShortMetadataFileInfo & b) { return a.version < b.version; }); } }(); - return {latest_metadata_file_info.version, latest_metadata_file_info.path}; + + return {latest_metadata_file_info.version, latest_metadata_file_info.path, getCompressionMethodFromMetadataFile(latest_metadata_file_info.path)}; } -static std::pair getLatestOrExplicitMetadataFileAndVersion( +static MetadataFileWithInfo getLatestOrExplicitMetadataFileAndVersion( const ObjectStoragePtr & object_storage, StorageObjectStorage::ConfigurationPtr configuration_ptr, IcebergMetadataFilesCachePtr cache_ptr, @@ -459,7 +495,7 @@ bool IcebergMetadata::update(const ContextPtr & local_context) std::lock_guard lock(mutex); - const auto [metadata_version, metadata_file_path] + const auto [metadata_version, metadata_file_path, compression_method] = getLatestOrExplicitMetadataFileAndVersion(object_storage, configuration_ptr, manifest_cache, local_context, log.get()); bool metadata_file_changed = false; @@ -469,7 +505,7 @@ bool IcebergMetadata::update(const ContextPtr & local_context) metadata_file_changed = true; } - auto metadata_object = getMetadataJSONObject(metadata_file_path, object_storage, configuration_ptr, manifest_cache, local_context, log); + auto metadata_object = getMetadataJSONObject(metadata_file_path, object_storage, configuration_ptr, manifest_cache, local_context, log, compression_method); chassert(format_version == metadata_object->getValue(f_format_version)); auto previous_snapshot_id = relevant_snapshot_id; @@ -662,9 +698,9 @@ DataLakeMetadataPtr IcebergMetadata::create( else LOG_TRACE(log, "Not using in-memory cache for iceberg metadata files, because the setting use_iceberg_metadata_files_cache is false."); - const auto [metadata_version, metadata_file_path] = getLatestOrExplicitMetadataFileAndVersion(object_storage, configuration_ptr, cache_ptr, local_context, log.get()); + const auto [metadata_version, metadata_file_path, compression_method] = getLatestOrExplicitMetadataFileAndVersion(object_storage, configuration_ptr, cache_ptr, local_context, log.get()); - Poco::JSON::Object::Ptr object = getMetadataJSONObject(metadata_file_path, object_storage, configuration_ptr, cache_ptr, local_context, log); + Poco::JSON::Object::Ptr object = getMetadataJSONObject(metadata_file_path, object_storage, configuration_ptr, cache_ptr, local_context, log, compression_method); auto format_version = object->getValue(f_format_version); return std::make_unique(object_storage, configuration_ptr, local_context, metadata_version, format_version, object, cache_ptr); @@ -734,7 +770,7 @@ IcebergMetadata::IcebergHistory IcebergMetadata::getHistory(ContextPtr local_con { auto configuration_ptr = configuration.lock(); - const auto [metadata_version, metadata_file_path] = getLatestOrExplicitMetadataFileAndVersion(object_storage, configuration_ptr, manifest_cache, local_context, log.get()); + const auto [metadata_version, metadata_file_path, compression_method] = getLatestOrExplicitMetadataFileAndVersion(object_storage, configuration_ptr, manifest_cache, local_context, log.get()); chassert([&]() { @@ -742,7 +778,7 @@ IcebergMetadata::IcebergHistory IcebergMetadata::getHistory(ContextPtr local_con return metadata_version == last_metadata_version; }()); - auto metadata_object = getMetadataJSONObject(metadata_file_path, object_storage, configuration_ptr, manifest_cache, local_context, log); + auto metadata_object = getMetadataJSONObject(metadata_file_path, object_storage, configuration_ptr, manifest_cache, local_context, log, compression_method); chassert([&]() { SharedLockGuard lock(mutex); diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index 30549ead3554..8adfdaa805ec 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -1,5 +1,6 @@ import logging import os +import subprocess import uuid import time from datetime import datetime, timezone @@ -744,7 +745,7 @@ def make_query_from_function( # Cluster Query with node1 as coordinator and storage type as arg select_cluster_with_type_arg, query_id_cluster_with_type_arg = make_query_from_function( run_on_cluster=True, - storage_type_as_arg=True, + storage_type_as_arg=True, ) # Cluster Query with node1 as coordinator and storage type in named collection @@ -2044,7 +2045,7 @@ def test_metadata_file_selection_from_version_hint(started_cluster, format_versi spark.sql( f"INSERT INTO {TABLE_NAME} select id, char(id + ascii('a')) from range(10)" ) - + # test the case where version_hint.text file contains just the version number with open(f"/iceberg_data/default/{TABLE_NAME}/metadata/version-hint.text", "w") as f: f.write('5') @@ -3402,3 +3403,42 @@ def execute_spark_query(query: str): table_select_expression = table_creation_expression instance.query(f"SELECT * FROM {table_select_expression} ORDER BY ALL") + + +@pytest.mark.parametrize("storage_type", ["local", "s3"]) +def test_compressed_metadata(started_cluster, storage_type): + instance = started_cluster.instances["node1"] + spark = started_cluster.spark_session + TABLE_NAME = "test_compressed_metadata_" + storage_type + "_" + get_uuid_str() + + table_properties = { + "write.metadata.compression": "gzip" + } + + df = spark.createDataFrame([ + (1, "Alice"), + (2, "Bob") + ], ["id", "name"]) + + # for some reason write.metadata.compression is not working :( + df.writeTo(TABLE_NAME) \ + .tableProperty("write.metadata.compression", "gzip") \ + .using("iceberg") \ + .create() + + # manual compression of metadata file before upload, still test some scenarios + subprocess.check_output(f"gzip /iceberg_data/default/{TABLE_NAME}/metadata/v1.metadata.json", shell=True) + + # Weird but compression extension is really in the middle of the file name, not in the end... + subprocess.check_output(f"mv /iceberg_data/default/{TABLE_NAME}/metadata/v1.metadata.json.gz /iceberg_data/default/{TABLE_NAME}/metadata/v1.gz.metadata.json", shell=True) + + default_upload_directory( + started_cluster, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + f"/iceberg_data/default/{TABLE_NAME}/", + ) + + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster, explicit_metadata_path="") + + assert instance.query(f"SELECT * FROM {TABLE_NAME} WHERE not ignore(*)") == "1\tAlice\n2\tBob\n" \ No newline at end of file