diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 5070f4fbdfe6..26fb7048d93e 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -6880,6 +6880,9 @@ Possible values: - '' - do not force any kind of Exchange operators, let the optimizer choose, - 'Persisted' - use temporary files in object storage, - 'Streaming' - stream exchange data over network. +)", EXPERIMENTAL) \ + DECLARE(Bool, object_storage_remote_initiator, false, R"( +Execute request to object storage as remote on one of object_storage_cluster nodes. )", EXPERIMENTAL) \ \ /** Experimental timeSeries* aggregate functions. */ \ @@ -6887,6 +6890,7 @@ Possible values: Experimental timeSeries* aggregate functions for Prometheus-like timeseries resampling, rate, delta calculation. )", EXPERIMENTAL, allow_experimental_ts_to_grid_aggregate_function) \ \ + /* ####################################################### */ \ /* ############ END OF EXPERIMENTAL FEATURES ############# */ \ /* ####################################################### */ \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index ebc3d26dcf08..8849fb3666d2 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -76,6 +76,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"lock_object_storage_task_distribution_ms", 0, 0, "New setting."}, {"object_storage_cluster", "", "", "New setting"}, {"object_storage_max_nodes", 0, 0, "New setting"}, + {"object_storage_remote_initiator", false, false, "New setting."}, }); addSettingsChanges(settings_changes_history, "25.6", { diff --git a/src/Databases/DataLake/DataLakeConstants.h b/src/Databases/DataLake/DataLakeConstants.h index eaa8f5a276e6..02f6a7dcfcd7 100644 --- a/src/Databases/DataLake/DataLakeConstants.h +++ b/src/Databases/DataLake/DataLakeConstants.h @@ -8,6 +8,7 @@ namespace DataLake { static constexpr auto DATABASE_ENGINE_NAME = "DataLakeCatalog"; +static constexpr auto DATABASE_ALIAS_NAME = "Iceberg"; static constexpr std::string_view FILE_PATH_PREFIX = "file:/"; /// Some catalogs (Unity or Glue) may store not only Iceberg/DeltaLake tables but other kinds of "tables" diff --git a/src/Databases/DataLake/DatabaseDataLake.cpp b/src/Databases/DataLake/DatabaseDataLake.cpp index 7e647c0fd06e..1631dab31a2c 100644 --- a/src/Databases/DataLake/DatabaseDataLake.cpp +++ b/src/Databases/DataLake/DatabaseDataLake.cpp @@ -646,6 +646,11 @@ void registerDatabaseDataLake(DatabaseFactory & factory) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", database_engine_name); } + if (database_engine_name == "Iceberg" && catalog_type != DatabaseDataLakeCatalogType::ICEBERG_REST) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `Iceberg` must have `rest` catalog type only"); + } + for (auto & engine_arg : engine_args) engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.context); @@ -724,6 +729,7 @@ void registerDatabaseDataLake(DatabaseFactory & factory) std::move(engine_for_tables)); }; factory.registerDatabase("DataLakeCatalog", create_fn, { .supports_arguments = true, .supports_settings = true }); + factory.registerDatabase("Iceberg", create_fn, { .supports_arguments = true, .supports_settings = true }); } } diff --git a/src/Databases/DataLake/ICatalog.cpp b/src/Databases/DataLake/ICatalog.cpp index d2afc528505d..1d3a43cacd50 100644 --- a/src/Databases/DataLake/ICatalog.cpp +++ b/src/Databases/DataLake/ICatalog.cpp @@ -70,13 +70,19 @@ void TableMetadata::setLocation(const std::string & location_) auto pos_to_path = location_.substr(pos_to_bucket).find('/'); if (pos_to_path == std::string::npos) - throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unexpected location format: {}", location_); - - pos_to_path = pos_to_bucket + pos_to_path; + { // empty path + location_without_path = location_; + path.clear(); + bucket = location_.substr(pos_to_bucket); + } + else + { + pos_to_path = pos_to_bucket + pos_to_path; - location_without_path = location_.substr(0, pos_to_path); - path = location_.substr(pos_to_path + 1); - bucket = location_.substr(pos_to_bucket, pos_to_path - pos_to_bucket); + location_without_path = location_.substr(0, pos_to_path); + path = location_.substr(pos_to_path + 1); + bucket = location_.substr(pos_to_bucket, pos_to_path - pos_to_bucket); + } LOG_TEST(getLogger("TableMetadata"), "Parsed location without path: {}, path: {}", diff --git a/src/IO/ReadBufferFromS3.cpp b/src/IO/ReadBufferFromS3.cpp index 1a27e98c4ae1..bdbe26342678 100644 --- a/src/IO/ReadBufferFromS3.cpp +++ b/src/IO/ReadBufferFromS3.cpp @@ -444,6 +444,12 @@ Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t attempt, si log, "Read S3 object. Bucket: {}, Key: {}, Version: {}, Offset: {}", bucket, key, version_id.empty() ? "Latest" : version_id, range_begin); } + else + { + LOG_TEST( + log, "Read S3 object. Bucket: {}, Key: {}, Version: {}", + bucket, key, version_id.empty() ? "Latest" : version_id); + } ProfileEvents::increment(ProfileEvents::S3GetObject); if (client_ptr->isClientForDisk()) diff --git a/src/IO/S3/Client.cpp b/src/IO/S3/Client.cpp index 879b90624953..49f521fc28a2 100644 --- a/src/IO/S3/Client.cpp +++ b/src/IO/S3/Client.cpp @@ -384,7 +384,7 @@ Model::HeadObjectOutcome Client::HeadObject(HeadObjectRequest & request) const auto bucket_uri = getURIForBucket(bucket); if (!bucket_uri) { - if (auto maybe_error = updateURIForBucketForHead(bucket); maybe_error.has_value()) + if (auto maybe_error = updateURIForBucketForHead(bucket, request.GetKey()); maybe_error.has_value()) return *maybe_error; if (auto region = getRegionForBucket(bucket); !region.empty()) @@ -589,7 +589,6 @@ Client::doRequest(RequestType & request, RequestFn request_fn) const if (auto uri = getURIForBucket(bucket); uri.has_value()) request.overrideURI(std::move(*uri)); - bool found_new_endpoint = false; // if we found correct endpoint after 301 responses, update the cache for future requests SCOPE_EXIT( @@ -869,12 +868,15 @@ std::optional Client::getURIFromError(const Aws::S3::S3Error & error) c } // Do a list request because head requests don't have body in response -std::optional Client::updateURIForBucketForHead(const std::string & bucket) const +// S3 Tables don't support ListObjects, so made dirty workaroung - changed on GetObject +std::optional Client::updateURIForBucketForHead(const std::string & bucket, const std::string & key) const { - ListObjectsV2Request req; + GetObjectRequest req; req.SetBucket(bucket); - req.SetMaxKeys(1); - auto result = ListObjectsV2(req); + req.SetKey(key); + req.SetRange("bytes=0-1"); + auto result = GetObject(req); + if (result.IsSuccess()) return std::nullopt; return result.GetError(); diff --git a/src/IO/S3/Client.h b/src/IO/S3/Client.h index d03bd70ca310..ac82dad77452 100644 --- a/src/IO/S3/Client.h +++ b/src/IO/S3/Client.h @@ -279,7 +279,7 @@ class Client : private Aws::S3::S3Client void updateURIForBucket(const std::string & bucket, S3::URI new_uri) const; std::optional getURIFromError(const Aws::S3::S3Error & error) const; - std::optional updateURIForBucketForHead(const std::string & bucket) const; + std::optional updateURIForBucketForHead(const std::string & bucket, const std::string & key) const; std::optional getURIForBucket(const std::string & bucket) const; diff --git a/src/IO/S3/URI.cpp b/src/IO/S3/URI.cpp index cfcdd21a5b90..10ca3d647e06 100644 --- a/src/IO/S3/URI.cpp +++ b/src/IO/S3/URI.cpp @@ -158,10 +158,72 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax) validateKey(key, uri); } +bool URI::isAWSRegion(std::string_view region) +{ + /// List from https://docs.aws.amazon.com/general/latest/gr/s3.html + static const std::unordered_set regions = { + "us-east-2", + "us-east-1", + "us-west-1", + "us-west-2", + "af-south-1", + "ap-east-1", + "ap-south-2", + "ap-southeast-3", + "ap-southeast-5", + "ap-southeast-4", + "ap-south-1", + "ap-northeast-3", + "ap-northeast-2", + "ap-southeast-1", + "ap-southeast-2", + "ap-east-2", + "ap-southeast-7", + "ap-northeast-1", + "ca-central-1", + "ca-west-1", + "eu-central-1", + "eu-west-1", + "eu-west-2", + "eu-south-1", + "eu-west-3", + "eu-south-2", + "eu-north-1", + "eu-central-2", + "il-central-1", + "mx-central-1", + "me-south-1", + "me-central-1", + "sa-east-1", + "us-gov-east-1", + "us-gov-west-1" + }; + + /// 's3-us-west-2' is a legacy region format for S3 storage, equals to 'us-west-2' + /// See https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html#VirtualHostingBackwardsCompatibility + if (region.substr(0, 3) == "s3-") + region = region.substr(3); + + return regions.contains(region); +} + void URI::addRegionToURI(const std::string ®ion) { if (auto pos = endpoint.find(".amazonaws.com"); pos != std::string::npos) + { + if (pos > 0) + { /// Check if region is already in endpoint to avoid add it second time + auto prev_pos = endpoint.find_last_of("/.", pos - 1); + if (prev_pos == std::string::npos) + prev_pos = 0; + else + ++prev_pos; + std::string_view endpoint_region = std::string_view(endpoint).substr(prev_pos, pos - prev_pos); + if (isAWSRegion(endpoint_region)) + return; + } endpoint = endpoint.substr(0, pos) + "." + region + endpoint.substr(pos); + } } void URI::validateBucket(const String & bucket, const Poco::URI & uri) diff --git a/src/IO/S3/URI.h b/src/IO/S3/URI.h index d455cd1908f0..d3b7e04b265a 100644 --- a/src/IO/S3/URI.h +++ b/src/IO/S3/URI.h @@ -42,6 +42,10 @@ struct URI static void validateBucket(const std::string & bucket, const Poco::URI & uri); static void validateKey(const std::string & key, const Poco::URI & uri); + /// Returns true if 'region' string is an AWS S3 region + /// https://docs.aws.amazon.com/general/latest/gr/s3.html + static bool isAWSRegion(std::string_view region); + private: std::pair> getURIAndArchivePattern(const std::string & source); }; diff --git a/src/Parsers/ASTSetQuery.cpp b/src/Parsers/ASTSetQuery.cpp index bfda0d6f9b83..a672551ab8d7 100644 --- a/src/Parsers/ASTSetQuery.cpp +++ b/src/Parsers/ASTSetQuery.cpp @@ -129,7 +129,8 @@ void ASTSetQuery::formatImpl(WriteBuffer & ostr, const FormatSettings & format, return true; } - if (DataLake::DATABASE_ENGINE_NAME == state.create_engine_name) + if (DataLake::DATABASE_ENGINE_NAME == state.create_engine_name + || DataLake::DATABASE_ALIAS_NAME == state.create_engine_name) { if (DataLake::SETTINGS_TO_HIDE.contains(change.name)) { diff --git a/src/Parsers/FunctionSecretArgumentsFinder.h b/src/Parsers/FunctionSecretArgumentsFinder.h index f145760a3a26..9ab6689637d0 100644 --- a/src/Parsers/FunctionSecretArgumentsFinder.h +++ b/src/Parsers/FunctionSecretArgumentsFinder.h @@ -716,7 +716,7 @@ class FunctionSecretArgumentsFinder /// S3('url', 'access_key_id', 'secret_access_key') findS3DatabaseSecretArguments(); } - else if (engine_name == "DataLakeCatalog") + else if (engine_name == "DataLakeCatalog" || engine_name == "Iceberg") { findDataLakeCatalogSecretArguments(); } diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index a5f471fbed25..0e8a2e69651f 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -1,5 +1,8 @@ #include +#include +#include + #include #include #include @@ -13,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -22,6 +26,9 @@ #include #include #include +#include +#include +#include #include #include @@ -40,6 +47,7 @@ namespace Setting extern const SettingsString cluster_for_parallel_replicas; extern const SettingsNonZeroUInt64 max_parallel_replicas; extern const SettingsUInt64 object_storage_max_nodes; + extern const SettingsBool object_storage_remote_initiator; } namespace ErrorCodes @@ -73,7 +81,7 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate) if (extension) return; - extension = storage->getTaskIteratorExtension(predicate, context, cluster); + extension = storage->getTaskIteratorExtension(predicate, filter_actions_dag, context, cluster); } /// The code executes on initiator @@ -97,15 +105,16 @@ void IStorageCluster::read( storage_snapshot->check(column_names); - updateBeforeRead(context); - auto cluster = getClusterImpl(context, cluster_name_from_settings, context->getSettingsRef()[Setting::object_storage_max_nodes]); + const auto & settings = context->getSettingsRef(); + + auto cluster = getClusterImpl(context, cluster_name_from_settings, settings[Setting::object_storage_max_nodes]); /// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*) Block sample_block; ASTPtr query_to_send = query_info.query; - if (context->getSettingsRef()[Setting::allow_experimental_analyzer]) + if (settings[Setting::allow_experimental_analyzer]) { sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query_info.query, context, SelectQueryOptions(processed_stage)); } @@ -118,6 +127,17 @@ void IStorageCluster::read( updateQueryToSendIfNeeded(query_to_send, storage_snapshot, context); + if (settings[Setting::object_storage_remote_initiator]) + { + auto storage_and_context = convertToRemote(cluster, context, cluster_name_from_settings, query_to_send); + auto src_distributed = std::dynamic_pointer_cast(storage_and_context.storage); + auto modified_query_info = query_info; + modified_query_info.cluster = src_distributed->getCluster(); + auto new_storage_snapshot = storage_and_context.storage->getStorageSnapshot(storage_snapshot->metadata, storage_and_context.context); + storage_and_context.storage->read(query_plan, column_names, new_storage_snapshot, modified_query_info, storage_and_context.context, processed_stage, max_block_size, num_streams); + return; + } + RestoreQualifiedNamesVisitor::Data data; data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_info.query->as(), 0)); data.remote_table.database = context->getCurrentDatabase(); @@ -145,6 +165,62 @@ void IStorageCluster::read( query_plan.addStep(std::move(reading)); } +IStorageCluster::RemoteCallVariables IStorageCluster::convertToRemote( + ClusterPtr cluster, + ContextPtr context, + const std::string & cluster_name_from_settings, + ASTPtr query_to_send) +{ + auto host_addresses = cluster->getShardsAddresses(); + if (host_addresses.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty cluster {}", cluster_name_from_settings); + + static pcg64 rng(randomSeed()); + size_t shard_num = rng() % host_addresses.size(); + auto shard_addresses = host_addresses[shard_num]; + /// After getClusterImpl each shard must have exactly 1 replica + if (shard_addresses.size() != 1) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of shard {} in cluster {} is not equal 1", shard_num, cluster_name_from_settings); + auto host_name = shard_addresses[0].toString(); + + LOG_INFO(log, "Choose remote initiator '{}'", host_name); + + bool secure = shard_addresses[0].secure == Protocol::Secure::Enable; + std::string remote_function_name = secure ? "remoteSecure" : "remote"; + + /// Clean object_storage_remote_initiator setting to avoid infinite remote call + auto new_context = Context::createCopy(context); + new_context->setSetting("object_storage_remote_initiator", false); + + auto * select_query = query_to_send->as(); + if (!select_query) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query"); + + auto query_settings = select_query->settings(); + if (query_settings) + { + auto & settings_ast = query_settings->as(); + if (settings_ast.changes.removeSetting("object_storage_remote_initiator") && settings_ast.changes.empty()) + { + select_query->setExpression(ASTSelectQuery::Expression::SETTINGS, {}); + } + } + + ASTTableExpression * table_expression = extractTableExpressionASTPtrFromSelectQuery(query_to_send); + if (!table_expression) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table expression"); + + auto remote_query = makeASTFunction(remote_function_name, std::make_shared(host_name), table_expression->table_function); + + table_expression->table_function = remote_query; + + auto remote_function = TableFunctionFactory::instance().get(remote_query, new_context); + + auto storage = remote_function->execute(query_to_send, new_context, remote_function_name); + + return RemoteCallVariables{storage, new_context}; +} + SinkToStoragePtr IStorageCluster::write( const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index ad5c41d17eec..b7c4b940d394 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -45,6 +45,7 @@ class IStorageCluster : public IStorage /// Query is needed for pruning by virtual columns (_file, _path) virtual RemoteQueryExecutor::Extension getTaskIteratorExtension( const ActionsDAG::Node * predicate, + const std::optional & filter_actions_dag, const ContextPtr & context, ClusterPtr cluster) const = 0; @@ -59,9 +60,20 @@ class IStorageCluster : public IStorage virtual String getClusterName(ContextPtr /* context */) const { return getOriginalClusterName(); } protected: - virtual void updateBeforeRead(const ContextPtr &) {} virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {} + struct RemoteCallVariables + { + StoragePtr storage; + ContextPtr context; + }; + + RemoteCallVariables convertToRemote( + ClusterPtr cluster, + ContextPtr context, + const std::string & cluster_name_from_settings, + ASTPtr query_to_send); + virtual void readFallBackToPure( QueryPlan & /* query_plan */, const Names & /* column_names */, diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h index 221868c0b63c..4768936e3627 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -40,6 +41,7 @@ namespace ErrorCodes namespace DataLakeStorageSetting { extern DataLakeStorageSettingsBool allow_dynamic_metadata_for_data_lakes; + extern const DataLakeStorageSettingsString iceberg_metadata_file_path; } @@ -96,6 +98,16 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl return std::nullopt; } + std::optional tryGetSamplePathFromMetadata() const override + { + if (!current_metadata) + return std::nullopt; + auto data_files = current_metadata->getDataFiles(); + if (!data_files.empty()) + return data_files[0]; + return std::nullopt; + } + std::optional totalRows(ContextPtr local_context) override { assertInitializedDL(); @@ -173,6 +185,42 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl current_metadata->modifyFormatSettings(settings_); } + ASTPtr createArgsWithAccessData() const override + { + auto res = BaseStorageConfiguration::createArgsWithAccessData(); + + auto iceberg_metadata_file_path = (*settings)[DataLakeStorageSetting::iceberg_metadata_file_path]; + + if (iceberg_metadata_file_path.changed) + { + auto * arguments = res->template as(); + if (!arguments) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Arguments are not an expression list"); + + bool has_settings = false; + + for (auto & arg : arguments->children) + { + if (auto * settings_ast = arg->template as()) + { + has_settings = true; + settings_ast->changes.setSetting("iceberg_metadata_file_path", iceberg_metadata_file_path.value); + break; + } + } + + if (!has_settings) + { + std::shared_ptr settings_ast = std::make_shared(); + settings_ast->is_standalone = false; + settings_ast->changes.setSetting("iceberg_metadata_file_path", iceberg_metadata_file_path.value); + arguments->children.push_back(settings_ast); + } + } + + return res; + } + private: DataLakeMetadataPtr current_metadata; LoggerPtr log = getLogger("DataLakeConfiguration"); @@ -487,6 +535,11 @@ class StorageIcebergConfiguration : public StorageObjectStorage::Configuration, void assertInitialized() const override { getImpl().assertInitialized(); } + std::optional tryGetSamplePathFromMetadata() const override + { + return getImpl().tryGetSamplePathFromMetadata(); + } + private: inline StorageObjectStorage::Configuration & getImpl() const { diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h index e68a673c740f..a3cf16cd6673 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h @@ -35,6 +35,8 @@ class DeltaLakeMetadata final : public IDataLakeMetadata DeltaLakeMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_); + Strings getDataFiles() const override { return data_files; } + NamesAndTypesList getTableSchema() const override { return schema; } DeltaLakePartitionColumns getPartitionColumns() const { return partition_columns; } diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp index a00568642dc3..e54bc89e3a39 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp @@ -35,6 +35,11 @@ bool DeltaLakeMetadataDeltaKernel::update(const ContextPtr &) return table_snapshot->update(); } +Strings DeltaLakeMetadataDeltaKernel::getDataFiles() const +{ + throwNotImplemented("getDataFiles()"); +} + ObjectIterator DeltaLakeMetadataDeltaKernel::iterate( const ActionsDAG * filter_dag, FileProgressCallback callback, diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h index 679214d5c489..cfc57b791040 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h @@ -33,6 +33,8 @@ class DeltaLakeMetadataDeltaKernel final : public IDataLakeMetadata bool update(const ContextPtr & context) override; + Strings getDataFiles() const override; + NamesAndTypesList getTableSchema() const override; DB::ReadFromFormatInfo prepareReadingFromFormat( diff --git a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp index 774e39554edc..6398ad34a4b6 100644 --- a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp @@ -91,7 +91,7 @@ HudiMetadata::HudiMetadata(ObjectStoragePtr object_storage_, ConfigurationObserv { } -Strings HudiMetadata::getDataFiles(const ActionsDAG *) const +Strings HudiMetadata::getDataFiles() const { if (data_files.empty()) data_files = getDataFilesImpl(); @@ -99,12 +99,12 @@ Strings HudiMetadata::getDataFiles(const ActionsDAG *) const } ObjectIterator HudiMetadata::iterate( - const ActionsDAG * filter_dag, + const ActionsDAG * /* filter_dag */, FileProgressCallback callback, size_t /* list_batch_size */, ContextPtr /* context */) const { - return createKeysIterator(getDataFiles(filter_dag), object_storage, callback); + return createKeysIterator(getDataFiles(), object_storage, callback); } } diff --git a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h index 7fd94e0d14c4..2c23269b928a 100644 --- a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h @@ -19,6 +19,8 @@ class HudiMetadata final : public IDataLakeMetadata, private WithContext HudiMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_); + Strings getDataFiles() const override; + NamesAndTypesList getTableSchema() const override { return {}; } bool operator ==(const IDataLakeMetadata & other) const override @@ -50,7 +52,6 @@ class HudiMetadata final : public IDataLakeMetadata, private WithContext mutable Strings data_files; Strings getDataFilesImpl() const; - Strings getDataFiles(const ActionsDAG * filter_dag) const; }; } diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h index 39f66faf106f..049a1fa8d60a 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h @@ -21,6 +21,9 @@ class IDataLakeMetadata : boost::noncopyable virtual bool operator==(const IDataLakeMetadata & other) const = 0; + /// List all data files. + /// For better parallelization, iterate() method should be used. + virtual Strings getDataFiles() const = 0; /// Return iterator to `data files`. using FileProgressCallback = std::function; virtual ObjectIterator iterate( diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index c2b8caa64c55..b09986652faa 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -144,7 +144,8 @@ IcebergMetadata::IcebergMetadata( Int32 format_version_, const Poco::JSON::Object::Ptr & metadata_object_, IcebergMetadataFilesCachePtr cache_ptr) - : object_storage(std::move(object_storage_)) + : WithContext(context_) + , object_storage(std::move(object_storage_)) , configuration(std::move(configuration_)) , schema_processor(IcebergSchemaProcessor()) , log(getLogger("IcebergMetadata")) @@ -528,7 +529,7 @@ void IcebergMetadata::updateSnapshot(ContextPtr local_context, Poco::JSON::Objec relevant_snapshot = IcebergSnapshot{ getManifestList(local_context, getProperFilePathFromMetadataInfo( - snapshot->getValue(f_manifest_list), configuration_ptr->getPathForRead().path, table_location)), + snapshot->getValue(f_manifest_list), configuration_ptr->getPathForRead().path, table_location, configuration_ptr->getNamespace())), relevant_snapshot_id, total_rows, total_bytes}; if (!snapshot->has(f_schema_id)) @@ -710,7 +711,7 @@ ManifestFileCacheKeys IcebergMetadata::getManifestList(ContextPtr local_context, for (size_t i = 0; i < manifest_list_deserializer.rows(); ++i) { const std::string file_path = manifest_list_deserializer.getValueFromRowByName(i, f_manifest_path, TypeIndex::String).safeGet(); - const auto manifest_file_name = getProperFilePathFromMetadataInfo(file_path, configuration_ptr->getPathForRead().path, table_location); + const auto manifest_file_name = getProperFilePathFromMetadataInfo(file_path, configuration_ptr->getPathForRead().path, table_location, configuration_ptr->getNamespace()); Int64 added_sequence_number = 0; if (format_version > 1) added_sequence_number = manifest_list_deserializer.getValueFromRowByName(i, f_sequence_number, TypeIndex::Int64).safeGet(); @@ -846,6 +847,7 @@ ManifestFilePtr IcebergMetadata::getManifestFile(ContextPtr local_context, const schema_processor, inherited_sequence_number, table_location, + configuration_ptr->getNamespace(), local_context); }; @@ -858,7 +860,7 @@ ManifestFilePtr IcebergMetadata::getManifestFile(ContextPtr local_context, const return create_fn(); } -Strings IcebergMetadata::getDataFiles(const ActionsDAG * filter_dag, ContextPtr local_context) const +Strings IcebergMetadata::getDataFilesImpl(const ActionsDAG * filter_dag, ContextPtr local_context) const { bool use_partition_pruning = filter_dag && local_context->getSettingsRef()[Setting::use_iceberg_partition_pruning]; @@ -980,7 +982,7 @@ ObjectIterator IcebergMetadata::iterate( ContextPtr local_context) const { SharedLockGuard lock(mutex); - return createKeysIterator(getDataFiles(filter_dag, local_context), object_storage, callback); + return createKeysIterator(getDataFilesImpl(filter_dag, local_context), object_storage, callback); } NamesAndTypesList IcebergMetadata::getTableSchema() const diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h index 411ffa296d17..0a6d16112625 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h @@ -27,7 +27,7 @@ namespace DB { -class IcebergMetadata : public IDataLakeMetadata +class IcebergMetadata : public IDataLakeMetadata, private WithContext { public: using ConfigurationObserverPtr = StorageObjectStorage::ConfigurationObserverPtr; @@ -45,6 +45,11 @@ class IcebergMetadata : public IDataLakeMetadata const Poco::JSON::Object::Ptr & metadata_object, IcebergMetadataFilesCachePtr cache_ptr); + /// Get data files. On first request it reads manifest_list file and iterates through manifest files to find all data files. + /// All subsequent calls when the same data snapshot is relevant will return saved list of files (because it cannot be changed + /// without changing metadata file). Drops on every snapshot update. + Strings getDataFiles() const override { return getDataFilesImpl(nullptr, getContext()); } + /// Get table schema parsed from metadata. NamesAndTypesList getTableSchema() const override; @@ -111,7 +116,7 @@ class IcebergMetadata : public IDataLakeMetadata mutable std::mutex cached_unprunned_files_for_last_processed_snapshot_mutex; void updateState(const ContextPtr & local_context, Poco::JSON::Object::Ptr metadata_object, bool metadata_file_changed) TSA_REQUIRES(mutex); - Strings getDataFiles(const ActionsDAG * filter_dag, ContextPtr local_context) const; + Strings getDataFilesImpl(const ActionsDAG * filter_dag, ContextPtr local_context) const; void updateSnapshot(ContextPtr local_context, Poco::JSON::Object::Ptr metadata_object) TSA_REQUIRES(mutex); ManifestFileCacheKeys getManifestList(ContextPtr local_context, const String & filename) const; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp index 22419d812f45..3f94ad70fe6e 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp @@ -128,6 +128,7 @@ ManifestFileContent::ManifestFileContent( const IcebergSchemaProcessor & schema_processor, Int64 inherited_sequence_number, const String & table_location, + const String & common_namespace, DB::ContextPtr context) { this->schema_id = schema_id_; @@ -192,7 +193,11 @@ ManifestFileContent::ManifestFileContent( } const auto status = ManifestEntryStatus(manifest_file_deserializer.getValueFromRowByName(i, f_status, TypeIndex::Int32).safeGet()); - const auto file_path = getProperFilePathFromMetadataInfo(manifest_file_deserializer.getValueFromRowByName(i, c_data_file_file_path, TypeIndex::String).safeGet(), common_path, table_location); + const auto file_path = getProperFilePathFromMetadataInfo( + manifest_file_deserializer.getValueFromRowByName(i, c_data_file_file_path, TypeIndex::String).safeGet(), + common_path, + table_location, + common_namespace); /// NOTE: This is weird, because in manifest file partition looks like this: /// { diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h index 0fc613c65946..9d49c9ef548d 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h @@ -94,6 +94,7 @@ class ManifestFileContent const DB::IcebergSchemaProcessor & schema_processor, Int64 inherited_sequence_number, const std::string & table_location, + const std::string & common_namespace, DB::ContextPtr context); const std::vector & getFiles() const; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp index 56d7b9565fb1..33af1fe07eaa 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp @@ -28,7 +28,11 @@ using namespace DB; // This function is used to get the file path inside the directory which corresponds to iceberg table from the full blob path which is written in manifest and metadata files. // For example, if the full blob path is s3://bucket/table_name/data/00000-1-1234567890.avro, the function will return table_name/data/00000-1-1234567890.avro // Common path should end with "" or "/". -std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::string_view common_path, std::string_view table_location) +std::string getProperFilePathFromMetadataInfo( + std::string_view data_path, + std::string_view common_path, + std::string_view table_location, + std::string_view common_namespace) { auto trim_backward_slash = [](std::string_view str) -> std::string_view { @@ -84,7 +88,20 @@ std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::s } else { - throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Expected to find '{}' in data path: '{}'", common_path, data_path); + /// Data files can have different path + pos = data_path.find("://"); + if (pos == std::string::npos) + throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected data path: '{}'", data_path); + pos = data_path.find('/', pos + 3); + if (pos == std::string::npos) + throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected data path: '{}'", data_path); + if (data_path.substr(pos + 1).starts_with(common_namespace)) + { + auto new_pos = data_path.find('/', pos + 1); + if (new_pos - pos == common_namespace.length() + 1) /// bucket in the path + pos = new_pos; + } + return std::string(data_path.substr(pos)); } } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h index 432751be8832..300df4492aa6 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h @@ -10,7 +10,11 @@ namespace Iceberg { -std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::string_view common_path, std::string_view table_location); +std::string getProperFilePathFromMetadataInfo( + std::string_view data_path, + std::string_view common_path, + std::string_view table_location, + std::string_view common_namespace); } diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index 9ee0ad779f27..bd46d8412649 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -98,7 +98,8 @@ StorageObjectStorage::StorageObjectStorage( bool distributed_processing_, ASTPtr partition_by_, bool is_table_function, - bool lazy_init) + bool lazy_init, + std::optional sample_path_) : IStorage(table_id_) , configuration(configuration_) , object_storage(object_storage_) @@ -145,7 +146,7 @@ StorageObjectStorage::StorageObjectStorage( /// (e.g. read always follows constructor immediately). update_configuration_on_read_write = !is_table_function || !updated_configuration; - std::string sample_path; + std::string sample_path = sample_path_.value_or(""); ColumnsDescription columns{columns_in_table_or_function_definition}; if (need_resolve_columns_or_format) @@ -315,6 +316,11 @@ std::optional StorageObjectStorage::Configuration::tryGetTab throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method tryGetTableStructureFromMetadata is not implemented for basic configuration"); } +std::optional StorageObjectStorage::Configuration::tryGetSamplePathFromMetadata() const +{ + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method tryGetSamplePathFromMetadata is not implemented for basic configuration"); +} + void StorageObjectStorage::read( QueryPlan & query_plan, const Names & column_names, diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index 55a6eaf4f451..721cab5b8c6a 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -78,7 +78,8 @@ class StorageObjectStorage : public IStorage bool distributed_processing_ = false, ASTPtr partition_by_ = nullptr, bool is_table_function_ = false, - bool lazy_init = false); + bool lazy_init = false, + std::optional sample_path_ = std::nullopt); String getName() const override; @@ -288,6 +289,7 @@ class StorageObjectStorage::Configuration virtual void initPartitionStrategy(ASTPtr partition_by, const ColumnsDescription & columns, ContextPtr context); virtual std::optional tryGetTableStructureFromMetadata() const; + virtual std::optional tryGetSamplePathFromMetadata() const; virtual bool supportsFileIterator() const { return false; } virtual bool supportsWrites() const { return true; } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index a65ed0325e29..f2a9615fb86f 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -166,7 +166,8 @@ StorageObjectStorageCluster::StorageObjectStorageCluster( /* distributed_processing */false, partition_by, /* is_table_function */false, - /* lazy_init */lazy_init); + /* lazy_init */lazy_init, + sample_path); auto virtuals_ = getVirtualsPtr(); if (virtuals_) @@ -440,12 +441,13 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded( RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension( const ActionsDAG::Node * predicate, + const std::optional & filter_actions_dag, const ContextPtr & local_context, ClusterPtr cluster) const { auto iterator = StorageObjectStorageSource::createFileIterator( configuration, configuration->getQuerySettings(local_context), object_storage, /* distributed_processing */false, - local_context, predicate, {}, getVirtualsList(), hive_partition_columns_to_read_from_file_path, nullptr, local_context->getFileProgressCallback(), /*ignore_archive_globs=*/true, /*skip_object_metadata=*/true); + local_context, predicate, filter_actions_dag, getVirtualsList(), hive_partition_columns_to_read_from_file_path, nullptr, local_context->getFileProgressCallback(), /*ignore_archive_globs=*/true, /*skip_object_metadata=*/true); std::vector ids_of_hosts; for (const auto & shard : cluster->getShardsInfo()) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index ba9be238c3a6..2cceaa7fa6b2 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -30,6 +30,7 @@ class StorageObjectStorageCluster : public IStorageCluster RemoteQueryExecutor::Extension getTaskIteratorExtension( const ActionsDAG::Node * predicate, + const std::optional & filter_actions_dag, const ContextPtr & context, ClusterPtr cluster) const override; diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 0ae57212024d..c5a89813f79c 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -1317,7 +1317,7 @@ std::optional StorageDistributed::distributedWriteFromClusterStor const auto cluster = getCluster(); /// Select query is needed for pruining on virtual columns - auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context, cluster); + auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, filter, local_context, cluster); /// Here we take addresses from destination cluster and assume source table exists on these nodes size_t replica_index = 0; diff --git a/src/Storages/StorageFileCluster.cpp b/src/Storages/StorageFileCluster.cpp index 9fe7b164a7a7..de825df3a255 100644 --- a/src/Storages/StorageFileCluster.cpp +++ b/src/Storages/StorageFileCluster.cpp @@ -96,6 +96,7 @@ void StorageFileCluster::updateQueryToSendIfNeeded(DB::ASTPtr & query, const Sto RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension( const ActionsDAG::Node * predicate, + const std::optional & /* filter_actions_dag */, const ContextPtr & context, ClusterPtr) const { diff --git a/src/Storages/StorageFileCluster.h b/src/Storages/StorageFileCluster.h index 5fb08a48eec6..354285602ec4 100644 --- a/src/Storages/StorageFileCluster.h +++ b/src/Storages/StorageFileCluster.h @@ -29,6 +29,7 @@ class StorageFileCluster : public IStorageCluster std::string getName() const override { return "FileCluster"; } RemoteQueryExecutor::Extension getTaskIteratorExtension( const ActionsDAG::Node * predicate, + const std::optional & filter_actions_dag, const ContextPtr & context, ClusterPtr) const override; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 9ec0231a8c71..565d0d3fb5f3 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -6086,7 +6086,7 @@ std::optional StorageReplicatedMergeTree::distributedWriteFromClu ContextMutablePtr query_context = Context::createCopy(local_context); query_context->increaseDistributedDepth(); - auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context, src_cluster); + auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, {}, local_context, src_cluster); size_t replica_index = 0; for (const auto & replicas : src_cluster->getShardsAddresses()) diff --git a/src/Storages/StorageURLCluster.cpp b/src/Storages/StorageURLCluster.cpp index 13be67789df8..5570392477e8 100644 --- a/src/Storages/StorageURLCluster.cpp +++ b/src/Storages/StorageURLCluster.cpp @@ -128,6 +128,7 @@ void StorageURLCluster::updateQueryToSendIfNeeded(ASTPtr & query, const StorageS RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension( const ActionsDAG::Node * predicate, + const std::optional & /* filter_actions_dag */, const ContextPtr & context, ClusterPtr) const { diff --git a/src/Storages/StorageURLCluster.h b/src/Storages/StorageURLCluster.h index e360eb22d701..2fefa108c965 100644 --- a/src/Storages/StorageURLCluster.h +++ b/src/Storages/StorageURLCluster.h @@ -32,6 +32,7 @@ class StorageURLCluster : public IStorageCluster std::string getName() const override { return "URLCluster"; } RemoteQueryExecutor::Extension getTaskIteratorExtension( const ActionsDAG::Node * predicate, + const std::optional & filter_actions_dag, const ContextPtr & context, ClusterPtr) const override; diff --git a/src/Storages/extractTableFunctionFromSelectQuery.cpp b/src/Storages/extractTableFunctionFromSelectQuery.cpp index c7f60240b3c7..2f457dadee3b 100644 --- a/src/Storages/extractTableFunctionFromSelectQuery.cpp +++ b/src/Storages/extractTableFunctionFromSelectQuery.cpp @@ -9,7 +9,7 @@ namespace DB { -ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query) +ASTTableExpression * extractTableExpressionASTPtrFromSelectQuery(ASTPtr & query) { auto * select_query = query->as(); if (!select_query || !select_query->tables()) @@ -17,10 +17,22 @@ ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query) auto * tables = select_query->tables()->as(); auto * table_expression = tables->children[0]->as()->table_expression->as(); - if (!table_expression->table_function) + return table_expression; +} + +ASTPtr extractTableFunctionASTPtrFromSelectQuery(ASTPtr & query) +{ + auto table_expression = extractTableExpressionASTPtrFromSelectQuery(query); + return table_expression ? table_expression->table_function : nullptr; +} + +ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query) +{ + auto table_function_ast = extractTableFunctionASTPtrFromSelectQuery(query); + if (!table_function_ast) return nullptr; - auto * table_function = table_expression->table_function->as(); + auto * table_function = table_function_ast->as(); return table_function; } diff --git a/src/Storages/extractTableFunctionFromSelectQuery.h b/src/Storages/extractTableFunctionFromSelectQuery.h index 87edf01c1c82..9834f3dc7573 100644 --- a/src/Storages/extractTableFunctionFromSelectQuery.h +++ b/src/Storages/extractTableFunctionFromSelectQuery.h @@ -6,7 +6,10 @@ namespace DB { +struct ASTTableExpression; +ASTTableExpression * extractTableExpressionASTPtrFromSelectQuery(ASTPtr & query); +ASTPtr extractTableFunctionASTPtrFromSelectQuery(ASTPtr & query); ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query); ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query); diff --git a/src/TableFunctions/TableFunctionRemote.h b/src/TableFunctions/TableFunctionRemote.h index 0f75bf2b854c..4de60a79aea3 100644 --- a/src/TableFunctions/TableFunctionRemote.h +++ b/src/TableFunctions/TableFunctionRemote.h @@ -26,6 +26,8 @@ class TableFunctionRemote : public ITableFunction bool needStructureConversion() const override { return false; } + void setRemoteTableFunction(ASTPtr remote_table_function_ptr_) { remote_table_function_ptr = remote_table_function_ptr_; } + private: StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override; diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index 10800a99cb1b..88b5e327293d 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -72,6 +72,8 @@ DEFAULT_SORT_ORDER = SortOrder(SortField(source_id=2, transform=IdentityTransform())) +AVAILABLE_ENGINES = ["DataLakeCatalog", "Iceberg"] + def list_namespaces(): response = requests.get(f"{BASE_URL_LOCAL}/namespaces") @@ -122,7 +124,7 @@ def generate_record(): def create_clickhouse_iceberg_database( - started_cluster, node, name, additional_settings={} + started_cluster, node, name, additional_settings={}, engine='DataLakeCatalog' ): settings = { "catalog_type": "rest", @@ -136,7 +138,7 @@ def create_clickhouse_iceberg_database( f""" DROP DATABASE IF EXISTS {name}; SET allow_experimental_database_iceberg=true; -CREATE DATABASE {name} ENGINE = DataLakeCatalog('{BASE_URL}', 'minio', '{minio_secret_key}') +CREATE DATABASE {name} ENGINE = {engine}('{BASE_URL}', 'minio', '{minio_secret_key}') SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))} """ ) @@ -169,7 +171,8 @@ def started_cluster(): cluster.shutdown() -def test_list_tables(started_cluster): +@pytest.mark.parametrize("engine", AVAILABLE_ENGINES) +def test_list_tables(started_cluster, engine): node = started_cluster.instances["node1"] root_namespace = f"clickhouse_{uuid.uuid4()}" @@ -200,7 +203,7 @@ def test_list_tables(started_cluster): for namespace in [namespace_1, namespace_2]: assert len(catalog.list_tables(namespace)) == 0 - create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME, engine=engine) tables_list = "" for table in namespace_1_tables: @@ -235,7 +238,8 @@ def test_list_tables(started_cluster): ) -def test_many_namespaces(started_cluster): +@pytest.mark.parametrize("engine", AVAILABLE_ENGINES) +def test_many_namespaces(started_cluster, engine): node = started_cluster.instances["node1"] root_namespace_1 = f"A_{uuid.uuid4()}" root_namespace_2 = f"B_{uuid.uuid4()}" @@ -256,7 +260,7 @@ def test_many_namespaces(started_cluster): for table in tables: create_table(catalog, namespace, table) - create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME, engine=engine) for namespace in namespaces: for table in tables: @@ -268,7 +272,8 @@ def test_many_namespaces(started_cluster): ) -def test_select(started_cluster): +@pytest.mark.parametrize("engine", AVAILABLE_ENGINES) +def test_select(started_cluster, engine): node = started_cluster.instances["node1"] test_ref = f"test_list_tables_{uuid.uuid4()}" @@ -296,7 +301,7 @@ def test_select(started_cluster): df = pa.Table.from_pylist(data) table.append(df) - create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME, engine=engine) expected = DEFAULT_CREATE_TABLE.format(CATALOG_NAME, namespace, table_name) assert expected == node.query( @@ -310,7 +315,8 @@ def test_select(started_cluster): assert int(node.query(f"SELECT count() FROM system.iceberg_history WHERE table = '{namespace}.{table_name}' and database = '{CATALOG_NAME}'").strip()) == 1 -def test_hide_sensitive_info(started_cluster): +@pytest.mark.parametrize("engine", AVAILABLE_ENGINES) +def test_hide_sensitive_info(started_cluster, engine): node = started_cluster.instances["node1"] test_ref = f"test_hide_sensitive_info_{uuid.uuid4()}" @@ -328,6 +334,7 @@ def test_hide_sensitive_info(started_cluster): node, CATALOG_NAME, additional_settings={"catalog_credential": "SECRET_1"}, + engine=engine, ) assert "SECRET_1" not in node.query(f"SHOW CREATE DATABASE {CATALOG_NAME}") @@ -336,11 +343,13 @@ def test_hide_sensitive_info(started_cluster): node, CATALOG_NAME, additional_settings={"auth_header": "SECRET_2"}, + engine=engine, ) assert "SECRET_2" not in node.query(f"SHOW CREATE DATABASE {CATALOG_NAME}") -def test_tables_with_same_location(started_cluster): +@pytest.mark.parametrize("engine", AVAILABLE_ENGINES) +def test_tables_with_same_location(started_cluster, engine): node = started_cluster.instances["node1"] test_ref = f"test_tables_with_same_location_{uuid.uuid4()}" @@ -371,7 +380,7 @@ def record(key): df = pa.Table.from_pylist(data) table_2.append(df) - create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME, engine=engine) assert 'aaa\naaa\naaa' == node.query(f"SELECT symbol FROM {CATALOG_NAME}.`{namespace}.{table_name}`").strip() assert 'bbb\nbbb\nbbb' == node.query(f"SELECT symbol FROM {CATALOG_NAME}.`{namespace}.{table_name_2}`").strip() diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index 30549ead3554..ed672f4d4dd0 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -2259,7 +2259,10 @@ def check_validity_and_get_prunned_files_general(instance, table_name, settings1 @pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) -def test_partition_pruning(started_cluster, storage_type): +@pytest.mark.parametrize("run_on_cluster", [False, True]) +def test_partition_pruning(started_cluster, storage_type, run_on_cluster): + if run_on_cluster and storage_type == "local": + pytest.skip("Local storage is not supported on cluster") instance = started_cluster.instances["node1"] spark = started_cluster.spark_session TABLE_NAME = "test_partition_pruning_" + storage_type + "_" + get_uuid_str() @@ -2306,7 +2309,7 @@ def execute_spark_query(query: str): ) creation_expression = get_creation_expression( - storage_type, TABLE_NAME, started_cluster, table_function=True + storage_type, TABLE_NAME, started_cluster, table_function=True, run_on_cluster=run_on_cluster ) def check_validity_and_get_prunned_files(select_expression): @@ -3112,7 +3115,10 @@ def test_explicit_metadata_file(started_cluster, storage_type): create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster, explicit_metadata_path="../metadata/v11.metadata.json") @pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) -def test_minmax_pruning_with_null(started_cluster, storage_type): +@pytest.mark.parametrize("run_on_cluster", [False, True]) +def test_minmax_pruning_with_null(started_cluster, storage_type, run_on_cluster): + if run_on_cluster and storage_type == "local": + pytest.skip("Local storage is not supported on cluster") instance = started_cluster.instances["node1"] spark = started_cluster.spark_session TABLE_NAME = "test_minmax_pruning_with_null" + storage_type + "_" + get_uuid_str() @@ -3182,7 +3188,7 @@ def execute_spark_query(query: str): ) creation_expression = get_creation_expression( - storage_type, TABLE_NAME, started_cluster, table_function=True + storage_type, TABLE_NAME, started_cluster, table_function=True, run_on_cluster=run_on_cluster ) def check_validity_and_get_prunned_files(select_expression):