diff --git a/src/Analyzer/FunctionSecretArgumentsFinderTreeNode.h b/src/Analyzer/FunctionSecretArgumentsFinderTreeNode.h index 8bcb6e147420..e4f63192c95b 100644 --- a/src/Analyzer/FunctionSecretArgumentsFinderTreeNode.h +++ b/src/Analyzer/FunctionSecretArgumentsFinderTreeNode.h @@ -71,8 +71,14 @@ class FunctionTreeNodeImpl : public AbstractFunction { public: explicit ArgumentsTreeNode(const QueryTreeNodes * arguments_) : arguments(arguments_) {} - size_t size() const override { return arguments ? arguments->size() : 0; } - std::unique_ptr at(size_t n) const override { return std::make_unique(arguments->at(n).get()); } + size_t size() const override + { /// size withous skipped indexes + return arguments ? arguments->size() - skippedSize() : 0; + } + std::unique_ptr at(size_t n) const override + { /// n is relative index, some can be skipped + return std::make_unique(arguments->at(getRealIndex(n)).get()); + } private: const QueryTreeNodes * arguments = nullptr; }; diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 525ac86e8b94..acf235072bd2 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -6111,6 +6111,15 @@ Enable PRQL - an alternative to SQL. )", EXPERIMENTAL) \ DECLARE(Bool, enable_adaptive_memory_spill_scheduler, false, R"( Trigger processor to spill data into external storage adpatively. grace join is supported at present. +)", EXPERIMENTAL) \ + DECLARE(String, object_storage_cluster, "", R"( +Cluster to make distributed requests to object storages with alternative syntax. +)", EXPERIMENTAL) \ + DECLARE(UInt64, object_storage_max_nodes, 0, R"( +Limit for hosts used for request in object storage cluster table functions - azureBlobStorageCluster, s3Cluster, hdfsCluster, etc. +Possible values: +- Positive integer. +- 0 — All hosts in cluster. )", EXPERIMENTAL) \ \ /** Experimental tsToGrid aggregate function. */ \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 0afb542e0cbf..0d0353ac2efe 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -66,6 +66,12 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() /// controls new feature and it's 'true' by default, use 'false' as previous_value). /// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972) /// Note: please check if the key already exists to prevent duplicate entries. + addSettingsChanges(settings_changes_history, "25.2.1.20000", + { + // Altinity Antalya modifications atop of 25.2 + {"object_storage_cluster", "", "", "New setting"}, + {"object_storage_max_nodes", 0, 0, "New setting"}, + }); addSettingsChanges(settings_changes_history, "25.4", { }); diff --git a/src/Databases/DataLake/DatabaseDataLake.cpp b/src/Databases/DataLake/DatabaseDataLake.cpp index 687d59ac3884..1b556f25edd4 100644 --- a/src/Databases/DataLake/DatabaseDataLake.cpp +++ b/src/Databases/DataLake/DatabaseDataLake.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -40,6 +41,7 @@ namespace DatabaseDataLakeSetting extern const DatabaseDataLakeSettingsString storage_endpoint; extern const DatabaseDataLakeSettingsString oauth_server_uri; extern const DatabaseDataLakeSettingsBool vended_credentials; + extern const DatabaseDataLakeSettingsString object_storage_cluster; extern const DatabaseDataLakeSettingsString aws_access_key_id; @@ -403,9 +405,12 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con /// with_table_structure = false: because there will be /// no table structure in table definition AST. - StorageObjectStorage::Configuration::initialize(*configuration, args, context_copy, /* with_table_structure */false, storage_settings); + configuration->initialize(args, context_copy, /* with_table_structure */false, storage_settings); - return std::make_shared( + auto cluster_name = settings[DatabaseDataLakeSetting::object_storage_cluster].value; + + return std::make_shared( + cluster_name, configuration, configuration->createObjectStorage(context_copy, /* is_readonly */ false), context_copy, @@ -415,9 +420,7 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con /* comment */"", getFormatSettings(context_copy), LoadingStrictnessLevel::CREATE, - /* distributed_processing */false, - /* partition_by */nullptr, - /* lazy_init */true); + /* partition_by */nullptr); } DatabaseTablesIteratorPtr DatabaseDataLake::getTablesIterator( diff --git a/src/Databases/DataLake/DatabaseDataLakeSettings.cpp b/src/Databases/DataLake/DatabaseDataLakeSettings.cpp index 4895253f5d67..412e96f865cf 100644 --- a/src/Databases/DataLake/DatabaseDataLakeSettings.cpp +++ b/src/Databases/DataLake/DatabaseDataLakeSettings.cpp @@ -26,6 +26,7 @@ namespace ErrorCodes DECLARE(String, aws_secret_access_key, "", "Key for AWS connection for Glue Catalog'", 0) \ DECLARE(String, region, "", "Region for Glue catalog", 0) \ DECLARE(String, storage_endpoint, "", "Object storage endpoint", 0) \ + DECLARE(String, object_storage_cluster, "", "Cluster for distributed requests", 0) \ #define LIST_OF_DATABASE_ICEBERG_SETTINGS(M, ALIAS) \ DATABASE_ICEBERG_RELATED_SETTINGS(M, ALIAS) diff --git a/src/Disks/DiskType.cpp b/src/Disks/DiskType.cpp index 07a7099419ba..ba767959e991 100644 --- a/src/Disks/DiskType.cpp +++ b/src/Disks/DiskType.cpp @@ -9,7 +9,7 @@ namespace ErrorCodes extern const int UNKNOWN_ELEMENT_IN_CONFIG; } -MetadataStorageType metadataTypeFromString(const String & type) +MetadataStorageType metadataTypeFromString(const std::string & type) { auto check_type = Poco::toLower(type); if (check_type == "local") @@ -53,23 +53,47 @@ std::string DataSourceDescription::toString() const case DataSourceType::RAM: return "memory"; case DataSourceType::ObjectStorage: - { - switch (object_storage_type) - { - case ObjectStorageType::S3: - return "s3"; - case ObjectStorageType::HDFS: - return "hdfs"; - case ObjectStorageType::Azure: - return "azure_blob_storage"; - case ObjectStorageType::Local: - return "local_blob_storage"; - case ObjectStorageType::Web: - return "web"; - case ObjectStorageType::None: - return "none"; - } - } + return DB::toString(object_storage_type); } } + +ObjectStorageType objectStorageTypeFromString(const std::string & type) +{ + auto check_type = Poco::toLower(type); + if (check_type == "s3") + return ObjectStorageType::S3; + if (check_type == "hdfs") + return ObjectStorageType::HDFS; + if (check_type == "azure_blob_storage" || check_type == "azure") + return ObjectStorageType::Azure; + if (check_type == "local_blob_storage" || check_type == "local") + return ObjectStorageType::Local; + if (check_type == "web") + return ObjectStorageType::Web; + if (check_type == "none") + return ObjectStorageType::None; + + throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG, + "Unknown object storage type: {}", type); +} + +std::string toString(ObjectStorageType type) +{ + switch (type) + { + case ObjectStorageType::S3: + return "s3"; + case ObjectStorageType::HDFS: + return "hdfs"; + case ObjectStorageType::Azure: + return "azure_blob_storage"; + case ObjectStorageType::Local: + return "local_blob_storage"; + case ObjectStorageType::Web: + return "web"; + case ObjectStorageType::None: + return "none"; + } +} + } diff --git a/src/Disks/DiskType.h b/src/Disks/DiskType.h index bf7ef3d30eb0..1aa3ea19cbbe 100644 --- a/src/Disks/DiskType.h +++ b/src/Disks/DiskType.h @@ -34,8 +34,10 @@ enum class MetadataStorageType : uint8_t Memory, }; -MetadataStorageType metadataTypeFromString(const String & type); -String toString(DataSourceType data_source_type); +MetadataStorageType metadataTypeFromString(const std::string & type); + +ObjectStorageType objectStorageTypeFromString(const std::string & type); +std::string toString(ObjectStorageType type); struct DataSourceDescription { diff --git a/src/Interpreters/Cluster.cpp b/src/Interpreters/Cluster.cpp index 00c67cb8a37c..5e90249d0b6e 100644 --- a/src/Interpreters/Cluster.cpp +++ b/src/Interpreters/Cluster.cpp @@ -717,9 +717,9 @@ void Cluster::initMisc() } } -std::unique_ptr Cluster::getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard) const +std::unique_ptr Cluster::getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard, size_t max_hosts) const { - return std::unique_ptr{ new Cluster(ReplicasAsShardsTag{}, *this, settings, max_replicas_from_shard)}; + return std::unique_ptr{ new Cluster(ReplicasAsShardsTag{}, *this, settings, max_replicas_from_shard, max_hosts)}; } std::unique_ptr Cluster::getClusterWithSingleShard(size_t index) const @@ -768,7 +768,7 @@ void shuffleReplicas(std::vector & replicas, const Settings & } -Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard) +Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard, size_t max_hosts) { if (from.addresses_with_failover.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster is empty"); @@ -790,6 +790,7 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti if (address.is_local) info.local_addresses.push_back(address); + addresses_with_failover.emplace_back(Addresses({address})); auto pool = ConnectionPoolFactory::instance().get( static_cast(settings[Setting::distributed_connections_pool_size]), @@ -811,9 +812,6 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti info.pool = std::make_shared(ConnectionPoolPtrs{pool}, settings[Setting::load_balancing]); info.per_replica_pools = {std::move(pool)}; - addresses_with_failover.emplace_back(Addresses{address}); - - slot_to_shard.insert(std::end(slot_to_shard), info.weight, shards_info.size()); shards_info.emplace_back(std::move(info)); } }; @@ -835,10 +833,37 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti secret = from.secret; name = from.name; + constrainShardInfoAndAddressesToMaxHosts(max_hosts); + + for (size_t i = 0; i < shards_info.size(); ++i) + slot_to_shard.insert(std::end(slot_to_shard), shards_info[i].weight, i); + initMisc(); } +void Cluster::constrainShardInfoAndAddressesToMaxHosts(size_t max_hosts) +{ + if (max_hosts == 0 || shards_info.size() <= max_hosts) + return; + + pcg64_fast gen{randomSeed()}; + std::shuffle(shards_info.begin(), shards_info.end(), gen); + shards_info.resize(max_hosts); + + AddressesWithFailover addresses_with_failover_; + + UInt32 shard_num = 0; + for (auto & shard_info : shards_info) + { + addresses_with_failover_.push_back(addresses_with_failover[shard_info.shard_num - 1]); + shard_info.shard_num = ++shard_num; + } + + addresses_with_failover.swap(addresses_with_failover_); +} + + Cluster::Cluster(Cluster::SubclusterTag, const Cluster & from, const std::vector & indices) { for (size_t index : indices) diff --git a/src/Interpreters/Cluster.h b/src/Interpreters/Cluster.h index e1868a18ad4f..6d0f5c204e5e 100644 --- a/src/Interpreters/Cluster.h +++ b/src/Interpreters/Cluster.h @@ -266,7 +266,7 @@ class Cluster std::unique_ptr getClusterWithMultipleShards(const std::vector & indices) const; /// Get a new Cluster that contains all servers (all shards with all replicas) from existing cluster as independent shards. - std::unique_ptr getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard = 0) const; + std::unique_ptr getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard = 0, size_t max_hosts = 0) const; /// Returns false if cluster configuration doesn't allow to use it for cross-replication. /// NOTE: true does not mean, that it's actually a cross-replication cluster. @@ -292,7 +292,7 @@ class Cluster /// For getClusterWithReplicasAsShards implementation struct ReplicasAsShardsTag {}; - Cluster(ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard); + Cluster(ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard, size_t max_hosts); void addShard( const Settings & settings, @@ -304,6 +304,9 @@ class Cluster ShardInfoInsertPathForInternalReplication insert_paths = {}, bool internal_replication = false); + /// Reduce size of cluster to max_hosts + void constrainShardInfoAndAddressesToMaxHosts(size_t max_hosts); + /// Inter-server secret String secret; diff --git a/src/Interpreters/InterpreterCreateQuery.cpp b/src/Interpreters/InterpreterCreateQuery.cpp index ca6f692ab76e..e7076f5fdfc9 100644 --- a/src/Interpreters/InterpreterCreateQuery.cpp +++ b/src/Interpreters/InterpreterCreateQuery.cpp @@ -1925,8 +1925,7 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create, auto table_function_ast = create.as_table_function->ptr(); auto table_function = TableFunctionFactory::instance().get(table_function_ast, getContext()); - if (!table_function->canBeUsedToCreateTable()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table function '{}' cannot be used to create a table", table_function->getName()); + table_function->validateUseToCreateTable(); /// In case of CREATE AS table_function() query we should use global context /// in storage creation because there will be no query context on server startup diff --git a/src/Parsers/FunctionSecretArgumentsFinder.h b/src/Parsers/FunctionSecretArgumentsFinder.h index 152c7ff6d230..bad7c48b2b90 100644 --- a/src/Parsers/FunctionSecretArgumentsFinder.h +++ b/src/Parsers/FunctionSecretArgumentsFinder.h @@ -3,9 +3,12 @@ #include #include #include +#include +#include #include #include #include +#include namespace DB @@ -29,6 +32,21 @@ class AbstractFunction virtual ~Arguments() = default; virtual size_t size() const = 0; virtual std::unique_ptr at(size_t n) const = 0; + void skipArgument(size_t n) { skipped_indexes.insert(n); } + void unskipArguments() { skipped_indexes.clear(); } + size_t getRealIndex(size_t n) const + { + for (auto idx : skipped_indexes) + { + if (n < idx) + break; + ++n; + } + return n; + } + size_t skippedSize() const { return skipped_indexes.size(); } + private: + std::set skipped_indexes; }; virtual ~AbstractFunction() = default; @@ -75,14 +93,15 @@ class FunctionSecretArgumentsFinder { if (index >= function->arguments->size()) return; + auto real_index = function->arguments->getRealIndex(index); if (!result.count) { - result.start = index; + result.start = real_index; result.are_named = argument_is_named; } - chassert(index >= result.start); /// We always check arguments consecutively + chassert(real_index >= result.start); /// We always check arguments consecutively chassert(result.replacement.empty()); /// We shouldn't use replacement with masking other arguments - result.count = index + 1 - result.start; + result.count = real_index + 1 - result.start; if (!argument_is_named) result.are_named = false; } @@ -100,14 +119,18 @@ class FunctionSecretArgumentsFinder { findMongoDBSecretArguments(); } + else if (function->name() == "iceberg") + { + findIcebergFunctionSecretArguments(); + } else if ((function->name() == "s3") || (function->name() == "cosn") || (function->name() == "oss") || - (function->name() == "deltaLake") || (function->name() == "hudi") || (function->name() == "iceberg") || + (function->name() == "deltaLake") || (function->name() == "hudi") || (function->name() == "gcs") || (function->name() == "icebergS3")) { /// s3('url', 'aws_access_key_id', 'aws_secret_access_key', ...) findS3FunctionSecretArguments(/* is_cluster_function= */ false); } - else if (function->name() == "s3Cluster") + else if ((function->name() == "s3Cluster") || (function->name() == "icebergS3Cluster")) { /// s3Cluster('cluster_name', 'url', 'aws_access_key_id', 'aws_secret_access_key', ...) findS3FunctionSecretArguments(/* is_cluster_function= */ true); @@ -117,7 +140,7 @@ class FunctionSecretArgumentsFinder /// azureBlobStorage(connection_string|storage_account_url, container_name, blobpath, account_name, account_key, format, compression, structure) findAzureBlobStorageFunctionSecretArguments(/* is_cluster_function= */ false); } - else if (function->name() == "azureBlobStorageCluster") + else if ((function->name() == "azureBlobStorageCluster") || (function->name() == "icebergAzureCluster")) { /// azureBlobStorageCluster(cluster, connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression, structure]) findAzureBlobStorageFunctionSecretArguments(/* is_cluster_function= */ true); @@ -236,11 +259,18 @@ class FunctionSecretArgumentsFinder findSecretNamedArgument("secret_access_key", 1); return; } + if (is_cluster_function && isNamedCollectionName(1)) + { + /// s3Cluster(cluster, named_collection, ..., secret_access_key = 'secret_access_key', ...) + findSecretNamedArgument("secret_access_key", 2); + return; + } /// We should check other arguments first because we don't need to do any replacement in case of /// s3('url', NOSIGN, 'format' [, 'compression'] [, extra_credentials(..)] [, headers(..)]) /// s3('url', 'format', 'structure' [, 'compression'] [, extra_credentials(..)] [, headers(..)]) size_t count = excludeS3OrURLNestedMaps(); + if ((url_arg_idx + 3 <= count) && (count <= url_arg_idx + 4)) { String second_arg; @@ -305,6 +335,48 @@ class FunctionSecretArgumentsFinder markSecretArgument(url_arg_idx + 4); } + std::string findIcebergStorageType() + { + std::string storage_type = "s3"; + + size_t count = function->arguments->size(); + if (!count) + return storage_type; + + auto storage_type_idx = findNamedArgument(&storage_type, "storage_type"); + if (storage_type_idx != -1) + { + storage_type = Poco::toLower(storage_type); + function->arguments->skipArgument(storage_type_idx); + } + else if (isNamedCollectionName(0)) + { + std::string collection_name; + if (function->arguments->at(0)->tryGetString(&collection_name, true)) + { + NamedCollectionPtr collection = NamedCollectionFactory::instance().tryGet(collection_name); + if (collection && collection->has("storage_type")) + { + storage_type = Poco::toLower(collection->get("storage_type")); + } + } + } + + return storage_type; + } + + void findIcebergFunctionSecretArguments() + { + auto storage_type = findIcebergStorageType(); + + if (storage_type == "s3") + findS3FunctionSecretArguments(false); + else if (storage_type == "azure") + findAzureBlobStorageFunctionSecretArguments(false); + + function->arguments->unskipArguments(); + } + bool maskAzureConnectionString(ssize_t url_arg_idx, bool argument_is_named = false, size_t start = 0) { String url_arg; @@ -328,7 +400,7 @@ class FunctionSecretArgumentsFinder if (RE2::Replace(&url_arg, account_key_pattern, "AccountKey=[HIDDEN]\\1")) { chassert(result.count == 0); /// We shouldn't use replacement with masking other arguments - result.start = url_arg_idx; + result.start = function->arguments->getRealIndex(url_arg_idx); result.are_named = argument_is_named; result.count = 1; result.replacement = url_arg; @@ -487,6 +559,7 @@ class FunctionSecretArgumentsFinder void findTableEngineSecretArguments() { const String & engine_name = function->name(); + if (engine_name == "ExternalDistributed") { /// ExternalDistributed('engine', 'host:port', 'database', 'table', 'user', 'password') @@ -504,10 +577,13 @@ class FunctionSecretArgumentsFinder { findMongoDBSecretArguments(); } + else if (engine_name == "Iceberg") + { + findIcebergTableEngineSecretArguments(); + } else if ((engine_name == "S3") || (engine_name == "COSN") || (engine_name == "OSS") || (engine_name == "DeltaLake") || (engine_name == "Hudi") - || (engine_name == "Iceberg") || (engine_name == "IcebergS3") - || (engine_name == "S3Queue")) + || (engine_name == "IcebergS3") || (engine_name == "S3Queue")) { /// S3('url', ['aws_access_key_id', 'aws_secret_access_key',] ...) findS3TableEngineSecretArguments(); @@ -516,7 +592,7 @@ class FunctionSecretArgumentsFinder { findURLSecretArguments(); } - else if (engine_name == "AzureBlobStorage" || engine_name == "AzureQueue") + else if (engine_name == "AzureBlobStorage" || engine_name == "AzureQueue" || engine_name == "IcebergAzure") { findAzureBlobStorageTableEngineSecretArguments(); } @@ -612,6 +688,18 @@ class FunctionSecretArgumentsFinder markSecretArgument(url_arg_idx + 4); } + void findIcebergTableEngineSecretArguments() + { + auto storage_type = findIcebergStorageType(); + + if (storage_type == "s3") + findS3TableEngineSecretArguments(); + else if (storage_type == "azure") + findAzureBlobStorageTableEngineSecretArguments(); + + function->arguments->unskipArguments(); + } + void findDatabaseEngineSecretArguments() { const String & engine_name = function->name(); diff --git a/src/Parsers/FunctionSecretArgumentsFinderAST.h b/src/Parsers/FunctionSecretArgumentsFinderAST.h index a260c0d58da6..3624d7a7e87b 100644 --- a/src/Parsers/FunctionSecretArgumentsFinderAST.h +++ b/src/Parsers/FunctionSecretArgumentsFinderAST.h @@ -54,10 +54,13 @@ class FunctionAST : public AbstractFunction { public: explicit ArgumentsAST(const ASTs * arguments_) : arguments(arguments_) {} - size_t size() const override { return arguments ? arguments->size() : 0; } + size_t size() const override + { /// size withous skipped indexes + return arguments ? arguments->size() - skippedSize() : 0; + } std::unique_ptr at(size_t n) const override - { - return std::make_unique(arguments->at(n).get()); + { /// n is relative index, some can be skipped + return std::make_unique(arguments->at(getRealIndex(n)).get()); } private: const ASTs * arguments = nullptr; diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index 526acc140c9c..acf615f7704b 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -35,7 +35,6 @@ #include #include #include -#include #include #include #include diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index fd1548bdaa11..09441e2a95c3 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -65,6 +65,9 @@ class RestorerFromBackup; class ConditionSelectivityEstimator; +class IObjectStorage; +using ObjectStoragePtr = std::shared_ptr; + class ActionsDAG; /** Storage. Describes the table. Responsible for diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index d304eeea4bf2..4cce11248ff9 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -39,6 +39,12 @@ namespace Setting extern const SettingsBool parallel_replicas_local_plan; extern const SettingsString cluster_for_parallel_replicas; extern const SettingsNonZeroUInt64 max_parallel_replicas; + extern const SettingsUInt64 object_storage_max_nodes; +} + +namespace ErrorCodes +{ + extern const int NOT_IMPLEMENTED; } IStorageCluster::IStorageCluster( @@ -123,13 +129,21 @@ void IStorageCluster::read( SelectQueryInfo & query_info, ContextPtr context, QueryProcessingStage::Enum processed_stage, - size_t /*max_block_size*/, - size_t /*num_streams*/) + size_t max_block_size, + size_t num_streams) { + auto cluster_name_from_settings = getClusterName(context); + + if (cluster_name_from_settings.empty()) + { + readFallBackToPure(query_plan, column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams); + return; + } + storage_snapshot->check(column_names); updateBeforeRead(context); - auto cluster = getCluster(context); + auto cluster = getClusterImpl(context, cluster_name_from_settings, context->getSettingsRef()[Setting::object_storage_max_nodes]); /// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*) @@ -176,6 +190,20 @@ void IStorageCluster::read( query_plan.addStep(std::move(reading)); } +SinkToStoragePtr IStorageCluster::write( + const ASTPtr & query, + const StorageMetadataPtr & metadata_snapshot, + ContextPtr context, + bool async_insert) +{ + auto cluster_name_from_settings = getClusterName(context); + + if (cluster_name_from_settings.empty()) + return writeFallBackToPure(query, metadata_snapshot, context, async_insert); + + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method write is not supported by storage {}", getName()); +} + void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) { createExtension(nullptr); @@ -262,9 +290,9 @@ ContextPtr ReadFromCluster::updateSettings(const Settings & settings) return new_context; } -ClusterPtr IStorageCluster::getCluster(ContextPtr context) const +ClusterPtr IStorageCluster::getClusterImpl(ContextPtr context, const String & cluster_name_, size_t max_hosts) { - return context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef()); + return context->getCluster(cluster_name_)->getClusterWithReplicasAsShards(context->getSettingsRef(), /* max_replicas_from_shard */ 0, max_hosts); } } diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index 2c81539e669e..d7e97e87ae4a 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -28,10 +28,17 @@ class IStorageCluster : public IStorage SelectQueryInfo & query_info, ContextPtr context, QueryProcessingStage::Enum processed_stage, - size_t /*max_block_size*/, - size_t /*num_streams*/) override; + size_t max_block_size, + size_t num_streams) override; + + SinkToStoragePtr write( + const ASTPtr & query, + const StorageMetadataPtr & metadata_snapshot, + ContextPtr context, + bool async_insert) override; + + ClusterPtr getCluster(ContextPtr context) const { return getClusterImpl(context, cluster_name); } - ClusterPtr getCluster(ContextPtr context) const; /// Query is needed for pruning by virtual columns (_file, _path) virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const = 0; @@ -42,11 +49,38 @@ class IStorageCluster : public IStorage bool supportsOptimizationToSubcolumns() const override { return false; } bool supportsTrivialCountOptimization(const StorageSnapshotPtr &, ContextPtr) const override { return true; } + const String & getOriginalClusterName() const { return cluster_name; } + virtual String getClusterName(ContextPtr /* context */) const { return getOriginalClusterName(); } + protected: virtual void updateBeforeRead(const ContextPtr &) {} virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {} + virtual void readFallBackToPure( + QueryPlan & /* query_plan */, + const Names & /* column_names */, + const StorageSnapshotPtr & /* storage_snapshot */, + SelectQueryInfo & /* query_info */, + ContextPtr /* context */, + QueryProcessingStage::Enum /* processed_stage */, + size_t /* max_block_size */, + size_t /* num_streams */) + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method readFallBackToPure is not supported by storage {}", getName()); + } + + virtual SinkToStoragePtr writeFallBackToPure( + const ASTPtr & /*query*/, + const StorageMetadataPtr & /*metadata_snapshot*/, + ContextPtr /*context*/, + bool /*async_insert*/) + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method writeFallBackToPure is not supported by storage {}", getName()); + } + private: + static ClusterPtr getClusterImpl(ContextPtr context, const String & cluster_name_, size_t max_hosts = 0); + LoggerPtr log; String cluster_name; }; diff --git a/src/Storages/ObjectStorage/Azure/Configuration.cpp b/src/Storages/ObjectStorage/Azure/Configuration.cpp index 35c58a7e952e..24dbc95b112e 100644 --- a/src/Storages/ObjectStorage/Azure/Configuration.cpp +++ b/src/Storages/ObjectStorage/Azure/Configuration.cpp @@ -54,6 +54,7 @@ const std::unordered_set optional_configuration_keys = { "account_key", "connection_string", "storage_account_url", + "storage_type", }; void StorageAzureConfiguration::check(ContextPtr context) const @@ -133,8 +134,6 @@ void StorageAzureConfiguration::fromNamedCollection(const NamedCollection & coll String connection_url; String container_name; - std::optional account_name; - std::optional account_key; if (collection.has("connection_string")) connection_url = collection.get("connection_string"); @@ -150,9 +149,9 @@ void StorageAzureConfiguration::fromNamedCollection(const NamedCollection & coll if (collection.has("account_key")) account_key = collection.get("account_key"); - structure = collection.getOrDefault("structure", "auto"); - format = collection.getOrDefault("format", format); - compression_method = collection.getOrDefault("compression_method", collection.getOrDefault("compression", "auto")); + setStructure(collection.getOrDefault("structure", "auto")); + setFormat(collection.getOrDefault("format", getFormat())); + setCompressionMethod(collection.getOrDefault("compression_method", collection.getOrDefault("compression", "auto"))); blobs_paths = {blob_path}; connection_params = getConnectionParams(connection_url, container_name, account_name, account_key, context); @@ -174,14 +173,10 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context, std::unordered_map engine_args_to_idx; - String connection_url = checkAndGetLiteralArgument(engine_args[0], "connection_string/storage_account_url"); String container_name = checkAndGetLiteralArgument(engine_args[1], "container"); blob_path = checkAndGetLiteralArgument(engine_args[2], "blobpath"); - std::optional account_name; - std::optional account_key; - auto is_format_arg = [] (const std::string & s) -> bool { return s == "auto" || FormatFactory::instance().getAllFormats().contains(Poco::toLower(s)); @@ -192,12 +187,12 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context, auto fourth_arg = checkAndGetLiteralArgument(engine_args[3], "format/account_name"); if (is_format_arg(fourth_arg)) { - format = fourth_arg; + setFormat(fourth_arg); } else { if (with_structure) - structure = fourth_arg; + setStructure(fourth_arg); else throw Exception( ErrorCodes::BAD_ARGUMENTS, @@ -209,8 +204,8 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context, auto fourth_arg = checkAndGetLiteralArgument(engine_args[3], "format/account_name"); if (is_format_arg(fourth_arg)) { - format = fourth_arg; - compression_method = checkAndGetLiteralArgument(engine_args[4], "compression"); + setFormat(fourth_arg); + setCompressionMethod(checkAndGetLiteralArgument(engine_args[4], "compression")); } else { @@ -225,9 +220,9 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context, { if (with_structure) { - format = fourth_arg; - compression_method = checkAndGetLiteralArgument(engine_args[4], "compression"); - structure = checkAndGetLiteralArgument(engine_args[5], "structure"); + setFormat(fourth_arg); + setCompressionMethod(checkAndGetLiteralArgument(engine_args[4], "compression")); + setStructure(checkAndGetLiteralArgument(engine_args[5], "structure")); } else throw Exception(ErrorCodes::BAD_ARGUMENTS, "Format and compression must be last arguments"); @@ -239,12 +234,12 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context, auto sixth_arg = checkAndGetLiteralArgument(engine_args[5], "format/structure"); if (is_format_arg(sixth_arg)) { - format = sixth_arg; + setFormat(sixth_arg); } else { if (with_structure) - structure = sixth_arg; + setStructure(sixth_arg); else throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown format {}", sixth_arg); } @@ -263,8 +258,8 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context, auto sixth_arg = checkAndGetLiteralArgument(engine_args[5], "format/account_name"); if (!is_format_arg(sixth_arg)) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown format {}", sixth_arg); - format = sixth_arg; - compression_method = checkAndGetLiteralArgument(engine_args[6], "compression"); + setFormat(sixth_arg); + setCompressionMethod(checkAndGetLiteralArgument(engine_args[6], "compression")); } else if (with_structure && engine_args.size() == 8) { @@ -274,9 +269,9 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context, auto sixth_arg = checkAndGetLiteralArgument(engine_args[5], "format"); if (!is_format_arg(sixth_arg)) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown format {}", sixth_arg); - format = sixth_arg; - compression_method = checkAndGetLiteralArgument(engine_args[6], "compression"); - structure = checkAndGetLiteralArgument(engine_args[7], "structure"); + setFormat(sixth_arg); + setCompressionMethod (checkAndGetLiteralArgument(engine_args[6], "compression")); + setStructure(checkAndGetLiteralArgument(engine_args[7], "structure")); } blobs_paths = {blob_path}; @@ -447,6 +442,22 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded( } } +ASTPtr StorageAzureConfiguration::createArgsWithAccessData() const +{ + auto arguments = std::make_shared(); + + arguments->children.push_back(std::make_shared(connection_params.endpoint.storage_account_url)); + arguments->children.push_back(std::make_shared(connection_params.endpoint.container_name)); + arguments->children.push_back(std::make_shared(blob_path)); + if (account_name && account_key) + { + arguments->children.push_back(std::make_shared(*account_name)); + arguments->children.push_back(std::make_shared(*account_key)); + } + + return arguments; +} + } #endif diff --git a/src/Storages/ObjectStorage/Azure/Configuration.h b/src/Storages/ObjectStorage/Azure/Configuration.h index 72124465c462..c915696f2448 100644 --- a/src/Storages/ObjectStorage/Azure/Configuration.h +++ b/src/Storages/ObjectStorage/Azure/Configuration.h @@ -79,6 +79,8 @@ class StorageAzureConfiguration : public StorageObjectStorage::Configuration ContextPtr context, bool with_structure) override; + ASTPtr createArgsWithAccessData() const override; + protected: void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override; void fromAST(ASTs & args, ContextPtr context, bool with_structure) override; @@ -86,6 +88,8 @@ class StorageAzureConfiguration : public StorageObjectStorage::Configuration std::string blob_path; std::vector blobs_paths; AzureBlobStorage::ConnectionParams connection_params; + std::optional account_name; + std::optional account_key; }; } diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h index 395d690cb2dc..c4e7fdf1f7a2 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -13,7 +14,11 @@ #include #include #include -#include "Storages/ColumnsDescription.h" +#include +#include +#include +#include +#include #include #include @@ -30,6 +35,7 @@ namespace DB namespace ErrorCodes { extern const int FORMAT_VERSION_TOO_OLD; +extern const int LOGICAL_ERROR; } namespace StorageObjectStorageSetting @@ -259,15 +265,244 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl using StorageS3IcebergConfiguration = DataLakeConfiguration; #endif -#if USE_AZURE_BLOB_STORAGE +# if USE_AZURE_BLOB_STORAGE using StorageAzureIcebergConfiguration = DataLakeConfiguration; #endif -#if USE_HDFS +# if USE_HDFS using StorageHDFSIcebergConfiguration = DataLakeConfiguration; #endif using StorageLocalIcebergConfiguration = DataLakeConfiguration; + +/// Class detects storage type by `storage_type` parameter if exists +/// and uses appropriate implementation - S3, Azure, HDFS or Local +class StorageIcebergConfiguration : public StorageObjectStorage::Configuration, public std::enable_shared_from_this +{ + friend class StorageObjectStorage::Configuration; + +public: + ObjectStorageType getType() const override { return getImpl().getType(); } + + std::string getTypeName() const override { return getImpl().getTypeName(); } + std::string getEngineName() const override { return getImpl().getEngineName(); } + std::string getNamespaceType() const override { return getImpl().getNamespaceType(); } + + Path getPath() const override { return getImpl().getPath(); } + void setPath(const Path & path) override { getImpl().setPath(path); } + + const Paths & getPaths() const override { return getImpl().getPaths(); } + void setPaths(const Paths & paths) override { getImpl().setPaths(paths); } + + String getDataSourceDescription() const override { return getImpl().getDataSourceDescription(); } + String getNamespace() const override { return getImpl().getNamespace(); } + + StorageObjectStorage::QuerySettings getQuerySettings(const ContextPtr & context) const override + { return getImpl().getQuerySettings(context); } + + void addStructureAndFormatToArgsIfNeeded( + ASTs & args, const String & structure_, const String & format_, ContextPtr context, bool with_structure) override + { getImpl().addStructureAndFormatToArgsIfNeeded(args, structure_, format_, context, with_structure); } + + std::string getPathWithoutGlobs() const override { return getImpl().getPathWithoutGlobs(); } + + bool isArchive() const override { return getImpl().isArchive(); } + std::string getPathInArchive() const override { return getImpl().getPathInArchive(); } + + void check(ContextPtr context) const override { getImpl().check(context); } + void validateNamespace(const String & name) const override { getImpl().validateNamespace(name); } + + ObjectStoragePtr createObjectStorage(ContextPtr context, bool is_readonly) override + { return getImpl().createObjectStorage(context, is_readonly); } + StorageObjectStorage::ConfigurationPtr clone() override { return getImpl().clone(); } + bool isStaticConfiguration() const override { return getImpl().isStaticConfiguration(); } + + bool isDataLakeConfiguration() const override { return getImpl().isDataLakeConfiguration(); } + + bool hasExternalDynamicMetadata() override { return getImpl().hasExternalDynamicMetadata(); } + + std::shared_ptr getInitialSchemaByPath(const String & path) const override + { return getImpl().getInitialSchemaByPath(path); } + + std::shared_ptr getSchemaTransformer(const String & data_path) const override + { return getImpl().getSchemaTransformer(data_path); } + + ColumnsDescription updateAndGetCurrentSchema(ObjectStoragePtr object_storage, ContextPtr context) override + { return getImpl().updateAndGetCurrentSchema(object_storage, context); } + + ReadFromFormatInfo prepareReadingFromFormat( + ObjectStoragePtr object_storage, + const Strings & requested_columns, + const StorageSnapshotPtr & storage_snapshot, + bool supports_subset_of_columns, + ContextPtr local_context) override + { + return getImpl().prepareReadingFromFormat( + object_storage, + requested_columns, + storage_snapshot, + supports_subset_of_columns, + local_context); + } + + std::optional tryGetTableStructureFromMetadata() const override + { return getImpl().tryGetTableStructureFromMetadata(); } + + void update(ObjectStoragePtr object_storage, ContextPtr local_context) override + { return getImpl().update(object_storage, local_context); } + + void initialize( + ASTs & engine_args, + ContextPtr local_context, + bool with_table_structure, + std::shared_ptr settings) override + { + createDynamicConfiguration(engine_args, local_context); + getImpl().initialize(engine_args, local_context, with_table_structure, settings); + } + + ASTPtr createArgsWithAccessData() const override + { + return getImpl().createArgsWithAccessData(); + } + + const String & getFormat() const override { return getImpl().getFormat(); } + const String & getCompressionMethod() const override { return getImpl().getCompressionMethod(); } + const String & getStructure() const override { return getImpl().getStructure(); } + + void setFormat(const String & format_) override { getImpl().setFormat(format_); } + void setCompressionMethod(const String & compression_method_) override { getImpl().setCompressionMethod(compression_method_); } + void setStructure(const String & structure_) override { getImpl().setStructure(structure_); } + +protected: + void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override + { return getImpl().fromNamedCollection(collection, context); } + void fromAST(ASTs & args, ContextPtr context, bool with_structure) override + { return getImpl().fromAST(args, context, with_structure); } + + /// Find storage_type argument and remove it from args if exists. + /// Return storage type. + ObjectStorageType extractDynamicStorageType(ASTs & args, ContextPtr context, ASTPtr * type_arg = nullptr) const override + { + static const auto storage_type_name = "storage_type"; + + if (auto named_collection = tryGetNamedCollectionWithOverrides(args, context)) + { + if (named_collection->has(storage_type_name)) + { + return objectStorageTypeFromString(named_collection->get(storage_type_name)); + } + } + + auto type_it = args.end(); + + /// S3 by default for backward compatibility + /// Iceberg without storage_type == IcebergS3 + ObjectStorageType type = ObjectStorageType::S3; + + for (auto arg_it = args.begin(); arg_it != args.end(); ++arg_it) + { + const auto * type_ast_function = (*arg_it)->as(); + + if (type_ast_function && type_ast_function->name == "equals" + && type_ast_function->arguments && type_ast_function->arguments->children.size() == 2) + { + auto name = type_ast_function->arguments->children[0]->as(); + + if (name && name->name() == storage_type_name) + { + if (type_it != args.end()) + { + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "DataLake can have only one key-value argument: storage_type='type'."); + } + + auto value = type_ast_function->arguments->children[1]->as(); + + if (!value) + { + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "DataLake parameter 'storage_type' has wrong type, string literal expected."); + } + + if (value->value.getType() != Field::Types::String) + { + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "DataLake parameter 'storage_type' has wrong value type, string expected."); + } + + type = objectStorageTypeFromString(value->value.safeGet()); + + type_it = arg_it; + } + } + } + + if (type_it != args.end()) + { + if (type_arg) + *type_arg = *type_it; + args.erase(type_it); + } + + return type; + } + + void createDynamicConfiguration(ASTs & args, ContextPtr context) + { + ObjectStorageType type = extractDynamicStorageType(args, context); + createDynamicStorage(type); + } + +private: + inline StorageObjectStorage::Configuration & getImpl() const + { + if (!impl) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Dynamic DataLake storage not initialized"); + + return *impl; + } + + void createDynamicStorage(ObjectStorageType type) + { + if (impl) + { + if (impl->getType() == type) + return; + + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't change datalake engine storage"); + } + + switch (type) + { +# if USE_AWS_S3 + case ObjectStorageType::S3: + impl = std::make_unique(); + break; +# endif +# if USE_AZURE_BLOB_STORAGE + case ObjectStorageType::Azure: + impl = std::make_unique(); + break; +# endif +# if USE_HDFS + case ObjectStorageType::HDFS: + impl = std::make_unique(); + break; +# endif + case ObjectStorageType::Local: + impl = std::make_unique(); + break; + default: + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unsuported DataLake storage {}", type); + } + } + + std::shared_ptr impl; +}; #endif #if USE_PARQUET && USE_AWS_S3 diff --git a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp index 77ef769ed0e9..8e7eebae313d 100644 --- a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp @@ -44,7 +44,7 @@ Strings HudiMetadata::getDataFilesImpl() const { auto configuration_ptr = configuration.lock(); auto log = getLogger("HudiMetadata"); - const auto keys = listFiles(*object_storage, *configuration_ptr, "", Poco::toLower(configuration_ptr->format)); + const auto keys = listFiles(*object_storage, *configuration_ptr, "", Poco::toLower(configuration_ptr->getFormat())); using Partition = std::string; using FileID = std::string; diff --git a/src/Storages/ObjectStorage/HDFS/Configuration.cpp b/src/Storages/ObjectStorage/HDFS/Configuration.cpp index 143cdc756ea9..ec09b2f0e74e 100644 --- a/src/Storages/ObjectStorage/HDFS/Configuration.cpp +++ b/src/Storages/ObjectStorage/HDFS/Configuration.cpp @@ -111,23 +111,23 @@ void StorageHDFSConfiguration::fromAST(ASTs & args, ContextPtr context, bool wit if (args.size() > 1) { - format = checkAndGetLiteralArgument(args[1], "format_name"); + setFormat(checkAndGetLiteralArgument(args[1], "format_name")); } if (with_structure) { if (args.size() > 2) { - structure = checkAndGetLiteralArgument(args[2], "structure"); + setStructure(checkAndGetLiteralArgument(args[2], "structure")); } if (args.size() > 3) { - compression_method = checkAndGetLiteralArgument(args[3], "compression_method"); + setCompressionMethod(checkAndGetLiteralArgument(args[3], "compression_method")); } } else if (args.size() > 2) { - compression_method = checkAndGetLiteralArgument(args[2], "compression_method"); + setCompressionMethod(checkAndGetLiteralArgument(args[2], "compression_method")); } setURL(url_str); @@ -143,10 +143,10 @@ void StorageHDFSConfiguration::fromNamedCollection(const NamedCollection & colle else url_str = collection.get("url"); - format = collection.getOrDefault("format", "auto"); - compression_method = collection.getOrDefault("compression_method", - collection.getOrDefault("compression", "auto")); - structure = collection.getOrDefault("structure", "auto"); + setFormat(collection.getOrDefault("format", "auto")); + setCompressionMethod(collection.getOrDefault("compression_method", + collection.getOrDefault("compression", "auto"))); + setStructure(collection.getOrDefault("structure", "auto")); setURL(url_str); } @@ -236,6 +236,13 @@ void StorageHDFSConfiguration::addStructureAndFormatToArgsIfNeeded( } } +ASTPtr StorageHDFSConfiguration::createArgsWithAccessData() const +{ + auto arguments = std::make_shared(); + arguments->children.push_back(std::make_shared(url + path)); + return arguments; +} + } #endif diff --git a/src/Storages/ObjectStorage/HDFS/Configuration.h b/src/Storages/ObjectStorage/HDFS/Configuration.h index db8ab7f9e4db..f38382e173ed 100644 --- a/src/Storages/ObjectStorage/HDFS/Configuration.h +++ b/src/Storages/ObjectStorage/HDFS/Configuration.h @@ -65,6 +65,8 @@ class StorageHDFSConfiguration : public StorageObjectStorage::Configuration ContextPtr context, bool with_structure) override; + ASTPtr createArgsWithAccessData() const override; + private: void fromNamedCollection(const NamedCollection &, ContextPtr context) override; void fromAST(ASTs & args, ContextPtr, bool /* with_structure */) override; diff --git a/src/Storages/ObjectStorage/Local/Configuration.cpp b/src/Storages/ObjectStorage/Local/Configuration.cpp index bae84714273c..c6dfa3318331 100644 --- a/src/Storages/ObjectStorage/Local/Configuration.cpp +++ b/src/Storages/ObjectStorage/Local/Configuration.cpp @@ -24,9 +24,9 @@ extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; void StorageLocalConfiguration::fromNamedCollection(const NamedCollection & collection, ContextPtr) { path = collection.get("path"); - format = collection.getOrDefault("format", "auto"); - compression_method = collection.getOrDefault("compression_method", collection.getOrDefault("compression", "auto")); - structure = collection.getOrDefault("structure", "auto"); + setFormat(collection.getOrDefault("format", "auto")); + setCompressionMethod(collection.getOrDefault("compression_method", collection.getOrDefault("compression", "auto"))); + setStructure(collection.getOrDefault("structure", "auto")); paths = {path}; } @@ -46,23 +46,23 @@ void StorageLocalConfiguration::fromAST(ASTs & args, ContextPtr context, bool wi if (args.size() > 1) { - format = checkAndGetLiteralArgument(args[1], "format_name"); + setFormat(checkAndGetLiteralArgument(args[1], "format_name")); } if (with_structure) { if (args.size() > 2) { - structure = checkAndGetLiteralArgument(args[2], "structure"); + setStructure(checkAndGetLiteralArgument(args[2], "structure")); } if (args.size() > 3) { - compression_method = checkAndGetLiteralArgument(args[3], "compression_method"); + setCompressionMethod(checkAndGetLiteralArgument(args[3], "compression_method")); } } else if (args.size() > 2) { - compression_method = checkAndGetLiteralArgument(args[2], "compression_method"); + setCompressionMethod(checkAndGetLiteralArgument(args[2], "compression_method")); } paths = {path}; } diff --git a/src/Storages/ObjectStorage/ReadBufferIterator.cpp b/src/Storages/ObjectStorage/ReadBufferIterator.cpp index df9faa048c2b..ba9066daeb1b 100644 --- a/src/Storages/ObjectStorage/ReadBufferIterator.cpp +++ b/src/Storages/ObjectStorage/ReadBufferIterator.cpp @@ -38,8 +38,8 @@ ReadBufferIterator::ReadBufferIterator( , read_keys(read_keys_) , prev_read_keys_size(read_keys_.size()) { - if (configuration->format != "auto") - format = configuration->format; + if (configuration->getFormat() != "auto") + format = configuration->getFormat(); } SchemaCache::Key ReadBufferIterator::getKeyForSchemaCache(const ObjectInfo & object_info, const String & format_name) const @@ -152,7 +152,7 @@ std::unique_ptr ReadBufferIterator::recreateLastReadBuffer() const auto & path = current_object_info->isArchive() ? current_object_info->getPathToArchive() : current_object_info->getPath(); auto impl = StorageObjectStorageSource::createReadBuffer(*current_object_info, object_storage, context, getLogger("ReadBufferIterator")); - const auto compression_method = chooseCompressionMethod(current_object_info->getFileName(), configuration->compression_method); + const auto compression_method = chooseCompressionMethod(current_object_info->getFileName(), configuration->getCompressionMethod()); const auto zstd_window = static_cast(context->getSettingsRef()[Setting::zstd_window_log_max]); return wrapReadBufferWithCompressionMethod(std::move(impl), compression_method, zstd_window); @@ -268,13 +268,13 @@ ReadBufferIterator::Data ReadBufferIterator::next() using ObjectInfoInArchive = StorageObjectStorageSource::ArchiveIterator::ObjectInfoInArchive; if (const auto * object_info_in_archive = dynamic_cast(current_object_info.get())) { - compression_method = chooseCompressionMethod(filename, configuration->compression_method); + compression_method = chooseCompressionMethod(filename, configuration->getCompressionMethod()); const auto & archive_reader = object_info_in_archive->archive_reader; read_buf = archive_reader->readFile(object_info_in_archive->path_in_archive, /*throw_on_not_found=*/true); } else { - compression_method = chooseCompressionMethod(filename, configuration->compression_method); + compression_method = chooseCompressionMethod(filename, configuration->getCompressionMethod()); read_buf = StorageObjectStorageSource::createReadBuffer(*current_object_info, object_storage, getContext(), getLogger("ReadBufferIterator")); } diff --git a/src/Storages/ObjectStorage/S3/Configuration.cpp b/src/Storages/ObjectStorage/S3/Configuration.cpp index 9ffc449a1524..095cf385d880 100644 --- a/src/Storages/ObjectStorage/S3/Configuration.cpp +++ b/src/Storages/ObjectStorage/S3/Configuration.cpp @@ -74,7 +74,8 @@ static const std::unordered_set optional_configuration_keys = "max_single_part_upload_size", "max_connections", "expiration_window_seconds", - "no_sign_request" + "no_sign_request", + "storage_type", }; String StorageS3Configuration::getDataSourceDescription() const @@ -179,9 +180,9 @@ void StorageS3Configuration::fromNamedCollection(const NamedCollection & collect auth_settings[S3AuthSetting::expiration_window_seconds] = collection.getOrDefault("expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS); auth_settings[S3AuthSetting::session_token] = collection.getOrDefault("session_token", ""); - format = collection.getOrDefault("format", format); - compression_method = collection.getOrDefault("compression_method", collection.getOrDefault("compression", "auto")); - structure = collection.getOrDefault("structure", "auto"); + setFormat(collection.getOrDefault("format", getFormat())); + setCompressionMethod(collection.getOrDefault("compression_method", collection.getOrDefault("compression", "auto"))); + setStructure(collection.getOrDefault("structure", "auto")); request_settings = S3::S3RequestSettings(collection, settings, /* validate_settings */true); @@ -364,18 +365,18 @@ void StorageS3Configuration::fromAST(ASTs & args, ContextPtr context, bool with_ if (engine_args_to_idx.contains("format")) { - format = checkAndGetLiteralArgument(args[engine_args_to_idx["format"]], "format"); + auto format_ = checkAndGetLiteralArgument(args[engine_args_to_idx["format"]], "format"); /// Set format to configuration only of it's not 'auto', /// because we can have default format set in configuration. - if (format != "auto") - format = format; + if (format_ != "auto") + setFormat(format_); } if (engine_args_to_idx.contains("structure")) - structure = checkAndGetLiteralArgument(args[engine_args_to_idx["structure"]], "structure"); + setStructure(checkAndGetLiteralArgument(args[engine_args_to_idx["structure"]], "structure")); if (engine_args_to_idx.contains("compression_method")) - compression_method = checkAndGetLiteralArgument(args[engine_args_to_idx["compression_method"]], "compression_method"); + setCompressionMethod(checkAndGetLiteralArgument(args[engine_args_to_idx["compression_method"]], "compression_method")); if (engine_args_to_idx.contains("access_key_id")) auth_settings[S3AuthSetting::access_key_id] = checkAndGetLiteralArgument(args[engine_args_to_idx["access_key_id"]], "access_key_id"); @@ -586,6 +587,30 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded( } } +ASTPtr StorageS3Configuration::createArgsWithAccessData() const +{ + auto arguments = std::make_shared(); + + arguments->children.push_back(std::make_shared(url.uri_str)); + if (auth_settings[S3AuthSetting::no_sign_request]) + { + arguments->children.push_back(std::make_shared("NOSIGN")); + } + else + { + arguments->children.push_back(std::make_shared(auth_settings[S3AuthSetting::access_key_id].value)); + arguments->children.push_back(std::make_shared(auth_settings[S3AuthSetting::secret_access_key].value)); + if (!auth_settings[S3AuthSetting::session_token].value.empty()) + arguments->children.push_back(std::make_shared(auth_settings[S3AuthSetting::session_token].value)); + if (getFormat() != "auto") + arguments->children.push_back(std::make_shared(getFormat())); + if (!getCompressionMethod().empty()) + arguments->children.push_back(std::make_shared(getCompressionMethod())); + } + + return arguments; +} + } #endif diff --git a/src/Storages/ObjectStorage/S3/Configuration.h b/src/Storages/ObjectStorage/S3/Configuration.h index ad2d136e0586..c9e9ffd7b8fc 100644 --- a/src/Storages/ObjectStorage/S3/Configuration.h +++ b/src/Storages/ObjectStorage/S3/Configuration.h @@ -97,6 +97,8 @@ class StorageS3Configuration : public StorageObjectStorage::Configuration ContextPtr context, bool with_structure) override; + ASTPtr createArgsWithAccessData() const override; + private: void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override; void fromAST(ASTs & args, ContextPtr context, bool with_structure) override; diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index 229a6e47b545..a678b803ebe0 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -58,6 +58,9 @@ String StorageObjectStorage::getPathSample(ContextPtr context) if (context->getSettingsRef()[Setting::use_hive_partitioning]) local_distributed_processing = false; + if (!configuration->isArchive() && !configuration->isPathWithGlobs() && !local_distributed_processing) + return configuration->getPath(); + auto file_iterator = StorageObjectStorageSource::createFileIterator( configuration, query_settings, @@ -70,9 +73,6 @@ String StorageObjectStorage::getPathSample(ContextPtr context) {} // file_progress_callback ); - if (!configuration->isArchive() && !configuration->isPathWithGlobs() && !local_distributed_processing) - return configuration->getPath(); - if (auto file = file_iterator->next(0)) return file->getPath(); return ""; @@ -99,7 +99,7 @@ StorageObjectStorage::StorageObjectStorage( , distributed_processing(distributed_processing_) , log(getLogger(fmt::format("Storage{}({})", configuration->getEngineName(), table_id_.getFullTableName()))) { - bool do_lazy_init = lazy_init && !columns_.empty() && !configuration->format.empty(); + bool do_lazy_init = lazy_init && !columns_.empty() && !configuration->getFormat().empty(); bool failed_init = false; auto do_init = [&]() { @@ -114,7 +114,7 @@ StorageObjectStorage::StorageObjectStorage( { // If we don't have format or schema yet, we can't ignore failed configuration update, // because relevant configuration is crucial for format and schema inference - if (mode <= LoadingStrictnessLevel::CREATE || columns_.empty() || (configuration->format == "auto")) + if (mode <= LoadingStrictnessLevel::CREATE || columns_.empty() || (configuration->getFormat() == "auto")) { throw; } @@ -131,7 +131,7 @@ StorageObjectStorage::StorageObjectStorage( std::string sample_path; ColumnsDescription columns{columns_}; - resolveSchemaAndFormat(columns, configuration->format, object_storage, configuration, format_settings, sample_path, context); + resolveSchemaAndFormat(columns, object_storage, configuration, format_settings, sample_path, context); configuration->check(context); StorageInMemoryMetadata metadata; @@ -171,23 +171,30 @@ String StorageObjectStorage::getName() const bool StorageObjectStorage::prefersLargeBlocks() const { - return FormatFactory::instance().checkIfOutputFormatPrefersLargeBlocks(configuration->format); + return FormatFactory::instance().checkIfOutputFormatPrefersLargeBlocks(configuration->getFormat()); } bool StorageObjectStorage::parallelizeOutputAfterReading(ContextPtr context) const { - return FormatFactory::instance().checkParallelizeOutputAfterReading(configuration->format, context); + return FormatFactory::instance().checkParallelizeOutputAfterReading(configuration->getFormat(), context); } bool StorageObjectStorage::supportsSubsetOfColumns(const ContextPtr & context) const { - return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration->format, context, format_settings); + return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration->getFormat(), context, format_settings); } void StorageObjectStorage::Configuration::update(ObjectStoragePtr object_storage_ptr, ContextPtr context) { IObjectStorage::ApplyNewSettingsOptions options{.allow_client_change = !isStaticConfiguration()}; object_storage_ptr->applyNewSettings(context->getConfigRef(), getTypeName() + ".", context, options); + updated = true; +} + +void StorageObjectStorage::Configuration::updateIfRequired(ObjectStoragePtr object_storage_ptr, ContextPtr local_context) +{ + if (!updated) + update(object_storage_ptr, local_context); } bool StorageObjectStorage::hasExternalDynamicMetadata() const @@ -273,7 +280,7 @@ class ReadFromObjectStorageStep : public SourceStepWithFilter num_streams = 1; } - const size_t max_parsing_threads = num_streams >= max_threads ? 1 : (max_threads / std::max(num_streams, 1ul)); + const size_t max_parsing_threads = (distributed_processing || num_streams >= max_threads) ? 1 : (max_threads / std::max(num_streams, 1ul)); for (size_t i = 0; i < num_streams; ++i) { @@ -508,7 +515,7 @@ ColumnsDescription StorageObjectStorage::resolveSchemaFromData( ObjectInfos read_keys; auto iterator = createReadBufferIterator(object_storage, configuration, format_settings, read_keys, context); - auto schema = readSchemaFromFormat(configuration->format, format_settings, *iterator, context); + auto schema = readSchemaFromFormat(configuration->getFormat(), format_settings, *iterator, context); sample_path = iterator->getLastFilePath(); return schema; } @@ -529,7 +536,7 @@ std::string StorageObjectStorage::resolveFormatFromData( std::pair StorageObjectStorage::resolveSchemaAndFormatFromData( const ObjectStoragePtr & object_storage, - const ConfigurationPtr & configuration, + ConfigurationPtr & configuration, const std::optional & format_settings, std::string & sample_path, const ContextPtr & context) @@ -538,13 +545,13 @@ std::pair StorageObjectStorage::resolveSchemaAn auto iterator = createReadBufferIterator(object_storage, configuration, format_settings, read_keys, context); auto [columns, format] = detectFormatAndReadSchema(format_settings, *iterator, context); sample_path = iterator->getLastFilePath(); - configuration->format = format; + configuration->setFormat(format); return std::pair(columns, format); } void StorageObjectStorage::addInferredEngineArgsToCreateQuery(ASTs & args, const ContextPtr & context) const { - configuration->addStructureAndFormatToArgsIfNeeded(args, "", configuration->format, context, /*with_structure=*/false); + configuration->addStructureAndFormatToArgsIfNeeded(args, "", configuration->getFormat(), context, /*with_structure=*/false); } SchemaCache & StorageObjectStorage::getSchemaCache(const ContextPtr & context, const std::string & storage_type_name) @@ -579,36 +586,35 @@ SchemaCache & StorageObjectStorage::getSchemaCache(const ContextPtr & context, c } void StorageObjectStorage::Configuration::initialize( - Configuration & configuration_to_initialize, ASTs & engine_args, ContextPtr local_context, bool with_table_structure, StorageObjectStorageSettingsPtr settings) { if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, local_context)) - configuration_to_initialize.fromNamedCollection(*named_collection, local_context); + fromNamedCollection(*named_collection, local_context); else - configuration_to_initialize.fromAST(engine_args, local_context, with_table_structure); + fromAST(engine_args, local_context, with_table_structure); - if (configuration_to_initialize.format == "auto") + if (format == "auto") { - if (configuration_to_initialize.isDataLakeConfiguration()) + if (isDataLakeConfiguration()) { - configuration_to_initialize.format = "Parquet"; + format = "Parquet"; } else { - configuration_to_initialize.format + format = FormatFactory::instance() - .tryGetFormatFromFileName(configuration_to_initialize.isArchive() ? configuration_to_initialize.getPathInArchive() : configuration_to_initialize.getPath()) + .tryGetFormatFromFileName(isArchive() ? getPathInArchive() : getPath()) .value_or("auto"); } } else - FormatFactory::instance().checkFormatName(configuration_to_initialize.format); + FormatFactory::instance().checkFormatName(format); - configuration_to_initialize.storage_settings = settings; - configuration_to_initialize.initialized = true; + storage_settings = settings; + initialized = true; } const StorageObjectStorageSettings & StorageObjectStorage::Configuration::getSettingsRef() const diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index 440774d30d5c..6022071a3b7d 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -128,7 +128,7 @@ class StorageObjectStorage : public IStorage static std::pair resolveSchemaAndFormatFromData( const ObjectStoragePtr & object_storage, - const ConfigurationPtr & configuration, + ConfigurationPtr & configuration, const std::optional & format_settings, std::string & sample_path, const ContextPtr & context); @@ -168,8 +168,7 @@ class StorageObjectStorage::Configuration using Path = std::string; using Paths = std::vector; - static void initialize( - Configuration & configuration_to_initialize, + virtual void initialize( ASTs & engine_args, ContextPtr local_context, bool with_table_structure, @@ -248,21 +247,40 @@ class StorageObjectStorage::Configuration throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method iterate() is not implemented for configuration type {}", getTypeName()); } - String format = "auto"; - String compression_method = "auto"; - String structure = "auto"; - virtual void update(ObjectStoragePtr object_storage, ContextPtr local_context); + void updateIfRequired(ObjectStoragePtr object_storage, ContextPtr local_context); const StorageObjectStorageSettings & getSettingsRef() const; -protected: + /// Create arguments for table function with path and access parameters + virtual ASTPtr createArgsWithAccessData() const + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method createArgsWithAccessData is not supported by storage {}", getEngineName()); + } + virtual void fromNamedCollection(const NamedCollection & collection, ContextPtr context) = 0; virtual void fromAST(ASTs & args, ContextPtr context, bool with_structure) = 0; + virtual ObjectStorageType extractDynamicStorageType(ASTs & /* args */, ContextPtr /* context */, ASTPtr * /* type_arg */ = nullptr) const + { return ObjectStorageType::None; } + void assertInitialized() const; + virtual const String & getFormat() const { return format; } + virtual const String & getCompressionMethod() const { return compression_method; } + virtual const String & getStructure() const { return structure; } + + virtual void setFormat(const String & format_) { format = format_; } + virtual void setCompressionMethod(const String & compression_method_) { compression_method = compression_method_; } + virtual void setStructure(const String & structure_) { structure = structure_; } + +private: + String format = "auto"; + String compression_method = "auto"; + String structure = "auto"; + bool initialized = false; + std::atomic updated = false; StorageObjectStorageSettingsPtr storage_settings; }; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 8fabcc368a5e..7fb3b8ce5b88 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -6,32 +6,45 @@ #include #include +#include +#include +#include +#include #include #include +#include +#include #include #include #include #include - namespace DB { namespace Setting { extern const SettingsBool use_hive_partitioning; + extern const SettingsString object_storage_cluster; } namespace ErrorCodes { extern const int LOGICAL_ERROR; + extern const int UNKNOWN_FUNCTION; + extern const int NOT_IMPLEMENTED; } + String StorageObjectStorageCluster::getPathSample(StorageInMemoryMetadata metadata, ContextPtr context) { auto query_settings = configuration->getQuerySettings(context); /// We don't want to throw an exception if there are no files with specified path. query_settings.throw_on_zero_files_match = false; + + if (!configuration->isArchive() && !configuration->isPathWithGlobs()) + return configuration->getPath(); + auto file_iterator = StorageObjectStorageSource::createFileIterator( configuration, query_settings, @@ -46,6 +59,7 @@ String StorageObjectStorageCluster::getPathSample(StorageInMemoryMetadata metada if (auto file = file_iterator->next(0)) return file->getPath(); + return ""; } @@ -53,18 +67,24 @@ StorageObjectStorageCluster::StorageObjectStorageCluster( const String & cluster_name_, ConfigurationPtr configuration_, ObjectStoragePtr object_storage_, + ContextPtr context_, const StorageID & table_id_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, - ContextPtr context_) + const String & comment_, + std::optional format_settings_, + LoadingStrictnessLevel mode_, + ASTPtr partition_by_ +) : IStorageCluster( cluster_name_, table_id_, getLogger(fmt::format("{}({})", configuration_->getEngineName(), table_id_.table_name))) , configuration{configuration_} , object_storage(object_storage_) + , cluster_name_in_settings(false) { ColumnsDescription columns{columns_}; std::string sample_path; - resolveSchemaAndFormat(columns, configuration->format, object_storage, configuration, {}, sample_path, context_); + resolveSchemaAndFormat(columns, object_storage, configuration, {}, sample_path, context_); configuration->check(context_); StorageInMemoryMetadata metadata; @@ -76,6 +96,24 @@ StorageObjectStorageCluster::StorageObjectStorageCluster( setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(metadata.columns, context_, sample_path)); setInMemoryMetadata(metadata); + + pure_storage = std::make_shared( + configuration, + object_storage, + context_, + getStorageID(), + getInMemoryMetadata().getColumns(), + getInMemoryMetadata().getConstraints(), + comment_, + format_settings_, + mode_, + /* distributed_processing */false, + partition_by_); + + auto virtuals_ = getVirtualsPtr(); + if (virtuals_) + pure_storage->setVirtuals(*virtuals_); + pure_storage->setInMemoryMetadata(getInMemoryMetadata()); } std::string StorageObjectStorageCluster::getName() const @@ -83,11 +121,133 @@ std::string StorageObjectStorageCluster::getName() const return configuration->getEngineName(); } +void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr & query, ContextPtr context) +{ + // Change table engine on table function for distributed request + // CREATE TABLE t (...) ENGINE=IcebergS3(...) + // SELECT * FROM t + // change on + // SELECT * FROM icebergS3(...) + // to execute on cluster nodes + + auto * select_query = query->as(); + if (!select_query || !select_query->tables()) + return; + + auto * tables = select_query->tables()->as(); + + if (tables->children.empty()) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Expected SELECT query from table with engine {}, got '{}'", + configuration->getEngineName(), query->formatForLogging()); + + auto * table_expression = tables->children[0]->as()->table_expression->as(); + + if (!table_expression) + return; + + if (!table_expression->database_and_table_name) + return; + + auto & table_identifier_typed = table_expression->database_and_table_name->as(); + + auto table_alias = table_identifier_typed.tryGetAlias(); + + auto storage_engine_name = configuration->getEngineName(); + if (storage_engine_name == "Iceberg") + { + switch (configuration->getType()) + { + case ObjectStorageType::S3: + storage_engine_name = "IcebergS3"; + break; + case ObjectStorageType::Azure: + storage_engine_name = "IcebergAzure"; + break; + case ObjectStorageType::HDFS: + storage_engine_name = "IcebergHDFS"; + break; + default: + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Can't find table function for engine {}", + storage_engine_name + ); + } + } + + static std::unordered_map engine_to_function = { + {"S3", "s3"}, + {"Azure", "azureBlobStorage"}, + {"HDFS", "hdfs"}, + {"Iceberg", "iceberg"}, + {"IcebergS3", "icebergS3"}, + {"IcebergAzure", "icebergAzure"}, + {"IcebergHDFS", "icebergHDFS"}, + {"DeltaLake", "deltaLake"}, + {"Hudi", "hudi"} + }; + + auto p = engine_to_function.find(storage_engine_name); + if (p == engine_to_function.end()) + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Can't find table function for engine {}", + storage_engine_name + ); + } + + std::string table_function_name = p->second; + + auto function_ast = std::make_shared(); + function_ast->name = table_function_name; + + auto cluster_name = getClusterName(context); + + if (cluster_name.empty()) + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Can't be here without cluster name, no cluster name in query {}", + query->formatForLogging()); + } + + function_ast->arguments = configuration->createArgsWithAccessData(); + function_ast->children.push_back(function_ast->arguments); + function_ast->setAlias(table_alias); + + ASTPtr function_ast_ptr(function_ast); + + table_expression->database_and_table_name = nullptr; + table_expression->table_function = function_ast_ptr; + table_expression->children[0] = function_ast_ptr; + + auto settings = select_query->settings(); + if (settings) + { + auto & settings_ast = settings->as(); + settings_ast.changes.insertSetting("object_storage_cluster", cluster_name); + } + else + { + auto settings_ast_ptr = std::make_shared(); + settings_ast_ptr->is_standalone = false; + settings_ast_ptr->changes.setSetting("object_storage_cluster", cluster_name); + select_query->setExpression(ASTSelectQuery::Expression::SETTINGS, std::move(settings_ast_ptr)); + } + + cluster_name_in_settings = true; +} + void StorageObjectStorageCluster::updateQueryToSendIfNeeded( ASTPtr & query, const DB::StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) { + updateQueryForDistributedEngineIfNeeded(query, context); + auto * table_function = extractTableFunctionFromSelectQuery(query); if (!table_function) { @@ -116,6 +276,8 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded( configuration->getEngineName()); } + ASTPtr object_storage_type_arg; + configuration->extractDynamicStorageType(args, context, &object_storage_type_arg); ASTPtr settings_temporary_storage = nullptr; for (auto * it = args.begin(); it != args.end(); ++it) { @@ -128,19 +290,70 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded( } } - if (!endsWith(table_function->name, "Cluster")) - configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->format, context, /*with_structure=*/true); + if (cluster_name_in_settings || !endsWith(table_function->name, "Cluster")) + { + configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->getFormat(), context, /*with_structure=*/true); + + /// Convert to old-stype *Cluster table function. + /// This allows to use old clickhouse versions in cluster. + static std::unordered_map function_to_cluster_function = { + {"s3", "s3Cluster"}, + {"azureBlobStorage", "azureBlobStorageCluster"}, + {"hdfs", "hdfsCluster"}, + {"iceberg", "icebergS3Cluster"}, + {"icebergS3", "icebergS3Cluster"}, + {"icebergAzure", "icebergAzureCluster"}, + {"icebergHDFS", "icebergHDFSCluster"}, + {"deltaLake", "deltaLakeCluster"}, + {"hudi", "hudiCluster"}, + }; + + auto p = function_to_cluster_function.find(table_function->name); + if (p == function_to_cluster_function.end()) + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Can't find cluster name for table function {}", + table_function->name); + } + + table_function->name = p->second; + + auto cluster_name = getClusterName(context); + auto cluster_name_arg = std::make_shared(cluster_name); + args.insert(args.begin(), cluster_name_arg); + + auto * select_query = query->as(); + if (!select_query) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Expected SELECT query from table function {}", + configuration->getEngineName()); + + auto settings = select_query->settings(); + if (settings) + { + auto & settings_ast = settings->as(); + if (settings_ast.changes.removeSetting("object_storage_cluster") && settings_ast.changes.empty()) + { + select_query->setExpression(ASTSelectQuery::Expression::SETTINGS, {}); + } + /// No throw if not found - `object_storage_cluster` can be global setting. + } + } else { ASTPtr cluster_name_arg = args.front(); args.erase(args.begin()); - configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->format, context, /*with_structure=*/true); + configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->getFormat(), context, /*with_structure=*/true); args.insert(args.begin(), cluster_name_arg); } if (settings_temporary_storage) { args.insert(args.end(), std::move(settings_temporary_storage)); } + if (object_storage_type_arg) + args.insert(args.end(), object_storage_type_arg); } RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension( @@ -165,4 +378,67 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) }; } +void StorageObjectStorageCluster::readFallBackToPure( + QueryPlan & query_plan, + const Names & column_names, + const StorageSnapshotPtr & storage_snapshot, + SelectQueryInfo & query_info, + ContextPtr context, + QueryProcessingStage::Enum processed_stage, + size_t max_block_size, + size_t num_streams) +{ + pure_storage->read(query_plan, column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams); +} + +SinkToStoragePtr StorageObjectStorageCluster::writeFallBackToPure( + const ASTPtr & query, + const StorageMetadataPtr & metadata_snapshot, + ContextPtr context, + bool async_insert) +{ + return pure_storage->write(query, metadata_snapshot, context, async_insert); +} + +String StorageObjectStorageCluster::getClusterName(ContextPtr context) const +{ + /// StorageObjectStorageCluster is always created for cluster or non-cluster variants. + /// User can specify cluster name in table definition or in setting `object_storage_cluster` + /// only for several queries. When it specified in both places, priority is given to the query setting. + /// When it is empty, non-cluster realization is used. + auto cluster_name_from_settings = context->getSettingsRef()[Setting::object_storage_cluster].value; + if (cluster_name_from_settings.empty()) + cluster_name_from_settings = getOriginalClusterName(); + return cluster_name_from_settings; +} + +QueryProcessingStage::Enum StorageObjectStorageCluster::getQueryProcessingStage( + ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info) const +{ + /// Full query if fall back to pure storage. + if (getClusterName(context).empty()) + return QueryProcessingStage::Enum::FetchColumns; + + /// Distributed storage. + return IStorageCluster::getQueryProcessingStage(context, to_stage, storage_snapshot, query_info); +} + +void StorageObjectStorageCluster::truncate( + const ASTPtr & query, + const StorageMetadataPtr & metadata_snapshot, + ContextPtr local_context, + TableExclusiveLockHolder & lock_holder) +{ + /// Full query if fall back to pure storage. + if (getClusterName(local_context).empty()) + return pure_storage->truncate(query, metadata_snapshot, local_context, lock_holder); + + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate is not supported by storage {}", getName()); +} + +void StorageObjectStorageCluster::addInferredEngineArgsToCreateQuery(ASTs & args, const ContextPtr & context) const +{ + configuration->addStructureAndFormatToArgsIfNeeded(args, "", configuration->getFormat(), context, /*with_structure=*/false); +} + } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 0088ff28fc22..4f3c47acf8fb 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -17,10 +17,15 @@ class StorageObjectStorageCluster : public IStorageCluster const String & cluster_name_, ConfigurationPtr configuration_, ObjectStoragePtr object_storage_, + ContextPtr context_, const StorageID & table_id_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, - ContextPtr context_); + const String & comment_, + std::optional format_settings_, + LoadingStrictnessLevel mode_, + ASTPtr partition_by_ = nullptr + ); std::string getName() const override; @@ -29,16 +34,82 @@ class StorageObjectStorageCluster : public IStorageCluster String getPathSample(StorageInMemoryMetadata metadata, ContextPtr context); + void setClusterNameInSettings(bool cluster_name_in_settings_) { cluster_name_in_settings = cluster_name_in_settings_; } + + String getClusterName(ContextPtr context) const override; + + bool hasExternalDynamicMetadata() const override + { + return (pure_storage && pure_storage->hasExternalDynamicMetadata()) + || configuration->hasExternalDynamicMetadata(); + } + + void updateExternalDynamicMetadata(ContextPtr context_ptr) override + { + if (pure_storage && pure_storage->hasExternalDynamicMetadata()) + pure_storage->updateExternalDynamicMetadata(context_ptr); + if (configuration->hasExternalDynamicMetadata()) + { + StorageInMemoryMetadata metadata; + metadata.setColumns(configuration->updateAndGetCurrentSchema(object_storage, context_ptr)); + IStorageCluster::setInMemoryMetadata(metadata); + } + } + + QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override; + + void truncate( + const ASTPtr & query, + const StorageMetadataPtr & metadata_snapshot, + ContextPtr local_context, + TableExclusiveLockHolder &) override; + + void addInferredEngineArgsToCreateQuery(ASTs & args, const ContextPtr & context) const override; + private: void updateQueryToSendIfNeeded( ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) override; + void readFallBackToPure( + QueryPlan & query_plan, + const Names & column_names, + const StorageSnapshotPtr & storage_snapshot, + SelectQueryInfo & query_info, + ContextPtr context, + QueryProcessingStage::Enum processed_stage, + size_t max_block_size, + size_t num_streams) override; + + SinkToStoragePtr writeFallBackToPure( + const ASTPtr & query, + const StorageMetadataPtr & metadata_snapshot, + ContextPtr context, + bool async_insert) override; + + /* + In case the table was created with `object_storage_cluster` setting, + modify the AST query object so that it uses the table function implementation + by mapping the engine name to table function name and setting `object_storage_cluster`. + For table like + CREATE TABLE table ENGINE=S3(...) SETTINGS object_storage_cluster='cluster' + coverts request + SELECT * FROM table + to + SELECT * FROM s3(...) SETTINGS object_storage_cluster='cluster' + to make distributed request over cluster 'cluster'. + */ + void updateQueryForDistributedEngineIfNeeded(ASTPtr & query, ContextPtr context); + const String engine_name; - const StorageObjectStorage::ConfigurationPtr configuration; + StorageObjectStorage::ConfigurationPtr configuration; const ObjectStoragePtr object_storage; NamesAndTypesList virtual_columns; + bool cluster_name_in_settings; + + /// non-clustered storage to fall back on pure realisation if needed + std::shared_ptr pure_storage; }; } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSettings.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSettings.cpp index f59914cae3f3..8a5f5467cf02 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSettings.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSettings.cpp @@ -22,6 +22,9 @@ If enabled, the engine would use delta-kernel-rs for DeltaLake metadata parsing )", 0) \ DECLARE(String, iceberg_metadata_file_path, "", R"( Explicit path to desired Iceberg metadata file, should be relative to path in object storage. Make sense for table function use case only. +)", 0) \ + DECLARE(String, object_storage_cluster, "", R"( +Cluster for distributed requests )", 0) \ // clang-format on diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp index bd7cee407e62..263188301877 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp @@ -32,7 +32,7 @@ StorageObjectStorageSink::StorageObjectStorageSink( { const auto & settings = context->getSettingsRef(); const auto path = blob_path.empty() ? configuration->getPaths().back() : blob_path; - const auto chosen_compression_method = chooseCompressionMethod(path, configuration->compression_method); + const auto chosen_compression_method = chooseCompressionMethod(path, configuration->getCompressionMethod()); auto buffer = object_storage->writeObject( StoredObject(path), WriteMode::Rewrite, std::nullopt, DBMS_DEFAULT_BUFFER_SIZE, context->getWriteSettings()); @@ -44,7 +44,7 @@ StorageObjectStorageSink::StorageObjectStorageSink( static_cast(settings[Setting::output_format_compression_zstd_window_log])); writer = FormatFactory::instance().getOutputFormatParallelIfPossible( - configuration->format, *write_buf, sample_block, context, format_settings_); + configuration->getFormat(), *write_buf, sample_block, context, format_settings_); } void StorageObjectStorageSink::consume(Chunk & chunk) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index e398170493f6..3ad2bc19ac41 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -148,6 +148,8 @@ std::shared_ptr StorageObjectStorageSource::createFileIterator( throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expression can not have wildcards inside {} name", configuration->getNamespaceType()); + configuration->updateIfRequired(object_storage, local_context); + std::unique_ptr iterator; if (configuration->isPathWithGlobs()) { @@ -358,7 +360,7 @@ Chunk StorageObjectStorageSource::generate() void StorageObjectStorageSource::addNumRowsToCache(const ObjectInfo & object_info, size_t num_rows) { const auto cache_key = getKeyForSchemaCache( - getUniqueStoragePathIdentifier(*configuration, object_info), configuration->format, format_settings, read_context); + getUniqueStoragePathIdentifier(*configuration, object_info), configuration->getFormat(), format_settings, read_context); schema_cache.addNumRows(cache_key, num_rows); } @@ -424,7 +426,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade const auto cache_key = getKeyForSchemaCache( getUniqueStoragePathIdentifier(*configuration, *object_info), - configuration->format, + configuration->getFormat(), format_settings, context_); @@ -455,13 +457,13 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade CompressionMethod compression_method; if (const auto * object_info_in_archive = dynamic_cast(object_info.get())) { - compression_method = chooseCompressionMethod(configuration->getPathInArchive(), configuration->compression_method); + compression_method = chooseCompressionMethod(configuration->getPathInArchive(), configuration->getCompressionMethod()); const auto & archive_reader = object_info_in_archive->archive_reader; read_buf = archive_reader->readFile(object_info_in_archive->path_in_archive, /*throw_on_not_found=*/true); } else { - compression_method = chooseCompressionMethod(object_info->getFileName(), configuration->compression_method); + compression_method = chooseCompressionMethod(object_info->getFileName(), configuration->getCompressionMethod()); read_buf = createReadBuffer(*object_info, object_storage, context_, log); } @@ -479,7 +481,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade auto input_format = FormatFactory::instance().getInput( - configuration->format, + configuration->getFormat(), *read_buf, initial_header, context_, diff --git a/src/Storages/ObjectStorage/Utils.cpp b/src/Storages/ObjectStorage/Utils.cpp index 73410d959e0d..d4f152bfd582 100644 --- a/src/Storages/ObjectStorage/Utils.cpp +++ b/src/Storages/ObjectStorage/Utils.cpp @@ -45,24 +45,27 @@ std::optional checkAndGetNewFileOnInsertIfNeeded( void resolveSchemaAndFormat( ColumnsDescription & columns, - std::string & format, ObjectStoragePtr object_storage, - const StorageObjectStorage::ConfigurationPtr & configuration, + StorageObjectStorage::ConfigurationPtr configuration, std::optional format_settings, std::string & sample_path, const ContextPtr & context) { if (columns.empty()) { - if (format == "auto") + if (configuration->getFormat() == "auto") + { + std::string format; std::tie(columns, format) = StorageObjectStorage::resolveSchemaAndFormatFromData(object_storage, configuration, format_settings, sample_path, context); + configuration->setFormat(format); + } else columns = StorageObjectStorage::resolveSchemaFromData(object_storage, configuration, format_settings, sample_path, context); } - else if (format == "auto") + else if (configuration->getFormat() == "auto") { - format = StorageObjectStorage::resolveFormatFromData(object_storage, configuration, format_settings, sample_path, context); + configuration->setFormat(StorageObjectStorage::resolveFormatFromData(object_storage, configuration, format_settings, sample_path, context)); } if (!columns.hasOnlyOrdinary()) diff --git a/src/Storages/ObjectStorage/Utils.h b/src/Storages/ObjectStorage/Utils.h index 7ee14f509799..17e30babb709 100644 --- a/src/Storages/ObjectStorage/Utils.h +++ b/src/Storages/ObjectStorage/Utils.h @@ -15,9 +15,8 @@ std::optional checkAndGetNewFileOnInsertIfNeeded( void resolveSchemaAndFormat( ColumnsDescription & columns, - std::string & format, ObjectStoragePtr object_storage, - const StorageObjectStorage::ConfigurationPtr & configuration, + StorageObjectStorage::ConfigurationPtr configuration, std::optional format_settings, std::string & sample_path, const ContextPtr & context); diff --git a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp index f62b9cae37f0..d93d14618a73 100644 --- a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -20,13 +21,18 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; } +namespace StorageObjectStorageSetting +{ + extern const StorageObjectStorageSettingsString object_storage_cluster; +} + namespace { // LocalObjectStorage is only supported for Iceberg Datalake operations where Avro format is required. For regular file access, use FileStorage instead. #if USE_AWS_S3 || USE_AZURE_BLOB_STORAGE || USE_HDFS || USE_AVRO -std::shared_ptr +StoragePtr createStorageObjectStorage(const StorageFactory::Arguments & args, StorageObjectStorage::ConfigurationPtr configuration) { auto & engine_args = args.engine_args; @@ -39,7 +45,9 @@ createStorageObjectStorage(const StorageFactory::Arguments & args, StorageObject if (args.storage_def->settings) storage_settings->loadFromQuery(*args.storage_def->settings); - StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, context, false, storage_settings); + auto cluster_name = (*storage_settings)[StorageObjectStorageSetting::object_storage_cluster].value; + +configuration->initialize(args.engine_args, context, false, storage_settings); // Use format settings from global server context + settings from // the SETTINGS clause of the create query. Settings from current @@ -63,7 +71,8 @@ createStorageObjectStorage(const StorageFactory::Arguments & args, StorageObject if (args.storage_def->partition_by) partition_by = args.storage_def->partition_by->clone(); - return std::make_shared( + return std::make_shared( + cluster_name, configuration, // We only want to perform write actions (e.g. create a container in Azure) when the table is being created, // and we want to avoid it when we load the table after a server restart. @@ -75,7 +84,6 @@ createStorageObjectStorage(const StorageFactory::Arguments & args, StorageObject args.comment, format_settings, args.mode, - /* distributed_processing */ false, partition_by); } @@ -172,21 +180,21 @@ void registerStorageObjectStorage(StorageFactory & factory) void registerStorageIceberg(StorageFactory & factory) { -#if USE_AWS_S3 factory.registerStorage( "Iceberg", [&](const StorageFactory::Arguments & args) { - auto configuration = std::make_shared(); + auto configuration = std::make_shared(); return createStorageObjectStorage(args, configuration); }, { .supports_settings = true, .supports_schema_inference = true, - .source_access_type = AccessType::S3, + .source_access_type = AccessType::NONE, .has_builtin_setting_fn = StorageObjectStorageSettings::hasBuiltin, }); +# if USE_AWS_S3 factory.registerStorage( "IcebergS3", [&](const StorageFactory::Arguments & args) diff --git a/src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.cpp b/src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.cpp index 0f6a2f7b09d6..2e99ad37ca7a 100644 --- a/src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.cpp +++ b/src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.cpp @@ -224,12 +224,12 @@ StorageObjectStorageQueue::StorageObjectStorageQueue( validateSettings(*queue_settings_, is_attach); object_storage = configuration->createObjectStorage(context_, /* is_readonly */true); - FormatFactory::instance().checkFormatName(configuration->format); + FormatFactory::instance().checkFormatName(configuration->getFormat()); configuration->check(context_); ColumnsDescription columns{columns_}; std::string sample_path; - resolveSchemaAndFormat(columns, configuration->format, object_storage, configuration, format_settings, sample_path, context_); + resolveSchemaAndFormat(columns, object_storage, configuration, format_settings, sample_path, context_); configuration->check(context_); StorageInMemoryMetadata storage_metadata; @@ -244,7 +244,7 @@ StorageObjectStorageQueue::StorageObjectStorageQueue( LOG_INFO(log, "Using zookeeper path: {}", zk_path.string()); auto table_metadata = ObjectStorageQueueMetadata::syncWithKeeper( - zk_path, *queue_settings_, storage_metadata.getColumns(), configuration_->format, context_, is_attach, log); + zk_path, *queue_settings_, storage_metadata.getColumns(), configuration_->getFormat(), context_, is_attach, log); ObjectStorageType storage_type = engine_name == "S3Queue" ? ObjectStorageType::S3 : ObjectStorageType::Azure; @@ -312,7 +312,7 @@ void StorageObjectStorageQueue::drop() bool StorageObjectStorageQueue::supportsSubsetOfColumns(const ContextPtr & context_) const { - return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration->format, context_, format_settings); + return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration->getFormat(), context_, format_settings); } class ReadFromObjectStorageQueue : public SourceStepWithFilter diff --git a/src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.h b/src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.h index b34b55603124..aac7783bcd75 100644 --- a/src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.h +++ b/src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.h @@ -55,7 +55,7 @@ class StorageObjectStorageQueue : public IStorage, WithContext ContextPtr local_context, AlterLockHolder & table_lock_holder) override; - const auto & getFormatName() const { return configuration->format; } + const auto & getFormatName() const { return configuration->getFormat(); } const fs::path & getZooKeeperPath() const { return zk_path; } diff --git a/src/Storages/ObjectStorageQueue/registerQueueStorage.cpp b/src/Storages/ObjectStorageQueue/registerQueueStorage.cpp index 9f11a35bd7c7..63d7f7a286a1 100644 --- a/src/Storages/ObjectStorageQueue/registerQueueStorage.cpp +++ b/src/Storages/ObjectStorageQueue/registerQueueStorage.cpp @@ -33,7 +33,7 @@ StoragePtr createQueueStorage(const StorageFactory::Arguments & args) throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments"); auto configuration = std::make_shared(); - StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getContext(), false, nullptr); + configuration->initialize(args.engine_args, args.getContext(), false, nullptr); // Use format settings from global server context + settings from // the SETTINGS clause of the create query. Settings from current diff --git a/src/Storages/extractTableFunctionFromSelectQuery.cpp b/src/Storages/extractTableFunctionFromSelectQuery.cpp index 57302036c889..c7f60240b3c7 100644 --- a/src/Storages/extractTableFunctionFromSelectQuery.cpp +++ b/src/Storages/extractTableFunctionFromSelectQuery.cpp @@ -20,7 +20,16 @@ ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query) if (!table_expression->table_function) return nullptr; - return table_expression->table_function->as(); + auto * table_function = table_expression->table_function->as(); + return table_function; +} + +ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query) +{ + auto * table_function = extractTableFunctionFromSelectQuery(query); + if (!table_function) + return nullptr; + return table_function->arguments->as(); } } diff --git a/src/Storages/extractTableFunctionFromSelectQuery.h b/src/Storages/extractTableFunctionFromSelectQuery.h index c69cc7ce6c52..0511d59f6230 100644 --- a/src/Storages/extractTableFunctionFromSelectQuery.h +++ b/src/Storages/extractTableFunctionFromSelectQuery.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -8,5 +9,6 @@ namespace DB { ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query); +ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query); } diff --git a/src/TableFunctions/ITableFunction.h b/src/TableFunctions/ITableFunction.h index f24580608c5e..184c942022fc 100644 --- a/src/TableFunctions/ITableFunction.h +++ b/src/TableFunctions/ITableFunction.h @@ -78,7 +78,7 @@ class ITableFunction : public std::enable_shared_from_this virtual bool supportsReadingSubsetOfColumns(const ContextPtr &) { return true; } - virtual bool canBeUsedToCreateTable() const { return true; } + virtual void validateUseToCreateTable() const {} /// Create storage according to the query. StoragePtr diff --git a/src/TableFunctions/ITableFunctionCluster.h b/src/TableFunctions/ITableFunctionCluster.h index 2550e10f2702..ab2e3f809ba6 100644 --- a/src/TableFunctions/ITableFunctionCluster.h +++ b/src/TableFunctions/ITableFunctionCluster.h @@ -16,6 +16,7 @@ namespace ErrorCodes extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int CLUSTER_DOESNT_EXIST; extern const int LOGICAL_ERROR; + extern const int BAD_ARGUMENTS; } /// Base class for *Cluster table functions that require cluster_name for the first argument. @@ -46,7 +47,10 @@ class ITableFunctionCluster : public Base throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected table function name: {}", table_function->name); } - bool canBeUsedToCreateTable() const override { return false; } + void validateUseToCreateTable() const override + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table function '{}' cannot be used to create a table", getName()); + } protected: void parseArguments(const ASTPtr & ast, ContextPtr context) override diff --git a/src/TableFunctions/TableFunctionObjectStorage.cpp b/src/TableFunctions/TableFunctionObjectStorage.cpp index 27f84dd30142..cc78632f07d4 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.cpp +++ b/src/TableFunctions/TableFunctionObjectStorage.cpp @@ -109,16 +109,16 @@ template ColumnsDescription TableFunctionObjectStorage< Definition, Configuration>::getActualTableStructure(ContextPtr context, bool is_insert_query) const { - if (configuration->structure == "auto") + if (configuration->getStructure() == "auto") { context->checkAccess(getSourceAccessType()); ColumnsDescription columns; auto storage = getObjectStorage(context, !is_insert_query); std::string sample_path; - resolveSchemaAndFormat(columns, configuration->format, storage, configuration, std::nullopt, sample_path, context); + resolveSchemaAndFormat(columns, storage, configuration, std::nullopt, sample_path, context); return columns; } - return parseColumnsListFromString(configuration->structure, context); + return parseColumnsListFromString(configuration->getStructure(), context); } template @@ -132,8 +132,8 @@ StoragePtr TableFunctionObjectStorage::executeImpl( chassert(configuration); ColumnsDescription columns; - if (configuration->structure != "auto") - columns = parseColumnsListFromString(configuration->structure, context); + if (configuration->getStructure() != "auto") + columns = parseColumnsListFromString(configuration->getStructure(), context); else if (!structure_hint.empty()) columns = structure_hint; else if (!cached_columns.empty()) @@ -156,10 +156,14 @@ StoragePtr TableFunctionObjectStorage::executeImpl( parallel_replicas_cluster_name, configuration, getObjectStorage(context, !is_insert_query), + context, StorageID(getDatabaseName(), table_name), columns, ConstraintsDescription{}, - context); + /* comment */ String{}, + /* format_settings */ std::nullopt, /// No format_settings + /* mode */ LoadingStrictnessLevel::CREATE, + /* partition_by */ nullptr); storage->startup(); return storage; @@ -186,17 +190,6 @@ void registerTableFunctionObjectStorage(TableFunctionFactory & factory) { UNUSED(factory); #if USE_AWS_S3 - factory.registerFunction>( - { - .documentation = - { - .description=R"(The table function can be used to read the data stored on AWS S3.)", - .examples{{"s3", "SELECT * FROM s3(url, access_key_id, secret_access_key)", ""} - }, - .category{""}}, - .allow_readonly = false - }); - factory.registerFunction>( { .documentation = @@ -229,38 +222,6 @@ void registerTableFunctionObjectStorage(TableFunctionFactory & factory) .allow_readonly = false }); #endif - -#if USE_AZURE_BLOB_STORAGE - factory.registerFunction>( - { - .documentation = - { - .description=R"(The table function can be used to read the data stored on Azure Blob Storage.)", - .examples{ - { - "azureBlobStorage", - "SELECT * FROM azureBlobStorage(connection_string|storage_account_url, container_name, blobpath, " - "[account_name, account_key, format, compression, structure])", "" - }} - }, - .allow_readonly = false - }); -#endif -#if USE_HDFS - factory.registerFunction>( - { - .documentation = - { - .description=R"(The table function can be used to read the data stored on HDFS virtual filesystem.)", - .examples{ - { - "hdfs", - "SELECT * FROM hdfs(url, format, compression, structure])", "" - }} - }, - .allow_readonly = false - }); -#endif } #if USE_AZURE_BLOB_STORAGE @@ -282,6 +243,10 @@ template class TableFunctionObjectStorage; +#if USE_AVRO +template class TableFunctionObjectStorage; +#endif + #if USE_AVRO && USE_AWS_S3 template class TableFunctionObjectStorage; #endif @@ -305,37 +270,6 @@ template class TableFunctionObjectStorage( - {.documentation - = {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store. Alias to icebergS3)", - .examples{{"iceberg", "SELECT * FROM iceberg(url, access_key_id, secret_access_key)", ""}}, - .category{""}}, - .allow_readonly = false}); - factory.registerFunction( - {.documentation - = {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store.)", - .examples{{"icebergS3", "SELECT * FROM icebergS3(url, access_key_id, secret_access_key)", ""}}, - .category{""}}, - .allow_readonly = false}); - -#endif -#if USE_AZURE_BLOB_STORAGE - factory.registerFunction( - {.documentation - = {.description = R"(The table function can be used to read the Iceberg table stored on Azure object store.)", - .examples{{"icebergAzure", "SELECT * FROM icebergAzure(url, access_key_id, secret_access_key)", ""}}, - .category{""}}, - .allow_readonly = false}); -#endif -#if USE_HDFS - factory.registerFunction( - {.documentation - = {.description = R"(The table function can be used to read the Iceberg table stored on HDFS virtual filesystem.)", - .examples{{"icebergHDFS", "SELECT * FROM icebergHDFS(url)", ""}}, - .category{""}}, - .allow_readonly = false}); -#endif factory.registerFunction( {.documentation = {.description = R"(The table function can be used to read the Iceberg table stored locally.)", @@ -346,42 +280,11 @@ void registerTableFunctionIceberg(TableFunctionFactory & factory) #endif -#if USE_AWS_S3 -#if USE_PARQUET && USE_DELTA_KERNEL_RS -void registerTableFunctionDeltaLake(TableFunctionFactory & factory) -{ - factory.registerFunction( - {.documentation - = {.description = R"(The table function can be used to read the DeltaLake table stored on object store.)", - .examples{{"deltaLake", "SELECT * FROM deltaLake(url, access_key_id, secret_access_key)", ""}}, - .category{""}}, - .allow_readonly = false}); -} -#endif - -void registerTableFunctionHudi(TableFunctionFactory & factory) -{ - factory.registerFunction( - {.documentation - = {.description = R"(The table function can be used to read the Hudi table stored on object store.)", - .examples{{"hudi", "SELECT * FROM hudi(url, access_key_id, secret_access_key)", ""}}, - .category{""}}, - .allow_readonly = false}); -} - -#endif - void registerDataLakeTableFunctions(TableFunctionFactory & factory) { UNUSED(factory); #if USE_AVRO registerTableFunctionIceberg(factory); #endif -#if USE_AWS_S3 -#if USE_PARQUET && USE_DELTA_KERNEL_RS - registerTableFunctionDeltaLake(factory); -#endif - registerTableFunctionHudi(factory); -#endif } } diff --git a/src/TableFunctions/TableFunctionObjectStorage.h b/src/TableFunctions/TableFunctionObjectStorage.h index 94d3542ad48f..9493baf18571 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.h +++ b/src/TableFunctions/TableFunctionObjectStorage.h @@ -68,7 +68,7 @@ struct LocalDefinition struct IcebergDefinition { static constexpr auto name = "iceberg"; - static constexpr auto storage_type_name = "S3"; + static constexpr auto storage_type_name = "UNDEFINED"; }; struct IcebergS3Definition @@ -115,15 +115,16 @@ class TableFunctionObjectStorage : public ITableFunction String getName() const override { return name; } - bool hasStaticStructure() const override { return configuration->structure != "auto"; } + bool hasStaticStructure() const override { return configuration->getStructure() != "auto"; } - bool needStructureHint() const override { return configuration->structure == "auto"; } + bool needStructureHint() const override { return configuration->getStructure() == "auto"; } void setStructureHint(const ColumnsDescription & structure_hint_) override { structure_hint = structure_hint_; } bool supportsReadingSubsetOfColumns(const ContextPtr & context) override { - return configuration->format != "auto" && FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration->format, context); + return configuration->getFormat() != "auto" + && FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration->getFormat(), context); } std::unordered_set getVirtualsToCheckBeforeUsingStructureHint() const override @@ -133,7 +134,7 @@ class TableFunctionObjectStorage : public ITableFunction virtual void parseArgumentsImpl(ASTs & args, const ContextPtr & context) { - StorageObjectStorage::Configuration::initialize(*getConfiguration(), args, context, true, settings); + getConfiguration()->initialize(args, context, true, settings); } static void updateStructureAndFormatArgumentsIfNeeded( @@ -187,8 +188,9 @@ using TableFunctionLocal = TableFunctionObjectStorage; + # if USE_AWS_S3 -using TableFunctionIceberg = TableFunctionObjectStorage; using TableFunctionIcebergS3 = TableFunctionObjectStorage; # endif # if USE_AZURE_BLOB_STORAGE diff --git a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp index e8e7009cff66..c30ce59e330c 100644 --- a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp +++ b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp @@ -22,8 +22,9 @@ StoragePtr TableFunctionObjectStorageCluster::execute auto configuration = Base::getConfiguration(); ColumnsDescription columns; - if (configuration->structure != "auto") - columns = parseColumnsListFromString(configuration->structure, context); + + if (configuration->getStructure() != "auto") + columns = parseColumnsListFromString(configuration->getStructure(), context); else if (!Base::structure_hint.empty()) columns = Base::structure_hint; else if (!cached_columns.empty()) @@ -45,7 +46,7 @@ StoragePtr TableFunctionObjectStorageCluster::execute /* format_settings */ std::nullopt, /// No format_settings /* mode */ LoadingStrictnessLevel::CREATE, /* distributed_processing */ true, - /*partition_by_=*/nullptr); + /* partition_by */ nullptr); } else { @@ -53,10 +54,14 @@ StoragePtr TableFunctionObjectStorageCluster::execute ITableFunctionCluster::cluster_name, configuration, object_storage, + context, StorageID(Base::getDatabaseName(), table_name), columns, ConstraintsDescription{}, - context); + /* comment */ String{}, + /* format_settings */ std::nullopt, /// No format_settings + /* mode */ LoadingStrictnessLevel::CREATE, + /* partition_by */ nullptr); } storage->startup(); @@ -111,32 +116,49 @@ void registerTableFunctionIcebergCluster(TableFunctionFactory & factory) { UNUSED(factory); -#if USE_AWS_S3 + factory.registerFunction( + {.documentation + = {.description = R"(The table function can be used to read the Iceberg table stored on any object store in parallel for many nodes in a specified cluster.)", + .examples{ +# if USE_AWS_S3 + {"icebergCluster", "SELECT * FROM icebergCluster(cluster, url, [, NOSIGN | access_key_id, secret_access_key, [session_token]], format, [,compression], storage_type='s3')", ""}, +# endif +# if USE_AZURE_BLOB_STORAGE + {"icebergCluster", "SELECT * FROM icebergCluster(cluster, connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression], storage_type='azure')", ""}, +# endif +# if USE_HDFS + {"icebergCluster", "SELECT * FROM icebergCluster(cluster, uri, [format], [structure], [compression_method], storage_type='hdfs')", ""}, +# endif + }, + .category{""}}, + .allow_readonly = false}); + +# if USE_AWS_S3 factory.registerFunction( {.documentation = {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store in parallel for many nodes in a specified cluster.)", .examples{{"icebergS3Cluster", "SELECT * FROM icebergS3Cluster(cluster, url, [, NOSIGN | access_key_id, secret_access_key, [session_token]], format, [,compression])", ""}}, .category{""}}, .allow_readonly = false}); -#endif +# endif -#if USE_AZURE_BLOB_STORAGE +# if USE_AZURE_BLOB_STORAGE factory.registerFunction( {.documentation = {.description = R"(The table function can be used to read the Iceberg table stored on Azure object store in parallel for many nodes in a specified cluster.)", .examples{{"icebergAzureCluster", "SELECT * FROM icebergAzureCluster(cluster, connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression])", ""}}, .category{""}}, .allow_readonly = false}); -#endif +# endif -#if USE_HDFS +# if USE_HDFS factory.registerFunction( {.documentation = {.description = R"(The table function can be used to read the Iceberg table stored on HDFS virtual filesystem in parallel for many nodes in a specified cluster.)", .examples{{"icebergHDFSCluster", "SELECT * FROM icebergHDFSCluster(cluster, uri, [format], [structure], [compression_method])", ""}}, .category{""}}, .allow_readonly = false}); -#endif +# endif } #endif diff --git a/src/TableFunctions/TableFunctionObjectStorageCluster.h b/src/TableFunctions/TableFunctionObjectStorageCluster.h index 54afae400288..d03f7198d359 100644 --- a/src/TableFunctions/TableFunctionObjectStorageCluster.h +++ b/src/TableFunctions/TableFunctionObjectStorageCluster.h @@ -10,8 +10,6 @@ namespace DB class Context; -class StorageS3Settings; -class StorageAzureBlobSettings; class StorageS3Configuration; class StorageAzureConfiguration; @@ -33,6 +31,12 @@ struct HDFSClusterDefinition static constexpr auto storage_type_name = "HDFSCluster"; }; +struct IcebergClusterDefinition +{ + static constexpr auto name = "icebergCluster"; + static constexpr auto storage_type_name = "UNDEFINED"; +}; + struct IcebergS3ClusterDefinition { static constexpr auto name = "icebergS3Cluster"; @@ -91,9 +95,9 @@ class TableFunctionObjectStorageCluster : public ITableFunctionClusterstructure != "auto"; } + bool hasStaticStructure() const override { return Base::getConfiguration()->getStructure() != "auto"; } - bool needStructureHint() const override { return Base::getConfiguration()->structure == "auto"; } + bool needStructureHint() const override { return Base::getConfiguration()->getStructure() == "auto"; } void setStructureHint(const ColumnsDescription & structure_hint_) override { Base::structure_hint = structure_hint_; } }; @@ -110,6 +114,8 @@ using TableFunctionAzureBlobCluster = TableFunctionObjectStorageCluster; #endif +using TableFunctionIcebergCluster = TableFunctionObjectStorageCluster; + #if USE_AVRO && USE_AWS_S3 using TableFunctionIcebergS3Cluster = TableFunctionObjectStorageCluster; #endif diff --git a/src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp b/src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp new file mode 100644 index 000000000000..287cfe0b0dc2 --- /dev/null +++ b/src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp @@ -0,0 +1,377 @@ +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace Setting +{ + extern const SettingsString object_storage_cluster; +} + +namespace ErrorCodes +{ + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int BAD_ARGUMENTS; +} + +struct S3ClusterFallbackDefinition +{ + static constexpr auto name = "s3"; + static constexpr auto storage_type_name = "S3"; + static constexpr auto storage_type_cluster_name = "S3Cluster"; +}; + +struct AzureClusterFallbackDefinition +{ + static constexpr auto name = "azureBlobStorage"; + static constexpr auto storage_type_name = "Azure"; + static constexpr auto storage_type_cluster_name = "AzureBlobStorageCluster"; +}; + +struct HDFSClusterFallbackDefinition +{ + static constexpr auto name = "hdfs"; + static constexpr auto storage_type_name = "HDFS"; + static constexpr auto storage_type_cluster_name = "HDFSCluster"; +}; + +struct IcebergClusterFallbackDefinition +{ + static constexpr auto name = "iceberg"; + static constexpr auto storage_type_name = "UNDEFINED"; + static constexpr auto storage_type_cluster_name = "IcebergCluster"; +}; + +struct IcebergS3ClusterFallbackDefinition +{ + static constexpr auto name = "icebergS3"; + static constexpr auto storage_type_name = "S3"; + static constexpr auto storage_type_cluster_name = "IcebergS3Cluster"; +}; + +struct IcebergAzureClusterFallbackDefinition +{ + static constexpr auto name = "icebergAzure"; + static constexpr auto storage_type_name = "Azure"; + static constexpr auto storage_type_cluster_name = "IcebergAzureCluster"; +}; + +struct IcebergHDFSClusterFallbackDefinition +{ + static constexpr auto name = "icebergHDFS"; + static constexpr auto storage_type_name = "HDFS"; + static constexpr auto storage_type_cluster_name = "IcebergHDFSCluster"; +}; + +struct DeltaLakeClusterFallbackDefinition +{ + static constexpr auto name = "deltaLake"; + static constexpr auto storage_type_name = "S3"; + static constexpr auto storage_type_cluster_name = "DeltaLakeS3Cluster"; +}; + +struct HudiClusterFallbackDefinition +{ + static constexpr auto name = "hudi"; + static constexpr auto storage_type_name = "S3"; + static constexpr auto storage_type_cluster_name = "HudiS3Cluster"; +}; + +template +void TableFunctionObjectStorageClusterFallback::parseArgumentsImpl(ASTs & args, const ContextPtr & context) +{ + if (args.empty()) + throw Exception( + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "The function {} should have arguments. The first argument must be the cluster name and the rest are the arguments of " + "corresponding table function", + getName()); + + const auto & settings = context->getSettingsRef(); + + is_cluster_function = !settings[Setting::object_storage_cluster].value.empty(); + + if (is_cluster_function) + { + ASTPtr cluster_name_arg = std::make_shared(settings[Setting::object_storage_cluster].value); + args.insert(args.begin(), cluster_name_arg); + BaseCluster::parseArgumentsImpl(args, context); + args.erase(args.begin()); + } + else + BaseSimple::parseArgumentsImpl(args, context); +} + +template +StoragePtr TableFunctionObjectStorageClusterFallback::executeImpl( + const ASTPtr & ast_function, + ContextPtr context, + const std::string & table_name, + ColumnsDescription cached_columns, + bool is_insert_query) const +{ + if (is_cluster_function) + { + auto result = BaseCluster::executeImpl(ast_function, context, table_name, cached_columns, is_insert_query); + if (auto storage = typeid_cast>(result)) + storage->setClusterNameInSettings(true); + return result; + } + else + return BaseSimple::executeImpl(ast_function, context, table_name, cached_columns, is_insert_query); +} + +template +void TableFunctionObjectStorageClusterFallback::validateUseToCreateTable() const +{ + if (is_cluster_function) + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Table function '{}' cannot be used to create a table in cluster mode", + getName()); +} + +#if USE_AWS_S3 +using TableFunctionS3ClusterFallback = TableFunctionObjectStorageClusterFallback; +#endif + +#if USE_AZURE_BLOB_STORAGE +using TableFunctionAzureClusterFallback = TableFunctionObjectStorageClusterFallback; +#endif + +#if USE_HDFS +using TableFunctionHDFSClusterFallback = TableFunctionObjectStorageClusterFallback; +#endif + +#if USE_AVRO +using TableFunctionIcebergClusterFallback = TableFunctionObjectStorageClusterFallback; +#endif + +#if USE_AVRO && USE_AWS_S3 +using TableFunctionIcebergS3ClusterFallback = TableFunctionObjectStorageClusterFallback; +#endif + +#if USE_AVRO && USE_AZURE_BLOB_STORAGE +using TableFunctionIcebergAzureClusterFallback = TableFunctionObjectStorageClusterFallback; +#endif + +#if USE_AVRO && USE_HDFS +using TableFunctionIcebergHDFSClusterFallback = TableFunctionObjectStorageClusterFallback; +#endif + +#if USE_AWS_S3 && USE_PARQUET && USE_DELTA_KERNEL_RS +using TableFunctionDeltaLakeClusterFallback = TableFunctionObjectStorageClusterFallback; +#endif + +#if USE_AWS_S3 +using TableFunctionHudiClusterFallback = TableFunctionObjectStorageClusterFallback; +#endif + +void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & factory) +{ + UNUSED(factory); +#if USE_AWS_S3 + factory.registerFunction( + { + .documentation = { + .description=R"(The table function can be used to read the data stored on S3 in parallel for many nodes in a specified cluster or from single node.)", + .examples{ + {"s3", "SELECT * FROM s3(url, format, structure)", ""}, + {"s3", "SELECT * FROM s3(url, format, structure) SETTINGS object_storage_cluster='cluster'", ""} + }, + }, + .allow_readonly = false + } + ); +#endif + +#if USE_AZURE_BLOB_STORAGE + factory.registerFunction( + { + .documentation = { + .description=R"(The table function can be used to read the data stored on Azure Blob Storage in parallel for many nodes in a specified cluster or from single node.)", + .examples{ + { + "azureBlobStorage", + "SELECT * FROM azureBlobStorage(connection_string|storage_account_url, container_name, blobpath, " + "[account_name, account_key, format, compression, structure])", "" + }, + { + "azureBlobStorage", + "SELECT * FROM azureBlobStorage(connection_string|storage_account_url, container_name, blobpath, " + "[account_name, account_key, format, compression, structure]) " + "SETTINGS object_storage_cluster='cluster'", "" + }, + } + }, + .allow_readonly = false + } + ); +#endif + +#if USE_HDFS + factory.registerFunction( + { + .documentation = { + .description=R"(The table function can be used to read the data stored on HDFS virtual filesystem in parallel for many nodes in a specified cluster or from single node.)", + .examples{ + { + "hdfs", + "SELECT * FROM hdfs(url, format, compression, structure])", "" + }, + { + "hdfs", + "SELECT * FROM hdfs(url, format, compression, structure]) " + "SETTINGS object_storage_cluster='cluster'", "" + }, + } + }, + .allow_readonly = false + } + ); +#endif + +#if USE_AVRO + factory.registerFunction( + { + .documentation = { + .description=R"(The table function can be used to read the Iceberg table stored on different object store in parallel for many nodes in a specified cluster or from single node.)", + .examples{ + { + "iceberg", + "SELECT * FROM iceberg(url, access_key_id, secret_access_key, storage_type='s3')", "" + }, + { + "iceberg", + "SELECT * FROM iceberg(url, access_key_id, secret_access_key, storage_type='s3') " + "SETTINGS object_storage_cluster='cluster'", "" + }, + { + "iceberg", + "SELECT * FROM iceberg(url, access_key_id, secret_access_key, storage_type='azure')", "" + }, + { + "iceberg", + "SELECT * FROM iceberg(url, storage_type='hdfs') SETTINGS object_storage_cluster='cluster'", "" + }, + } + }, + .allow_readonly = false + } + ); +#endif + +#if USE_AVRO && USE_AWS_S3 + factory.registerFunction( + { + .documentation = { + .description=R"(The table function can be used to read the Iceberg table stored on S3 object store in parallel for many nodes in a specified cluster or from single node.)", + .examples{ + { + "icebergS3", + "SELECT * FROM icebergS3(url, access_key_id, secret_access_key)", "" + }, + { + "icebergS3", + "SELECT * FROM icebergS3(url, access_key_id, secret_access_key) " + "SETTINGS object_storage_cluster='cluster'", "" + }, + } + }, + .allow_readonly = false + } + ); +#endif + +#if USE_AVRO && USE_AZURE_BLOB_STORAGE + factory.registerFunction( + { + .documentation = { + .description=R"(The table function can be used to read the Iceberg table stored on Azure object store in parallel for many nodes in a specified cluster or from single node.)", + .examples{ + { + "icebergAzure", + "SELECT * FROM icebergAzure(url, access_key_id, secret_access_key)", "" + }, + { + "icebergAzure", + "SELECT * FROM icebergAzure(url, access_key_id, secret_access_key) " + "SETTINGS object_storage_cluster='cluster'", "" + }, + } + }, + .allow_readonly = false + } + ); +#endif + +#if USE_AVRO && USE_HDFS + factory.registerFunction( + { + .documentation = { + .description=R"(The table function can be used to read the Iceberg table stored on HDFS virtual filesystem in parallel for many nodes in a specified cluster or from single node.)", + .examples{ + { + "icebergHDFS", + "SELECT * FROM icebergHDFS(url)", "" + }, + { + "icebergHDFS", + "SELECT * FROM icebergHDFS(url) SETTINGS object_storage_cluster='cluster'", "" + }, + } + }, + .allow_readonly = false + } + ); +#endif + +#if USE_AWS_S3 && USE_PARQUET && USE_DELTA_KERNEL_RS + factory.registerFunction( + { + .documentation = { + .description=R"(The table function can be used to read the DeltaLake table stored on object store in parallel for many nodes in a specified cluster or from single node.)", + .examples{ + { + "deltaLake", + "SELECT * FROM deltaLake(url, access_key_id, secret_access_key)", "" + }, + { + "deltaLake", + "SELECT * FROM deltaLake(url, access_key_id, secret_access_key) " + "SETTINGS object_storage_cluster='cluster'", "" + }, + } + }, + .allow_readonly = false + } + ); +#endif + +#if USE_AWS_S3 + factory.registerFunction( + { + .documentation = { + .description=R"(The table function can be used to read the Hudi table stored on object store in parallel for many nodes in a specified cluster or from single node.)", + .examples{ + { + "hudi", + "SELECT * FROM hudi(url, access_key_id, secret_access_key)", "" + }, + { + "hudi", + "SELECT * FROM hudi(url, access_key_id, secret_access_key) SETTINGS object_storage_cluster='cluster'", "" + }, + } + }, + .allow_readonly = false + } + ); +#endif +} + +} diff --git a/src/TableFunctions/TableFunctionObjectStorageClusterFallback.h b/src/TableFunctions/TableFunctionObjectStorageClusterFallback.h new file mode 100644 index 000000000000..28dfe22143dc --- /dev/null +++ b/src/TableFunctions/TableFunctionObjectStorageClusterFallback.h @@ -0,0 +1,49 @@ +#pragma once +#include "config.h" +#include + +namespace DB +{ + +/** +* Class implementing s3/hdfs/azureBlobStorage(...) table functions, +* which allow to use simple or distributed function variant based on settings. +* If setting `object_storage_cluster` is empty, +* simple single-host variant is used, if setting not empty, cluster variant is used. +* `SELECT * FROM s3('s3://...', ...) SETTINGS object_storage_cluster='cluster'` +* is equal to +* `SELECT * FROM s3Cluster('cluster', 's3://...', ...)` +*/ + +template +class TableFunctionObjectStorageClusterFallback : public Base +{ +public: + using BaseCluster = Base; + using BaseSimple = BaseCluster::Base; + + static constexpr auto name = Definition::name; + + String getName() const override { return name; } + + void validateUseToCreateTable() const override; + +private: + const char * getStorageTypeName() const override + { + return is_cluster_function ? Definition::storage_type_cluster_name : Definition::storage_type_name; + } + + StoragePtr executeImpl( + const ASTPtr & ast_function, + ContextPtr context, + const std::string & table_name, + ColumnsDescription cached_columns, + bool is_insert_query) const override; + + void parseArgumentsImpl(ASTs & args, const ContextPtr & context) override; + + bool is_cluster_function = false; +}; + +} diff --git a/src/TableFunctions/registerTableFunctions.cpp b/src/TableFunctions/registerTableFunctions.cpp index 131ca783f73f..c7b852b96fc8 100644 --- a/src/TableFunctions/registerTableFunctions.cpp +++ b/src/TableFunctions/registerTableFunctions.cpp @@ -65,6 +65,7 @@ void registerTableFunctions(bool use_legacy_mongodb_integration [[maybe_unused]] registerTableFunctionObjectStorage(factory); registerTableFunctionObjectStorageCluster(factory); + registerTableFunctionObjectStorageClusterFallback(factory); registerDataLakeTableFunctions(factory); registerDataLakeClusterTableFunctions(factory); } diff --git a/src/TableFunctions/registerTableFunctions.h b/src/TableFunctions/registerTableFunctions.h index 8b7d1b0cf60e..142948213352 100644 --- a/src/TableFunctions/registerTableFunctions.h +++ b/src/TableFunctions/registerTableFunctions.h @@ -61,6 +61,7 @@ void registerTableFunctionExplain(TableFunctionFactory & factory); void registerTableFunctionObjectStorage(TableFunctionFactory & factory); void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory); +void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & factory); void registerDataLakeTableFunctions(TableFunctionFactory & factory); void registerDataLakeClusterTableFunctions(TableFunctionFactory & factory); diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index 5bbd48264dfe..959fd71eb9e4 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -292,6 +292,21 @@ def test_wrong_cluster(started_cluster): assert "not found" in error + error = node.query_and_get_error( + """ + SELECT count(*) from s3( + 'http://minio1:9001/root/data/{clickhouse,database}/*', + 'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') + UNION ALL + SELECT count(*) from s3( + 'http://minio1:9001/root/data/{clickhouse,database}/*', + 'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') + SETTINGS object_storage_cluster = 'non_existing_cluster' + """ + ) + + assert "not found" in error + def test_ambiguous_join(started_cluster): node = started_cluster.instances["s0_0_0"] @@ -362,6 +377,17 @@ def test_unset_skip_unavailable_shards(started_cluster): assert result == "10\n" + result = node.query( + """ + SELECT count(*) from s3( + 'http://minio1:9001/root/data/clickhouse/part1.csv', + 'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') + SETTINGS object_storage_cluster = 'cluster_non_existent_port' + """ + ) + + assert result == "10\n" + def test_distributed_insert_select_with_replicated(started_cluster): first_replica_first_shard = started_cluster.instances["s0_0_0"] @@ -584,6 +610,20 @@ def test_cluster_format_detection(started_cluster): assert result == expected_result + result = node.query( + """SELECT * FROM s3('http://minio1:9001/root/data/generated/*', 'minio', 'minio123') order by c1, c2 + SETTINGS object_storage_cluster = 'cluster_simple'""" + ) + + assert result == expected_result + + result = node.query( + """SELECT * FROM s3('http://minio1:9001/root/data/generated/*', 'minio', 'minio123', auto, 'a String, b UInt64') order by a, b + SETTINGS object_storage_cluster = 'cluster_simple'""" + ) + + assert result == expected_result + def test_cluster_default_expression(started_cluster): node = started_cluster.instances["s0_0_0"] @@ -917,7 +957,7 @@ def test_hive_partitioning(started_cluster, allow_experimental_analyzer): ) cluster_optimized_traffic = int(cluster_optimized_traffic) assert cluster_optimized_traffic == optimized_traffic - + node.query("SET allow_experimental_analyzer = DEFAULT") diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index 3da62c4e39b9..5b448d0a095b 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -631,6 +631,27 @@ def count_secondary_subqueries(started_cluster, query_id, expected, comment): assert int(cluster_secondary_queries) == expected +def count_secondary_subqueries(started_cluster, query_id, expected, comment): + for node_name, replica in started_cluster.instances.items(): + cluster_secondary_queries = ( + replica.query( + f""" + SELECT count(*) FROM system.query_log + WHERE + type = 'QueryFinish' + AND NOT is_initial_query + AND initial_query_id='{query_id}' + """ + ) + .strip() + ) + + logging.info( + f"[{node_name}] cluster_secondary_queries {comment}: {cluster_secondary_queries}" + ) + assert int(cluster_secondary_queries) == expected + + @pytest.mark.parametrize("format_version", ["1", "2"]) @pytest.mark.parametrize("storage_type", ["s3", "azure"]) def test_cluster_table_function(started_cluster, format_version, storage_type):