From c22bf24efc2b10a4ce2515f714f64b4d7e30d36f Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Thu, 30 Jan 2025 17:47:06 +0100 Subject: [PATCH 01/18] Distributed request to tables with Object Storage Engines --- src/Databases/Iceberg/DatabaseIceberg.cpp | 47 +++++--- .../Iceberg/DatabaseIcebergSettings.cpp | 1 + src/Parsers/FunctionSecretArgumentsFinder.h | 2 +- src/Storages/IStorageCluster.h | 2 + .../ObjectStorage/Azure/Configuration.cpp | 33 +++++- .../ObjectStorage/Azure/Configuration.h | 3 + .../ObjectStorage/HDFS/Configuration.cpp | 10 ++ .../ObjectStorage/HDFS/Configuration.h | 2 + .../ObjectStorage/S3/Configuration.cpp | 31 +++++- src/Storages/ObjectStorage/S3/Configuration.h | 2 + .../ObjectStorage/StorageObjectStorage.cpp | 6 +- .../ObjectStorage/StorageObjectStorage.h | 4 + .../StorageObjectStorageCluster.cpp | 105 +++++++++++++++++- .../StorageObjectStorageCluster.h | 2 + .../StorageObjectStorageSettings.cpp | 8 +- .../StorageObjectStorageSource.cpp | 2 + .../registerStorageObjectStorage.cpp | 56 +++++++--- tests/integration/test_s3_cluster/test.py | 67 +++++++++++ .../test_cluster.py | 67 +++++++++++ .../integration/test_storage_iceberg/test.py | 79 ++++++++++--- 20 files changed, 477 insertions(+), 52 deletions(-) diff --git a/src/Databases/Iceberg/DatabaseIceberg.cpp b/src/Databases/Iceberg/DatabaseIceberg.cpp index 58f4d37baac3..59a62e100786 100644 --- a/src/Databases/Iceberg/DatabaseIceberg.cpp +++ b/src/Databases/Iceberg/DatabaseIceberg.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include @@ -37,10 +38,12 @@ namespace DatabaseIcebergSetting extern const DatabaseIcebergSettingsString storage_endpoint; extern const DatabaseIcebergSettingsString oauth_server_uri; extern const DatabaseIcebergSettingsBool vended_credentials; + extern const DatabaseIcebergSettingsString object_storage_cluster; } namespace Setting { extern const SettingsBool allow_experimental_database_iceberg; + extern const SettingsString object_storage_cluster; } namespace ErrorCodes @@ -235,19 +238,37 @@ StoragePtr DatabaseIceberg::tryGetTable(const String & name, ContextPtr context_ /// no table structure in table definition AST. StorageObjectStorage::Configuration::initialize(*configuration, args, context_, /* with_table_structure */false, std::move(storage_settings)); - return std::make_shared( - configuration, - configuration->createObjectStorage(context_, /* is_readonly */ false), - context_, - StorageID(getDatabaseName(), name), - /* columns */columns, - /* constraints */ConstraintsDescription{}, - /* comment */"", - getFormatSettings(context_), - LoadingStrictnessLevel::CREATE, - /* distributed_processing */false, - /* partition_by */nullptr, - /* lazy_init */true); + auto cluster_name = settings[DatabaseIcebergSetting::object_storage_cluster].value; + if (cluster_name.empty()) + cluster_name = context_->getSettingsRef()[Setting::object_storage_cluster].value; + + if (cluster_name.empty()) + { + return std::make_shared( + configuration, + configuration->createObjectStorage(context_, /* is_readonly */ false), + context_, + StorageID(getDatabaseName(), name), + /* columns */columns, + /* constraints */ConstraintsDescription{}, + /* comment */"", + getFormatSettings(context_), + LoadingStrictnessLevel::CREATE, + /* distributed_processing */false, + /* partition_by */nullptr, + /* lazy_init */true); + } + else + { + return std::make_shared( + cluster_name, + configuration, + configuration->createObjectStorage(context_, /* is_readonly */ false), + StorageID(getDatabaseName(), name), + columns, + ConstraintsDescription{}, + context_); + } } DatabaseTablesIteratorPtr DatabaseIceberg::getTablesIterator( diff --git a/src/Databases/Iceberg/DatabaseIcebergSettings.cpp b/src/Databases/Iceberg/DatabaseIcebergSettings.cpp index 37b4909106ba..4847309a6283 100644 --- a/src/Databases/Iceberg/DatabaseIcebergSettings.cpp +++ b/src/Databases/Iceberg/DatabaseIcebergSettings.cpp @@ -23,6 +23,7 @@ namespace ErrorCodes DECLARE(String, warehouse, "", "Warehouse name inside the catalog", 0) \ DECLARE(String, auth_header, "", "Authorization header of format 'Authorization: '", 0) \ DECLARE(String, storage_endpoint, "", "Object storage endpoint", 0) \ + DECLARE(String, object_storage_cluster, "", "Cluster for distributed requests", 0) \ #define LIST_OF_DATABASE_ICEBERG_SETTINGS(M, ALIAS) \ DATABASE_ICEBERG_RELATED_SETTINGS(M, ALIAS) diff --git a/src/Parsers/FunctionSecretArgumentsFinder.h b/src/Parsers/FunctionSecretArgumentsFinder.h index 9277093c6085..20bb7ab8b354 100644 --- a/src/Parsers/FunctionSecretArgumentsFinder.h +++ b/src/Parsers/FunctionSecretArgumentsFinder.h @@ -102,7 +102,7 @@ class FunctionSecretArgumentsFinder } else if ((function->name() == "s3") || (function->name() == "cosn") || (function->name() == "oss") || (function->name() == "deltaLake") || (function->name() == "hudi") || (function->name() == "iceberg") || - (function->name() == "gcs")) + (function->name() == "icebergS3") || (function->name() == "gcs")) { /// s3('url', 'aws_access_key_id', 'aws_secret_access_key', ...) findS3FunctionSecretArguments(/* is_cluster_function= */ false); diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index d000e24562ff..d75d584ddbe5 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -42,6 +42,8 @@ class IStorageCluster : public IStorage bool supportsOptimizationToSubcolumns() const override { return false; } bool supportsTrivialCountOptimization(const StorageSnapshotPtr &, ContextPtr) const override { return true; } + const String & getClusterName() const { return cluster_name; } + protected: virtual void updateBeforeRead(const ContextPtr &) {} virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {} diff --git a/src/Storages/ObjectStorage/Azure/Configuration.cpp b/src/Storages/ObjectStorage/Azure/Configuration.cpp index faa1554dbe8d..f0ef79016e88 100644 --- a/src/Storages/ObjectStorage/Azure/Configuration.cpp +++ b/src/Storages/ObjectStorage/Azure/Configuration.cpp @@ -154,6 +154,14 @@ void StorageAzureConfiguration::fromNamedCollection(const NamedCollection & coll compression_method = collection.getOrDefault("compression_method", collection.getOrDefault("compression", "auto")); blobs_paths = {blob_path}; + if (account_name && account_key) + { + if (saved_params.empty()) + { + saved_params.push_back(*account_name); + saved_params.push_back(*account_key); + } + } connection_params = getConnectionParams(connection_url, container_name, account_name, account_key, context); } @@ -173,7 +181,6 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context, std::unordered_map engine_args_to_idx; - String connection_url = checkAndGetLiteralArgument(engine_args[0], "connection_string/storage_account_url"); String container_name = checkAndGetLiteralArgument(engine_args[1], "container"); blob_path = checkAndGetLiteralArgument(engine_args[2], "blobpath"); @@ -279,6 +286,14 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context, } blobs_paths = {blob_path}; + if (account_name && account_key) + { + if (saved_params.empty()) + { + saved_params.push_back(*account_name); + saved_params.push_back(*account_key); + } + } connection_params = getConnectionParams(connection_url, container_name, account_name, account_key, context); } @@ -444,6 +459,22 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded( } } +void StorageAzureConfiguration::setFunctionArgs(ASTs & args) const +{ + if (!args.empty()) + { /// Just check + throw Exception(ErrorCodes::LOGICAL_ERROR, "Arguments are not empty"); + } + + args.push_back(std::make_shared(connection_params.endpoint.storage_account_url)); + args.push_back(std::make_shared(connection_params.endpoint.container_name)); + args.push_back(std::make_shared(blob_path)); + for (const auto & arg : saved_params) + { + args.push_back(std::make_shared(arg)); + } +} + } #endif diff --git a/src/Storages/ObjectStorage/Azure/Configuration.h b/src/Storages/ObjectStorage/Azure/Configuration.h index 72124465c462..5ab8c3d71455 100644 --- a/src/Storages/ObjectStorage/Azure/Configuration.h +++ b/src/Storages/ObjectStorage/Azure/Configuration.h @@ -79,6 +79,8 @@ class StorageAzureConfiguration : public StorageObjectStorage::Configuration ContextPtr context, bool with_structure) override; + void setFunctionArgs(ASTs & args) const override; + protected: void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override; void fromAST(ASTs & args, ContextPtr context, bool with_structure) override; @@ -86,6 +88,7 @@ class StorageAzureConfiguration : public StorageObjectStorage::Configuration std::string blob_path; std::vector blobs_paths; AzureBlobStorage::ConnectionParams connection_params; + std::vector saved_params; }; } diff --git a/src/Storages/ObjectStorage/HDFS/Configuration.cpp b/src/Storages/ObjectStorage/HDFS/Configuration.cpp index 3c4897b5062e..645f8ddd4a98 100644 --- a/src/Storages/ObjectStorage/HDFS/Configuration.cpp +++ b/src/Storages/ObjectStorage/HDFS/Configuration.cpp @@ -235,6 +235,16 @@ void StorageHDFSConfiguration::addStructureAndFormatToArgsIfNeeded( } } +void StorageHDFSConfiguration::setFunctionArgs(ASTs & args) const +{ + if (!args.empty()) + { /// Just check + throw Exception(ErrorCodes::LOGICAL_ERROR, "Arguments are not empty"); + } + + args.push_back(std::make_shared(url + path)); +} + } #endif diff --git a/src/Storages/ObjectStorage/HDFS/Configuration.h b/src/Storages/ObjectStorage/HDFS/Configuration.h index db8ab7f9e4db..dbef04aca4aa 100644 --- a/src/Storages/ObjectStorage/HDFS/Configuration.h +++ b/src/Storages/ObjectStorage/HDFS/Configuration.h @@ -65,6 +65,8 @@ class StorageHDFSConfiguration : public StorageObjectStorage::Configuration ContextPtr context, bool with_structure) override; + void setFunctionArgs(ASTs & args) const override; + private: void fromNamedCollection(const NamedCollection &, ContextPtr context) override; void fromAST(ASTs & args, ContextPtr, bool /* with_structure */) override; diff --git a/src/Storages/ObjectStorage/S3/Configuration.cpp b/src/Storages/ObjectStorage/S3/Configuration.cpp index 629628c762fa..d905884868d8 100644 --- a/src/Storages/ObjectStorage/S3/Configuration.cpp +++ b/src/Storages/ObjectStorage/S3/Configuration.cpp @@ -363,11 +363,11 @@ void StorageS3Configuration::fromAST(ASTs & args, ContextPtr context, bool with_ if (engine_args_to_idx.contains("format")) { - format = checkAndGetLiteralArgument(args[engine_args_to_idx["format"]], "format"); + auto format_ = checkAndGetLiteralArgument(args[engine_args_to_idx["format"]], "format"); /// Set format to configuration only of it's not 'auto', /// because we can have default format set in configuration. - if (format != "auto") - format = format; + if (format_ != "auto") + format = format_; } if (engine_args_to_idx.contains("structure")) @@ -585,6 +585,31 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded( } } +void StorageS3Configuration::setFunctionArgs(ASTs & args) const +{ + if (!args.empty()) + { /// Just check + throw Exception(ErrorCodes::LOGICAL_ERROR, "Arguments are not empty"); + } + + args.push_back(std::make_shared(url.uri_str)); + if (auth_settings[S3AuthSetting::no_sign_request]) + { + args.push_back(std::make_shared("NOSIGN")); + } + else + { + args.push_back(std::make_shared(auth_settings[S3AuthSetting::access_key_id].value)); + args.push_back(std::make_shared(auth_settings[S3AuthSetting::secret_access_key].value)); + if (!auth_settings[S3AuthSetting::session_token].value.empty()) + args.push_back(std::make_shared(auth_settings[S3AuthSetting::session_token].value)); + if (format != "auto") + args.push_back(std::make_shared(format)); + if (!compression_method.empty()) + args.push_back(std::make_shared(compression_method)); + } +} + } #endif diff --git a/src/Storages/ObjectStorage/S3/Configuration.h b/src/Storages/ObjectStorage/S3/Configuration.h index c4614a281899..4f5f2528caf8 100644 --- a/src/Storages/ObjectStorage/S3/Configuration.h +++ b/src/Storages/ObjectStorage/S3/Configuration.h @@ -94,6 +94,8 @@ class StorageS3Configuration : public StorageObjectStorage::Configuration ContextPtr context, bool with_structure) override; + void setFunctionArgs(ASTs & args) const override; + private: void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override; void fromAST(ASTs & args, ContextPtr context, bool with_structure) override; diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index b0284508562f..cf809d53af36 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -60,6 +60,9 @@ String StorageObjectStorage::getPathSample(ContextPtr context) if (context->getSettingsRef()[Setting::use_hive_partitioning]) local_distributed_processing = false; + if (!configuration->isArchive() && !configuration->isPathWithGlobs() && !local_distributed_processing) + return configuration->getPath(); + auto file_iterator = StorageObjectStorageSource::createFileIterator( configuration, query_settings, @@ -72,9 +75,6 @@ String StorageObjectStorage::getPathSample(ContextPtr context) {} // file_progress_callback ); - if (!configuration->isArchive() && !configuration->isPathWithGlobs() && !local_distributed_processing) - return configuration->getPath(); - if (auto file = file_iterator->next(0)) return file->getPath(); return ""; diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index 1349e24320fd..09fe3e5819d3 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -247,6 +247,10 @@ class StorageObjectStorage::Configuration virtual void update(ObjectStoragePtr object_storage, ContextPtr local_context); + virtual void setFunctionArgs(ASTs & /* args */) const + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method setFunctionArgs is not supported by storage {}", getEngineName()); + } protected: virtual void fromNamedCollection(const NamedCollection & collection, ContextPtr context) = 0; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index f59c03b737f3..604cbcb66fd8 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -4,15 +4,20 @@ #include #include #include +#include +#include +#include +#include #include #include +#include +#include #include #include #include #include - namespace DB { namespace Setting @@ -23,13 +28,19 @@ namespace Setting namespace ErrorCodes { extern const int LOGICAL_ERROR; + extern const int UNKNOWN_FUNCTION; } + String StorageObjectStorageCluster::getPathSample(StorageInMemoryMetadata metadata, ContextPtr context) { auto query_settings = configuration->getQuerySettings(context); /// We don't want to throw an exception if there are no files with specified path. query_settings.throw_on_zero_files_match = false; + + if (!configuration->isArchive() && !configuration->isPathWithGlobs()) + return configuration->getPath(); + auto file_iterator = StorageObjectStorageSource::createFileIterator( configuration, query_settings, @@ -44,6 +55,7 @@ String StorageObjectStorageCluster::getPathSample(StorageInMemoryMetadata metada if (auto file = file_iterator->next(0)) return file->getPath(); + return ""; } @@ -82,12 +94,103 @@ std::string StorageObjectStorageCluster::getName() const return configuration->getEngineName(); } +void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr & query) +{ + // Change table engine on table function for distributed request + // CREATE TABLE t (...) ENGINE=IcebergS3(...) + // SELECT * FROM t + // change on + // SELECT * FROM icebergS3(...) + // to execute on cluster nodes + + auto * select_query = query->as(); + if (!select_query || !select_query->tables()) + return; + + auto * tables = select_query->tables()->as(); + auto * table_expression = tables->children[0]->as()->table_expression->as(); + if (!table_expression->database_and_table_name) + return; + + auto & table_identifier_typed = table_expression->database_and_table_name->as(); + + auto table_alias = table_identifier_typed.tryGetAlias(); + + std::unordered_map engine_to_function = { + {"S3", "s3"}, + {"Azure", "azureBlobStorage"}, + {"HDFS", "hdfs"}, + {"IcebergS3", "icebergS3"}, + {"IcebergAzure", "icebergAzure"}, + {"IcebergHDFS", "icebergHDFS"}, + {"DeltaLake", "deltaLake"}, + {"Hudi", "hudi"} + }; + + auto p = engine_to_function.find(configuration->getEngineName()); + if (p == engine_to_function.end()) + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Can't find table function for engine {}", + configuration->getEngineName() + ); + } + + std::string table_function_name = p->second; + + auto function_ast = std::make_shared(); + function_ast->name = table_function_name; + auto arguments = std::make_shared(); + + auto cluster_name = getClusterName(); + + if (cluster_name.empty()) + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Can't be here without cluster name, no cluster name in query {}", + queryToString(query)); + } + + configuration->setFunctionArgs(arguments->children); + + function_ast->arguments = arguments; + function_ast->children.push_back(arguments); + function_ast->setAlias(table_alias); + + ASTPtr function_ast_ptr(function_ast); + + table_expression->database_and_table_name = nullptr; + table_expression->table_function = function_ast_ptr; + table_expression->children[0].swap(function_ast_ptr); + + auto settings = select_query->settings(); + if (settings) + { + auto & settings_ast = settings->as(); + settings_ast.changes.insertSetting("object_storage_cluster", cluster_name); + } + else + { + auto settings_ast_ptr = std::make_shared(); + settings_ast_ptr->is_standalone = false; + settings_ast_ptr->changes.setSetting("object_storage_cluster", cluster_name); + select_query->setExpression(ASTSelectQuery::Expression::SETTINGS, std::move(settings_ast_ptr)); + } + + cluster_name_in_settings = true; +} + void StorageObjectStorageCluster::updateQueryToSendIfNeeded( ASTPtr & query, const DB::StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) { + updateQueryForDistributedEngineIfNeeded(query); + ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(query); + if (!expression_list) { throw Exception( diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 57bd92222af1..aa3a91e03fd4 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -37,6 +37,8 @@ class StorageObjectStorageCluster : public IStorageCluster const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) override; + void updateQueryForDistributedEngineIfNeeded(ASTPtr & query); + const String engine_name; const StorageObjectStorage::ConfigurationPtr configuration; const ObjectStoragePtr object_storage; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSettings.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSettings.cpp index 383f54342036..174162fd6465 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSettings.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSettings.cpp @@ -17,7 +17,13 @@ namespace DB allow_dynamic_metadata_for_data_lakes, \ false, \ "If enabled, indicates that metadata is taken from iceberg specification that is pulled from cloud before each query.", \ - 0) + 0) \ + DECLARE( \ + String, \ + object_storage_cluster, \ + "", \ + "Cluster for distributed requests", \ + 0) \ #define LIST_OF_STORAGE_OBJECT_STORAGE_SETTINGS(M, ALIAS) \ STORAGE_OBJECT_STORAGE_RELATED_SETTINGS(M, ALIAS) \ diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index b01566d00ca3..3ed2707b33a9 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -132,6 +132,8 @@ std::shared_ptr StorageObjectStorageSourc const bool is_archive = configuration->isArchive(); + configuration->update(object_storage, local_context); + std::unique_ptr iterator; if (configuration->isPathWithGlobs()) { diff --git a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp index c56038f441a5..e5d8d5993179 100644 --- a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include "Common/logger_useful.h" @@ -19,13 +20,23 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; } +namespace Setting +{ + extern const SettingsString object_storage_cluster; +} + +namespace StorageObjectStorageSetting +{ + extern const StorageObjectStorageSettingsString object_storage_cluster; +} + namespace { // LocalObjectStorage is only supported for Iceberg Datalake operations where Avro format is required. For regular file access, use FileStorage instead. #if USE_AWS_S3 || USE_AZURE_BLOB_STORAGE || USE_HDFS || USE_AVRO -std::shared_ptr +StoragePtr createStorageObjectStorage(const StorageFactory::Arguments & args, StorageObjectStorage::ConfigurationPtr configuration, ContextPtr context) { auto & engine_args = args.engine_args; @@ -37,6 +48,8 @@ createStorageObjectStorage(const StorageFactory::Arguments & args, StorageObject queue_settings->loadFromQuery(*args.storage_def); + auto cluster_name = (*queue_settings)[StorageObjectStorageSetting::object_storage_cluster].value; + StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, context, false, std::move(queue_settings)); // Use format settings from global server context + settings from @@ -61,18 +74,35 @@ createStorageObjectStorage(const StorageFactory::Arguments & args, StorageObject if (args.storage_def->partition_by) partition_by = args.storage_def->partition_by->clone(); - return std::make_shared( - configuration, - configuration->createObjectStorage(context, /* is_readonly */ false), - args.getContext(), - args.table_id, - args.columns, - args.constraints, - args.comment, - format_settings, - args.mode, - /* distributed_processing */ false, - partition_by); + if (cluster_name.empty()) + cluster_name = context->getSettingsRef()[Setting::object_storage_cluster].value; + + if (cluster_name.empty()) + { + return std::make_shared( + configuration, + configuration->createObjectStorage(context, /* is_readonly */ false), + args.getContext(), + args.table_id, + args.columns, + args.constraints, + args.comment, + format_settings, + args.mode, + /* distributed_processing */ false, + partition_by); + } + else + { + return std::make_shared( + cluster_name, + configuration, + configuration->createObjectStorage(context, /* is_readonly */ false), + args.table_id, + args.columns, + args.constraints, + args.getContext()); + } } #endif diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index 06d25405a110..4d876e433414 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -719,3 +719,70 @@ def test_remote_no_hedged(started_cluster): ) assert TSV(pure_s3) == TSV(s3_distributed) + + +def test_distributed_s3_table_engine(started_cluster): + node = started_cluster.instances["s0_0_0"] + + resp_def = node.query( + """ + SELECT * from s3Cluster( + 'cluster_simple', + 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon) + """ + ) + + node.query("DROP TABLE IF EXISTS single_node"); + node.query( + """ + CREATE TABLE single_node + (name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))) + ENGINE=S3('http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV') + """ + ) + query_id_engine_single_node = str(uuid.uuid4()) + resp_engine_single_node = node.query( + """ + SELECT * FROM single_node ORDER BY (name, value, polygon) + """, + query_id = query_id_engine_single_node + ) + assert resp_def == resp_engine_single_node + + node.query("DROP TABLE IF EXISTS distributed"); + node.query( + """ + CREATE TABLE distributed + (name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))) + ENGINE=S3('http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV') + SETTINGS object_storage_cluster='cluster_simple' + """ + ) + query_id_engine_distributed = str(uuid.uuid4()) + resp_engine_distributed = node.query( + """ + SELECT * FROM distributed ORDER BY (name, value, polygon) + """, + query_id = query_id_engine_distributed + ) + assert resp_def == resp_engine_distributed + + node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'") + + hosts_engine_single_node = node.query( + f""" + SELECT uniq(hostname) + FROM clusterAllReplicas('cluster_simple', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id_engine_single_node}' + """ + ) + assert int(hosts_engine_single_node) == 1 + hosts_engine_distributed = node.query( + f""" + SELECT uniq(hostname) + FROM clusterAllReplicas('cluster_simple', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id_engine_distributed}' + """ + ) + assert int(hosts_engine_distributed) == 3 diff --git a/tests/integration/test_storage_azure_blob_storage/test_cluster.py b/tests/integration/test_storage_azure_blob_storage/test_cluster.py index 54e1ced79577..2036de1becd6 100644 --- a/tests/integration/test_storage_azure_blob_storage/test_cluster.py +++ b/tests/integration/test_storage_azure_blob_storage/test_cluster.py @@ -104,8 +104,57 @@ def test_select_all(cluster): query_id=query_id_distributed_alt_syntax, ) print(distributed_azure_alt_syntax) + azure_query( + node, + f""" + DROP TABLE IF EXISTS azure_engine_table_single_node; + CREATE TABLE azure_engine_table_single_node + (key UInt64, data String) + ENGINE=AzureBlobStorage( + '{storage_account_url}', + 'cont', + 'test_cluster_select_all.csv', + 'devstoreaccount1', + 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', + 'CSV', + 'auto' + ) + """, + ) + query_id_engine_single_node = str(uuid.uuid4()) + azure_engine_single_node = azure_query( + node, + "SELECT * FROM azure_engine_table_single_node", + query_id=query_id_engine_single_node, + ) + azure_query( + node, + f""" + DROP TABLE IF EXISTS azure_engine_table_distributed; + CREATE TABLE azure_engine_table_distributed + (key UInt64, data String) + ENGINE=AzureBlobStorage( + '{storage_account_url}', + 'cont', + 'test_cluster_select_all.csv', + 'devstoreaccount1', + 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', + 'CSV', + 'auto' + ) + SETTINGS object_storage_cluster='simple_cluster' + """, + ) + query_id_engine_distributed = str(uuid.uuid4()) + azure_engine_distributed = azure_query( + node, + "SELECT * FROM azure_engine_table_distributed", + query_id=query_id_engine_distributed, + ) assert TSV(pure_azure) == TSV(distributed_azure) assert TSV(pure_azure) == TSV(distributed_azure_alt_syntax) + assert TSV(pure_azure) == TSV(azure_engine_single_node) + assert TSV(pure_azure) == TSV(azure_engine_distributed) for _, node_ in cluster.instances.items(): node_.query("SYSTEM FLUSH LOGS") nodes_pure = node.query( @@ -135,6 +184,24 @@ def test_select_all(cluster): """, ) assert int(nodes_distributed_alt_syntax) == 3 + nodes_engine_single_node = node.query( + f""" + SELECT uniq(hostname) + FROM clusterAllReplicas('simple_cluster', system.query_log) + WHERE type='QueryFinish' + AND initial_query_id='{query_id_engine_single_node}' + """, + ) + assert int(nodes_engine_single_node) == 1 + nodes_engine_distributed = node.query( + f""" + SELECT uniq(hostname) + FROM clusterAllReplicas('simple_cluster', system.query_log) + WHERE type='QueryFinish' + AND initial_query_id='{query_id_engine_distributed}' + """, + ) + assert int(nodes_engine_distributed) == 3 def test_count(cluster): diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index edab338aa351..26d4e09f47c3 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -208,13 +208,17 @@ def get_creation_expression( table_function=False, allow_dynamic_metadata_for_data_lakes=False, run_on_cluster=False, + object_storage_cluster=False, **kwargs, ): - allow_dynamic_metadata_for_datalakes_suffix = ( - " SETTINGS allow_dynamic_metadata_for_data_lakes = 1" - if allow_dynamic_metadata_for_data_lakes - else "" - ) + settings_suffix = "" + if allow_dynamic_metadata_for_data_lakes or object_storage_cluster: + settings = [] + if allow_dynamic_metadata_for_data_lakes: + settings.append("allow_dynamic_metadata_for_data_lakes = 1") + if object_storage_cluster: + settings.append(f"object_storage_cluster = '{object_storage_cluster}'") + settings_suffix = " SETTINGS " + ", ".join(settings) if storage_type == "s3": if "bucket" in kwargs: @@ -234,7 +238,7 @@ def get_creation_expression( DROP TABLE IF EXISTS {table_name}; CREATE TABLE {table_name} ENGINE=IcebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')""" - + allow_dynamic_metadata_for_datalakes_suffix + + settings_suffix ) elif storage_type == "azure": @@ -254,7 +258,7 @@ def get_creation_expression( DROP TABLE IF EXISTS {table_name}; CREATE TABLE {table_name} ENGINE=IcebergAzure(azure, container = {cluster.azure_container_name}, storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})""" - + allow_dynamic_metadata_for_datalakes_suffix + + settings_suffix ) elif storage_type == "hdfs": @@ -290,7 +294,7 @@ def get_creation_expression( DROP TABLE IF EXISTS {table_name}; CREATE TABLE {table_name} ENGINE=IcebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format})""" - + allow_dynamic_metadata_for_datalakes_suffix + + settings_suffix ) else: @@ -326,10 +330,18 @@ def create_iceberg_table( table_name, cluster, format="Parquet", + object_storage_cluster=False, **kwargs, ): node.query( - get_creation_expression(storage_type, table_name, cluster, format, **kwargs) + get_creation_expression( + storage_type, + table_name, + cluster, + format, + object_storage_cluster=object_storage_cluster, + **kwargs, + ) ) @@ -662,14 +674,37 @@ def add_df(mode): .split() ) + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster, object_storage_cluster='cluster_simple') + query_id_cluster_table_engine = str(uuid.uuid4()) + select_cluster_table_engine = ( + instance.query( + f""" + SELECT * FROM {TABLE_NAME} + """, + query_id=query_id_cluster_table_engine, + ) + .strip() + .split() + ) + + select_remote_cluster = ( + instance.query(f"SELECT * FROM remote('node2',{table_function_expr_cluster})") + .strip() + .split() + ) + # Simple size check assert len(select_regular) == 600 assert len(select_cluster) == 600 assert len(select_cluster_alt_syntax) == 600 + assert len(select_cluster_table_engine) == 600 + assert len(select_remote_cluster) == 600 # Actual check assert select_cluster == select_regular assert select_cluster_alt_syntax == select_regular + assert select_cluster_table_engine == select_regular + assert select_remote_cluster == select_regular # Check query_log for replica in started_cluster.instances.values(): @@ -715,13 +750,25 @@ def add_df(mode): ) assert len(cluster_secondary_queries) == 1 - select_remote_cluster = ( - instance.query(f"SELECT * FROM remote('node2',{table_function_expr_cluster})") - .strip() - .split() - ) - assert len(select_remote_cluster) == 600 - assert select_remote_cluster == select_regular + for node_name, replica in started_cluster.instances.items(): + cluster_secondary_queries = ( + replica.query( + f""" + SELECT query, type, is_initial_query, read_rows, read_bytes FROM system.query_log + WHERE + type = 'QueryStart' + AND NOT is_initial_query + AND initial_query_id='{query_id_cluster_table_engine}' + """ + ) + .strip() + .split("\n") + ) + + logging.info( + f"[{node_name}] cluster_secondary_queries: {cluster_secondary_queries}" + ) + assert len(cluster_secondary_queries) == 1 @pytest.mark.parametrize("format_version", ["1", "2"]) From 3a11374d985b907d0f9bb7d91955da972d1f40b7 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Fri, 7 Feb 2025 10:21:36 +0100 Subject: [PATCH 02/18] Fix tests --- src/Core/SettingsChangesHistory.cpp | 1 + tests/integration/test_s3_cluster/test.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index d47763370d84..b7d2bb8113c6 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -73,6 +73,7 @@ static std::initializer_list Date: Thu, 13 Feb 2025 15:46:43 +0100 Subject: [PATCH 03/18] Fixes after review --- src/Storages/ObjectStorage/Azure/Configuration.cpp | 2 +- src/Storages/ObjectStorage/Azure/Configuration.h | 2 +- src/Storages/ObjectStorage/HDFS/Configuration.cpp | 2 +- src/Storages/ObjectStorage/HDFS/Configuration.h | 2 +- src/Storages/ObjectStorage/S3/Configuration.cpp | 2 +- src/Storages/ObjectStorage/S3/Configuration.h | 2 +- src/Storages/ObjectStorage/StorageObjectStorage.h | 4 ++-- .../ObjectStorage/StorageObjectStorageCluster.cpp | 13 ++++++++++--- .../ObjectStorage/StorageObjectStorageCluster.h | 12 ++++++++++++ 9 files changed, 30 insertions(+), 11 deletions(-) diff --git a/src/Storages/ObjectStorage/Azure/Configuration.cpp b/src/Storages/ObjectStorage/Azure/Configuration.cpp index f0ef79016e88..ff7f6810c811 100644 --- a/src/Storages/ObjectStorage/Azure/Configuration.cpp +++ b/src/Storages/ObjectStorage/Azure/Configuration.cpp @@ -459,7 +459,7 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded( } } -void StorageAzureConfiguration::setFunctionArgs(ASTs & args) const +void StorageAzureConfiguration::getTableFunctionArguments(ASTs & args) const { if (!args.empty()) { /// Just check diff --git a/src/Storages/ObjectStorage/Azure/Configuration.h b/src/Storages/ObjectStorage/Azure/Configuration.h index 5ab8c3d71455..a3de3a9377f4 100644 --- a/src/Storages/ObjectStorage/Azure/Configuration.h +++ b/src/Storages/ObjectStorage/Azure/Configuration.h @@ -79,7 +79,7 @@ class StorageAzureConfiguration : public StorageObjectStorage::Configuration ContextPtr context, bool with_structure) override; - void setFunctionArgs(ASTs & args) const override; + void getTableFunctionArguments(ASTs & args) const override; protected: void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override; diff --git a/src/Storages/ObjectStorage/HDFS/Configuration.cpp b/src/Storages/ObjectStorage/HDFS/Configuration.cpp index 645f8ddd4a98..1cb6d6ffb064 100644 --- a/src/Storages/ObjectStorage/HDFS/Configuration.cpp +++ b/src/Storages/ObjectStorage/HDFS/Configuration.cpp @@ -235,7 +235,7 @@ void StorageHDFSConfiguration::addStructureAndFormatToArgsIfNeeded( } } -void StorageHDFSConfiguration::setFunctionArgs(ASTs & args) const +void StorageHDFSConfiguration::getTableFunctionArguments(ASTs & args) const { if (!args.empty()) { /// Just check diff --git a/src/Storages/ObjectStorage/HDFS/Configuration.h b/src/Storages/ObjectStorage/HDFS/Configuration.h index dbef04aca4aa..75c570901270 100644 --- a/src/Storages/ObjectStorage/HDFS/Configuration.h +++ b/src/Storages/ObjectStorage/HDFS/Configuration.h @@ -65,7 +65,7 @@ class StorageHDFSConfiguration : public StorageObjectStorage::Configuration ContextPtr context, bool with_structure) override; - void setFunctionArgs(ASTs & args) const override; + void getTableFunctionArguments(ASTs & args) const override; private: void fromNamedCollection(const NamedCollection &, ContextPtr context) override; diff --git a/src/Storages/ObjectStorage/S3/Configuration.cpp b/src/Storages/ObjectStorage/S3/Configuration.cpp index d905884868d8..031e50715f14 100644 --- a/src/Storages/ObjectStorage/S3/Configuration.cpp +++ b/src/Storages/ObjectStorage/S3/Configuration.cpp @@ -585,7 +585,7 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded( } } -void StorageS3Configuration::setFunctionArgs(ASTs & args) const +void StorageS3Configuration::getTableFunctionArguments(ASTs & args) const { if (!args.empty()) { /// Just check diff --git a/src/Storages/ObjectStorage/S3/Configuration.h b/src/Storages/ObjectStorage/S3/Configuration.h index 4f5f2528caf8..7a71b507fb13 100644 --- a/src/Storages/ObjectStorage/S3/Configuration.h +++ b/src/Storages/ObjectStorage/S3/Configuration.h @@ -94,7 +94,7 @@ class StorageS3Configuration : public StorageObjectStorage::Configuration ContextPtr context, bool with_structure) override; - void setFunctionArgs(ASTs & args) const override; + void getTableFunctionArguments(ASTs & args) const override; private: void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override; diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index 09fe3e5819d3..739805321b95 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -247,9 +247,9 @@ class StorageObjectStorage::Configuration virtual void update(ObjectStoragePtr object_storage, ContextPtr local_context); - virtual void setFunctionArgs(ASTs & /* args */) const + virtual void getTableFunctionArguments(ASTs & /* args */) const { - throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method setFunctionArgs is not supported by storage {}", getEngineName()); + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method getTableFunctionArguments is not supported by storage {}", getEngineName()); } protected: diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index a0a787a5e7d1..fe04be5e6fab 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -108,6 +108,13 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr return; auto * tables = select_query->tables()->as(); + + if (tables->children.empty()) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Expected SELECT query from table with engine {}, got '{}'", + configuration->getEngineName(), queryToString(query)); + auto * table_expression = tables->children[0]->as()->table_expression->as(); if (!table_expression->database_and_table_name) return; @@ -116,7 +123,7 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr auto table_alias = table_identifier_typed.tryGetAlias(); - std::unordered_map engine_to_function = { + static std::unordered_map engine_to_function = { {"S3", "s3"}, {"Azure", "azureBlobStorage"}, {"HDFS", "hdfs"}, @@ -153,7 +160,7 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr queryToString(query)); } - configuration->setFunctionArgs(arguments->children); + configuration->getTableFunctionArguments(arguments->children); function_ast->arguments = arguments; function_ast->children.push_back(arguments); @@ -163,7 +170,7 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr table_expression->database_and_table_name = nullptr; table_expression->table_function = function_ast_ptr; - table_expression->children[0].swap(function_ast_ptr); + table_expression->children[0] = function_ast_ptr; auto settings = select_query->settings(); if (settings) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 941566a6d12c..89e7f8e827a1 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -37,6 +37,18 @@ class StorageObjectStorageCluster : public IStorageCluster const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) override; + /* + In case the table was created with `object_storage_cluster` setting, + modify the AST query object so that it uses the table function implementation + by mapping the engine name to table function name and setting `object_storage_cluster`. + For table like + CREATE TABLE table ENGINE=S3(...) SETTINGS object_storage_cluster='cluster' + coverts request + SELECT * FROM table + to + SELECT * FROM s3(...) SETTINGS object_storage_cluster='cluster' + to make distributed request over cluster 'cluster'. + */ void updateQueryForDistributedEngineIfNeeded(ASTPtr & query); const String engine_name; From 78261d3ed521fedc787e4377c3b187bf195cd83c Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Fri, 14 Feb 2025 12:58:15 +0100 Subject: [PATCH 04/18] More fixes after review --- src/Databases/Iceberg/DatabaseIceberg.cpp | 2 -- .../ObjectStorage/Azure/Configuration.cpp | 28 ++----------------- .../ObjectStorage/Azure/Configuration.h | 3 +- .../ObjectStorage/HDFS/Configuration.cpp | 2 -- .../ObjectStorage/S3/Configuration.cpp | 2 -- .../StorageObjectStorageCluster.cpp | 4 +++ .../registerStorageObjectStorage.cpp | 3 -- 7 files changed, 9 insertions(+), 35 deletions(-) diff --git a/src/Databases/Iceberg/DatabaseIceberg.cpp b/src/Databases/Iceberg/DatabaseIceberg.cpp index 59a62e100786..55f667684c10 100644 --- a/src/Databases/Iceberg/DatabaseIceberg.cpp +++ b/src/Databases/Iceberg/DatabaseIceberg.cpp @@ -239,8 +239,6 @@ StoragePtr DatabaseIceberg::tryGetTable(const String & name, ContextPtr context_ StorageObjectStorage::Configuration::initialize(*configuration, args, context_, /* with_table_structure */false, std::move(storage_settings)); auto cluster_name = settings[DatabaseIcebergSetting::object_storage_cluster].value; - if (cluster_name.empty()) - cluster_name = context_->getSettingsRef()[Setting::object_storage_cluster].value; if (cluster_name.empty()) { diff --git a/src/Storages/ObjectStorage/Azure/Configuration.cpp b/src/Storages/ObjectStorage/Azure/Configuration.cpp index ff7f6810c811..4774b7917c70 100644 --- a/src/Storages/ObjectStorage/Azure/Configuration.cpp +++ b/src/Storages/ObjectStorage/Azure/Configuration.cpp @@ -132,8 +132,6 @@ void StorageAzureConfiguration::fromNamedCollection(const NamedCollection & coll String connection_url; String container_name; - std::optional account_name; - std::optional account_key; if (collection.has("connection_string")) connection_url = collection.get("connection_string"); @@ -154,14 +152,6 @@ void StorageAzureConfiguration::fromNamedCollection(const NamedCollection & coll compression_method = collection.getOrDefault("compression_method", collection.getOrDefault("compression", "auto")); blobs_paths = {blob_path}; - if (account_name && account_key) - { - if (saved_params.empty()) - { - saved_params.push_back(*account_name); - saved_params.push_back(*account_key); - } - } connection_params = getConnectionParams(connection_url, container_name, account_name, account_key, context); } @@ -185,9 +175,6 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context, String container_name = checkAndGetLiteralArgument(engine_args[1], "container"); blob_path = checkAndGetLiteralArgument(engine_args[2], "blobpath"); - std::optional account_name; - std::optional account_key; - auto is_format_arg = [] (const std::string & s) -> bool { return s == "auto" || FormatFactory::instance().getAllFormats().contains(Poco::toLower(s)); @@ -286,14 +273,6 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context, } blobs_paths = {blob_path}; - if (account_name && account_key) - { - if (saved_params.empty()) - { - saved_params.push_back(*account_name); - saved_params.push_back(*account_key); - } - } connection_params = getConnectionParams(connection_url, container_name, account_name, account_key, context); } @@ -462,16 +441,15 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded( void StorageAzureConfiguration::getTableFunctionArguments(ASTs & args) const { if (!args.empty()) - { /// Just check throw Exception(ErrorCodes::LOGICAL_ERROR, "Arguments are not empty"); - } args.push_back(std::make_shared(connection_params.endpoint.storage_account_url)); args.push_back(std::make_shared(connection_params.endpoint.container_name)); args.push_back(std::make_shared(blob_path)); - for (const auto & arg : saved_params) + if (account_name && account_key) { - args.push_back(std::make_shared(arg)); + args.push_back(std::make_shared(*account_name)); + args.push_back(std::make_shared(*account_key)); } } diff --git a/src/Storages/ObjectStorage/Azure/Configuration.h b/src/Storages/ObjectStorage/Azure/Configuration.h index a3de3a9377f4..7d93acf1701c 100644 --- a/src/Storages/ObjectStorage/Azure/Configuration.h +++ b/src/Storages/ObjectStorage/Azure/Configuration.h @@ -88,7 +88,8 @@ class StorageAzureConfiguration : public StorageObjectStorage::Configuration std::string blob_path; std::vector blobs_paths; AzureBlobStorage::ConnectionParams connection_params; - std::vector saved_params; + std::optional account_name; + std::optional account_key; }; } diff --git a/src/Storages/ObjectStorage/HDFS/Configuration.cpp b/src/Storages/ObjectStorage/HDFS/Configuration.cpp index 1cb6d6ffb064..a14f2aa3965f 100644 --- a/src/Storages/ObjectStorage/HDFS/Configuration.cpp +++ b/src/Storages/ObjectStorage/HDFS/Configuration.cpp @@ -238,9 +238,7 @@ void StorageHDFSConfiguration::addStructureAndFormatToArgsIfNeeded( void StorageHDFSConfiguration::getTableFunctionArguments(ASTs & args) const { if (!args.empty()) - { /// Just check throw Exception(ErrorCodes::LOGICAL_ERROR, "Arguments are not empty"); - } args.push_back(std::make_shared(url + path)); } diff --git a/src/Storages/ObjectStorage/S3/Configuration.cpp b/src/Storages/ObjectStorage/S3/Configuration.cpp index 031e50715f14..da1af9f61823 100644 --- a/src/Storages/ObjectStorage/S3/Configuration.cpp +++ b/src/Storages/ObjectStorage/S3/Configuration.cpp @@ -588,9 +588,7 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded( void StorageS3Configuration::getTableFunctionArguments(ASTs & args) const { if (!args.empty()) - { /// Just check throw Exception(ErrorCodes::LOGICAL_ERROR, "Arguments are not empty"); - } args.push_back(std::make_shared(url.uri_str)); if (auth_settings[S3AuthSetting::no_sign_request]) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index fe04be5e6fab..9c0d864cf600 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -116,6 +116,10 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr configuration->getEngineName(), queryToString(query)); auto * table_expression = tables->children[0]->as()->table_expression->as(); + + if (!table_expression) + return; + if (!table_expression->database_and_table_name) return; diff --git a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp index e5d8d5993179..ea11cd9f353b 100644 --- a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp @@ -74,9 +74,6 @@ createStorageObjectStorage(const StorageFactory::Arguments & args, StorageObject if (args.storage_def->partition_by) partition_by = args.storage_def->partition_by->clone(); - if (cluster_name.empty()) - cluster_name = context->getSettingsRef()[Setting::object_storage_cluster].value; - if (cluster_name.empty()) { return std::make_shared( From ac37da6f69e29e676b5494a9ee92153e1951d298 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Fri, 14 Feb 2025 13:41:09 +0100 Subject: [PATCH 05/18] Rename getTableFunctionArguments to addPathAndAccessKeysToArgs --- src/Storages/ObjectStorage/Azure/Configuration.cpp | 2 +- src/Storages/ObjectStorage/Azure/Configuration.h | 2 +- src/Storages/ObjectStorage/HDFS/Configuration.cpp | 2 +- src/Storages/ObjectStorage/HDFS/Configuration.h | 2 +- src/Storages/ObjectStorage/S3/Configuration.cpp | 2 +- src/Storages/ObjectStorage/S3/Configuration.h | 2 +- src/Storages/ObjectStorage/StorageObjectStorage.h | 5 +++-- src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp | 2 +- 8 files changed, 10 insertions(+), 9 deletions(-) diff --git a/src/Storages/ObjectStorage/Azure/Configuration.cpp b/src/Storages/ObjectStorage/Azure/Configuration.cpp index 4774b7917c70..e4bcdd7e5b1a 100644 --- a/src/Storages/ObjectStorage/Azure/Configuration.cpp +++ b/src/Storages/ObjectStorage/Azure/Configuration.cpp @@ -438,7 +438,7 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded( } } -void StorageAzureConfiguration::getTableFunctionArguments(ASTs & args) const +void StorageAzureConfiguration::addPathAndAccessKeysToArgs(ASTs & args) const { if (!args.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Arguments are not empty"); diff --git a/src/Storages/ObjectStorage/Azure/Configuration.h b/src/Storages/ObjectStorage/Azure/Configuration.h index 7d93acf1701c..08b5568baf95 100644 --- a/src/Storages/ObjectStorage/Azure/Configuration.h +++ b/src/Storages/ObjectStorage/Azure/Configuration.h @@ -79,7 +79,7 @@ class StorageAzureConfiguration : public StorageObjectStorage::Configuration ContextPtr context, bool with_structure) override; - void getTableFunctionArguments(ASTs & args) const override; + void addPathAndAccessKeysToArgs(ASTs & args) const override; protected: void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override; diff --git a/src/Storages/ObjectStorage/HDFS/Configuration.cpp b/src/Storages/ObjectStorage/HDFS/Configuration.cpp index a14f2aa3965f..e9bba5c57a25 100644 --- a/src/Storages/ObjectStorage/HDFS/Configuration.cpp +++ b/src/Storages/ObjectStorage/HDFS/Configuration.cpp @@ -235,7 +235,7 @@ void StorageHDFSConfiguration::addStructureAndFormatToArgsIfNeeded( } } -void StorageHDFSConfiguration::getTableFunctionArguments(ASTs & args) const +void StorageHDFSConfiguration::addPathAndAccessKeysToArgs(ASTs & args) const { if (!args.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Arguments are not empty"); diff --git a/src/Storages/ObjectStorage/HDFS/Configuration.h b/src/Storages/ObjectStorage/HDFS/Configuration.h index 75c570901270..5d009e41d16f 100644 --- a/src/Storages/ObjectStorage/HDFS/Configuration.h +++ b/src/Storages/ObjectStorage/HDFS/Configuration.h @@ -65,7 +65,7 @@ class StorageHDFSConfiguration : public StorageObjectStorage::Configuration ContextPtr context, bool with_structure) override; - void getTableFunctionArguments(ASTs & args) const override; + void addPathAndAccessKeysToArgs(ASTs & args) const override; private: void fromNamedCollection(const NamedCollection &, ContextPtr context) override; diff --git a/src/Storages/ObjectStorage/S3/Configuration.cpp b/src/Storages/ObjectStorage/S3/Configuration.cpp index da1af9f61823..9d42745a3f26 100644 --- a/src/Storages/ObjectStorage/S3/Configuration.cpp +++ b/src/Storages/ObjectStorage/S3/Configuration.cpp @@ -585,7 +585,7 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded( } } -void StorageS3Configuration::getTableFunctionArguments(ASTs & args) const +void StorageS3Configuration::addPathAndAccessKeysToArgs(ASTs & args) const { if (!args.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Arguments are not empty"); diff --git a/src/Storages/ObjectStorage/S3/Configuration.h b/src/Storages/ObjectStorage/S3/Configuration.h index 7a71b507fb13..4f0b7e1d8fe6 100644 --- a/src/Storages/ObjectStorage/S3/Configuration.h +++ b/src/Storages/ObjectStorage/S3/Configuration.h @@ -94,7 +94,7 @@ class StorageS3Configuration : public StorageObjectStorage::Configuration ContextPtr context, bool with_structure) override; - void getTableFunctionArguments(ASTs & args) const override; + void addPathAndAccessKeysToArgs(ASTs & args) const override; private: void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override; diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index 739805321b95..2b14ee72f155 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -247,9 +247,10 @@ class StorageObjectStorage::Configuration virtual void update(ObjectStoragePtr object_storage, ContextPtr local_context); - virtual void getTableFunctionArguments(ASTs & /* args */) const + /// Add path and access arguments in the AST arguments durign conversion from table engine to table function + virtual void addPathAndAccessKeysToArgs(ASTs & /* args */) const { - throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method getTableFunctionArguments is not supported by storage {}", getEngineName()); + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method addPathAndAccessKeysToArgs is not supported by storage {}", getEngineName()); } protected: diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 9c0d864cf600..b54e4ad45cd2 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -164,7 +164,7 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr queryToString(query)); } - configuration->getTableFunctionArguments(arguments->children); + configuration->addPathAndAccessKeysToArgs(arguments->children); function_ast->arguments = arguments; function_ast->children.push_back(arguments); From db4416670ab502485fb071966b213c75e2373efa Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Fri, 14 Feb 2025 14:33:01 +0100 Subject: [PATCH 06/18] More refactoring --- .../ObjectStorage/Azure/Configuration.cpp | 17 ++++++++------- .../ObjectStorage/Azure/Configuration.h | 2 +- .../ObjectStorage/HDFS/Configuration.cpp | 9 ++++---- .../ObjectStorage/HDFS/Configuration.h | 2 +- .../ObjectStorage/S3/Configuration.cpp | 21 ++++++++++--------- src/Storages/ObjectStorage/S3/Configuration.h | 2 +- .../ObjectStorage/StorageObjectStorage.h | 6 +++--- .../StorageObjectStorageCluster.cpp | 7 ++----- 8 files changed, 32 insertions(+), 34 deletions(-) diff --git a/src/Storages/ObjectStorage/Azure/Configuration.cpp b/src/Storages/ObjectStorage/Azure/Configuration.cpp index e4bcdd7e5b1a..0e273158bbf5 100644 --- a/src/Storages/ObjectStorage/Azure/Configuration.cpp +++ b/src/Storages/ObjectStorage/Azure/Configuration.cpp @@ -438,19 +438,20 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded( } } -void StorageAzureConfiguration::addPathAndAccessKeysToArgs(ASTs & args) const +ASTPtr StorageAzureConfiguration::createArgsWithAccessData() const { - if (!args.empty()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Arguments are not empty"); + auto arguments = std::make_shared(); - args.push_back(std::make_shared(connection_params.endpoint.storage_account_url)); - args.push_back(std::make_shared(connection_params.endpoint.container_name)); - args.push_back(std::make_shared(blob_path)); + arguments->children.push_back(std::make_shared(connection_params.endpoint.storage_account_url)); + arguments->children.push_back(std::make_shared(connection_params.endpoint.container_name)); + arguments->children.push_back(std::make_shared(blob_path)); if (account_name && account_key) { - args.push_back(std::make_shared(*account_name)); - args.push_back(std::make_shared(*account_key)); + arguments->children.push_back(std::make_shared(*account_name)); + arguments->children.push_back(std::make_shared(*account_key)); } + + return arguments; } } diff --git a/src/Storages/ObjectStorage/Azure/Configuration.h b/src/Storages/ObjectStorage/Azure/Configuration.h index 08b5568baf95..c915696f2448 100644 --- a/src/Storages/ObjectStorage/Azure/Configuration.h +++ b/src/Storages/ObjectStorage/Azure/Configuration.h @@ -79,7 +79,7 @@ class StorageAzureConfiguration : public StorageObjectStorage::Configuration ContextPtr context, bool with_structure) override; - void addPathAndAccessKeysToArgs(ASTs & args) const override; + ASTPtr createArgsWithAccessData() const override; protected: void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override; diff --git a/src/Storages/ObjectStorage/HDFS/Configuration.cpp b/src/Storages/ObjectStorage/HDFS/Configuration.cpp index e9bba5c57a25..2a81a7ff78c9 100644 --- a/src/Storages/ObjectStorage/HDFS/Configuration.cpp +++ b/src/Storages/ObjectStorage/HDFS/Configuration.cpp @@ -235,12 +235,11 @@ void StorageHDFSConfiguration::addStructureAndFormatToArgsIfNeeded( } } -void StorageHDFSConfiguration::addPathAndAccessKeysToArgs(ASTs & args) const +ASTPtr StorageHDFSConfiguration::createArgsWithAccessData() const { - if (!args.empty()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Arguments are not empty"); - - args.push_back(std::make_shared(url + path)); + auto arguments = std::make_shared(); + arguments->children.push_back(std::make_shared(url + path)); + return arguments; } } diff --git a/src/Storages/ObjectStorage/HDFS/Configuration.h b/src/Storages/ObjectStorage/HDFS/Configuration.h index 5d009e41d16f..f38382e173ed 100644 --- a/src/Storages/ObjectStorage/HDFS/Configuration.h +++ b/src/Storages/ObjectStorage/HDFS/Configuration.h @@ -65,7 +65,7 @@ class StorageHDFSConfiguration : public StorageObjectStorage::Configuration ContextPtr context, bool with_structure) override; - void addPathAndAccessKeysToArgs(ASTs & args) const override; + ASTPtr createArgsWithAccessData() const override; private: void fromNamedCollection(const NamedCollection &, ContextPtr context) override; diff --git a/src/Storages/ObjectStorage/S3/Configuration.cpp b/src/Storages/ObjectStorage/S3/Configuration.cpp index 9d42745a3f26..d29e4bc130ac 100644 --- a/src/Storages/ObjectStorage/S3/Configuration.cpp +++ b/src/Storages/ObjectStorage/S3/Configuration.cpp @@ -585,27 +585,28 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded( } } -void StorageS3Configuration::addPathAndAccessKeysToArgs(ASTs & args) const +ASTPtr StorageS3Configuration::createArgsWithAccessData() const { - if (!args.empty()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Arguments are not empty"); + auto arguments = std::make_shared(); - args.push_back(std::make_shared(url.uri_str)); + arguments->children.push_back(std::make_shared(url.uri_str)); if (auth_settings[S3AuthSetting::no_sign_request]) { - args.push_back(std::make_shared("NOSIGN")); + arguments->children.push_back(std::make_shared("NOSIGN")); } else { - args.push_back(std::make_shared(auth_settings[S3AuthSetting::access_key_id].value)); - args.push_back(std::make_shared(auth_settings[S3AuthSetting::secret_access_key].value)); + arguments->children.push_back(std::make_shared(auth_settings[S3AuthSetting::access_key_id].value)); + arguments->children.push_back(std::make_shared(auth_settings[S3AuthSetting::secret_access_key].value)); if (!auth_settings[S3AuthSetting::session_token].value.empty()) - args.push_back(std::make_shared(auth_settings[S3AuthSetting::session_token].value)); + arguments->children.push_back(std::make_shared(auth_settings[S3AuthSetting::session_token].value)); if (format != "auto") - args.push_back(std::make_shared(format)); + arguments->children.push_back(std::make_shared(format)); if (!compression_method.empty()) - args.push_back(std::make_shared(compression_method)); + arguments->children.push_back(std::make_shared(compression_method)); } + + return arguments; } } diff --git a/src/Storages/ObjectStorage/S3/Configuration.h b/src/Storages/ObjectStorage/S3/Configuration.h index 4f0b7e1d8fe6..746431987601 100644 --- a/src/Storages/ObjectStorage/S3/Configuration.h +++ b/src/Storages/ObjectStorage/S3/Configuration.h @@ -94,7 +94,7 @@ class StorageS3Configuration : public StorageObjectStorage::Configuration ContextPtr context, bool with_structure) override; - void addPathAndAccessKeysToArgs(ASTs & args) const override; + ASTPtr createArgsWithAccessData() const override; private: void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override; diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index 2b14ee72f155..18f62b97ca62 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -247,10 +247,10 @@ class StorageObjectStorage::Configuration virtual void update(ObjectStoragePtr object_storage, ContextPtr local_context); - /// Add path and access arguments in the AST arguments durign conversion from table engine to table function - virtual void addPathAndAccessKeysToArgs(ASTs & /* args */) const + /// Create arguments for table function with path and access parameters + virtual ASTPtr createArgsWithAccessData() const { - throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method addPathAndAccessKeysToArgs is not supported by storage {}", getEngineName()); + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method createArgsWithAccessData is not supported by storage {}", getEngineName()); } protected: diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index b54e4ad45cd2..380deecb283d 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -152,7 +152,6 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr auto function_ast = std::make_shared(); function_ast->name = table_function_name; - auto arguments = std::make_shared(); auto cluster_name = getClusterName(); @@ -164,10 +163,8 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr queryToString(query)); } - configuration->addPathAndAccessKeysToArgs(arguments->children); - - function_ast->arguments = arguments; - function_ast->children.push_back(arguments); + function_ast->arguments = configuration->createArgsWithAccessData(); + function_ast->children.push_back(function_ast->arguments); function_ast->setAlias(table_alias); ASTPtr function_ast_ptr(function_ast); From 3fafe6f27b766b492c37fbd748ee148702ab652d Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 18 Feb 2025 11:13:56 +0100 Subject: [PATCH 07/18] Add ability to choose object storage cluster in select query --- src/Databases/Iceberg/DatabaseIceberg.cpp | 40 ++---- src/Storages/IStorageCluster.cpp | 36 +++++- src/Storages/IStorageCluster.h | 21 +++- .../StorageObjectStorageCluster.cpp | 54 +++++++- .../StorageObjectStorageCluster.h | 27 +++- .../registerStorageObjectStorage.cpp | 43 ++----- .../TableFunctionObjectStorageCluster.cpp | 8 +- .../integration/test_storage_iceberg/test.py | 116 +++++++++--------- 8 files changed, 210 insertions(+), 135 deletions(-) diff --git a/src/Databases/Iceberg/DatabaseIceberg.cpp b/src/Databases/Iceberg/DatabaseIceberg.cpp index 55f667684c10..880a27d61d9b 100644 --- a/src/Databases/Iceberg/DatabaseIceberg.cpp +++ b/src/Databases/Iceberg/DatabaseIceberg.cpp @@ -43,7 +43,6 @@ namespace DatabaseIcebergSetting namespace Setting { extern const SettingsBool allow_experimental_database_iceberg; - extern const SettingsString object_storage_cluster; } namespace ErrorCodes @@ -240,33 +239,18 @@ StoragePtr DatabaseIceberg::tryGetTable(const String & name, ContextPtr context_ auto cluster_name = settings[DatabaseIcebergSetting::object_storage_cluster].value; - if (cluster_name.empty()) - { - return std::make_shared( - configuration, - configuration->createObjectStorage(context_, /* is_readonly */ false), - context_, - StorageID(getDatabaseName(), name), - /* columns */columns, - /* constraints */ConstraintsDescription{}, - /* comment */"", - getFormatSettings(context_), - LoadingStrictnessLevel::CREATE, - /* distributed_processing */false, - /* partition_by */nullptr, - /* lazy_init */true); - } - else - { - return std::make_shared( - cluster_name, - configuration, - configuration->createObjectStorage(context_, /* is_readonly */ false), - StorageID(getDatabaseName(), name), - columns, - ConstraintsDescription{}, - context_); - } + return std::make_shared( + cluster_name, + configuration, + configuration->createObjectStorage(context_, /* is_readonly */ false), + context_, + StorageID(getDatabaseName(), name), + /* columns */columns, + /* constraints */ConstraintsDescription{}, + /* comment */"", + /* format_settings */ getFormatSettings(context_), + /* mode */ LoadingStrictnessLevel::CREATE, + /* partition_by */nullptr); } DatabaseTablesIteratorPtr DatabaseIceberg::getTablesIterator( diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 28b5a84166a2..84885032dda9 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -36,6 +36,11 @@ namespace Setting extern const SettingsBool skip_unavailable_shards; } +namespace ErrorCodes +{ + extern const int NOT_IMPLEMENTED; +} + IStorageCluster::IStorageCluster( const String & cluster_name_, const StorageID & table_id_, @@ -65,6 +70,19 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate) extension = storage->getTaskIteratorExtension(predicate, context); } +void IStorageCluster::readFallBackToPure( + QueryPlan & /*query_plan*/, + const Names & /*column_names*/, + const StorageSnapshotPtr & /*storage_snapshot*/, + SelectQueryInfo & /*query_info*/, + ContextPtr /*context*/, + QueryProcessingStage::Enum /*processed_stage*/, + size_t /*max_block_size*/, + size_t /*num_streams*/) +{ + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method readFallBackToPure is not supported by storage {}", getName()); +} + /// The code executes on initiator void IStorageCluster::read( QueryPlan & query_plan, @@ -73,13 +91,21 @@ void IStorageCluster::read( SelectQueryInfo & query_info, ContextPtr context, QueryProcessingStage::Enum processed_stage, - size_t /*max_block_size*/, - size_t /*num_streams*/) + size_t max_block_size, + size_t num_streams) { + auto cluster_name_ = getClusterName(context); + + if (cluster_name_.empty()) + { + readFallBackToPure(query_plan, column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams); + return; + } + storage_snapshot->check(column_names); updateBeforeRead(context); - auto cluster = getCluster(context); + auto cluster = getClusterImpl(context, cluster_name_); /// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*) @@ -196,9 +222,9 @@ ContextPtr ReadFromCluster::updateSettings(const Settings & settings) return new_context; } -ClusterPtr IStorageCluster::getCluster(ContextPtr context) const +ClusterPtr IStorageCluster::getClusterImpl(ContextPtr context, const String & cluster_name_) { - return context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef()); + return context->getCluster(cluster_name_)->getClusterWithReplicasAsShards(context->getSettingsRef()); } } diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index 3bcc467e3135..c6546c20957e 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -29,10 +29,10 @@ class IStorageCluster : public IStorage SelectQueryInfo & query_info, ContextPtr context, QueryProcessingStage::Enum processed_stage, - size_t /*max_block_size*/, - size_t /*num_streams*/) override; + size_t max_block_size, + size_t num_streams) override; - ClusterPtr getCluster(ContextPtr context) const; + ClusterPtr getCluster(ContextPtr context) const { return getClusterImpl(context, cluster_name); } /// Query is needed for pruning by virtual columns (_file, _path) virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const = 0; @@ -43,13 +43,26 @@ class IStorageCluster : public IStorage bool supportsOptimizationToSubcolumns() const override { return false; } bool supportsTrivialCountOptimization(const StorageSnapshotPtr &, ContextPtr) const override { return true; } - const String & getClusterName() const { return cluster_name; } + const String & getOriginalClusterName() const { return cluster_name; } + virtual String getClusterName(ContextPtr /* context */) const { return getOriginalClusterName(); } protected: virtual void updateBeforeRead(const ContextPtr &) {} virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {} + virtual void readFallBackToPure( + QueryPlan & query_plan, + const Names & column_names, + const StorageSnapshotPtr & storage_snapshot, + SelectQueryInfo & query_info, + ContextPtr context, + QueryProcessingStage::Enum processed_stage, + size_t max_block_size, + size_t num_streams); + private: + static ClusterPtr getClusterImpl(ContextPtr context, const String & cluster_name_); + LoggerPtr log; String cluster_name; }; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 380deecb283d..431a5fc67bee 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -23,6 +23,7 @@ namespace DB namespace Setting { extern const SettingsBool use_hive_partitioning; + extern const SettingsString object_storage_cluster; } namespace ErrorCodes @@ -63,15 +64,24 @@ StorageObjectStorageCluster::StorageObjectStorageCluster( const String & cluster_name_, ConfigurationPtr configuration_, ObjectStoragePtr object_storage_, + ContextPtr context_, const StorageID & table_id_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, - ContextPtr context_) + const String & comment_, + std::optional format_settings_, + LoadingStrictnessLevel mode_, + ASTPtr partition_by_ +) : IStorageCluster( cluster_name_, table_id_, getLogger(fmt::format("{}({})", configuration_->getEngineName(), table_id_.table_name))) , configuration{configuration_} , object_storage(object_storage_) , cluster_name_in_settings(false) + , comment(comment_) + , format_settings(format_settings_) + , mode(mode_) + , partition_by(partition_by_) { ColumnsDescription columns{columns_}; std::string sample_path; @@ -94,7 +104,7 @@ std::string StorageObjectStorageCluster::getName() const return configuration->getEngineName(); } -void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr & query) +void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr & query, ContextPtr context) { // Change table engine on table function for distributed request // CREATE TABLE t (...) ENGINE=IcebergS3(...) @@ -131,6 +141,7 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr {"S3", "s3"}, {"Azure", "azureBlobStorage"}, {"HDFS", "hdfs"}, + {"Iceberg", "icebergS3"}, {"IcebergS3", "icebergS3"}, {"IcebergAzure", "icebergAzure"}, {"IcebergHDFS", "icebergHDFS"}, @@ -153,7 +164,7 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr auto function_ast = std::make_shared(); function_ast->name = table_function_name; - auto cluster_name = getClusterName(); + auto cluster_name = getClusterName(context); if (cluster_name.empty()) { @@ -195,7 +206,7 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded( const DB::StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) { - updateQueryForDistributedEngineIfNeeded(query); + updateQueryForDistributedEngineIfNeeded(query, context); ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(query); @@ -247,4 +258,39 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) }; } +void StorageObjectStorageCluster::readFallBackToPure( + QueryPlan & query_plan, + const Names & column_names, + const StorageSnapshotPtr & storage_snapshot, + SelectQueryInfo & query_info, + ContextPtr context, + QueryProcessingStage::Enum processed_stage, + size_t max_block_size, + size_t num_streams) +{ + if (!pure_storage) + pure_storage = std::make_shared( + configuration, + object_storage, + context, + getStorageID(), + getInMemoryMetadata().getColumns(), + getInMemoryMetadata().getConstraints(), + comment, + format_settings, + mode, + /* distributed_processing */false, + partition_by); + + pure_storage->read(query_plan, column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams); +} + +String StorageObjectStorageCluster::getClusterName(ContextPtr context) const +{ + auto cluster_name_ = context->getSettingsRef()[Setting::object_storage_cluster].value; + if (cluster_name_.empty()) + cluster_name_ = getOriginalClusterName(); + return cluster_name_; +} + } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 89e7f8e827a1..6f04e4e7d443 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -17,10 +17,15 @@ class StorageObjectStorageCluster : public IStorageCluster const String & cluster_name_, ConfigurationPtr configuration_, ObjectStoragePtr object_storage_, + ContextPtr context_, const StorageID & table_id_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, - ContextPtr context_); + const String & comment_, + std::optional format_settings_, + LoadingStrictnessLevel mode_, + ASTPtr partition_by_ = nullptr + ); std::string getName() const override; @@ -31,12 +36,24 @@ class StorageObjectStorageCluster : public IStorageCluster void setClusterNameInSettings(bool cluster_name_in_settings_) { cluster_name_in_settings = cluster_name_in_settings_; } + String getClusterName(ContextPtr context) const override; + private: void updateQueryToSendIfNeeded( ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) override; + void readFallBackToPure( + QueryPlan & query_plan, + const Names & column_names, + const StorageSnapshotPtr & storage_snapshot, + SelectQueryInfo & query_info, + ContextPtr context, + QueryProcessingStage::Enum processed_stage, + size_t max_block_size, + size_t num_streams) override; + /* In case the table was created with `object_storage_cluster` setting, modify the AST query object so that it uses the table function implementation @@ -49,12 +66,18 @@ class StorageObjectStorageCluster : public IStorageCluster SELECT * FROM s3(...) SETTINGS object_storage_cluster='cluster' to make distributed request over cluster 'cluster'. */ - void updateQueryForDistributedEngineIfNeeded(ASTPtr & query); + void updateQueryForDistributedEngineIfNeeded(ASTPtr & query, ContextPtr context); const String engine_name; const StorageObjectStorage::ConfigurationPtr configuration; const ObjectStoragePtr object_storage; bool cluster_name_in_settings; + + std::shared_ptr pure_storage; + String comment; + std::optional format_settings; + LoadingStrictnessLevel mode; + ASTPtr partition_by; }; } diff --git a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp index ea11cd9f353b..4c7c8abebe11 100644 --- a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp @@ -20,11 +20,6 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; } -namespace Setting -{ - extern const SettingsString object_storage_cluster; -} - namespace StorageObjectStorageSetting { extern const StorageObjectStorageSettingsString object_storage_cluster; @@ -74,32 +69,18 @@ createStorageObjectStorage(const StorageFactory::Arguments & args, StorageObject if (args.storage_def->partition_by) partition_by = args.storage_def->partition_by->clone(); - if (cluster_name.empty()) - { - return std::make_shared( - configuration, - configuration->createObjectStorage(context, /* is_readonly */ false), - args.getContext(), - args.table_id, - args.columns, - args.constraints, - args.comment, - format_settings, - args.mode, - /* distributed_processing */ false, - partition_by); - } - else - { - return std::make_shared( - cluster_name, - configuration, - configuration->createObjectStorage(context, /* is_readonly */ false), - args.table_id, - args.columns, - args.constraints, - args.getContext()); - } + return std::make_shared( + cluster_name, + configuration, + configuration->createObjectStorage(context, /* is_readonly */ false), + args.getContext(), + args.table_id, + args.columns, + args.constraints, + args.comment, + format_settings, + args.mode, + partition_by); } #endif diff --git a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp index 54d5c101826f..b563039fa325 100644 --- a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp +++ b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp @@ -46,7 +46,7 @@ StoragePtr TableFunctionObjectStorageCluster::execute /* format_settings */ std::nullopt, /// No format_settings /* mode */ LoadingStrictnessLevel::CREATE, /* distributed_processing */ true, - /*partition_by_=*/nullptr); + /* partition_by */ nullptr); } else { @@ -54,10 +54,14 @@ StoragePtr TableFunctionObjectStorageCluster::execute ITableFunctionCluster::cluster_name, configuration, object_storage, + context, StorageID(Base::getDatabaseName(), table_name), columns, ConstraintsDescription{}, - context); + /* comment */ String{}, + /* format_settings */ std::nullopt, /// No format_settings + /* mode */ LoadingStrictnessLevel::CREATE, + /* partition_by */ nullptr); } storage->startup(); diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index 26d4e09f47c3..d6c9deb35cc8 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -587,6 +587,28 @@ def test_types(started_cluster, format_version, storage_type): ) +def count_secondary_subqueries(started_cluster, query_id, expected, comment): + for node_name, replica in started_cluster.instances.items(): + cluster_secondary_queries = ( + replica.query( + f""" + SELECT query, type, is_initial_query, read_rows, read_bytes FROM system.query_log + WHERE + type = 'QueryFinish' + AND NOT is_initial_query + AND initial_query_id='{query_id}' + """ + ) + .strip() + .split("\n") + ) + + logging.info( + f"[{node_name}] cluster_secondary_queries {comment}: {cluster_secondary_queries}" + ) + assert len(cluster_secondary_queries) == expected + + @pytest.mark.parametrize("format_version", ["1", "2"]) @pytest.mark.parametrize("storage_type", ["s3", "azure", "hdfs"]) def test_cluster_table_function(started_cluster, format_version, storage_type): @@ -693,82 +715,58 @@ def add_df(mode): .split() ) + instance.query(f"DROP TABLE IF EXISTS `{TABLE_NAME}` SYNC") + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster) + query_id_pure_table_engine = str(uuid.uuid4()) + select_pure_table_engine = ( + instance.query( + f""" + SELECT * FROM {TABLE_NAME} + """, + query_id=query_id_pure_table_engine, + ) + .strip() + .split() + ) + query_id_pure_table_engine_cluster = str(uuid.uuid4()) + select_pure_table_engine_cluster = ( + instance.query( + f""" + SELECT * FROM {TABLE_NAME} + SETTINGS object_storage_cluster='cluster_simple' + """, + query_id=query_id_pure_table_engine_cluster, + ) + .strip() + .split() + ) + # Simple size check assert len(select_regular) == 600 assert len(select_cluster) == 600 assert len(select_cluster_alt_syntax) == 600 assert len(select_cluster_table_engine) == 600 assert len(select_remote_cluster) == 600 + assert len(select_pure_table_engine) == 600 + assert len(select_pure_table_engine_cluster) == 600 # Actual check assert select_cluster == select_regular assert select_cluster_alt_syntax == select_regular assert select_cluster_table_engine == select_regular assert select_remote_cluster == select_regular + assert select_pure_table_engine == select_regular + assert select_pure_table_engine_cluster == select_regular # Check query_log for replica in started_cluster.instances.values(): replica.query("SYSTEM FLUSH LOGS") - for node_name, replica in started_cluster.instances.items(): - cluster_secondary_queries = ( - replica.query( - f""" - SELECT query, type, is_initial_query, read_rows, read_bytes FROM system.query_log - WHERE - type = 'QueryStart' - AND NOT is_initial_query - AND initial_query_id='{query_id_cluster}' - """ - ) - .strip() - .split("\n") - ) - - logging.info( - f"[{node_name}] cluster_secondary_queries: {cluster_secondary_queries}" - ) - assert len(cluster_secondary_queries) == 1 - - for node_name, replica in started_cluster.instances.items(): - cluster_secondary_queries = ( - replica.query( - f""" - SELECT query, type, is_initial_query, read_rows, read_bytes FROM system.query_log - WHERE - type = 'QueryStart' - AND NOT is_initial_query - AND initial_query_id='{query_id_cluster_alt_syntax}' - """ - ) - .strip() - .split("\n") - ) - - logging.info( - f"[{node_name}] cluster_secondary_queries: {cluster_secondary_queries}" - ) - assert len(cluster_secondary_queries) == 1 - - for node_name, replica in started_cluster.instances.items(): - cluster_secondary_queries = ( - replica.query( - f""" - SELECT query, type, is_initial_query, read_rows, read_bytes FROM system.query_log - WHERE - type = 'QueryStart' - AND NOT is_initial_query - AND initial_query_id='{query_id_cluster_table_engine}' - """ - ) - .strip() - .split("\n") - ) - - logging.info( - f"[{node_name}] cluster_secondary_queries: {cluster_secondary_queries}" - ) - assert len(cluster_secondary_queries) == 1 + count_secondary_subqueries(started_cluster, query_id_cluster, 1, "table function") + count_secondary_subqueries(started_cluster, query_id_cluster_alt_syntax, 1, "table function alt syntax") + count_secondary_subqueries(started_cluster, query_id_cluster_table_engine, 1, "cluster table engine") + count_secondary_subqueries(started_cluster, query_id_pure_table_engine, 0, "table engine") + count_secondary_subqueries(started_cluster, query_id_pure_table_engine_cluster, 1, "table engine with cluster setting") @pytest.mark.parametrize("format_version", ["1", "2"]) From cfc74ec374b0e95cfb350461c92a20a79578d461 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 18 Feb 2025 20:16:00 +0100 Subject: [PATCH 08/18] Fix write to pure engine --- src/Storages/IStorageCluster.cpp | 27 +++++++------- src/Storages/IStorageCluster.h | 34 +++++++++++++----- .../StorageObjectStorageCluster.cpp | 35 +++++++++++++------ .../StorageObjectStorageCluster.h | 9 +++++ 4 files changed, 74 insertions(+), 31 deletions(-) diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 84885032dda9..599e33f66541 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -70,19 +70,6 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate) extension = storage->getTaskIteratorExtension(predicate, context); } -void IStorageCluster::readFallBackToPure( - QueryPlan & /*query_plan*/, - const Names & /*column_names*/, - const StorageSnapshotPtr & /*storage_snapshot*/, - SelectQueryInfo & /*query_info*/, - ContextPtr /*context*/, - QueryProcessingStage::Enum /*processed_stage*/, - size_t /*max_block_size*/, - size_t /*num_streams*/) -{ - throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method readFallBackToPure is not supported by storage {}", getName()); -} - /// The code executes on initiator void IStorageCluster::read( QueryPlan & query_plan, @@ -152,6 +139,20 @@ void IStorageCluster::read( query_plan.addStep(std::move(reading)); } +SinkToStoragePtr IStorageCluster::write( + const ASTPtr & query, + const StorageMetadataPtr & metadata_snapshot, + ContextPtr context, + bool async_insert) +{ + auto cluster_name_ = getClusterName(context); + + if (cluster_name_.empty()) + return writeFallBackToPure(query, metadata_snapshot, context, async_insert); + + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method write is not supported by storage {}", getName()); +} + void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) { createExtension(nullptr); diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index c6546c20957e..2992c3bc2497 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -32,6 +32,12 @@ class IStorageCluster : public IStorage size_t max_block_size, size_t num_streams) override; + SinkToStoragePtr write( + const ASTPtr & query, + const StorageMetadataPtr & metadata_snapshot, + ContextPtr context, + bool async_insert) override; + ClusterPtr getCluster(ContextPtr context) const { return getClusterImpl(context, cluster_name); } /// Query is needed for pruning by virtual columns (_file, _path) virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const = 0; @@ -51,14 +57,26 @@ class IStorageCluster : public IStorage virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {} virtual void readFallBackToPure( - QueryPlan & query_plan, - const Names & column_names, - const StorageSnapshotPtr & storage_snapshot, - SelectQueryInfo & query_info, - ContextPtr context, - QueryProcessingStage::Enum processed_stage, - size_t max_block_size, - size_t num_streams); + QueryPlan & /* query_plan */, + const Names & /* column_names */, + const StorageSnapshotPtr & /* storage_snapshot */, + SelectQueryInfo & /* query_info */, + ContextPtr /* context */, + QueryProcessingStage::Enum /* processed_stage */, + size_t /* max_block_size */, + size_t /* num_streams */) + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method readFallBackToPure is not supported by storage {}", getName()); + } + + virtual SinkToStoragePtr writeFallBackToPure( + const ASTPtr & /*query*/, + const StorageMetadataPtr & /*metadata_snapshot*/, + ContextPtr /*context*/, + bool /*async_insert*/) + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method writeFallBackToPure is not supported by storage {}", getName()); + } private: static ClusterPtr getClusterImpl(ContextPtr context, const String & cluster_name_); diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 431a5fc67bee..2873d6989a08 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -258,16 +258,9 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) }; } -void StorageObjectStorageCluster::readFallBackToPure( - QueryPlan & query_plan, - const Names & column_names, - const StorageSnapshotPtr & storage_snapshot, - SelectQueryInfo & query_info, - ContextPtr context, - QueryProcessingStage::Enum processed_stage, - size_t max_block_size, - size_t num_streams) +std::shared_ptr StorageObjectStorageCluster::getPureStorage(ContextPtr context) { + std::lock_guard lock(mutex); if (!pure_storage) pure_storage = std::make_shared( configuration, @@ -282,7 +275,29 @@ void StorageObjectStorageCluster::readFallBackToPure( /* distributed_processing */false, partition_by); - pure_storage->read(query_plan, column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams); + return pure_storage; +} + +void StorageObjectStorageCluster::readFallBackToPure( + QueryPlan & query_plan, + const Names & column_names, + const StorageSnapshotPtr & storage_snapshot, + SelectQueryInfo & query_info, + ContextPtr context, + QueryProcessingStage::Enum processed_stage, + size_t max_block_size, + size_t num_streams) +{ + getPureStorage(context)->read(query_plan, column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams); +} + +SinkToStoragePtr StorageObjectStorageCluster::writeFallBackToPure( + const ASTPtr & query, + const StorageMetadataPtr & metadata_snapshot, + ContextPtr context, + bool async_insert) +{ + return getPureStorage(context)->write(query, metadata_snapshot, context, async_insert); } String StorageObjectStorageCluster::getClusterName(ContextPtr context) const diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 6f04e4e7d443..3d25c54397ca 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -54,6 +54,14 @@ class StorageObjectStorageCluster : public IStorageCluster size_t max_block_size, size_t num_streams) override; + SinkToStoragePtr writeFallBackToPure( + const ASTPtr & query, + const StorageMetadataPtr & metadata_snapshot, + ContextPtr context, + bool async_insert) override; + + std::shared_ptr getPureStorage(ContextPtr context); + /* In case the table was created with `object_storage_cluster` setting, modify the AST query object so that it uses the table function implementation @@ -73,6 +81,7 @@ class StorageObjectStorageCluster : public IStorageCluster const ObjectStoragePtr object_storage; bool cluster_name_in_settings; + std::mutex mutex; std::shared_ptr pure_storage; String comment; std::optional format_settings; From df462de17735a628bd2ec09503adfe3b037b0cb7 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 19 Feb 2025 13:56:09 +0100 Subject: [PATCH 09/18] Fix virtual columns for pure storage --- src/Storages/IStorage.h | 4 ++-- .../StorageObjectStorageCluster.cpp | 19 +++++++++++++++++++ .../StorageObjectStorageCluster.h | 16 ++++++++++++++++ 3 files changed, 37 insertions(+), 2 deletions(-) diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index 490546d19195..ec67b9733bee 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -216,12 +216,12 @@ class IStorage : public std::enable_shared_from_this, public TypePromo /// Update storage metadata. Used in ALTER or initialization of Storage. /// Metadata object is multiversion, so this method can be called without /// any locks. - void setInMemoryMetadata(const StorageInMemoryMetadata & metadata_) + virtual void setInMemoryMetadata(const StorageInMemoryMetadata & metadata_) { metadata.set(std::make_unique(metadata_)); } - void setVirtuals(VirtualColumnsDescription virtuals_) + virtual void setVirtuals(VirtualColumnsDescription virtuals_) { virtuals.set(std::make_unique(std::move(virtuals_))); } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 2873d6989a08..6b8bdebf6a53 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -262,6 +262,7 @@ std::shared_ptr StorageObjectStorageCluster::getPureStorag { std::lock_guard lock(mutex); if (!pure_storage) + { pure_storage = std::make_shared( configuration, object_storage, @@ -275,6 +276,13 @@ std::shared_ptr StorageObjectStorageCluster::getPureStorag /* distributed_processing */false, partition_by); + auto virtuals_ = getVirtualsPtr(); + if (virtuals_) + pure_storage->setVirtuals(*virtuals_); + + pure_storage->setInMemoryMetadata(getInMemoryMetadata()); + } + return pure_storage; } @@ -308,4 +316,15 @@ String StorageObjectStorageCluster::getClusterName(ContextPtr context) const return cluster_name_; } +QueryProcessingStage::Enum StorageObjectStorageCluster::getQueryProcessingStage( + ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info) const +{ + /// Full query if fall back to pure storage. + if (getClusterName(context).empty()) + return QueryProcessingStage::Enum::FetchColumns; + + /// Distributed storage. + return IStorageCluster::getQueryProcessingStage(context, to_stage, storage_snapshot, query_info); +} + } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 3d25c54397ca..798886c9ddc9 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -38,6 +38,22 @@ class StorageObjectStorageCluster : public IStorageCluster String getClusterName(ContextPtr context) const override; + void setInMemoryMetadata(const StorageInMemoryMetadata & metadata_) override + { + if (pure_storage) + pure_storage->setInMemoryMetadata(metadata_); + IStorageCluster::setInMemoryMetadata(metadata_); + } + + void setVirtuals(VirtualColumnsDescription virtuals_) override + { + if (pure_storage) + pure_storage->setVirtuals(virtuals_); + IStorageCluster::setVirtuals(virtuals_); + } + + QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override; + private: void updateQueryToSendIfNeeded( ASTPtr & query, From 508e4bad0610432d97460bb8e8a98691eb8e892d Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 19 Feb 2025 22:01:54 +0100 Subject: [PATCH 10/18] More fixes --- .../ObjectStorage/StorageObjectStorageCluster.cpp | 14 ++++++++++++++ .../ObjectStorage/StorageObjectStorageCluster.h | 8 ++++++++ 2 files changed, 22 insertions(+) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 6b8bdebf6a53..6cb0fc2cd749 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -327,4 +327,18 @@ QueryProcessingStage::Enum StorageObjectStorageCluster::getQueryProcessingStage( return IStorageCluster::getQueryProcessingStage(context, to_stage, storage_snapshot, query_info); } +void StorageObjectStorageCluster::truncate( + const ASTPtr & query, + const StorageMetadataPtr & metadata_snapshot, + ContextPtr local_context, + TableExclusiveLockHolder & lock_holder) +{ + return getPureStorage(local_context)->truncate(query, metadata_snapshot, local_context, lock_holder); +} + +void StorageObjectStorageCluster::addInferredEngineArgsToCreateQuery(ASTs & args, const ContextPtr & context) const +{ + configuration->addStructureAndFormatToArgsIfNeeded(args, "", configuration->format, context, /*with_structure=*/false); +} + } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 798886c9ddc9..cee67dd4a3ff 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -54,6 +54,14 @@ class StorageObjectStorageCluster : public IStorageCluster QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override; + void truncate( + const ASTPtr & query, + const StorageMetadataPtr & metadata_snapshot, + ContextPtr local_context, + TableExclusiveLockHolder &) override; + + void addInferredEngineArgsToCreateQuery(ASTs & args, const ContextPtr & context) const override; + private: void updateQueryToSendIfNeeded( ASTPtr & query, From 9dbd20963a5a963a99ac99143fd4a1d819f3e68c Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Thu, 20 Feb 2025 14:34:38 +0100 Subject: [PATCH 11/18] Do not update configuration twice --- .../ObjectStorage/DataLakes/DataLakeConfiguration.h | 4 ++++ src/Storages/ObjectStorage/StorageObjectStorage.cpp | 7 +++++++ src/Storages/ObjectStorage/StorageObjectStorage.h | 2 ++ src/Storages/ObjectStorage/StorageObjectStorageSource.cpp | 2 +- 4 files changed, 14 insertions(+), 1 deletion(-) diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h index ede70567da4c..5d3adcdfa72a 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h @@ -62,6 +62,8 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl BaseStorageConfiguration::setPartitionColumns(current_metadata->getPartitionColumns()); } } + + updated = true; } std::optional tryGetTableStructureFromMetadata() const override @@ -114,6 +116,8 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl private: DataLakeMetadataPtr current_metadata; + bool updated = false; + ReadFromFormatInfo prepareReadingFromFormat( ObjectStoragePtr object_storage, const Strings & requested_columns, diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index cf809d53af36..414d8bf5deb3 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -165,6 +165,13 @@ void StorageObjectStorage::Configuration::update(ObjectStoragePtr object_storage { IObjectStorage::ApplyNewSettingsOptions options{.allow_client_change = !isStaticConfiguration()}; object_storage_ptr->applyNewSettings(context->getConfigRef(), getTypeName() + ".", context, options); + updated = true; +} + +void StorageObjectStorage::Configuration::updateIfRequired(ObjectStoragePtr object_storage_ptr, ContextPtr local_context) +{ + if (!updated) + update(object_storage_ptr, local_context); } bool StorageObjectStorage::hasExternalDynamicMetadata() const diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index 18f62b97ca62..0212b80c2014 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -246,6 +246,7 @@ class StorageObjectStorage::Configuration String structure = "auto"; virtual void update(ObjectStoragePtr object_storage, ContextPtr local_context); + void updateIfRequired(ObjectStoragePtr object_storage, ContextPtr local_context); /// Create arguments for table function with path and access parameters virtual ASTPtr createArgsWithAccessData() const @@ -261,6 +262,7 @@ class StorageObjectStorage::Configuration void assertInitialized() const; bool initialized = false; + bool updated = false; DataLakePartitionColumns partition_columns; bool allow_dynamic_metadata_for_data_lakes; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 3ed2707b33a9..b5f0c386dba0 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -132,7 +132,7 @@ std::shared_ptr StorageObjectStorageSourc const bool is_archive = configuration->isArchive(); - configuration->update(object_storage, local_context); + configuration->updateIfRequired(object_storage, local_context); std::unique_ptr iterator; if (configuration->isPathWithGlobs()) From 293ac8378d60c634eb866a5eb73961dd269f133c Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Thu, 20 Feb 2025 20:49:43 +0100 Subject: [PATCH 12/18] Create pure storage on create --- src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 6cb0fc2cd749..9287068fa409 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -97,6 +97,8 @@ StorageObjectStorageCluster::StorageObjectStorageCluster( setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(metadata.columns, context_, sample_path)); setInMemoryMetadata(metadata); + + getPureStorage(context_); } std::string StorageObjectStorageCluster::getName() const From 5bc11eee05d2252455b90aa999ba52f5ecd8692f Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 26 Feb 2025 12:48:19 +0100 Subject: [PATCH 13/18] More simple --- .../StorageObjectStorageCluster.cpp | 60 ++++++++----------- .../StorageObjectStorageCluster.h | 14 +---- 2 files changed, 27 insertions(+), 47 deletions(-) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 9287068fa409..015ab94d13d8 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -30,6 +30,7 @@ namespace ErrorCodes { extern const int LOGICAL_ERROR; extern const int UNKNOWN_FUNCTION; + extern const int NOT_IMPLEMENTED; } @@ -78,10 +79,6 @@ StorageObjectStorageCluster::StorageObjectStorageCluster( , configuration{configuration_} , object_storage(object_storage_) , cluster_name_in_settings(false) - , comment(comment_) - , format_settings(format_settings_) - , mode(mode_) - , partition_by(partition_by_) { ColumnsDescription columns{columns_}; std::string sample_path; @@ -98,7 +95,18 @@ StorageObjectStorageCluster::StorageObjectStorageCluster( setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(metadata.columns, context_, sample_path)); setInMemoryMetadata(metadata); - getPureStorage(context_); + pure_storage = std::make_shared( + configuration, + object_storage, + context_, + getStorageID(), + getInMemoryMetadata().getColumns(), + getInMemoryMetadata().getConstraints(), + comment_, + format_settings_, + mode_, + /* distributed_processing */false, + partition_by_); } std::string StorageObjectStorageCluster::getName() const @@ -260,34 +268,6 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) }; } -std::shared_ptr StorageObjectStorageCluster::getPureStorage(ContextPtr context) -{ - std::lock_guard lock(mutex); - if (!pure_storage) - { - pure_storage = std::make_shared( - configuration, - object_storage, - context, - getStorageID(), - getInMemoryMetadata().getColumns(), - getInMemoryMetadata().getConstraints(), - comment, - format_settings, - mode, - /* distributed_processing */false, - partition_by); - - auto virtuals_ = getVirtualsPtr(); - if (virtuals_) - pure_storage->setVirtuals(*virtuals_); - - pure_storage->setInMemoryMetadata(getInMemoryMetadata()); - } - - return pure_storage; -} - void StorageObjectStorageCluster::readFallBackToPure( QueryPlan & query_plan, const Names & column_names, @@ -298,7 +278,7 @@ void StorageObjectStorageCluster::readFallBackToPure( size_t max_block_size, size_t num_streams) { - getPureStorage(context)->read(query_plan, column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams); + pure_storage->read(query_plan, column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams); } SinkToStoragePtr StorageObjectStorageCluster::writeFallBackToPure( @@ -307,11 +287,15 @@ SinkToStoragePtr StorageObjectStorageCluster::writeFallBackToPure( ContextPtr context, bool async_insert) { - return getPureStorage(context)->write(query, metadata_snapshot, context, async_insert); + return pure_storage->write(query, metadata_snapshot, context, async_insert); } String StorageObjectStorageCluster::getClusterName(ContextPtr context) const { + /// We try to get cluster name from query settings. + /// If it emtpy, we take default cluster name from table settings. + /// When it is not empty, we use this cluster to distibuted requests. + /// When both are empty, we must fall back to pure implementatiuon. auto cluster_name_ = context->getSettingsRef()[Setting::object_storage_cluster].value; if (cluster_name_.empty()) cluster_name_ = getOriginalClusterName(); @@ -335,7 +319,11 @@ void StorageObjectStorageCluster::truncate( ContextPtr local_context, TableExclusiveLockHolder & lock_holder) { - return getPureStorage(local_context)->truncate(query, metadata_snapshot, local_context, lock_holder); + /// Full query if fall back to pure storage. + if (getClusterName(local_context).empty()) + return pure_storage->truncate(query, metadata_snapshot, local_context, lock_holder); + + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate is not supported by storage {}", getName()); } void StorageObjectStorageCluster::addInferredEngineArgsToCreateQuery(ASTs & args, const ContextPtr & context) const diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index cee67dd4a3ff..bd01fb4d86ad 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -40,15 +40,13 @@ class StorageObjectStorageCluster : public IStorageCluster void setInMemoryMetadata(const StorageInMemoryMetadata & metadata_) override { - if (pure_storage) - pure_storage->setInMemoryMetadata(metadata_); + pure_storage->setInMemoryMetadata(metadata_); IStorageCluster::setInMemoryMetadata(metadata_); } void setVirtuals(VirtualColumnsDescription virtuals_) override { - if (pure_storage) - pure_storage->setVirtuals(virtuals_); + pure_storage->setVirtuals(virtuals_); IStorageCluster::setVirtuals(virtuals_); } @@ -84,8 +82,6 @@ class StorageObjectStorageCluster : public IStorageCluster ContextPtr context, bool async_insert) override; - std::shared_ptr getPureStorage(ContextPtr context); - /* In case the table was created with `object_storage_cluster` setting, modify the AST query object so that it uses the table function implementation @@ -105,12 +101,8 @@ class StorageObjectStorageCluster : public IStorageCluster const ObjectStoragePtr object_storage; bool cluster_name_in_settings; - std::mutex mutex; + /// non-clustered storage to fall back on pure realisation if needed std::shared_ptr pure_storage; - String comment; - std::optional format_settings; - LoadingStrictnessLevel mode; - ASTPtr partition_by; }; } From a4943e24deb72aff812250ada7b77da4ca8c2161 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Thu, 27 Feb 2025 08:49:53 +0100 Subject: [PATCH 14/18] Rename cluster_name_ variable to cluster_name_from_settings --- src/Storages/IStorageCluster.cpp | 10 +++++----- .../ObjectStorage/StorageObjectStorageCluster.cpp | 8 ++++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 599e33f66541..6b5e9f0e49ba 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -81,9 +81,9 @@ void IStorageCluster::read( size_t max_block_size, size_t num_streams) { - auto cluster_name_ = getClusterName(context); + auto cluster_name_from_settings = getClusterName(context); - if (cluster_name_.empty()) + if (cluster_name_from_settings.empty()) { readFallBackToPure(query_plan, column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams); return; @@ -92,7 +92,7 @@ void IStorageCluster::read( storage_snapshot->check(column_names); updateBeforeRead(context); - auto cluster = getClusterImpl(context, cluster_name_); + auto cluster = getClusterImpl(context, cluster_name_from_settings); /// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*) @@ -145,9 +145,9 @@ SinkToStoragePtr IStorageCluster::write( ContextPtr context, bool async_insert) { - auto cluster_name_ = getClusterName(context); + auto cluster_name_from_settings = getClusterName(context); - if (cluster_name_.empty()) + if (cluster_name_from_settings.empty()) return writeFallBackToPure(query, metadata_snapshot, context, async_insert); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method write is not supported by storage {}", getName()); diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 015ab94d13d8..235e8d195e0c 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -296,10 +296,10 @@ String StorageObjectStorageCluster::getClusterName(ContextPtr context) const /// If it emtpy, we take default cluster name from table settings. /// When it is not empty, we use this cluster to distibuted requests. /// When both are empty, we must fall back to pure implementatiuon. - auto cluster_name_ = context->getSettingsRef()[Setting::object_storage_cluster].value; - if (cluster_name_.empty()) - cluster_name_ = getOriginalClusterName(); - return cluster_name_; + auto cluster_name_from_settings = context->getSettingsRef()[Setting::object_storage_cluster].value; + if (cluster_name_from_settings.empty()) + cluster_name_from_settings = getOriginalClusterName(); + return cluster_name_from_settings; } QueryProcessingStage::Enum StorageObjectStorageCluster::getQueryProcessingStage( From 09321d3cdcfedf94a12982ec6aaf1d8b966cfa0e Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Thu, 27 Feb 2025 15:29:53 +0100 Subject: [PATCH 15/18] Changes after review --- src/Core/SettingsChangesHistory.cpp | 1 - .../ObjectStorage/StorageObjectStorageCluster.cpp | 8 ++++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 545bb914b8f5..2f3f9c32f7d8 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -78,7 +78,6 @@ static std::initializer_listgetSettingsRef()[Setting::object_storage_cluster].value; if (cluster_name_from_settings.empty()) cluster_name_from_settings = getOriginalClusterName(); From e5dc2500ef70412362c5c3610a359c65cd88d7bd Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Sat, 1 Mar 2025 16:18:20 +0100 Subject: [PATCH 16/18] Fix initialization order --- src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp | 7 ++++++- src/Storages/ObjectStorage/StorageObjectStorageCluster.h | 6 ++++-- tests/integration/test_storage_iceberg/test.py | 2 +- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index b3e4af67d6a8..61c198336c7c 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -92,8 +92,8 @@ StorageObjectStorageCluster::StorageObjectStorageCluster( if (sample_path.empty() && context_->getSettingsRef()[Setting::use_hive_partitioning]) sample_path = getPathSample(metadata, context_); - setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(metadata.columns, context_, sample_path)); setInMemoryMetadata(metadata); + setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(metadata.columns, context_, sample_path)); pure_storage = std::make_shared( configuration, @@ -107,6 +107,11 @@ StorageObjectStorageCluster::StorageObjectStorageCluster( mode_, /* distributed_processing */false, partition_by_); + + auto virtuals_ = getVirtualsPtr(); + if (virtuals_) + pure_storage->setVirtuals(*virtuals_); + pure_storage->setInMemoryMetadata(getInMemoryMetadata()); } std::string StorageObjectStorageCluster::getName() const diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index bd01fb4d86ad..4fa6526e2557 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -40,13 +40,15 @@ class StorageObjectStorageCluster : public IStorageCluster void setInMemoryMetadata(const StorageInMemoryMetadata & metadata_) override { - pure_storage->setInMemoryMetadata(metadata_); + if (pure_storage) + pure_storage->setInMemoryMetadata(metadata_); IStorageCluster::setInMemoryMetadata(metadata_); } void setVirtuals(VirtualColumnsDescription virtuals_) override { - pure_storage->setVirtuals(virtuals_); + if (pure_storage) + pure_storage->setVirtuals(virtuals_); IStorageCluster::setVirtuals(virtuals_); } diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index d6c9deb35cc8..231c22149b56 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -278,7 +278,7 @@ def get_creation_expression( DROP TABLE IF EXISTS {table_name}; CREATE TABLE {table_name} ENGINE=IcebergHDFS(hdfs, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/')""" - + allow_dynamic_metadata_for_datalakes_suffix + + settings_suffix ) elif storage_type == "local": From 14da835df2d32fe0882cfba308d62a8a8abd0caa Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 4 Mar 2025 00:44:45 +0100 Subject: [PATCH 17/18] Fix after merge --- src/Storages/IStorage.h | 7 +++-- .../DataLakes/DataLakeConfiguration.h | 4 --- .../StorageObjectStorageCluster.cpp | 28 +++++++++++++++++-- .../StorageObjectStorageCluster.h | 20 +++++++------ .../integration/test_storage_iceberg/test.py | 5 ++-- 5 files changed, 44 insertions(+), 20 deletions(-) diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index ec67b9733bee..b2cb0b68afc2 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -71,6 +71,9 @@ class RestorerFromBackup; class ConditionSelectivityEstimator; +class IObjectStorage; +using ObjectStoragePtr = std::shared_ptr; + struct ColumnSize { size_t marks = 0; @@ -216,12 +219,12 @@ class IStorage : public std::enable_shared_from_this, public TypePromo /// Update storage metadata. Used in ALTER or initialization of Storage. /// Metadata object is multiversion, so this method can be called without /// any locks. - virtual void setInMemoryMetadata(const StorageInMemoryMetadata & metadata_) + void setInMemoryMetadata(const StorageInMemoryMetadata & metadata_) { metadata.set(std::make_unique(metadata_)); } - virtual void setVirtuals(VirtualColumnsDescription virtuals_) + void setVirtuals(VirtualColumnsDescription virtuals_) { virtuals.set(std::make_unique(std::move(virtuals_))); } diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h index 5d3adcdfa72a..ede70567da4c 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h @@ -62,8 +62,6 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl BaseStorageConfiguration::setPartitionColumns(current_metadata->getPartitionColumns()); } } - - updated = true; } std::optional tryGetTableStructureFromMetadata() const override @@ -116,8 +114,6 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl private: DataLakeMetadataPtr current_metadata; - bool updated = false; - ReadFromFormatInfo prepareReadingFromFormat( ObjectStoragePtr object_storage, const Strings & requested_columns, diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 61c198336c7c..3a2bfe8a990b 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -152,11 +152,33 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr auto table_alias = table_identifier_typed.tryGetAlias(); + auto storage_engine_name = configuration->getEngineName(); + if (storage_engine_name == "Iceberg") + { + switch (configuration->getType()) + { + case ObjectStorageType::S3: + storage_engine_name = "IcebergS3"; + break; + case ObjectStorageType::Azure: + storage_engine_name = "IcebergAzure"; + break; + case ObjectStorageType::HDFS: + storage_engine_name = "IcebergHDFS"; + break; + default: + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Can't find table function for engine {}", + storage_engine_name + ); + } + } + static std::unordered_map engine_to_function = { {"S3", "s3"}, {"Azure", "azureBlobStorage"}, {"HDFS", "hdfs"}, - {"Iceberg", "icebergS3"}, {"IcebergS3", "icebergS3"}, {"IcebergAzure", "icebergAzure"}, {"IcebergHDFS", "icebergHDFS"}, @@ -164,13 +186,13 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr {"Hudi", "hudi"} }; - auto p = engine_to_function.find(configuration->getEngineName()); + auto p = engine_to_function.find(storage_engine_name); if (p == engine_to_function.end()) { throw Exception( ErrorCodes::LOGICAL_ERROR, "Can't find table function for engine {}", - configuration->getEngineName() + storage_engine_name ); } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 4fa6526e2557..492ea02c505a 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -38,18 +38,22 @@ class StorageObjectStorageCluster : public IStorageCluster String getClusterName(ContextPtr context) const override; - void setInMemoryMetadata(const StorageInMemoryMetadata & metadata_) override + bool hasExternalDynamicMetadata() const override { - if (pure_storage) - pure_storage->setInMemoryMetadata(metadata_); - IStorageCluster::setInMemoryMetadata(metadata_); + return (pure_storage && pure_storage->hasExternalDynamicMetadata()) + || configuration->hasExternalDynamicMetadata(); } - void setVirtuals(VirtualColumnsDescription virtuals_) override + void updateExternalDynamicMetadata(ContextPtr context_ptr) override { - if (pure_storage) - pure_storage->setVirtuals(virtuals_); - IStorageCluster::setVirtuals(virtuals_); + if (pure_storage && pure_storage->hasExternalDynamicMetadata()) + pure_storage->updateExternalDynamicMetadata(context_ptr); + if (configuration->hasExternalDynamicMetadata()) + { + StorageInMemoryMetadata metadata; + metadata.setColumns(configuration->updateAndGetCurrentSchema(object_storage, context_ptr)); + IStorageCluster::setInMemoryMetadata(metadata); + } } QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override; diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index 231c22149b56..3252ee01819e 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -592,7 +592,7 @@ def count_secondary_subqueries(started_cluster, query_id, expected, comment): cluster_secondary_queries = ( replica.query( f""" - SELECT query, type, is_initial_query, read_rows, read_bytes FROM system.query_log + SELECT count(*) FROM system.query_log WHERE type = 'QueryFinish' AND NOT is_initial_query @@ -600,13 +600,12 @@ def count_secondary_subqueries(started_cluster, query_id, expected, comment): """ ) .strip() - .split("\n") ) logging.info( f"[{node_name}] cluster_secondary_queries {comment}: {cluster_secondary_queries}" ) - assert len(cluster_secondary_queries) == expected + assert int(cluster_secondary_queries) == expected @pytest.mark.parametrize("format_version", ["1", "2"]) From 0ccb6e8188a3403309e6f29f35066137da312247 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 5 Mar 2025 17:03:43 +0100 Subject: [PATCH 18/18] std::atomic update --- src/Storages/ObjectStorage/StorageObjectStorage.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index 0212b80c2014..6b5d6e1d423c 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -262,7 +262,7 @@ class StorageObjectStorage::Configuration void assertInitialized() const; bool initialized = false; - bool updated = false; + std::atomic updated = false; DataLakePartitionColumns partition_columns; bool allow_dynamic_metadata_for_data_lakes;