diff --git a/src/Databases/DataLake/GlueCatalog.cpp b/src/Databases/DataLake/GlueCatalog.cpp index 9ee8c67ef8e8..23962e53dee0 100644 --- a/src/Databases/DataLake/GlueCatalog.cpp +++ b/src/Databases/DataLake/GlueCatalog.cpp @@ -594,7 +594,7 @@ String GlueCatalog::resolveMetadataPathFromTableLocation(const String & table_lo return ""; // List all files in metadata directory - DB::RelativePathsWithMetadata files; + DB::PathsWithMetadata files; object_storage->listObjects(metadata_dir_path, files, 0); // Filter for .metadata.json files and find the most recent one diff --git a/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp b/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp index e5e139b04c5c..68f38d58bd54 100644 --- a/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp +++ b/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp @@ -73,7 +73,7 @@ class AzureIteratorAsync final : public IObjectStorageIteratorAsync } private: - bool getBatchAndCheckNext(RelativePathsWithMetadata & batch) override + bool getBatchAndCheckNext(PathsWithMetadata & batch) override { ProfileEvents::increment(ProfileEvents::AzureListObjects); if (client->IsClientForDisk()) @@ -87,7 +87,7 @@ class AzureIteratorAsync final : public IObjectStorageIteratorAsync for (const auto & blob : blobs_list) { - batch.emplace_back(std::make_shared( + batch.emplace_back(std::make_shared( blob.Name, ObjectMetadata{ static_cast(blob.BlobSize), @@ -169,7 +169,7 @@ ObjectStorageIteratorPtr AzureObjectStorage::iterate(const std::string & path_pr return std::make_shared(path_prefix, client_ptr, max_keys ? max_keys : settings_ptr->list_object_keys_size); } -void AzureObjectStorage::listObjects(const std::string & path, RelativePathsWithMetadata & children, size_t max_keys) const +void AzureObjectStorage::listObjects(const std::string & path, PathsWithMetadata & children, size_t max_keys) const { auto client_ptr = client.get(); @@ -195,7 +195,7 @@ void AzureObjectStorage::listObjects(const std::string & path, RelativePathsWith for (const auto & blob : blobs_list) { - children.emplace_back(std::make_shared( + children.emplace_back(std::make_shared( blob.Name, ObjectMetadata{ static_cast(blob.BlobSize), diff --git a/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h b/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h index a958907771cc..1e4ed18b4605 100644 --- a/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h +++ b/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h @@ -37,7 +37,7 @@ class AzureObjectStorage : public IObjectStorage bool supportsListObjectsCache() override { return true; } - void listObjects(const std::string & path, RelativePathsWithMetadata & children, size_t max_keys) const override; + void listObjects(const std::string & path, PathsWithMetadata & children, size_t max_keys) const override; /// Sanitizer build may crash with max_keys=1; this looks like a false positive. ObjectStorageIteratorPtr iterate(const std::string & path_prefix, size_t max_keys) const override; diff --git a/src/Disks/ObjectStorages/Cached/CachedObjectStorage.cpp b/src/Disks/ObjectStorages/Cached/CachedObjectStorage.cpp index 8675555f2668..8d483a5581dd 100644 --- a/src/Disks/ObjectStorages/Cached/CachedObjectStorage.cpp +++ b/src/Disks/ObjectStorages/Cached/CachedObjectStorage.cpp @@ -193,7 +193,7 @@ void CachedObjectStorage::copyObject( // NOLINT object_storage->copyObject(object_from, object_to, read_settings, write_settings, object_to_attributes); } -void CachedObjectStorage::listObjects(const std::string & path, RelativePathsWithMetadata & children, size_t max_keys) const +void CachedObjectStorage::listObjects(const std::string & path, PathsWithMetadata & children, size_t max_keys) const { object_storage->listObjects(path, children, max_keys); } diff --git a/src/Disks/ObjectStorages/Cached/CachedObjectStorage.h b/src/Disks/ObjectStorages/Cached/CachedObjectStorage.h index 68cce9f2ccb6..34c2b7f2054f 100644 --- a/src/Disks/ObjectStorages/Cached/CachedObjectStorage.h +++ b/src/Disks/ObjectStorages/Cached/CachedObjectStorage.h @@ -64,7 +64,7 @@ class CachedObjectStorage final : public IObjectStorage IObjectStorage & object_storage_to, std::optional object_to_attributes = {}) override; - void listObjects(const std::string & path, RelativePathsWithMetadata & children, size_t max_keys) const override; + void listObjects(const std::string & path, PathsWithMetadata & children, size_t max_keys) const override; ObjectMetadata getObjectMetadata(const std::string & path) const override; diff --git a/src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.cpp b/src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.cpp index 5dea86c49027..443a8b4c6920 100644 --- a/src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.cpp +++ b/src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.cpp @@ -167,7 +167,7 @@ ObjectMetadata HDFSObjectStorage::getObjectMetadata(const std::string & path) co return metadata; } -void HDFSObjectStorage::listObjects(const std::string & path, RelativePathsWithMetadata & children, size_t max_keys) const +void HDFSObjectStorage::listObjects(const std::string & path, PathsWithMetadata & children, size_t max_keys) const { initializeHDFSFS(); LOG_TEST(log, "Trying to list files for {}", path); @@ -203,7 +203,7 @@ void HDFSObjectStorage::listObjects(const std::string & path, RelativePathsWithM } else { - children.emplace_back(std::make_shared( + children.emplace_back(std::make_shared( String(file_path), ObjectMetadata{ static_cast(ls.file_info[i].mSize), diff --git a/src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.h b/src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.h index 733407e236ef..c3fe9ce0cf6c 100644 --- a/src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.h +++ b/src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.h @@ -92,7 +92,7 @@ class HDFSObjectStorage : public IObjectStorage, public HDFSErrorWrapper const WriteSettings & write_settings, std::optional object_to_attributes = {}) override; - void listObjects(const std::string & path, RelativePathsWithMetadata & children, size_t max_keys) const override; + void listObjects(const std::string & path, PathsWithMetadata & children, size_t max_keys) const override; String getObjectsNamespace() const override { return ""; } diff --git a/src/Disks/ObjectStorages/IObjectStorage.cpp b/src/Disks/ObjectStorages/IObjectStorage.cpp index fbd022a6e69d..9b0137af7f28 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.cpp +++ b/src/Disks/ObjectStorages/IObjectStorage.cpp @@ -30,12 +30,12 @@ const MetadataStorageMetrics & IObjectStorage::getMetadataStorageMetrics() const bool IObjectStorage::existsOrHasAnyChild(const std::string & path) const { - RelativePathsWithMetadata files; + PathsWithMetadata files; listObjects(path, files, 1); return !files.empty(); } -void IObjectStorage::listObjects(const std::string &, RelativePathsWithMetadata &, size_t) const +void IObjectStorage::listObjects(const std::string &, PathsWithMetadata &, size_t) const { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "listObjects() is not supported"); } @@ -43,7 +43,7 @@ void IObjectStorage::listObjects(const std::string &, RelativePathsWithMetadata ObjectStorageIteratorPtr IObjectStorage::iterate(const std::string & path_prefix, size_t max_keys) const { - RelativePathsWithMetadata files; + PathsWithMetadata files; listObjects(path_prefix, files, max_keys); return std::make_shared(std::move(files)); @@ -102,21 +102,14 @@ WriteSettings IObjectStorage::patchSettings(const WriteSettings & write_settings return write_settings; } -RelativePathWithMetadata::RelativePathWithMetadata(const DataFileInfo & info, std::optional metadata_) - : metadata(std::move(metadata_)) -{ - relative_path = info.file_path; - file_meta_info = info.file_meta_info; -} - -std::string RelativePathWithMetadata::getPathOrPathToArchiveIfArchive() const +std::string PathWithMetadata::getPathOrPathToArchiveIfArchive() const { if (isArchive()) return getPathToArchive(); return getPath(); } -RelativePathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std::string & task) +PathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std::string & task) { Poco::JSON::Parser parser; try @@ -136,7 +129,7 @@ RelativePathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std } } -std::string RelativePathWithMetadata::CommandInTaskResponse::to_string() const +std::string PathWithMetadata::CommandInTaskResponse::to_string() const { Poco::JSON::Object json; if (retry_after_us.has_value()) @@ -149,16 +142,18 @@ std::string RelativePathWithMetadata::CommandInTaskResponse::to_string() const } -void RelativePathWithMetadata::loadMetadata(ObjectStoragePtr object_storage, bool ignore_non_existent_file) +void PathWithMetadata::loadMetadata(ObjectStoragePtr object_storage, bool ignore_non_existent_file) { if (!metadata) { const auto & path = isArchive() ? getPathToArchive() : getPath(); + auto storage_to_use = object_storage_to_use ? object_storage_to_use : object_storage; + if (ignore_non_existent_file) - metadata = object_storage->tryGetObjectMetadata(path); + metadata = storage_to_use->tryGetObjectMetadata(path); else - metadata = object_storage->getObjectMetadata(path); + metadata = storage_to_use->getObjectMetadata(path); } } diff --git a/src/Disks/ObjectStorages/IObjectStorage.h b/src/Disks/ObjectStorages/IObjectStorage.h index 440f5482e783..26017f46f57b 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.h +++ b/src/Disks/ObjectStorages/IObjectStorage.h @@ -108,13 +108,12 @@ struct ObjectMetadata }; -struct DataFileInfo; class DataFileMetaInfo; using DataFileMetaInfoPtr = std::shared_ptr; struct DataLakeObjectMetadata; -struct RelativePathWithMetadata +struct PathWithMetadata { class CommandInTaskResponse { @@ -143,28 +142,35 @@ struct RelativePathWithMetadata std::optional file_meta_info; /// Retry request after short pause CommandInTaskResponse command; + std::optional absolute_path; + ObjectStoragePtr object_storage_to_use = nullptr; - RelativePathWithMetadata() = default; + PathWithMetadata() = default; - explicit RelativePathWithMetadata(String command_or_path, std::optional metadata_ = std::nullopt) + explicit PathWithMetadata( + const String & command_or_path, + std::optional metadata_ = std::nullopt, + std::optional absolute_path_ = std::nullopt, + ObjectStoragePtr object_storage_to_use_ = nullptr) : relative_path(std::move(command_or_path)) , metadata(std::move(metadata_)) , command(relative_path) + , absolute_path((absolute_path_.has_value() && !absolute_path_.value().empty()) ? absolute_path_ : std::nullopt) + , object_storage_to_use(object_storage_to_use_) { if (command.is_parsed()) relative_path = ""; } - explicit RelativePathWithMetadata(const DataFileInfo & info, std::optional metadata_ = std::nullopt); + PathWithMetadata(const PathWithMetadata & other) = default; - RelativePathWithMetadata(const RelativePathWithMetadata & other) = default; - - virtual ~RelativePathWithMetadata() = default; + virtual ~PathWithMetadata() = default; virtual std::string getFileName() const { return std::filesystem::path(relative_path).filename(); } virtual std::string getFileNameWithoutExtension() const { return std::filesystem::path(relative_path).stem(); } virtual std::string getPath() const { return relative_path; } + virtual std::optional getAbsolutePath() const { return absolute_path; } virtual bool isArchive() const { return false; } virtual std::string getPathToArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); } virtual size_t fileSizeInArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); } @@ -176,6 +182,8 @@ struct RelativePathWithMetadata const CommandInTaskResponse & getCommand() const { return command; } void loadMetadata(ObjectStoragePtr object_storage, bool ignore_non_existent_file = true); + + ObjectStoragePtr getObjectStorage() const { return object_storage_to_use; } }; struct ObjectKeyWithMetadata @@ -191,8 +199,8 @@ struct ObjectKeyWithMetadata {} }; -using RelativePathWithMetadataPtr = std::shared_ptr; -using RelativePathsWithMetadata = std::vector; +using PathWithMetadataPtr = std::shared_ptr; +using PathsWithMetadata = std::vector; using ObjectKeysWithMetadata = std::vector; class IObjectStorageIterator; @@ -233,7 +241,7 @@ class IObjectStorage virtual bool existsOrHasAnyChild(const std::string & path) const; /// List objects recursively by certain prefix. - virtual void listObjects(const std::string & path, RelativePathsWithMetadata & children, size_t max_keys) const; + virtual void listObjects(const std::string & path, PathsWithMetadata & children, size_t max_keys) const; /// List objects recursively by certain prefix. Use it instead of listObjects, if you want to list objects lazily. virtual ObjectStorageIteratorPtr iterate(const std::string & path_prefix, size_t max_keys) const; diff --git a/src/Disks/ObjectStorages/Local/LocalObjectStorage.cpp b/src/Disks/ObjectStorages/Local/LocalObjectStorage.cpp index 2c444954d538..40e83286e416 100644 --- a/src/Disks/ObjectStorages/Local/LocalObjectStorage.cpp +++ b/src/Disks/ObjectStorages/Local/LocalObjectStorage.cpp @@ -151,7 +151,7 @@ ObjectMetadata LocalObjectStorage::getObjectMetadata(const std::string & path) c return object_metadata; } -void LocalObjectStorage::listObjects(const std::string & path, RelativePathsWithMetadata & children, size_t/* max_keys */) const +void LocalObjectStorage::listObjects(const std::string & path, PathsWithMetadata & children, size_t/* max_keys */) const { if (!fs::exists(path) || !fs::is_directory(path)) return; @@ -164,7 +164,7 @@ void LocalObjectStorage::listObjects(const std::string & path, RelativePathsWith continue; } - children.emplace_back(std::make_shared(entry.path(), getObjectMetadata(entry.path()))); + children.emplace_back(std::make_shared(entry.path(), getObjectMetadata(entry.path()))); } } diff --git a/src/Disks/ObjectStorages/Local/LocalObjectStorage.h b/src/Disks/ObjectStorages/Local/LocalObjectStorage.h index a8a9fe321894..3b2a8b31d562 100644 --- a/src/Disks/ObjectStorages/Local/LocalObjectStorage.h +++ b/src/Disks/ObjectStorages/Local/LocalObjectStorage.h @@ -62,7 +62,7 @@ class LocalObjectStorage : public IObjectStorage ObjectMetadata getObjectMetadata(const std::string & path) const override; - void listObjects(const std::string & path, RelativePathsWithMetadata & children, size_t max_keys) const override; + void listObjects(const std::string & path, PathsWithMetadata & children, size_t max_keys) const override; bool existsOrHasAnyChild(const std::string & path) const override; diff --git a/src/Disks/ObjectStorages/MetadataStorageFromPlainObjectStorage.cpp b/src/Disks/ObjectStorages/MetadataStorageFromPlainObjectStorage.cpp index 336c2bca91ca..2609d0a123a8 100644 --- a/src/Disks/ObjectStorages/MetadataStorageFromPlainObjectStorage.cpp +++ b/src/Disks/ObjectStorages/MetadataStorageFromPlainObjectStorage.cpp @@ -124,7 +124,7 @@ std::vector MetadataStorageFromPlainObjectStorage::listDirectory(co { auto key_prefix = object_storage->generateObjectKeyForPath(path, std::nullopt /* key_prefix */).serialize(); - RelativePathsWithMetadata files; + PathsWithMetadata files; std::string absolute_key = key_prefix; if (!absolute_key.ends_with('/')) absolute_key += '/'; diff --git a/src/Disks/ObjectStorages/ObjectStorageIterator.cpp b/src/Disks/ObjectStorages/ObjectStorageIterator.cpp index 3d939ce92302..bec76452b2d5 100644 --- a/src/Disks/ObjectStorages/ObjectStorageIterator.cpp +++ b/src/Disks/ObjectStorages/ObjectStorageIterator.cpp @@ -9,7 +9,7 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -RelativePathWithMetadataPtr ObjectStorageIteratorFromList::current() +PathWithMetadataPtr ObjectStorageIteratorFromList::current() { if (!isValid()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to access invalid iterator"); diff --git a/src/Disks/ObjectStorages/ObjectStorageIterator.h b/src/Disks/ObjectStorages/ObjectStorageIterator.h index d814514ddcc9..b62992f6a719 100644 --- a/src/Disks/ObjectStorages/ObjectStorageIterator.h +++ b/src/Disks/ObjectStorages/ObjectStorageIterator.h @@ -16,10 +16,10 @@ class IObjectStorageIterator virtual bool isValid() = 0; /// Return the current element. - virtual RelativePathWithMetadataPtr current() = 0; + virtual PathWithMetadataPtr current() = 0; /// This will initiate prefetching the next batch in background, so it can be obtained faster when needed. - virtual std::optional getCurrentBatchAndScheduleNext() = 0; + virtual std::optional getCurrentBatchAndScheduleNext() = 0; /// Returns the number of elements in the batches that were fetched so far. virtual size_t getAccumulatedSize() const = 0; @@ -36,7 +36,7 @@ class IObjectStorageIterator /// Return the current batch of elements. /// It is unspecified how batches are formed. /// But this method can be used for more efficient processing. - virtual RelativePathsWithMetadata currentBatch() = 0; + virtual PathsWithMetadata currentBatch() = 0; }; using ObjectStorageIteratorPtr = std::shared_ptr; @@ -45,7 +45,7 @@ class ObjectStorageIteratorFromList : public IObjectStorageIterator { public: /// Everything is represented by just a single batch. - explicit ObjectStorageIteratorFromList(RelativePathsWithMetadata && batch_) + explicit ObjectStorageIteratorFromList(PathsWithMetadata && batch_) : batch(std::move(batch_)) , batch_iterator(batch.begin()) {} @@ -59,11 +59,11 @@ class ObjectStorageIteratorFromList : public IObjectStorageIterator bool isValid() override { return batch_iterator != batch.end(); } - RelativePathWithMetadataPtr current() override; + PathWithMetadataPtr current() override; - RelativePathsWithMetadata currentBatch() override { return batch; } + PathsWithMetadata currentBatch() override { return batch; } - std::optional getCurrentBatchAndScheduleNext() override + std::optional getCurrentBatchAndScheduleNext() override { if (batch.empty()) return {}; @@ -76,8 +76,8 @@ class ObjectStorageIteratorFromList : public IObjectStorageIterator size_t getAccumulatedSize() const override { return batch.size(); } private: - RelativePathsWithMetadata batch; - RelativePathsWithMetadata::iterator batch_iterator; + PathsWithMetadata batch; + PathsWithMetadata::iterator batch_iterator; }; } diff --git a/src/Disks/ObjectStorages/ObjectStorageIteratorAsync.cpp b/src/Disks/ObjectStorages/ObjectStorageIteratorAsync.cpp index 2d2e8cd2c1a5..c488e8596aaa 100644 --- a/src/Disks/ObjectStorages/ObjectStorageIteratorAsync.cpp +++ b/src/Disks/ObjectStorages/ObjectStorageIteratorAsync.cpp @@ -112,7 +112,7 @@ bool IObjectStorageIteratorAsync::isValid() return !is_finished; } -RelativePathWithMetadataPtr IObjectStorageIteratorAsync::current() +PathWithMetadataPtr IObjectStorageIteratorAsync::current() { std::lock_guard lock(mutex); @@ -123,7 +123,7 @@ RelativePathWithMetadataPtr IObjectStorageIteratorAsync::current() } -RelativePathsWithMetadata IObjectStorageIteratorAsync::currentBatch() +PathsWithMetadata IObjectStorageIteratorAsync::currentBatch() { std::lock_guard lock(mutex); @@ -133,7 +133,7 @@ RelativePathsWithMetadata IObjectStorageIteratorAsync::currentBatch() return current_batch; } -std::optional IObjectStorageIteratorAsync::getCurrentBatchAndScheduleNext() +std::optional IObjectStorageIteratorAsync::getCurrentBatchAndScheduleNext() { std::lock_guard lock(mutex); diff --git a/src/Disks/ObjectStorages/ObjectStorageIteratorAsync.h b/src/Disks/ObjectStorages/ObjectStorageIteratorAsync.h index 013714151245..b8a0d6e0249a 100644 --- a/src/Disks/ObjectStorages/ObjectStorageIteratorAsync.h +++ b/src/Disks/ObjectStorages/ObjectStorageIteratorAsync.h @@ -23,24 +23,24 @@ class IObjectStorageIteratorAsync : public IObjectStorageIterator bool isValid() override; - RelativePathWithMetadataPtr current() override; - RelativePathsWithMetadata currentBatch() override; + PathWithMetadataPtr current() override; + PathsWithMetadata currentBatch() override; void next() override; void nextBatch() override; size_t getAccumulatedSize() const override; - std::optional getCurrentBatchAndScheduleNext() override; + std::optional getCurrentBatchAndScheduleNext() override; void deactivate(); protected: /// This method fetches the next batch, and returns true if there are more batches after it. - virtual bool getBatchAndCheckNext(RelativePathsWithMetadata & batch) = 0; + virtual bool getBatchAndCheckNext(PathsWithMetadata & batch) = 0; struct BatchAndHasNext { - RelativePathsWithMetadata batch; + PathsWithMetadata batch; bool has_next; }; @@ -55,8 +55,8 @@ class IObjectStorageIteratorAsync : public IObjectStorageIterator ThreadPool list_objects_pool; ThreadPoolCallbackRunnerUnsafe list_objects_scheduler; std::future outcome_future; - RelativePathsWithMetadata current_batch; - RelativePathsWithMetadata::iterator current_batch_iterator; + PathsWithMetadata current_batch; + PathsWithMetadata::iterator current_batch_iterator; std::atomic accumulated_size = 0; }; diff --git a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp index 5ae8da17fa93..f1cb2561674c 100644 --- a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp +++ b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp @@ -134,7 +134,7 @@ class S3IteratorAsync final : public IObjectStorageIteratorAsync } private: - bool getBatchAndCheckNext(RelativePathsWithMetadata & batch) override + bool getBatchAndCheckNext(PathsWithMetadata & batch) override { ProfileEvents::increment(ProfileEvents::S3ListObjects); ProfileEvents::increment(ProfileEvents::DiskS3ListObjects); @@ -155,7 +155,7 @@ class S3IteratorAsync final : public IObjectStorageIteratorAsync for (const auto & object : objects) { ObjectMetadata metadata{static_cast(object.GetSize()), Poco::Timestamp::fromEpochTime(object.GetLastModified().Seconds()), object.GetETag(), {}}; - batch.emplace_back(std::make_shared(object.GetKey(), std::move(metadata))); + batch.emplace_back(std::make_shared(object.GetKey(), std::move(metadata))); } /// It returns false when all objects were returned @@ -253,7 +253,7 @@ ObjectStorageIteratorPtr S3ObjectStorage::iterate(const std::string & path_prefi return std::make_shared(uri.bucket, path_prefix, client.get(), max_keys); } -void S3ObjectStorage::listObjects(const std::string & path, RelativePathsWithMetadata & children, size_t max_keys) const +void S3ObjectStorage::listObjects(const std::string & path, PathsWithMetadata & children, size_t max_keys) const { auto settings_ptr = s3_settings.get(); @@ -285,7 +285,7 @@ void S3ObjectStorage::listObjects(const std::string & path, RelativePathsWithMet break; for (const auto & object : objects) - children.emplace_back(std::make_shared( + children.emplace_back(std::make_shared( object.GetKey(), ObjectMetadata{ static_cast(object.GetSize()), diff --git a/src/Disks/ObjectStorages/S3/S3ObjectStorage.h b/src/Disks/ObjectStorages/S3/S3ObjectStorage.h index a7efb0809984..fbeb32916280 100644 --- a/src/Disks/ObjectStorages/S3/S3ObjectStorage.h +++ b/src/Disks/ObjectStorages/S3/S3ObjectStorage.h @@ -79,7 +79,7 @@ class S3ObjectStorage : public IObjectStorage size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, const WriteSettings & write_settings = {}) override; - void listObjects(const std::string & path, RelativePathsWithMetadata & children, size_t max_keys) const override; + void listObjects(const std::string & path, PathsWithMetadata & children, size_t max_keys) const override; ObjectStorageIteratorPtr iterate(const std::string & path_prefix, size_t max_keys) const override; diff --git a/src/Interpreters/ClusterFunctionReadTask.cpp b/src/Interpreters/ClusterFunctionReadTask.cpp index 3843af6d0f41..adef5b74b98d 100644 --- a/src/Interpreters/ClusterFunctionReadTask.cpp +++ b/src/Interpreters/ClusterFunctionReadTask.cpp @@ -35,6 +35,7 @@ ClusterFunctionReadTaskResponse::ClusterFunctionReadTaskResponse(ObjectInfoPtr o const bool send_over_whole_archive = !context->getSettingsRef()[Setting::cluster_function_process_archive_on_multiple_nodes]; path = send_over_whole_archive ? object->getPathOrPathToArchiveIfArchive() : object->getPath(); + absolute_path = object->getAbsolutePath(); } ClusterFunctionReadTaskResponse::ClusterFunctionReadTaskResponse(const std::string & path_) @@ -50,6 +51,9 @@ ObjectInfoPtr ClusterFunctionReadTaskResponse::getObjectInfo() const auto object = std::make_shared(path); object->data_lake_metadata = data_lake_metadata; object->file_meta_info = file_meta_info; + if (absolute_path.has_value() && !absolute_path.value().empty()) + object->absolute_path = absolute_path; + return object; } diff --git a/src/Interpreters/ClusterFunctionReadTask.h b/src/Interpreters/ClusterFunctionReadTask.h index 5c3b5912e5c7..b5e2123cbc4f 100644 --- a/src/Interpreters/ClusterFunctionReadTask.h +++ b/src/Interpreters/ClusterFunctionReadTask.h @@ -18,6 +18,8 @@ struct ClusterFunctionReadTaskResponse /// Data path (object path, in case of object storage). String path; + /// Absolute path (including storage type prefix). + std::optional absolute_path; /// Object metadata path, in case of data lake object. DataLakeObjectMetadata data_lake_metadata; /// File's columns info diff --git a/src/Storages/ObjectStorage/DataLakes/Common.cpp b/src/Storages/ObjectStorage/DataLakes/Common.cpp index 65041a470e4c..c292915f2e16 100644 --- a/src/Storages/ObjectStorage/DataLakes/Common.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Common.cpp @@ -14,7 +14,7 @@ std::vector listFiles( const String & prefix, const String & suffix) { auto key = std::filesystem::path(configuration.getPathForRead().path) / prefix; - RelativePathsWithMetadata files_with_metadata; + PathsWithMetadata files_with_metadata; object_storage.listObjects(key, files_with_metadata, 0); Strings res; for (const auto & file_with_metadata : files_with_metadata) diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h index b2a0384e981f..eb509c474afe 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h @@ -70,6 +70,7 @@ struct DataFileInfo { std::string file_path; std::optional file_meta_info; + std::optional absolute_uri; explicit DataFileInfo(const std::string & file_path_) : file_path(file_path_) {} diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.cpp index 8f59de506f89..c2e783dc73e9 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.cpp @@ -69,6 +69,7 @@ struct Plan IcebergHistory history; std::unordered_map manifest_file_to_first_snapshot; std::unordered_map> manifest_list_to_manifest_files; + std::unordered_map> manifest_file_to_snapshots; std::unordered_map>> snapshot_id_to_data_files; std::unordered_map> path_to_data_file; FileNamesGenerator generator; @@ -111,6 +112,7 @@ Plan getPlan( IcebergHistory snapshots_info, const PersistentTableComponents & persistent_table_components, ObjectStoragePtr object_storage, + SecondaryStorages & secondary_storages, StorageObjectStorageConfigurationPtr configuration, ContextPtr context, CompressionMethod compression_method) @@ -146,29 +148,36 @@ Plan getPlan( std::unordered_map> manifest_files; for (const auto & snapshot : snapshots_info) { + auto [manifest_list_storage, key_in_storage] = resolveObjectStorageForPath(persistent_table_components.table_location, snapshot.manifest_list_path, object_storage, secondary_storages, context); + auto manifest_list - = getManifestList(object_storage, configuration, persistent_table_components, context, snapshot.manifest_list_path, log); + = getManifestList(manifest_list_storage, configuration, persistent_table_components, context, key_in_storage, snapshot.manifest_list_path, log); + for (const auto & manifest_file : manifest_list) { - plan.manifest_list_to_manifest_files[snapshot.manifest_list_path].push_back(manifest_file.manifest_file_path); - if (!plan.manifest_file_to_first_snapshot.contains(manifest_file.manifest_file_path)) - plan.manifest_file_to_first_snapshot[manifest_file.manifest_file_path] = snapshot.snapshot_id; + plan.manifest_list_to_manifest_files[snapshot.manifest_list_absolute_path].push_back(manifest_file.manifest_file_absolute_path); + if (!plan.manifest_file_to_first_snapshot.contains(manifest_file.manifest_file_absolute_path)) + { + plan.manifest_file_to_first_snapshot[manifest_file.manifest_file_absolute_path] = snapshot.snapshot_id; + } auto manifest_file_content = getManifestFile( object_storage, configuration, persistent_table_components, context, log, - manifest_file.manifest_file_path, + manifest_file.manifest_file_absolute_path, manifest_file.added_sequence_number, - manifest_file.added_snapshot_id); + manifest_file.added_snapshot_id, + secondary_storages); - if (!manifest_files.contains(manifest_file.manifest_file_path)) + if (!manifest_files.contains(manifest_file.manifest_file_absolute_path)) { - manifest_files[manifest_file.manifest_file_path] = std::make_shared(current_schema); - manifest_files[manifest_file.manifest_file_path]->path = manifest_file.manifest_file_path; + manifest_files[manifest_file.manifest_file_absolute_path] = std::make_shared(current_schema); + manifest_files[manifest_file.manifest_file_absolute_path]->path = manifest_file.manifest_file_absolute_path; } - manifest_files[manifest_file.manifest_file_path]->manifest_lists_path.push_back(snapshot.manifest_list_path); + manifest_files[manifest_file.manifest_file_absolute_path]->manifest_lists_path.push_back(snapshot.manifest_list_path); + plan.manifest_file_to_snapshots[manifest_file.manifest_file_absolute_path].insert(snapshot.snapshot_id); auto data_files = manifest_file_content->getFilesWithoutDeleted(FileContentType::DATA); auto positional_delete_files = manifest_file_content->getFilesWithoutDeleted(FileContentType::POSITION_DELETE); for (const auto & pos_delete_file : positional_delete_files) @@ -180,19 +189,23 @@ Plan getPlan( if (plan.partitions.size() <= partition_index) plan.partitions.push_back({}); - IcebergDataObjectInfoPtr data_object_info = std::make_shared(data_file); + auto [resolved_storage, resolved_key] = resolveObjectStorageForPath( + persistent_table_components.table_location, data_file.file_path, object_storage, secondary_storages, context); + + IcebergDataObjectInfoPtr data_object_info = std::make_shared(data_file, resolved_storage, resolved_key); std::shared_ptr data_file_ptr; - if (!plan.path_to_data_file.contains(manifest_file.manifest_file_path)) + std::string path_identifier = resolved_storage->getDescription() + ":" + resolved_storage->getObjectsNamespace() + "|" + resolved_key; + if (!plan.path_to_data_file.contains(path_identifier)) { data_file_ptr = std::make_shared(DataFilePlan{ .data_object_info = data_object_info, - .manifest_list = manifest_files[manifest_file.manifest_file_path], + .manifest_list = manifest_files[manifest_file.manifest_file_absolute_path], .patched_path = plan.generator.generateDataFileName()}); - plan.path_to_data_file[manifest_file.manifest_file_path] = data_file_ptr; + plan.path_to_data_file[path_identifier] = data_file_ptr; } else { - data_file_ptr = plan.path_to_data_file[manifest_file.manifest_file_path]; + data_file_ptr = plan.path_to_data_file[path_identifier]; } plan.partitions[partition_index].push_back(data_file_ptr); plan.snapshot_id_to_data_files[snapshot.snapshot_id].push_back(plan.partitions[partition_index].back()); @@ -224,15 +237,20 @@ void writeDataFiles( ObjectStoragePtr object_storage, const std::optional & format_settings, ContextPtr context, - StorageObjectStorageConfigurationPtr configuration) + StorageObjectStorageConfigurationPtr configuration, + const String & table_location, + SecondaryStorages & secondary_storages) { for (auto & [_, data_file] : initial_plan.path_to_data_file) { auto delete_file_transform = std::make_shared( - sample_block, data_file->data_object_info, object_storage, format_settings, context); + sample_block, data_file->data_object_info, object_storage, format_settings, context, table_location, secondary_storages); + ObjectStoragePtr storage_to_use = data_file->data_object_info->getObjectStorage(); + if (!storage_to_use) + storage_to_use = object_storage; StorageObjectStorage::ObjectInfo object_info(data_file->data_object_info->getPath()); - auto read_buffer = createReadBuffer(object_info, object_storage, context, getLogger("IcebergCompaction")); + auto read_buffer = createReadBuffer(object_info, storage_to_use, context, getLogger("IcebergCompaction")); const Settings & settings = context->getSettingsRef(); auto parser_shared_resources = std::make_shared( @@ -390,6 +408,9 @@ void writeMetadataFiles( { manifest_entry->patched_path = plan.generator.generateManifestEntryName(); manifest_file_renamings[manifest_entry->path] = manifest_entry->patched_path.path_in_metadata; + + std::vector unique_data_filenames(data_filenames.begin(), data_filenames.end()); + auto buffer_manifest_entry = object_storage->writeObject( StoredObject(manifest_entry->patched_path.path_in_storage), WriteMode::Rewrite, @@ -407,7 +428,7 @@ void writeMetadataFiles( partition_columns, plan.partition_encoder.getPartitionValue(grouped_by_manifest_files_partitions[manifest_entry]), ChunkPartitioner(fields_from_partition_spec, current_schema, context, sample_block_).getResultTypes(), - std::vector(data_filenames.begin(), data_filenames.end()), + unique_data_filenames, manifest_entry->statistics, sample_block_, snapshot, @@ -436,16 +457,25 @@ void writeMetadataFiles( if (plan.history[i].added_files == 0) continue; - auto initial_manifest_list_name = plan.history[i].manifest_list_path; + auto initial_manifest_list_name = plan.history[i].manifest_list_absolute_path; auto initial_manifest_entries = plan.manifest_list_to_manifest_files[initial_manifest_list_name]; - auto renamed_manifest_list = manifest_list_renamings[initial_manifest_list_name]; + auto renamed_manifest_list = manifest_list_renamings[plan.history[i].manifest_list_path]; std::vector renamed_manifest_entries; + std::unordered_set seen_manifest_entries; // Deduplicate manifest entries Int32 total_manifest_file_sizes = 0; for (const auto & initial_manifest_entry : initial_manifest_entries) { auto renamed_manifest_entry = manifest_file_renamings[initial_manifest_entry]; if (!renamed_manifest_entry.empty()) { + auto it = plan.manifest_file_to_snapshots.find(initial_manifest_entry); + if (it != plan.manifest_file_to_snapshots.end() && !it->second.contains(plan.history[i].snapshot_id)) + continue; + + if (seen_manifest_entries.contains(renamed_manifest_entry)) + continue; + + seen_manifest_entries.insert(renamed_manifest_entry); renamed_manifest_entries.push_back(renamed_manifest_entry); total_manifest_file_sizes += manifest_file_sizes[renamed_manifest_entry]; } @@ -508,6 +538,7 @@ void compactIcebergTable( IcebergHistory snapshots_info, const PersistentTableComponents & persistent_table_components, ObjectStoragePtr object_storage_, + SecondaryStorages & secondary_storages_, StorageObjectStorageConfigurationPtr configuration_, const std::optional & format_settings_, SharedHeader sample_block_, @@ -515,11 +546,11 @@ void compactIcebergTable( CompressionMethod compression_method_) { auto plan - = getPlan(std::move(snapshots_info), persistent_table_components, object_storage_, configuration_, context_, compression_method_); + = getPlan(std::move(snapshots_info), persistent_table_components, object_storage_, secondary_storages_, configuration_, context_, compression_method_); if (plan.need_optimize) { auto old_files = getOldFiles(object_storage_, configuration_); - writeDataFiles(plan, sample_block_, object_storage_, format_settings_, context_, configuration_); + writeDataFiles(plan, sample_block_, object_storage_, format_settings_, context_, configuration_, persistent_table_components.table_location, secondary_storages_); writeMetadataFiles(plan, object_storage_, configuration_, context_, sample_block_); clearOldFiles(object_storage_, old_files); } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.h index c2c80ccbf6a9..16d3622c939e 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Compaction.h @@ -5,6 +5,7 @@ #include #include #include +#include namespace DB::Iceberg @@ -15,6 +16,7 @@ void compactIcebergTable( IcebergHistory snapshots_info, const PersistentTableComponents & persistent_table_components, DB::ObjectStoragePtr object_storage_, + SecondaryStorages & secondary_storages_, DB::StorageObjectStorageConfigurationPtr configuration_, const std::optional & format_settings_, DB::SharedHeader sample_block_, diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.cpp index 7462a3b59bff..057a95cfbf7a 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.cpp @@ -1,3 +1,4 @@ +#include #include "config.h" #if USE_AVRO @@ -36,18 +37,35 @@ namespace Setting extern const SettingsBool use_roaring_bitmap_iceberg_positional_deletes; }; + IcebergDataObjectInfo::IcebergDataObjectInfo(Iceberg::ManifestFileEntry data_manifest_file_entry_) - : RelativePathWithMetadata(data_manifest_file_entry_.file_path) + : PathWithMetadata(data_manifest_file_entry_.file_path, std::nullopt, + data_manifest_file_entry_.file_path_key.empty() ? std::nullopt : std::make_optional(data_manifest_file_entry_.file_path_key)) , data_object_file_path_key(data_manifest_file_entry_.file_path_key) , underlying_format_read_schema_id(data_manifest_file_entry_.schema_id) , sequence_number(data_manifest_file_entry_.added_sequence_number) { - auto toupper = [](String & str) + if (!position_deletes_objects.empty() && Poco::toUpperInPlace(data_manifest_file_entry_.file_format) != "PARQUET") { - std::transform(str.begin(), str.end(), str.begin(), ::toupper); - return str; - }; - if (!position_deletes_objects.empty() && toupper(data_manifest_file_entry_.file_format) != "PARQUET") + throw Exception( + ErrorCodes::NOT_IMPLEMENTED, + "Position deletes are only supported for data files of Parquet format in Iceberg, but got {}", + data_manifest_file_entry_.file_format); + } +} + +IcebergDataObjectInfo::IcebergDataObjectInfo( + Iceberg::ManifestFileEntry data_manifest_file_entry_, + ObjectStoragePtr resolved_storage, + const String & resolved_key) + : PathWithMetadata(resolved_key, std::nullopt, + data_manifest_file_entry_.file_path.empty() ? std::nullopt : std::make_optional(data_manifest_file_entry_.file_path), + resolved_storage) + , data_object_file_path_key(data_manifest_file_entry_.file_path_key) + , underlying_format_read_schema_id(data_manifest_file_entry_.schema_id) + , sequence_number(data_manifest_file_entry_.added_sequence_number) +{ + if (!position_deletes_objects.empty() && Poco::toUpperInPlace(data_manifest_file_entry_.file_format) != "PARQUET") { throw Exception( ErrorCodes::NOT_IMPLEMENTED, @@ -60,13 +78,15 @@ std::shared_ptr IcebergDataObjectInfo::getPositionDeleteTransf ObjectStoragePtr object_storage, const SharedHeader & header, const std::optional & format_settings, - ContextPtr context_) + ContextPtr context_, + const String & table_location, + SecondaryStorages & secondary_storages) { IcebergDataObjectInfoPtr self = shared_from_this(); if (!context_->getSettingsRef()[Setting::use_roaring_bitmap_iceberg_positional_deletes].value) - return std::make_shared(header, self, object_storage, format_settings, context_); + return std::make_shared(header, self, object_storage, format_settings, context_, table_location, secondary_storages); else - return std::make_shared(header, self, object_storage, format_settings, context_); + return std::make_shared(header, self, object_storage, format_settings, context_, table_location, secondary_storages); } } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.h index 40cbd2252928..e5e8e1b6cc09 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.h @@ -8,12 +8,13 @@ #include #include +#include #include namespace DB { -struct IcebergDataObjectInfo : public RelativePathWithMetadata, std::enable_shared_from_this +struct IcebergDataObjectInfo : public PathWithMetadata, std::enable_shared_from_this { using IcebergDataObjectInfoPtr = std::shared_ptr; @@ -21,12 +22,20 @@ struct IcebergDataObjectInfo : public RelativePathWithMetadata, std::enable_shar /// It is used to filter position deletes objects by data file path. /// It is also used to create a filter for the data object in the position delete transform. explicit IcebergDataObjectInfo(Iceberg::ManifestFileEntry data_manifest_file_entry_); + + /// Sometimes data files are located outside the table location and even in a different storage. + explicit IcebergDataObjectInfo( + Iceberg::ManifestFileEntry data_manifest_file_entry_, + ObjectStoragePtr resolved_storage, + const String & resolved_key); std::shared_ptr getPositionDeleteTransformer( ObjectStoragePtr object_storage, const SharedHeader & header, const std::optional & format_settings, - ContextPtr context_); + ContextPtr context_, + const String & table_location, + SecondaryStorages & secondary_storages); void addPositionDeleteObject(Iceberg::ManifestFileEntry position_delete_object) { diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp index dddb95d9d568..27e76d1996b6 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -42,6 +43,7 @@ #include #include #include +#include #include @@ -130,9 +132,10 @@ std::optional SingleThreadIcebergKeysIterator::next() persistent_components, local_context, log, - data_snapshot->manifest_list_entries[manifest_file_index].manifest_file_path, + data_snapshot->manifest_list_entries[manifest_file_index].manifest_file_absolute_path, data_snapshot->manifest_list_entries[manifest_file_index].added_sequence_number, - data_snapshot->manifest_list_entries[manifest_file_index].added_snapshot_id); + data_snapshot->manifest_list_entries[manifest_file_index].added_snapshot_id, + *secondary_storages); internal_data_index = 0; } auto files = files_generator(current_manifest_file_content); @@ -199,7 +202,8 @@ SingleThreadIcebergKeysIterator::SingleThreadIcebergKeysIterator( const ActionsDAG * filter_dag_, Iceberg::IcebergTableStateSnapshotPtr table_snapshot_, Iceberg::IcebergDataSnapshotPtr data_snapshot_, - PersistentTableComponents persistent_components_) + PersistentTableComponents persistent_components_, + std::shared_ptr secondary_storages_) : object_storage(object_storage_) , filter_dag(filter_dag_ ? std::make_shared(filter_dag_->clone()) : nullptr) , local_context(local_context_) @@ -221,6 +225,7 @@ SingleThreadIcebergKeysIterator::SingleThreadIcebergKeysIterator( , persistent_components(persistent_components_) , files_generator(files_generator_) , log(getLogger("IcebergIterator")) + , secondary_storages(secondary_storages_) , manifest_file_content_type(manifest_file_content_type_) { } @@ -233,9 +238,11 @@ IcebergIterator::IcebergIterator( IDataLakeMetadata::FileProgressCallback callback_, Iceberg::IcebergTableStateSnapshotPtr table_snapshot_, Iceberg::IcebergDataSnapshotPtr data_snapshot_, - PersistentTableComponents persistent_components_) + PersistentTableComponents persistent_components_, + std::shared_ptr secondary_storages_) : filter_dag(filter_dag_ ? std::make_unique(filter_dag_->clone()) : nullptr) , object_storage(std::move(object_storage_)) + , local_context(local_context_) , data_files_iterator( object_storage, local_context_, @@ -245,7 +252,8 @@ IcebergIterator::IcebergIterator( filter_dag.get(), table_snapshot_, data_snapshot_, - persistent_components_) + persistent_components_, + secondary_storages_) , deletes_iterator( object_storage, local_context_, @@ -261,7 +269,8 @@ IcebergIterator::IcebergIterator( filter_dag.get(), table_snapshot_, data_snapshot_, - persistent_components_) + persistent_components_, + secondary_storages_) , blocking_queue(100) , producer_task(local_context_->getSchedulePool().createTask( "IcebergMetaReaderThread", @@ -301,6 +310,7 @@ IcebergIterator::IcebergIterator( , compression_method(configuration_.lock()->getCompressionMethod()) , persistent_components(persistent_components_) , table_schema_id(table_snapshot_->schema_id) + , secondary_storages(secondary_storages_) { auto delete_file = deletes_iterator.next(); while (delete_file.has_value()) @@ -326,15 +336,18 @@ ObjectInfoPtr IcebergIterator::next(size_t) Iceberg::ManifestFileEntry manifest_file_entry; if (blocking_queue.pop(manifest_file_entry)) { - IcebergDataObjectInfoPtr object_info = std::make_shared(manifest_file_entry); + // Resolve the data file path to get the correct storage and key + auto [storage_to_use, resolved_key] = resolveObjectStorageForPath( + persistent_components.table_location, manifest_file_entry.file_path, object_storage, *secondary_storages, local_context); + + IcebergDataObjectInfoPtr object_info = std::make_shared(manifest_file_entry, storage_to_use, resolved_key); + for (const auto & position_delete : defineDeletesSpan(manifest_file_entry, position_deletes_files, false)) - { object_info->addPositionDeleteObject(position_delete); - } + for (const auto & equality_delete : defineDeletesSpan(manifest_file_entry, equality_deletes_files, true)) - { object_info->addEqualityDeleteObject(equality_delete); - } + object_info->setFileMetaInfo(std::make_shared( *persistent_components.schema_processor, table_schema_id, /// current schema id to use current column names diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h index 03fed84ba6e0..1a2e0b0c139f 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h @@ -25,6 +25,7 @@ #include #include #include +#include namespace DB { @@ -45,7 +46,8 @@ class SingleThreadIcebergKeysIterator const ActionsDAG * filter_dag_, IcebergTableStateSnapshotPtr table_snapshot_, IcebergDataSnapshotPtr data_snapshot_, - PersistentTableComponents persistent_components); + PersistentTableComponents persistent_components, + std::shared_ptr secondary_storages_); std::optional next(); @@ -62,7 +64,7 @@ class SingleThreadIcebergKeysIterator PersistentTableComponents persistent_components; FilesGenerator files_generator; LoggerPtr log; - + std::shared_ptr secondary_storages; // By Iceberg design it is difficult to avoid storing position deletes in memory. size_t manifest_file_index = 0; @@ -90,7 +92,8 @@ class IcebergIterator : public IObjectIterator IDataLakeMetadata::FileProgressCallback callback_, Iceberg::IcebergTableStateSnapshotPtr table_snapshot_, Iceberg::IcebergDataSnapshotPtr data_snapshot_, - Iceberg::PersistentTableComponents persistent_components_); + Iceberg::PersistentTableComponents persistent_components_, + std::shared_ptr secondary_storages_); ObjectInfoPtr next(size_t) override; @@ -100,6 +103,7 @@ class IcebergIterator : public IObjectIterator private: std::unique_ptr filter_dag; ObjectStoragePtr object_storage; + ContextPtr local_context; Iceberg::SingleThreadIcebergKeysIterator data_files_iterator; Iceberg::SingleThreadIcebergKeysIterator deletes_iterator; ConcurrentBoundedQueue blocking_queue; @@ -113,6 +117,7 @@ class IcebergIterator : public IObjectIterator std::mutex exception_mutex; Iceberg::PersistentTableComponents persistent_components; Int32 table_schema_id; + std::shared_ptr secondary_storages; // Sometimes data or manifests can be located on another storage }; } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index ade62ff01d61..138fc732537c 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -133,6 +133,7 @@ IcebergMetadata::IcebergMetadata( IcebergMetadataFilesCachePtr cache_ptr, CompressionMethod metadata_compression_method_) : object_storage(std::move(object_storage_)) + , secondary_storages(std::make_shared()) , configuration(std::move(configuration_)) , persistent_components(PersistentTableComponents{ .schema_processor = std::make_shared(context_), @@ -500,17 +501,18 @@ void IcebergMetadata::updateSnapshot(ContextPtr local_context, Poco::JSON::Objec } auto [partition_key, sorting_key] = extractIcebergKeys(metadata_object); + + String manifest_list_path = snapshot->getValue(f_manifest_list); + auto [storage_to_use, key_in_storage] = resolveObjectStorageForPath(persistent_components.table_location, manifest_list_path, object_storage, *secondary_storages, local_context); + relevant_snapshot = std::make_shared( getManifestList( - object_storage, + storage_to_use, configuration_ptr, persistent_components, - local_context, - getProperFilePathFromMetadataInfo( - snapshot->getValue(f_manifest_list), - configuration_ptr->getPathForRead().path, - persistent_components.table_location, - configuration_ptr->getNamespace()), + local_context, + key_in_storage, + makeAbsolutePath(persistent_components.table_location, manifest_list_path), log), relevant_snapshot_id, total_rows, @@ -548,6 +550,7 @@ bool IcebergMetadata::optimize(const StorageMetadataPtr & metadata_snapshot, Con snapshots_info, persistent_components, object_storage, + *secondary_storages, configuration_ptr, format_settings, sample_block, @@ -634,7 +637,7 @@ std::shared_ptr IcebergMetadata::getInitialSchemaByPath(Conte : nullptr; } -std::shared_ptr IcebergMetadata::getSchemaTransformer(ContextPtr context_, ObjectInfoPtr object_info) const +std::shared_ptr IcebergMetadata::getSchemaTransformer(ContextPtr local_context, ObjectInfoPtr object_info) const { IcebergDataObjectInfo * iceberg_object_info = dynamic_cast(object_info.get()); SharedLockGuard lock(mutex); @@ -642,7 +645,7 @@ std::shared_ptr IcebergMetadata::getSchemaTransformer(ContextP return nullptr; return (iceberg_object_info->underlying_format_read_schema_id != relevant_snapshot_schema_id) ? persistent_components.schema_processor->getSchemaTransformationDagByIds( - context_, + local_context, iceberg_object_info->underlying_format_read_schema_id, relevant_snapshot_schema_id) : nullptr; @@ -925,9 +928,10 @@ std::optional IcebergMetadata::totalRows(ContextPtr local_context) const persistent_components, local_context, log, - manifest_list_entry.manifest_file_path, + manifest_list_entry.manifest_file_absolute_path, manifest_list_entry.added_sequence_number, - manifest_list_entry.added_snapshot_id); + manifest_list_entry.added_snapshot_id, + *secondary_storages); auto data_count = manifest_file_ptr->getRowsCountInAllFilesExcludingDeleted(FileContentType::DATA); auto position_deletes_count = manifest_file_ptr->getRowsCountInAllFilesExcludingDeleted(FileContentType::POSITION_DELETE); if (!data_count.has_value() || !position_deletes_count.has_value()) @@ -965,9 +969,10 @@ std::optional IcebergMetadata::totalBytes(ContextPtr local_context) cons persistent_components, local_context, log, - manifest_list_entry.manifest_file_path, + manifest_list_entry.manifest_file_absolute_path, manifest_list_entry.added_sequence_number, - manifest_list_entry.added_snapshot_id); + manifest_list_entry.added_snapshot_id, + *secondary_storages); auto count = manifest_file_ptr->getBytesCountInAllDataFilesExcludingDeleted(); if (!count.has_value()) return {}; @@ -1009,7 +1014,8 @@ ObjectIterator IcebergMetadata::iterate( callback, table_snapshot, relevant_snapshot, - persistent_components); + persistent_components, + secondary_storages); } NamesAndTypesList IcebergMetadata::getTableSchema() const @@ -1047,7 +1053,7 @@ void IcebergMetadata::addDeleteTransformers( LOG_DEBUG(log, "Constructing filter transform for position delete, there are {} delete objects", iceberg_object_info->position_deletes_objects.size()); builder.addSimpleTransform( [&](const SharedHeader & header) - { return iceberg_object_info->getPositionDeleteTransformer(object_storage, header, format_settings, local_context); }); + { return iceberg_object_info->getPositionDeleteTransformer(object_storage, header, format_settings, local_context, persistent_components.table_location, *secondary_storages); }); } const auto & delete_files = iceberg_object_info->equality_deletes_objects; if (!delete_files.empty()) @@ -1058,9 +1064,13 @@ void IcebergMetadata::addDeleteTransformers( { /// get header of delete file Block delete_file_header; - ObjectInfo delete_file_object(delete_file.file_path); + + auto [delete_storage_to_use, resolved_delete_key] = resolveObjectStorageForPath( + persistent_components.table_location, delete_file.file_path, object_storage, *secondary_storages, local_context); + + PathWithMetadata delete_file_object(resolved_delete_key, std::nullopt, delete_file.file_path, delete_storage_to_use); { - auto schema_read_buffer = createReadBuffer(delete_file_object, object_storage, local_context, log); + auto schema_read_buffer = createReadBuffer(delete_file_object, delete_storage_to_use, local_context, log); auto schema_reader = FormatFactory::instance().getSchemaReader(delete_file.file_format, *schema_read_buffer, local_context); auto columns_with_names = schema_reader->readSchema(); ColumnsWithTypeAndName initial_header_data; @@ -1083,7 +1093,7 @@ void IcebergMetadata::addDeleteTransformers( } /// Then we read the content of the delete file. auto mutable_columns_for_set = block_for_set.cloneEmptyColumns(); - std::unique_ptr data_read_buffer = createReadBuffer(delete_file_object, object_storage, local_context, log); + std::unique_ptr data_read_buffer = createReadBuffer(delete_file_object, delete_storage_to_use, local_context, log); CompressionMethod compression_method = chooseCompressionMethod(delete_file.file_path, "auto"); auto delete_format = FormatFactory::instance().getInput( delete_file.file_format, diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h index e868814b23f8..ad1b60e86820 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h @@ -28,6 +28,7 @@ #include #include #include +#include namespace DB { @@ -129,11 +130,19 @@ class IcebergMetadata : public IDataLakeMetadata std::optional sortingKey(ContextPtr) const override; protected: + ObjectIterator createIcebergKeysIterator( + Strings && data_files_, + ObjectStoragePtr, + IDataLakeMetadata::FileProgressCallback callback_, + ContextPtr local_context); + ObjectIterator iterate(const ActionsDAG * filter_dag, FileProgressCallback callback, size_t list_batch_size, ContextPtr local_context) const override; private: const ObjectStoragePtr object_storage; + mutable std::shared_ptr secondary_storages; // Sometimes data or manifests can be located on another storage + const StorageObjectStorageConfigurationWeakPtr configuration; DB::Iceberg::PersistentTableComponents persistent_components; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadataFilesCache.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadataFilesCache.h index d60fa5fdd870..a5f136648f32 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadataFilesCache.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadataFilesCache.h @@ -31,7 +31,7 @@ namespace DB /// And we can get `ManifestFileContent` from cache by ManifestFileEntry. struct ManifestFileCacheKey { - String manifest_file_path; + String manifest_file_absolute_path; Int64 added_sequence_number; Int64 added_snapshot_id; Iceberg::ManifestFileContentType content_type; @@ -73,7 +73,7 @@ struct IcebergMetadataFilesCacheCell : private boost::noncopyable size_t total_size = 0; for (const auto & entry: manifest_file_cache_keys) { - total_size += sizeof(ManifestFileCacheKey) + entry.manifest_file_path.capacity(); + total_size += sizeof(ManifestFileCacheKey) + entry.manifest_file_absolute_path.capacity(); } return total_size; } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp index 32fe10bca0d3..d2f6f9f59d9f 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp @@ -8,9 +8,9 @@ #include #include -#include #include #include +#include #include #include @@ -150,7 +150,6 @@ ManifestFileContent::ManifestFileContent( Int64 inherited_sequence_number, Int64 inherited_snapshot_id, const String & table_location, - const String & common_namespace, DB::ContextPtr context, const String & path_to_manifest_file_) : path_to_manifest_file(path_to_manifest_file_) @@ -244,7 +243,6 @@ ManifestFileContent::ManifestFileContent( content_type = FileContentType(manifest_file_deserializer.getValueFromRowByName(i, c_data_file_content, TypeIndex::Int32).safeGet()); const auto status = ManifestEntryStatus(manifest_file_deserializer.getValueFromRowByName(i, f_status, TypeIndex::Int32).safeGet()); - if (status == ManifestEntryStatus::DELETED) continue; @@ -287,13 +285,9 @@ ManifestFileContent::ManifestFileContent( } const auto schema_id = schema_id_opt.has_value() ? schema_id_opt.value() : manifest_schema_id; - const auto file_path_key - = manifest_file_deserializer.getValueFromRowByName(i, c_data_file_file_path, TypeIndex::String).safeGet(); - const auto file_path = getProperFilePathFromMetadataInfo( - manifest_file_deserializer.getValueFromRowByName(i, c_data_file_file_path, TypeIndex::String).safeGet(), - common_path, - table_location, - common_namespace); + const auto file_path_key_field = manifest_file_deserializer.getValueFromRowByName(i, c_data_file_file_path, TypeIndex::String); + const auto file_path_key = file_path_key_field.safeGet(); + const auto file_path = makeAbsolutePath(table_location, file_path_key); /// NOTE: This is weird, because in manifest file partition looks like this: /// { diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h index 97098811c015..7967a1b8ba84 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h @@ -124,7 +124,6 @@ class ManifestFileContent : public boost::noncopyable Int64 inherited_sequence_number, Int64 inherited_snapshot_id, const std::string & table_location, - const std::string & common_namespace, DB::ContextPtr context, const String & path_to_manifest_file_); diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp index cfdfaf2eb511..0bc97042b01e 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp @@ -168,15 +168,19 @@ std::optional writeDataFiles( Field cur_value; col_data_filename.column->get(i, cur_value); + String original_path = cur_value.safeGet(); String path_without_namespace; - if (cur_value.safeGet().starts_with(configuration->getNamespace())) - path_without_namespace = cur_value.safeGet().substr(configuration->getNamespace().size()); - if (!path_without_namespace.starts_with(configuration->getPathForRead().path)) + if (original_path.starts_with(configuration->getNamespace())) + path_without_namespace = original_path.substr(configuration->getNamespace().size()); + else + path_without_namespace = original_path; + + if (!path_without_namespace.empty() && !path_without_namespace.starts_with(configuration->getPathForRead().path)) { if (path_without_namespace.starts_with('/')) path_without_namespace = path_without_namespace.substr(1); - else + else if (!path_without_namespace.empty()) path_without_namespace = "/" + path_without_namespace; } col_data_filename_without_namespaces->insert(path_without_namespace); diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.cpp index 2f670db195f3..d97b786d1ff0 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.cpp @@ -67,18 +67,22 @@ void IcebergPositionDeleteTransform::initializeDeleteSources() { /// Skip position deletes that do not match the data file path. if (position_deletes_object.reference_data_file_path.has_value() - && position_deletes_object.reference_data_file_path != iceberg_data_path) + && position_deletes_object.reference_data_file_path.value() != iceberg_data_path) continue; - auto object_path = position_deletes_object.file_path; - auto object_metadata = object_storage->getObjectMetadata(object_path); - auto object_info = std::make_shared(object_path, object_metadata); + /// Resolve the position delete file path to get the correct storage and key + /// This handles cases where delete files are outside the table location + auto [delete_storage_to_use, resolved_delete_key] = resolveObjectStorageForPath( + table_location, position_deletes_object.file_path, object_storage, secondary_storages, context); + + auto object_metadata = delete_storage_to_use->getObjectMetadata(resolved_delete_key); + PathWithMetadata delete_file_object(resolved_delete_key, object_metadata, position_deletes_object.file_path, delete_storage_to_use); String format = position_deletes_object.file_format; Block initial_header; { - std::unique_ptr read_buf_schema = createReadBuffer(*object_info, object_storage, context, log); + std::unique_ptr read_buf_schema = createReadBuffer(delete_file_object, delete_storage_to_use, context, log); auto schema_reader = FormatFactory::instance().getSchemaReader(format, *read_buf_schema, context); auto columns_with_names = schema_reader->readSchema(); ColumnsWithTypeAndName initial_header_data; @@ -89,9 +93,9 @@ void IcebergPositionDeleteTransform::initializeDeleteSources() initial_header = Block(initial_header_data); } - CompressionMethod compression_method = chooseCompressionMethod(object_path, "auto"); + CompressionMethod compression_method = chooseCompressionMethod(resolved_delete_key, "auto"); - delete_read_buffers.push_back(createReadBuffer(*object_info, object_storage, context, log)); + delete_read_buffers.push_back(createReadBuffer(delete_file_object, delete_storage_to_use, context, log)); auto syntax_result = TreeRewriter(context).analyze(where_ast, initial_header.getNamesAndTypesList()); ExpressionAnalyzer analyzer(where_ast, syntax_result, context); @@ -188,10 +192,8 @@ void IcebergBitmapPositionDeleteTransform::initialize() while (auto delete_chunk = delete_source->read()) { int position_index = getColumnIndex(delete_source, IcebergPositionDeleteTransform::positions_column_name); - int filename_index = getColumnIndex(delete_source, IcebergPositionDeleteTransform::data_file_path_column_name); auto position_column = delete_chunk.getColumns()[position_index]; - auto filename_column = delete_chunk.getColumns()[filename_index]; for (size_t i = 0; i < delete_chunk.getNumRows(); ++i) { diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.h index 96c5719e04fd..e03fe400b542 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.h @@ -27,13 +27,17 @@ class IcebergPositionDeleteTransform : public ISimpleTransform IcebergDataObjectInfoPtr iceberg_object_info_, ObjectStoragePtr object_storage_, const std::optional & format_settings_, - ContextPtr context_) + ContextPtr context_, + const String & table_location_, + SecondaryStorages & secondary_storages_) : ISimpleTransform(header_, header_, false) , header(header_) , iceberg_object_info(iceberg_object_info_) , object_storage(object_storage_) , format_settings(format_settings_) , context(context_) + , table_location(table_location_) + , secondary_storages(secondary_storages_) { initializeDeleteSources(); } @@ -52,6 +56,8 @@ class IcebergPositionDeleteTransform : public ISimpleTransform const ObjectStoragePtr object_storage; const std::optional format_settings; ContextPtr context; + const String table_location; + SecondaryStorages & secondary_storages; /// We need to keep the read buffers alive since the delete_sources depends on them. std::vector> delete_read_buffers; @@ -66,8 +72,10 @@ class IcebergBitmapPositionDeleteTransform : public IcebergPositionDeleteTransfo IcebergDataObjectInfoPtr iceberg_object_info_, ObjectStoragePtr object_storage_, const std::optional & format_settings_, - ContextPtr context_) - : IcebergPositionDeleteTransform(header_, iceberg_object_info_, object_storage_, format_settings_, context_) + ContextPtr context_, + const String & table_location_, + SecondaryStorages & secondary_storages_) + : IcebergPositionDeleteTransform(header_, iceberg_object_info_, object_storage_, format_settings_, context_, table_location_, secondary_storages_) { initialize(); } @@ -91,8 +99,10 @@ class IcebergStreamingPositionDeleteTransform : public IcebergPositionDeleteTran IcebergDataObjectInfoPtr iceberg_object_info_, ObjectStoragePtr object_storage_, const std::optional & format_settings_, - ContextPtr context_) - : IcebergPositionDeleteTransform(header_, iceberg_object_info_, object_storage_, format_settings_, context_) + ContextPtr context_, + const String & table_location_, + SecondaryStorages & secondary_storages_) + : IcebergPositionDeleteTransform(header_, iceberg_object_info_, object_storage_, format_settings_, context_, table_location_, secondary_storages_) { initialize(); } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Snapshot.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/Snapshot.h index 53eb4ec9d390..f88838168639 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Snapshot.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Snapshot.h @@ -46,6 +46,7 @@ struct IcebergHistoryRecord Int64 parent_id; bool is_current_ancestor; String manifest_list_path; + String manifest_list_absolute_path; Int32 added_files = 0; Int32 added_records = 0; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.cpp index eb7123b0d8e1..11bf98dbe953 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.cpp @@ -73,9 +73,10 @@ Iceberg::ManifestFilePtr getManifestFile( const PersistentTableComponents & persistent_table_components, ContextPtr local_context, LoggerPtr log, - const String & filename, + const String & absolute_path, Int64 inherited_sequence_number, - Int64 inherited_snapshot_id) + Int64 inherited_snapshot_id, + SecondaryStorages & secondary_storages) { auto log_level = local_context->getSettingsRef()[Setting::iceberg_metadata_log_level].value; @@ -84,34 +85,36 @@ Iceberg::ManifestFilePtr getManifestFile( auto create_fn = [&, use_iceberg_metadata_cache]() { - RelativePathWithMetadata manifest_object_info(filename); + auto [storage_to_use, resolved_key_in_storage] = resolveObjectStorageForPath( + persistent_table_components.table_location, absolute_path, object_storage, secondary_storages, local_context); + + PathWithMetadata manifest_object_info(resolved_key_in_storage, std::nullopt, absolute_path, storage_to_use); auto read_settings = local_context->getReadSettings(); /// Do not utilize filesystem cache if more precise cache enabled if (use_iceberg_metadata_cache) read_settings.enable_filesystem_cache = false; - auto buffer = createReadBuffer(manifest_object_info, object_storage, local_context, log, read_settings); - Iceberg::AvroForIcebergDeserializer manifest_file_deserializer(std::move(buffer), filename, getFormatSettings(local_context)); + auto buffer = createReadBuffer(manifest_object_info, storage_to_use, local_context, log, read_settings); + Iceberg::AvroForIcebergDeserializer manifest_file_deserializer(std::move(buffer), resolved_key_in_storage, getFormatSettings(local_context)); return std::make_shared( manifest_file_deserializer, - filename, + resolved_key_in_storage, persistent_table_components.format_version, configuration->getPathForRead().path, *persistent_table_components.schema_processor, inherited_sequence_number, inherited_snapshot_id, persistent_table_components.table_location, - configuration->getNamespace(), local_context, - filename); + absolute_path); }; if (use_iceberg_metadata_cache) { auto manifest_file = persistent_table_components.metadata_cache->getOrSetManifestFile( - IcebergMetadataFilesCache::getKey(configuration, filename), create_fn); + IcebergMetadataFilesCache::getKey(configuration, absolute_path), create_fn); return manifest_file; } return create_fn(); @@ -122,7 +125,8 @@ ManifestFileCacheKeys getManifestList( StorageObjectStorageConfigurationWeakPtr configuration, const PersistentTableComponents & persistent_table_components, ContextPtr local_context, - const String & filename, + const String & key_in_storage, + const String & absolute_path, LoggerPtr log) { auto configuration_ptr = configuration.lock(); @@ -136,7 +140,7 @@ ManifestFileCacheKeys getManifestList( auto create_fn = [&, use_iceberg_metadata_cache]() { - StorageObjectStorage::ObjectInfo object_info(filename); + PathWithMetadata object_info(key_in_storage, std::nullopt, absolute_path, object_storage); auto read_settings = local_context->getReadSettings(); /// Do not utilize filesystem cache if more precise cache enabled @@ -144,7 +148,7 @@ ManifestFileCacheKeys getManifestList( read_settings.enable_filesystem_cache = false; auto manifest_list_buf = createReadBuffer(object_info, object_storage, local_context, log, read_settings); - AvroForIcebergDeserializer manifest_list_deserializer(std::move(manifest_list_buf), filename, getFormatSettings(local_context)); + AvroForIcebergDeserializer manifest_list_deserializer(std::move(manifest_list_buf), key_in_storage, getFormatSettings(local_context)); ManifestFileCacheKeys manifest_file_cache_keys; @@ -154,18 +158,14 @@ ManifestFileCacheKeys getManifestList( dump_metadata, DB::IcebergMetadataLogLevel::ManifestListMetadata, configuration_ptr->getRawPath().path, - filename, + key_in_storage, std::nullopt); for (size_t i = 0; i < manifest_list_deserializer.rows(); ++i) { const std::string file_path = manifest_list_deserializer.getValueFromRowByName(i, f_manifest_path, TypeIndex::String).safeGet(); - const auto manifest_file_name = getProperFilePathFromMetadataInfo( - file_path, - configuration_ptr->getPathForRead().path, - persistent_table_components.table_location, - configuration_ptr->getNamespace()); + const auto manifest_absolute_path = makeAbsolutePath(persistent_table_components.table_location, file_path); Int64 added_sequence_number = 0; auto added_snapshot_id = manifest_list_deserializer.getValueFromRowByName(i, f_added_snapshot_id); if (added_snapshot_id.isNull()) @@ -184,7 +184,7 @@ ManifestFileCacheKeys getManifestList( manifest_list_deserializer.getValueFromRowByName(i, f_content, TypeIndex::Int32).safeGet()); } manifest_file_cache_keys.emplace_back( - manifest_file_name, added_sequence_number, added_snapshot_id.safeGet(), content_type); + manifest_absolute_path, added_sequence_number, added_snapshot_id.safeGet(), content_type); auto dump_row_metadata = [&]()->String { return manifest_list_deserializer.getContent(i); }; insertRowToLogTable( @@ -192,7 +192,7 @@ ManifestFileCacheKeys getManifestList( dump_row_metadata, DB::IcebergMetadataLogLevel::ManifestListEntry, configuration_ptr->getRawPath().path, - filename, + absolute_path, i); } /// We only return the list of {file name, seq number} for cache. @@ -204,7 +204,7 @@ ManifestFileCacheKeys getManifestList( ManifestFileCacheKeys manifest_file_cache_keys; if (use_iceberg_metadata_cache) manifest_file_cache_keys = persistent_table_components.metadata_cache->getOrSetManifestFileCacheKeys( - IcebergMetadataFilesCache::getKey(configuration_ptr, filename), create_fn); + IcebergMetadataFilesCache::getKey(configuration_ptr, absolute_path), create_fn); else manifest_file_cache_keys = create_fn(); return manifest_file_cache_keys; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.h index 432409678312..26f17cbd2231 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.h @@ -19,6 +19,7 @@ #include #include +#include namespace DB::Iceberg { @@ -29,9 +30,10 @@ Iceberg::ManifestFilePtr getManifestFile( const PersistentTableComponents & persistent_table_components, ContextPtr local_context, LoggerPtr log, - const String & filename, + const String & absolute_path, Int64 inherited_sequence_number, - Int64 inherited_snapshot_id); + Int64 inherited_snapshot_id, + SecondaryStorages & secondary_storages); ManifestFileCacheKeys getManifestList( @@ -39,7 +41,8 @@ ManifestFileCacheKeys getManifestList( StorageObjectStorageConfigurationWeakPtr configuration, const PersistentTableComponents & persistent_table_components, ContextPtr local_context, - const String & filename, + const String & key_in_storage, + const String & absolute_path, LoggerPtr log); std::pair parseTableSchemaV1Method(const Poco::JSON::Object::Ptr & metadata_object); diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp index 644197a140fb..5b8a657fc997 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp @@ -78,7 +78,6 @@ namespace DB::Setting namespace DB::Iceberg { - using namespace DB; void writeMessageToFile( @@ -167,95 +166,6 @@ std::optional parseTransformAndArgument(const String & tra return std::nullopt; } -// This function is used to get the file path inside the directory which corresponds to iceberg table from the full blob path which is written in manifest and metadata files. -// For example, if the full blob path is s3://bucket/table_name/data/00000-1-1234567890.avro, the function will return table_name/data/00000-1-1234567890.avro -// Common path should end with "" or "/". -std::string getProperFilePathFromMetadataInfo( - std::string_view data_path, - std::string_view common_path, - std::string_view table_location, - std::string_view common_namespace) -{ - auto trim_backward_slash = [](std::string_view str) -> std::string_view - { - if (str.ends_with('/')) - { - return str.substr(0, str.size() - 1); - } - return str; - }; - auto trim_forward_slash = [](std::string_view str) -> std::string_view - { - if (str.starts_with('/')) - { - return str.substr(1); - } - return str; - }; - common_path = trim_backward_slash(common_path); - table_location = trim_backward_slash(table_location); - - if (data_path.starts_with(table_location) && table_location.ends_with(common_path)) - { - return std::filesystem::path{common_path} / trim_forward_slash(data_path.substr(table_location.size())); - } - - - auto pos = data_path.find(common_path); - /// Valid situation when data and metadata files are stored in different directories. - if (pos == std::string::npos) - { - /// connection://bucket - auto prefix = table_location.substr(0, table_location.size() - common_path.size()); - return std::string{data_path.substr(prefix.size())}; - } - - size_t good_pos = std::string::npos; - while (pos != std::string::npos) - { - auto potential_position = pos + common_path.size(); - if ((std::string_view(data_path.data() + potential_position, 6) == "/data/") - || (std::string_view(data_path.data() + potential_position, 10) == "/metadata/")) - { - good_pos = pos; - break; - } - size_t new_pos = data_path.find(common_path, pos + 1); - if (new_pos == std::string::npos) - { - break; - } - pos = new_pos; - } - - - if (good_pos != std::string::npos) - { - return std::string{data_path.substr(good_pos)}; - } - else if (pos != std::string::npos) - { - return std::string{data_path.substr(pos)}; - } - else - { - /// Data files can have different path - pos = data_path.find("://"); - if (pos == std::string::npos) - throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected data path: '{}'", data_path); - pos = data_path.find('/', pos + 3); - if (pos == std::string::npos) - throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected data path: '{}'", data_path); - if (data_path.substr(pos + 1).starts_with(common_namespace)) - { - auto new_pos = data_path.find('/', pos + 1); - if (new_pos - pos == common_namespace.length() + 1) /// bucket in the path - pos = new_pos; - } - return std::string(data_path.substr(pos)); - } -} - enum class MostRecentMetadataFileSelectionWay { BY_LAST_UPDATED_MS_FIELD, diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h index c77bfc6d5c76..a1b559b05810 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h @@ -1,7 +1,10 @@ #pragma once + +#include #include "config.h" +#include #include #include @@ -31,12 +34,6 @@ void writeMessageToFile( std::function cleanup, DB::CompressionMethod compression_method = DB::CompressionMethod::None); -std::string getProperFilePathFromMetadataInfo( - std::string_view data_path, - std::string_view common_path, - std::string_view table_location, - std::string_view common_namespace); - struct TransformAndArgument { String transform_name; diff --git a/src/Storages/ObjectStorage/IObjectIterator.h b/src/Storages/ObjectStorage/IObjectIterator.h index 76358ea44dfc..126abd181910 100644 --- a/src/Storages/ObjectStorage/IObjectIterator.h +++ b/src/Storages/ObjectStorage/IObjectIterator.h @@ -5,8 +5,8 @@ namespace DB { -using ObjectInfo = RelativePathWithMetadata; -using ObjectInfoPtr = std::shared_ptr; +using ObjectInfo = PathWithMetadata; +using ObjectInfoPtr = std::shared_ptr; class ExpressionActions; struct IObjectIterator diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index 21d2224d6b0e..9b1fe6ea9aee 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -37,7 +37,10 @@ struct IPartitionStrategy; class StorageObjectStorage : public IStorage { public: - using ObjectInfo = RelativePathWithMetadata; + class Configuration; + using ConfigurationPtr = std::shared_ptr; + using ConfigurationObserverPtr = std::weak_ptr; + using ObjectInfo = PathWithMetadata; using ObjectInfoPtr = std::shared_ptr; using ObjectInfos = std::vector; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 0394170dbc4d..9cf1532fcd18 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -500,7 +500,9 @@ class TaskDistributor : public TaskIterator { auto task = task_distributor.getNextTask(number_of_current_replica); if (task) + { return std::make_shared(std::move(task), context); + } return std::make_shared(); } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index a07866e68cc6..9d9191daf871 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -39,7 +39,7 @@ #endif #include - +#include namespace fs = std::filesystem; namespace ProfileEvents @@ -346,10 +346,14 @@ Chunk StorageObjectStorageSource::generate() path); } + /// For _path column, use absolute_path if available (e.g., file:///home/...) + /// Otherwise, fall back to the storage path identifier + std::string path_for_virtual_column = object_info->getAbsolutePath().value_or(path); + VirtualColumnUtils::addRequestedFileLikeStorageVirtualsToChunk( chunk, read_from_format_info.requested_virtual_columns, - {.path = path, + {.path = path_for_virtual_column, .size = object_info->isArchive() ? object_info->fileSizeInArchive() : object_info->metadata->size_bytes, .filename = &filename, .last_modified = object_info->metadata->last_modified, @@ -514,6 +518,10 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade object_info->loadMetadata(object_storage, query_settings.ignore_non_existent_file); } while (not_a_path || (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0)); + + ObjectStoragePtr storage_to_use = object_info->getObjectStorage(); + if (!storage_to_use) + storage_to_use = object_storage; QueryPipelineBuilder builder; std::shared_ptr source; @@ -701,7 +709,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade else { compression_method = chooseCompressionMethod(object_info->getFileName(), configuration->getCompressionMethod()); - read_buf = createReadBuffer(*object_info, object_storage, context_, log); + read_buf = createReadBuffer(*object_info, storage_to_use, context_, log); } Block initial_header = read_from_format_info.format_header; @@ -824,6 +832,10 @@ std::unique_ptr createReadBuffer( const auto & settings = context_->getSettingsRef(); const auto & effective_read_settings = read_settings.has_value() ? read_settings.value() : context_->getReadSettings(); + ObjectStoragePtr storage_to_use = object_info.getObjectStorage(); + if (!storage_to_use) + storage_to_use = object_storage; + bool use_distributed_cache = false; #if ENABLE_DISTRIBUTED_CACHE ObjectStorageConnectionInfoPtr connection_info; @@ -831,7 +843,7 @@ std::unique_ptr createReadBuffer( && DistributedCache::Registry::instance().isReady( effective_read_settings.distributed_cache_settings.read_only_from_current_az)) { - connection_info = object_storage->getConnectionInfo(); + connection_info = storage_to_use->getConnectionInfo(); if (connection_info) use_distributed_cache = true; } @@ -844,15 +856,15 @@ std::unique_ptr createReadBuffer( filesystem_cache_name = settings[Setting::filesystem_cache_name].value; use_filesystem_cache = effective_read_settings.enable_filesystem_cache && !filesystem_cache_name.empty() - && (object_storage->getType() == ObjectStorageType::Azure - || object_storage->getType() == ObjectStorageType::S3); + && (storage_to_use->getType() == ObjectStorageType::Azure + || storage_to_use->getType() == ObjectStorageType::S3); } /// We need object metadata for two cases: /// 1. object size suggests whether we need to use prefetch /// 2. object etag suggests a cache key in case we use filesystem cache if (!object_info.metadata) - object_info.metadata = object_storage->getObjectMetadata(object_info.getPath()); + object_info.metadata = storage_to_use->getObjectMetadata(object_info.getPath()); const auto & object_size = object_info.metadata->size_bytes; @@ -888,9 +900,9 @@ std::unique_ptr createReadBuffer( { const std::string path = object_info.getPath(); StoredObject object(path, "", object_size); - auto read_buffer_creator = [object, nested_buffer_read_settings, object_storage]() + auto read_buffer_creator = [object, nested_buffer_read_settings, storage_to_use]() { - return object_storage->readObject(object, nested_buffer_read_settings); + return storage_to_use->readObject(object, nested_buffer_read_settings); }; impl = std::make_unique( @@ -923,9 +935,9 @@ std::unique_ptr createReadBuffer( const auto cache_key = FileCacheKey::fromKey(hash.get128()); auto cache = FileCacheFactory::instance().get(filesystem_cache_name); - auto read_buffer_creator = [path = object_info.getPath(), object_size, nested_buffer_read_settings, object_storage]() + auto read_buffer_creator = [path = object_info.getPath(), object_size, nested_buffer_read_settings, object_storage, storage_to_use]() { - return object_storage->readObject(StoredObject(path, "", object_size), nested_buffer_read_settings); + return storage_to_use->readObject(StoredObject(path, "", object_size), nested_buffer_read_settings); }; impl = std::make_unique( @@ -953,7 +965,7 @@ std::unique_ptr createReadBuffer( } if (!impl) - impl = object_storage->readObject(StoredObject(object_info.getPath(), "", object_size), nested_buffer_read_settings); + impl = storage_to_use->readObject(StoredObject(object_info.getPath(), "", object_size), nested_buffer_read_settings); if (!use_async_buffer) return impl; @@ -1273,7 +1285,22 @@ StorageObjectStorageSource::ReadTaskIterator::ReadTaskIterator( { auto object = object_future.get(); if (object) + { + if (object->getAbsolutePath().has_value()) + { + auto [storage_to_use, key] = resolveObjectStorageForPath("", object->getAbsolutePath().value(), object_storage, secondary_storages, getContext()); + if (!key.empty()) + { + object->object_storage_to_use = storage_to_use; + object->relative_path = key; + } + } + else + { + object->object_storage_to_use = object_storage; + } buffer.push_back(object); + } } } @@ -1290,10 +1317,29 @@ StorageObjectStorage::ObjectInfoPtr StorageObjectStorageSource::ReadTaskIterator return nullptr; } - auto task = callback(); - if (!task || task->isEmpty()) + auto raw = callback(); + if (!raw || raw->isEmpty()) return nullptr; - object_info = task->getObjectInfo(); + + object_info = raw->getObjectInfo(); + + // The 'path' field from master is already the correctly resolved relative path. + // We should use it directly and NOT overwrite relative_path. + // Only resolve absolute_path if we need to determine which storage to use (for secondary storages). + object_info->object_storage_to_use = object_storage; + + if (raw->absolute_path.has_value()) + { + auto [storage_to_use, key] + = resolveObjectStorageForPath("", raw->absolute_path.value(), object_storage, secondary_storages, getContext()); + + if (!key.empty() && storage_to_use != object_storage) + { + // File is in a different storage (secondary storage), use that storage + // BUT preserve the original relative_path from master - don't overwrite it! + object_info->object_storage_to_use = storage_to_use; + } + } } else { @@ -1390,7 +1436,10 @@ StorageObjectStorageSource::ArchiveIterator::createArchiveReader(ObjectInfoPtr o return DB::createArchiveReader( /* path_to_archive */ object_info->getPath(), - /* archive_read_function */ [=, this]() { return createReadBuffer(*object_info, object_storage, getContext(), log); }, + /* archive_read_function */ [=, this]() { + ObjectStoragePtr storage = object_info->getObjectStorage() ? object_info->getObjectStorage() : object_storage; + return createReadBuffer(*object_info, storage, getContext(), log); + }, /* archive_size */ size); } @@ -1412,7 +1461,12 @@ ObjectInfoPtr StorageObjectStorageSource::ArchiveIterator::next(size_t processor } if (!archive_object->metadata) - archive_object->metadata = object_storage->getObjectMetadata(archive_object->getPath()); + { + ObjectStoragePtr storage_to_use = archive_object->getObjectStorage(); + if (!storage_to_use) + storage_to_use = object_storage; + archive_object->metadata = storage_to_use->getObjectMetadata(archive_object->getPath()); + } archive_reader = createArchiveReader(archive_object); file_enumerator = archive_reader->firstFile(); @@ -1438,7 +1492,12 @@ ObjectInfoPtr StorageObjectStorageSource::ArchiveIterator::next(size_t processor return {}; if (!archive_object->metadata) - archive_object->metadata = object_storage->getObjectMetadata(archive_object->getPath()); + { + ObjectStoragePtr storage_to_use = archive_object->getObjectStorage(); + if (!storage_to_use) + storage_to_use = object_storage; + archive_object->metadata = storage_to_use->getObjectMetadata(archive_object->getPath()); + } archive_reader = createArchiveReader(archive_object); if (!archive_reader->fileExists(path_in_archive)) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.h b/src/Storages/ObjectStorage/StorageObjectStorageSource.h index 898b57023856..ebcc159d8679 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.h @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -179,6 +180,7 @@ class StorageObjectStorageSource::ReadTaskIterator : public IObjectIterator, pri std::atomic_size_t index = 0; bool is_archive; ObjectStoragePtr object_storage; + SecondaryStorages secondary_storages; // Sometimes data can be located on a different storage /// path_to_archive -> archive reader. std::unordered_map> archive_readers; std::mutex archive_readers_mutex; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index a617d0292820..8492c6cc40ea 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -165,7 +165,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter } else { - file_path = object_info->getPath(); + file_path = object_info->getAbsolutePath().value_or(object_info->getPath()); } size_t file_replica_idx = getReplicaForFile(file_path); @@ -245,7 +245,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(s /// All unprocessed files owned by alive replicas with recenlty activity /// Need to retry after (oldest_activity - activity_limit) microseconds - RelativePathWithMetadata::CommandInTaskResponse response; + PathWithMetadata::CommandInTaskResponse response; response.set_retry_after_us(oldest_activity - activity_limit); return std::make_shared(response.to_string()); } diff --git a/src/Storages/ObjectStorage/Utils.cpp b/src/Storages/ObjectStorage/Utils.cpp index beeac43eab12..a89f1e3bfe64 100644 --- a/src/Storages/ObjectStorage/Utils.cpp +++ b/src/Storages/ObjectStorage/Utils.cpp @@ -9,6 +9,18 @@ #include #include #include +#include +#include +#include +#include +#include +#if USE_AWS_S3 +#include +#endif +#if USE_HDFS +#include +#endif + namespace DB { @@ -19,6 +31,174 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } +namespace +{ + +std::string normalizeScheme(const std::string & scheme) +{ + auto scheme_lowercase = Poco::toLower(scheme); + + if (scheme_lowercase == "s3a" || scheme_lowercase == "s3n") + scheme_lowercase = "s3"; + else if (scheme_lowercase == "wasb" || scheme_lowercase == "wasbs" || scheme_lowercase == "abfss") + scheme_lowercase = "abfs"; + + return scheme_lowercase; +} + +std::string factoryTypeForScheme(const std::string & normalized_scheme) +{ + if (normalized_scheme == "s3") return "s3"; + if (normalized_scheme == "abfs") return "azure"; + if (normalized_scheme == "hdfs") return "hdfs"; + if (normalized_scheme == "file") return "local"; + return ""; +} + +bool isAbsolutePath(const std::string & path) +{ + if (!path.empty() && (path.front() == '/' || path.find("://") != std::string_view::npos)) + return true; + + return false; +} + +/// Normalize a path string by removing redundant components and leading slashes. +std::string normalizePathString(const std::string & path) +{ + std::filesystem::path fs_path(path); + std::filesystem::path normalized = fs_path.lexically_normal(); + + std::string normalized_result = normalized.string(); + + while (!normalized_result.empty() && normalized_result.front() == '/') + normalized_result = normalized_result.substr(1); + + return normalized_result; +} + +#if USE_AWS_S3 +/// For s3:// URIs (generic), bucket needs to match. +/// For explicit http(s):// URIs, both bucket and endpoint must match. +bool s3URIMatches(const S3::URI & target_uri, const std::string & base_bucket, const std::string & base_endpoint, const std::string & target_scheme_normalized) +{ + bool bucket_matches = (target_uri.bucket == base_bucket); + bool endpoint_matches = (target_uri.endpoint == base_endpoint); + bool is_generic_s3_uri = (target_scheme_normalized == "s3"); + return bucket_matches && (endpoint_matches || is_generic_s3_uri); +} +#endif + +std::pair getOrCreateStorageAndKey( + const std::string & cache_key, + const std::string & key_to_use, + const std::string & storage_type, + SecondaryStorages & secondary_storages, + const ContextPtr & context, + std::function configure_fn) +{ + { + std::lock_guard lock(secondary_storages.mutex); + if (auto it = secondary_storages.storages.find(cache_key); it != secondary_storages.storages.end()) + return {it->second, key_to_use}; + } + + Poco::AutoPtr cfg(new Poco::Util::MapConfiguration); + const std::string config_prefix = "object_storages." + cache_key; + + cfg->setString(config_prefix + ".object_storage_type", storage_type); + + configure_fn(*cfg, config_prefix); + + auto & factory = ObjectStorageFactory::instance(); + ObjectStoragePtr storage = factory.create(cache_key, *cfg, config_prefix, context, /*skip_access_check*/ true); + + { + std::lock_guard lock(secondary_storages.mutex); + auto [it, inserted] = secondary_storages.storages.emplace(cache_key, storage); + if (!inserted) + return {it->second, key_to_use}; + } + + return {storage, key_to_use}; +} + +/// Normalize a path (relative to table location ot absolute path) to a key that will be looked up in the object storage. +std::string normalizePathToStorageRoot(const std::string & table_location, const std::string & path) +{ + if (table_location.empty()) + { + if (!path.empty() && path.front() == '/') + return path.substr(1); + return path; + } + + if (isAbsolutePath(path)) + return SchemeAuthorityKey(path).key; // Absolute path, return the key part + + SchemeAuthorityKey base{table_location}; + if (base.key.empty()) + return path; // Table location is empty, return the path as is + + std::string base_key_trimmed = base.key; + while (!base_key_trimmed.empty() && base_key_trimmed.front() == '/') + base_key_trimmed = base_key_trimmed.substr(1); + while (!base_key_trimmed.empty() && base_key_trimmed.back() == '/') + base_key_trimmed.pop_back(); + + std::string rel_path = path; + while (!rel_path.empty() && rel_path.front() == '/') + rel_path = rel_path.substr(1); + + if (!base_key_trimmed.empty() && (rel_path == base_key_trimmed || rel_path.starts_with(base_key_trimmed + "/"))) + return normalizePathString(rel_path); // Path already includes table location + + std::string result = base.key; + if (!result.empty() && result.back() != '/') + result += '/'; + result += rel_path; + + return normalizePathString(result); +} + +} + +// TODO: handle https://s3.amazonaws.com/bucketname/... properly +SchemeAuthorityKey::SchemeAuthorityKey(const std::string & uri) +{ + if (uri.empty()) + return; + + if (auto scheme_sep = uri.find("://"); scheme_sep != std::string_view::npos) + { + scheme = Poco::toLower(uri.substr(0, scheme_sep)); + auto rest = uri.substr(scheme_sep + 3); // skip :// + + // authority is up to next '/' + auto slash = rest.find('/'); + if (slash == std::string_view::npos) + { + authority = std::string(rest); + key = "/"; // Path obviously incorrect, but it will be dealt with by caller + return; + } + authority = std::string(rest.substr(0, slash)); + key = std::string(rest.substr(++slash)); // do not keep leading '/' + return; + } + + // if part has no scheme and starts with '/' -- it is an absolute uri for local file: file:///path + if (uri.front() == '/') + { + scheme = "file"; + key = std::string(uri); + return; + } + + // Relative path + key = std::string(uri); +} + std::optional checkAndGetNewFileOnInsertIfNeeded( const IObjectStorage & object_storage, const StorageObjectStorageConfiguration & configuration, @@ -117,11 +297,210 @@ void validateSupportedColumns( } } -namespace Setting +std::string makeAbsolutePath(const std::string & table_location, const std::string & path) { -extern const SettingsUInt64 max_download_buffer_size; -extern const SettingsBool use_cache_for_count_from_files; -extern const SettingsString filesystem_cache_name; -extern const SettingsUInt64 filesystem_cache_boundary_alignment; + if (isAbsolutePath(path)) + return path; + + auto table_location_decomposed = SchemeAuthorityKey(table_location); + + std::string normalized_key = normalizePathToStorageRoot(table_location, path); + + if (!table_location_decomposed.scheme.empty()) + return table_location_decomposed.scheme + "://" + table_location_decomposed.authority + "/" + normalized_key; + + return normalized_key; } + +std::pair resolveObjectStorageForPath( + const std::string & table_location, + const std::string & path, + const DB::ObjectStoragePtr & base_storage, + SecondaryStorages & secondary_storages, + const DB::ContextPtr & context) +{ + if (!isAbsolutePath(path)) + return {base_storage, normalizePathToStorageRoot(table_location, path)}; // Relative path definitely goes to base storage + + SchemeAuthorityKey table_location_decomposed{table_location}; + SchemeAuthorityKey target_decomposed{path}; + + const std::string base_scheme_normalized = normalizeScheme(table_location_decomposed.scheme); + const std::string target_scheme_normalized = normalizeScheme(target_decomposed.scheme); + + // For S3 URIs, use S3::URI to properly handle all kinds of URIs, e.g. https://s3.amazonaws.com/bucket/... == s3://bucket/... + #if USE_AWS_S3 + if (target_scheme_normalized == "s3" || target_scheme_normalized == "https" || target_scheme_normalized == "http") + { + std::string normalized_path = path; + if (target_decomposed.scheme == "s3a" || target_decomposed.scheme == "s3n") + { + normalized_path = "s3://" + target_decomposed.authority + "/" + target_decomposed.key; + } + S3::URI s3_uri(normalized_path); + + std::string key_to_use = s3_uri.key; + + bool use_base_storage = false; + if (base_storage->getType() == ObjectStorageType::S3) + { + if (auto s3_storage = std::dynamic_pointer_cast(base_storage)) + { + const std::string base_bucket = s3_storage->getObjectsNamespace(); + const std::string base_endpoint = s3_storage->getDescription(); + + if (s3URIMatches(s3_uri, base_bucket, base_endpoint, target_scheme_normalized)) + use_base_storage = true; + } + } + + if (!use_base_storage && (base_scheme_normalized == "s3" || base_scheme_normalized == "https" || base_scheme_normalized == "http")) + { + std::string normalized_table_location = table_location; + if (table_location_decomposed.scheme == "s3a" || table_location_decomposed.scheme == "s3n") + { + normalized_table_location = "s3://" + table_location_decomposed.authority + "/" + table_location_decomposed.key; + } + S3::URI base_s3_uri(normalized_table_location); + + if (s3URIMatches(s3_uri, base_s3_uri.bucket, base_s3_uri.endpoint, target_scheme_normalized)) + use_base_storage = true; + } + + if (use_base_storage) + return {base_storage, key_to_use}; + + const std::string storage_cache_key = "s3://" + s3_uri.bucket + "@" + (s3_uri.endpoint.empty() ? "amazonaws.com" : s3_uri.endpoint); + + return getOrCreateStorageAndKey( + storage_cache_key, + key_to_use, + "s3", + secondary_storages, + context, + [&](Poco::Util::MapConfiguration & cfg, const std::string & config_prefix) + { + // Use the full endpoint or construct it from bucket + std::string endpoint = s3_uri.endpoint.empty() + ? ("https://" + s3_uri.bucket + ".s3.amazonaws.com") + : s3_uri.endpoint; + cfg.setString(config_prefix + ".endpoint", endpoint); + + // Copy credentials from base storage if it's also S3 + if (base_storage->getType() == ObjectStorageType::S3) + { + if (auto s3_storage = std::dynamic_pointer_cast(base_storage)) + { + if (auto s3_client = s3_storage->tryGetS3StorageClient()) + { + const auto credentials = s3_client->getCredentials(); + const String & access_key_id = credentials.GetAWSAccessKeyId(); + const String & secret_access_key = credentials.GetAWSSecretKey(); + const String & session_token = credentials.GetSessionToken(); + const String & region = s3_client->getRegion(); + + if (!access_key_id.empty()) + cfg.setString(config_prefix + ".access_key_id", access_key_id); + if (!secret_access_key.empty()) + cfg.setString(config_prefix + ".secret_access_key", secret_access_key); + if (!session_token.empty()) + cfg.setString(config_prefix + ".session_token", session_token); + if (!region.empty()) + cfg.setString(config_prefix + ".region", region); + } + } + } + }); + } + #endif + + #if USE_HDFS + if (target_scheme_normalized == "hdfs") + { + bool use_base_storage = false; + + // Check if base_storage matches (only if it's HDFS) + if (base_storage->getType() == ObjectStorageType::HDFS) + { + if (auto hdfs_storage = std::dynamic_pointer_cast(base_storage)) + { + const std::string base_url = hdfs_storage->getDescription(); + // Extract endpoint from base URL (hdfs://namenode:port/path -> hdfs://namenode:port) + std::string base_endpoint; + if (auto pos = base_url.find('/', base_url.find("//") + 2); pos != std::string::npos) + base_endpoint = base_url.substr(0, pos); + else + base_endpoint = base_url; + + // For HDFS, compare endpoints (namenode addresses) + std::string target_endpoint = target_scheme_normalized + "://" + target_decomposed.authority; + + if (base_endpoint == target_endpoint) + use_base_storage = true; + + // Also check if table_location matches + if (!use_base_storage && base_scheme_normalized == "hdfs") + { + if (table_location_decomposed.authority == target_decomposed.authority) + use_base_storage = true; + } + } + } + + if (use_base_storage) + return {base_storage, target_decomposed.key}; + } + #endif + + /// Fallback for schemes not handled above (e.g., abfs, file) + if (base_scheme_normalized == target_scheme_normalized && table_location_decomposed.authority == target_decomposed.authority) + return {base_storage, target_decomposed.key}; + + const std::string cache_key = target_scheme_normalized + "://" + target_decomposed.authority; + + const std::string type_for_factory = factoryTypeForScheme(target_scheme_normalized); + if (type_for_factory.empty()) + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unsupported storage scheme '{}' in path '{}'", target_scheme_normalized, path); + + std::string key_to_use = target_decomposed.key; + if (target_scheme_normalized == "file") + key_to_use = "/" + target_decomposed.key; // file:///absolute/path/to/file -> key = /absolute/path/to/file (full POSIX path) + + /// Handle storage types that need new storage creation + return getOrCreateStorageAndKey( + cache_key, + key_to_use, + type_for_factory, + secondary_storages, + context, + [&](Poco::Util::MapConfiguration & cfg, const std::string & config_prefix) + { + if (target_scheme_normalized == "file") + { + std::filesystem::path fs_path(key_to_use); + std::filesystem::path parent = fs_path.parent_path(); + std::string dir_path = parent.string(); + + if (dir_path.empty() || dir_path == "/") + dir_path = "/"; + else if (dir_path.back() != '/') + dir_path += '/'; + + cfg.setString(config_prefix + ".path", dir_path); + } + else if (target_scheme_normalized == "abfs") + { + cfg.setString(config_prefix + ".endpoint", target_scheme_normalized + "://" + target_decomposed.authority); + } + else if (target_scheme_normalized == "hdfs") + { + // HDFS endpoint must end with '/' + auto endpoint = target_scheme_normalized + "://" + target_decomposed.authority; + if (!endpoint.empty() && endpoint.back() != '/') + endpoint.push_back('/'); + cfg.setString(config_prefix + ".endpoint", endpoint); + } + }); +} + } diff --git a/src/Storages/ObjectStorage/Utils.h b/src/Storages/ObjectStorage/Utils.h index da635dd5af60..2d99dbe269b7 100644 --- a/src/Storages/ObjectStorage/Utils.h +++ b/src/Storages/ObjectStorage/Utils.h @@ -1,11 +1,35 @@ #pragma once #include +#include +#include +#include + namespace DB { class IObjectStorage; +/// Thread-safe wrapper for secondary object storages map +struct SecondaryStorages +{ + mutable std::mutex mutex; + std::map storages; +}; + +// A URI splitted into components +// s3://bucket/a/b -> scheme="s3", authority="bucket", path="/a/b" +// file:///var/x -> scheme="file", authority="", path="/var/x" +// /abs/p -> scheme="", authority="", path="/abs/p" +struct SchemeAuthorityKey +{ + explicit SchemeAuthorityKey(const std::string & uri); + + std::string scheme; + std::string authority; + std::string key; +}; + std::optional checkAndGetNewFileOnInsertIfNeeded( const IObjectStorage & object_storage, const StorageObjectStorageConfiguration & configuration, @@ -31,4 +55,17 @@ std::unique_ptr createReadBuffer( const ContextPtr & context_, const LoggerPtr & log, const std::optional & read_settings = std::nullopt); + +std::string makeAbsolutePath(const std::string & table_location, const std::string & path); + +/// Resolve object storage and key for reading from that storage +/// If path is relative -- it must be read from base_storage +/// Otherwise, look for a suitable storage in secondary_storages +std::pair resolveObjectStorageForPath( + const std::string & table_location, + const std::string & path, + const DB::ObjectStoragePtr & base_storage, + SecondaryStorages & secondary_storages, + const DB::ContextPtr & context); + } diff --git a/tests/queries/0_stateless/data_minio/field_ids_complex_test/metadata/v1.metadata.json b/tests/queries/0_stateless/data_minio/field_ids_complex_test/metadata/v1.metadata.json index 8d367d20f041..a983881af8f0 100644 --- a/tests/queries/0_stateless/data_minio/field_ids_complex_test/metadata/v1.metadata.json +++ b/tests/queries/0_stateless/data_minio/field_ids_complex_test/metadata/v1.metadata.json @@ -1,7 +1,7 @@ { "format-version" : 2, "table-uuid" : "d4b695ca-ceeb-4537-8a2a-eee90dc6e313", - "location" : "s3a://test/field_ids_struct_test/metadata/field_ids_complex_test", + "location" : "s3a://test/field_ids_complex_test", "last-sequence-number" : 1, "last-updated-ms" : 1757661733693, "last-column-id" : 9, @@ -96,7 +96,7 @@ "total-position-deletes" : "0", "total-equality-deletes" : "0" }, - "manifest-list" : "s3a://test/field_ids_struct_test/metadata/field_ids_complex_test/metadata/snap-607752583403487091-1-140c8dff-1d83-4841-bc40-9aa85205b555.avro", + "manifest-list" : "s3a://test/field_ids_complex_test/metadata/snap-607752583403487091-1-140c8dff-1d83-4841-bc40-9aa85205b555.avro", "schema-id" : 0 } ], "statistics" : [ ], diff --git a/tests/queries/0_stateless/data_minio/field_ids_struct_test/metadata/v1.metadata.json b/tests/queries/0_stateless/data_minio/field_ids_struct_test/metadata/v1.metadata.json index 2d149abb44e7..d6c9079228ac 100644 --- a/tests/queries/0_stateless/data_minio/field_ids_struct_test/metadata/v1.metadata.json +++ b/tests/queries/0_stateless/data_minio/field_ids_struct_test/metadata/v1.metadata.json @@ -1,7 +1,7 @@ { "format-version" : 2, "table-uuid" : "149ecc15-7afc-4311-86b3-3a4c8d4ec08e", - "location" : "s3a://test/field_ids_struct_test/metadata/field_ids_struct_test", + "location" : "s3a://test/field_ids_struct_test", "last-sequence-number" : 1, "last-updated-ms" : 1753959190403, "last-column-id" : 6, @@ -84,7 +84,7 @@ "total-position-deletes" : "0", "total-equality-deletes" : "0" }, - "manifest-list" : "s3a://test/field_ids_struct_test/metadata/field_ids_struct_test/metadata/snap-2512638186869817292-1-ec467367-15a4-4610-8ea8-cf76797afb03.avro", + "manifest-list" : "s3a://test/field_ids_struct_test/metadata/snap-2512638186869817292-1-ec467367-15a4-4610-8ea8-cf76797afb03.avro", "schema-id" : 0 } ], "statistics" : [ ], diff --git a/tests/queries/0_stateless/data_minio/field_ids_table_test/metadata/v1.metadata.json b/tests/queries/0_stateless/data_minio/field_ids_table_test/metadata/v1.metadata.json index 32225eb618ad..1ddc3492cc82 100644 --- a/tests/queries/0_stateless/data_minio/field_ids_table_test/metadata/v1.metadata.json +++ b/tests/queries/0_stateless/data_minio/field_ids_table_test/metadata/v1.metadata.json @@ -1,7 +1,7 @@ { "format-version" : 2, "table-uuid" : "8f1f9ae2-18bb-421e-b640-ec2f85e67bce", - "location" : "s3a://test/field_ids_table_test/metadata/field_ids_table_test", + "location" : "s3a://test/field_ids_table_test", "last-sequence-number" : 1, "last-updated-ms" : 1752481476160, "last-column-id" : 1, @@ -56,7 +56,7 @@ "total-position-deletes" : "0", "total-equality-deletes" : "0" }, - "manifest-list" : "s3a://test/field_ids_table_test/metadata/field_ids_table_test/metadata/snap-2811410366534688344-1-3b002f99-b012-4041-9a97-db477fcc7115.avro", + "manifest-list" : "s3a://test/field_ids_table_test/metadata/snap-2811410366534688344-1-3b002f99-b012-4041-9a97-db477fcc7115.avro", "schema-id" : 0 } ], "statistics" : [ ],