From d350e7d665d9651adbb9080d91527ea8997554fe Mon Sep 17 00:00:00 2001 From: Andrey Zvonov Date: Sun, 23 Nov 2025 22:44:55 +0100 Subject: [PATCH 1/2] follow-up 1 --- .../DataLakes/Iceberg/IcebergIterator.cpp | 16 +++++++++------- .../DataLakes/Iceberg/IcebergIterator.h | 1 - .../DataLakes/Iceberg/ManifestFile.h | 5 +++++ 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp index 27e76d1996b6..6faf5f3f9e24 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp @@ -141,7 +141,7 @@ std::optional SingleThreadIcebergKeysIterator::next() auto files = files_generator(current_manifest_file_content); while (internal_data_index < files.size()) { - const auto & manifest_file_entry = files[internal_data_index++]; + auto & manifest_file_entry = files[internal_data_index++]; if ((manifest_file_entry.schema_id != previous_entry_schema) && (use_partition_pruning)) { previous_entry_schema = manifest_file_entry.schema_id; @@ -164,7 +164,13 @@ std::optional SingleThreadIcebergKeysIterator::next() switch (pruning_status) { case PruningReturnStatus::NOT_PRUNED: + { + auto [storage_to_use, resolved_key] = resolveObjectStorageForPath( + persistent_components.table_location, manifest_file_entry.file_path, object_storage, *secondary_storages, local_context); + manifest_file_entry.storage_to_use = storage_to_use; + manifest_file_entry.resolved_key = resolved_key; return manifest_file_entry; + } case PruningReturnStatus::MIN_MAX_INDEX_PRUNED: { ++min_max_index_pruned_files; break; @@ -242,7 +248,6 @@ IcebergIterator::IcebergIterator( std::shared_ptr secondary_storages_) : filter_dag(filter_dag_ ? std::make_unique(filter_dag_->clone()) : nullptr) , object_storage(std::move(object_storage_)) - , local_context(local_context_) , data_files_iterator( object_storage, local_context_, @@ -336,11 +341,8 @@ ObjectInfoPtr IcebergIterator::next(size_t) Iceberg::ManifestFileEntry manifest_file_entry; if (blocking_queue.pop(manifest_file_entry)) { - // Resolve the data file path to get the correct storage and key - auto [storage_to_use, resolved_key] = resolveObjectStorageForPath( - persistent_components.table_location, manifest_file_entry.file_path, object_storage, *secondary_storages, local_context); - - IcebergDataObjectInfoPtr object_info = std::make_shared(manifest_file_entry, storage_to_use, resolved_key); + IcebergDataObjectInfoPtr object_info = std::make_shared( + manifest_file_entry, manifest_file_entry.storage_to_use, manifest_file_entry.resolved_key); for (const auto & position_delete : defineDeletesSpan(manifest_file_entry, position_deletes_files, false)) object_info->addPositionDeleteObject(position_delete); diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h index 1a2e0b0c139f..eaa20f3a07fa 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h @@ -103,7 +103,6 @@ class IcebergIterator : public IObjectIterator private: std::unique_ptr filter_dag; ObjectStoragePtr object_storage; - ContextPtr local_context; Iceberg::SingleThreadIcebergKeysIterator data_files_iterator; Iceberg::SingleThreadIcebergKeysIterator deletes_iterator; ConcurrentBoundedQueue blocking_queue; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h index 7967a1b8ba84..7b26ba9845f3 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h @@ -26,6 +26,7 @@ struct ColumnInfo #include #include #include +#include #include @@ -84,6 +85,10 @@ struct ManifestFileEntry String file_format; std::optional reference_data_file_path; // For position delete files only. std::optional> equality_ids; + + // Resolved storage and key (set by SingleThreadIcebergKeysIterator) + ObjectStoragePtr storage_to_use; + String resolved_key; }; /** From 8550f3281b8e0ae4ff2abd1d68bb8c354b5d5ce9 Mon Sep 17 00:00:00 2001 From: Andrey Zvonov Date: Mon, 24 Nov 2025 00:43:04 +0100 Subject: [PATCH 2/2] fix taskdistributor --- .../StorageObjectStorageStableTaskDistributor.cpp | 8 ++++---- src/Storages/ObjectStorage/Utils.cpp | 3 +-- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index 8492c6cc40ea..d84eb6dbea28 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -111,7 +111,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getPreQueuedFile(size_t auto next_file = files.back(); files.pop_back(); - auto file_path = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getPath(); + auto file_path = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getAbsolutePath().value_or(next_file->getPath()); auto it = unprocessed_files.find(file_path); if (it == unprocessed_files.end()) continue; @@ -221,7 +221,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(s auto next_file = it->second.first; unprocessed_files.erase(it); - auto file_path = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getPath(); + auto file_path = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getAbsolutePath().value_or(next_file->getPath()); LOG_TRACE( log, "Iterator exhausted. Assigning unprocessed file {} to replica {}", @@ -282,8 +282,8 @@ void StorageObjectStorageStableTaskDistributor::rescheduleTasksFromReplica(size_ replica_to_files_to_be_processed.erase(number_of_current_replica); for (const auto & file : processed_file_list_ptr->second) { - auto file_replica_idx = getReplicaForFile(file->getPath()); - unprocessed_files.emplace(file->getPath(), std::make_pair(file, file_replica_idx)); + auto file_replica_idx = getReplicaForFile(file->getAbsolutePath().value_or(file->getPath())); + unprocessed_files.emplace(file->getAbsolutePath().value_or(file->getPath()), std::make_pair(file, file_replica_idx)); connection_to_files[file_replica_idx].push_back(file); } } diff --git a/src/Storages/ObjectStorage/Utils.cpp b/src/Storages/ObjectStorage/Utils.cpp index a89f1e3bfe64..8ae4ca36654e 100644 --- a/src/Storages/ObjectStorage/Utils.cpp +++ b/src/Storages/ObjectStorage/Utils.cpp @@ -110,8 +110,7 @@ std::pair getOrCreateStorageAndKey( configure_fn(*cfg, config_prefix); - auto & factory = ObjectStorageFactory::instance(); - ObjectStoragePtr storage = factory.create(cache_key, *cfg, config_prefix, context, /*skip_access_check*/ true); + ObjectStoragePtr storage = ObjectStorageFactory::instance().create(cache_key, *cfg, config_prefix, context, /*skip_access_check*/ true); { std::lock_guard lock(secondary_storages.mutex);