diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 4f68cc7deeff..5a9531ba861a 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -7071,6 +7071,9 @@ Allow Iceberg read optimization based on Iceberg metadata. )", EXPERIMENTAL) \ DECLARE(Bool, allow_retries_in_cluster_requests, false, R"( Allow retries in cluster request, when one node goes offline +)", EXPERIMENTAL) \ + DECLARE(Bool, object_storage_remote_initiator, false, R"( +Execute request to object storage as remote on one of object_storage_cluster nodes. )", EXPERIMENTAL) \ \ /** Experimental timeSeries* aggregate functions. */ \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 0e1ecc49a550..177de3cbcb34 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -47,6 +47,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"object_storage_cluster", "", "", "New setting"}, {"object_storage_max_nodes", 0, 0, "New setting"}, {"allow_retries_in_cluster_requests", false, false, "New setting"}, + {"object_storage_remote_initiator", false, false, "New setting."}, }); addSettingsChanges(settings_changes_history, "25.8", { diff --git a/src/Databases/DataLake/DataLakeConstants.h b/src/Databases/DataLake/DataLakeConstants.h index eaa8f5a276e6..02f6a7dcfcd7 100644 --- a/src/Databases/DataLake/DataLakeConstants.h +++ b/src/Databases/DataLake/DataLakeConstants.h @@ -8,6 +8,7 @@ namespace DataLake { static constexpr auto DATABASE_ENGINE_NAME = "DataLakeCatalog"; +static constexpr auto DATABASE_ALIAS_NAME = "Iceberg"; static constexpr std::string_view FILE_PATH_PREFIX = "file:/"; /// Some catalogs (Unity or Glue) may store not only Iceberg/DeltaLake tables but other kinds of "tables" diff --git a/src/Databases/DataLake/DatabaseDataLake.cpp b/src/Databases/DataLake/DatabaseDataLake.cpp index 2ef8a48897b4..aeac09799035 100644 --- a/src/Databases/DataLake/DatabaseDataLake.cpp +++ b/src/Databases/DataLake/DatabaseDataLake.cpp @@ -732,6 +732,11 @@ void registerDatabaseDataLake(DatabaseFactory & factory) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", database_engine_name); } + if (database_engine_name == "Iceberg" && catalog_type != DatabaseDataLakeCatalogType::ICEBERG_REST) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `Iceberg` must have `rest` catalog type only"); + } + for (auto & engine_arg : engine_args) engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.context); @@ -813,6 +818,7 @@ void registerDatabaseDataLake(DatabaseFactory & factory) args.uuid); }; factory.registerDatabase("DataLakeCatalog", create_fn, { .supports_arguments = true, .supports_settings = true }); + factory.registerDatabase("Iceberg", create_fn, { .supports_arguments = true, .supports_settings = true }); } } diff --git a/src/IO/ReadBufferFromS3.cpp b/src/IO/ReadBufferFromS3.cpp index 3bfc8628f294..afc01b0adc0f 100644 --- a/src/IO/ReadBufferFromS3.cpp +++ b/src/IO/ReadBufferFromS3.cpp @@ -445,6 +445,12 @@ Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t attempt, si log, "Read S3 object. Bucket: {}, Key: {}, Version: {}, Offset: {}", bucket, key, version_id.empty() ? "Latest" : version_id, range_begin); } + else + { + LOG_TEST( + log, "Read S3 object. Bucket: {}, Key: {}, Version: {}", + bucket, key, version_id.empty() ? "Latest" : version_id); + } ProfileEvents::increment(ProfileEvents::S3GetObject); if (client_ptr->isClientForDisk()) diff --git a/src/Parsers/ASTSetQuery.cpp b/src/Parsers/ASTSetQuery.cpp index 94e2ee5fbfc2..f5962b64c954 100644 --- a/src/Parsers/ASTSetQuery.cpp +++ b/src/Parsers/ASTSetQuery.cpp @@ -129,7 +129,8 @@ void ASTSetQuery::formatImpl(WriteBuffer & ostr, const FormatSettings & format, return true; } - if (DataLake::DATABASE_ENGINE_NAME == state.create_engine_name) + if (DataLake::DATABASE_ENGINE_NAME == state.create_engine_name + || DataLake::DATABASE_ALIAS_NAME == state.create_engine_name) { if (DataLake::SETTINGS_TO_HIDE.contains(change.name)) { diff --git a/src/Parsers/FunctionSecretArgumentsFinder.h b/src/Parsers/FunctionSecretArgumentsFinder.h index 8587fb4c21aa..b614dcb393f6 100644 --- a/src/Parsers/FunctionSecretArgumentsFinder.h +++ b/src/Parsers/FunctionSecretArgumentsFinder.h @@ -736,7 +736,7 @@ class FunctionSecretArgumentsFinder /// S3('url', 'access_key_id', 'secret_access_key') findS3DatabaseSecretArguments(); } - else if (engine_name == "DataLakeCatalog") + else if (engine_name == "DataLakeCatalog" || engine_name == "Iceberg") { findDataLakeCatalogSecretArguments(); } diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index bc62df76785c..076ec5b5a28b 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -1,5 +1,8 @@ #include +#include +#include + #include #include #include @@ -13,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -28,6 +32,8 @@ #include #include #include +#include +#include #include #include @@ -47,6 +53,7 @@ namespace Setting extern const SettingsNonZeroUInt64 max_parallel_replicas; extern const SettingsObjectStorageClusterJoinMode object_storage_cluster_join_mode; extern const SettingsUInt64 object_storage_max_nodes; + extern const SettingsBool object_storage_remote_initiator; } namespace ErrorCodes @@ -283,8 +290,9 @@ void IStorageCluster::read( storage_snapshot->check(column_names); - updateBeforeRead(context); - auto cluster = getClusterImpl(context, cluster_name_from_settings, context->getSettingsRef()[Setting::object_storage_max_nodes]); + const auto & settings = context->getSettingsRef(); + + auto cluster = getClusterImpl(context, cluster_name_from_settings, settings[Setting::object_storage_max_nodes]); /// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*) @@ -293,7 +301,7 @@ void IStorageCluster::read( updateQueryWithJoinToSendIfNeeded(query_to_send, query_info.query_tree, context); - if (context->getSettingsRef()[Setting::allow_experimental_analyzer]) + if (settings[Setting::allow_experimental_analyzer]) { sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query_to_send, context, SelectQueryOptions(processed_stage)); } @@ -306,6 +314,17 @@ void IStorageCluster::read( updateQueryToSendIfNeeded(query_to_send, storage_snapshot, context); + if (settings[Setting::object_storage_remote_initiator]) + { + auto storage_and_context = convertToRemote(cluster, context, cluster_name_from_settings, query_to_send); + auto src_distributed = std::dynamic_pointer_cast(storage_and_context.storage); + auto modified_query_info = query_info; + modified_query_info.cluster = src_distributed->getCluster(); + auto new_storage_snapshot = storage_and_context.storage->getStorageSnapshot(storage_snapshot->metadata, storage_and_context.context); + storage_and_context.storage->read(query_plan, column_names, new_storage_snapshot, modified_query_info, storage_and_context.context, processed_stage, max_block_size, num_streams); + return; + } + RestoreQualifiedNamesVisitor::Data data; data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_to_send->as(), 0)); data.remote_table.database = context->getCurrentDatabase(); @@ -333,6 +352,62 @@ void IStorageCluster::read( query_plan.addStep(std::move(reading)); } +IStorageCluster::RemoteCallVariables IStorageCluster::convertToRemote( + ClusterPtr cluster, + ContextPtr context, + const std::string & cluster_name_from_settings, + ASTPtr query_to_send) +{ + auto host_addresses = cluster->getShardsAddresses(); + if (host_addresses.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty cluster {}", cluster_name_from_settings); + + static pcg64 rng(randomSeed()); + size_t shard_num = rng() % host_addresses.size(); + auto shard_addresses = host_addresses[shard_num]; + /// After getClusterImpl each shard must have exactly 1 replica + if (shard_addresses.size() != 1) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of shard {} in cluster {} is not equal 1", shard_num, cluster_name_from_settings); + auto host_name = shard_addresses[0].toString(); + + LOG_INFO(log, "Choose remote initiator '{}'", host_name); + + bool secure = shard_addresses[0].secure == Protocol::Secure::Enable; + std::string remote_function_name = secure ? "remoteSecure" : "remote"; + + /// Clean object_storage_remote_initiator setting to avoid infinite remote call + auto new_context = Context::createCopy(context); + new_context->setSetting("object_storage_remote_initiator", false); + + auto * select_query = query_to_send->as(); + if (!select_query) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query"); + + auto query_settings = select_query->settings(); + if (query_settings) + { + auto & settings_ast = query_settings->as(); + if (settings_ast.changes.removeSetting("object_storage_remote_initiator") && settings_ast.changes.empty()) + { + select_query->setExpression(ASTSelectQuery::Expression::SETTINGS, {}); + } + } + + ASTTableExpression * table_expression = extractTableExpressionASTPtrFromSelectQuery(query_to_send); + if (!table_expression) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table expression"); + + auto remote_query = makeASTFunction(remote_function_name, std::make_shared(host_name), table_expression->table_function); + + table_expression->table_function = remote_query; + + auto remote_function = TableFunctionFactory::instance().get(remote_query, new_context); + + auto storage = remote_function->execute(query_to_send, new_context, remote_function_name); + + return RemoteCallVariables{storage, new_context}; +} + SinkToStoragePtr IStorageCluster::write( const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index c559acad8f75..362f938d120b 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -60,10 +60,21 @@ class IStorageCluster : public IStorage virtual String getClusterName(ContextPtr /* context */) const { return getOriginalClusterName(); } protected: - virtual void updateBeforeRead(const ContextPtr &) {} virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {} void updateQueryWithJoinToSendIfNeeded(ASTPtr & query_to_send, QueryTreeNodePtr query_tree, const ContextPtr & context); + struct RemoteCallVariables + { + StoragePtr storage; + ContextPtr context; + }; + + RemoteCallVariables convertToRemote( + ClusterPtr cluster, + ContextPtr context, + const std::string & cluster_name_from_settings, + ASTPtr query_to_send); + virtual void readFallBackToPure( QueryPlan & /* query_plan */, const Names & /* column_names */, diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h index da4b87bdd9b6..c9dd5f1b242c 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -42,20 +43,21 @@ namespace ErrorCodes namespace DataLakeStorageSetting { - extern DataLakeStorageSettingsBool allow_dynamic_metadata_for_data_lakes; - extern DataLakeStorageSettingsDatabaseDataLakeCatalogType storage_catalog_type; - extern DataLakeStorageSettingsString object_storage_endpoint; - extern DataLakeStorageSettingsString storage_aws_access_key_id; - extern DataLakeStorageSettingsString storage_aws_secret_access_key; - extern DataLakeStorageSettingsString storage_region; - extern DataLakeStorageSettingsString storage_catalog_url; - extern DataLakeStorageSettingsString storage_warehouse; - extern DataLakeStorageSettingsString storage_catalog_credential; - - extern DataLakeStorageSettingsString storage_auth_scope; - extern DataLakeStorageSettingsString storage_auth_header; - extern DataLakeStorageSettingsString storage_oauth_server_uri; - extern DataLakeStorageSettingsBool storage_oauth_server_use_request_body; + extern const DataLakeStorageSettingsBool allow_dynamic_metadata_for_data_lakes; + extern const DataLakeStorageSettingsDatabaseDataLakeCatalogType storage_catalog_type; + extern const DataLakeStorageSettingsString object_storage_endpoint; + extern const DataLakeStorageSettingsString storage_aws_access_key_id; + extern const DataLakeStorageSettingsString storage_aws_secret_access_key; + extern const DataLakeStorageSettingsString storage_region; + extern const DataLakeStorageSettingsString storage_catalog_url; + extern const DataLakeStorageSettingsString storage_warehouse; + extern const DataLakeStorageSettingsString storage_catalog_credential; + + extern const DataLakeStorageSettingsString storage_auth_scope; + extern const DataLakeStorageSettingsString storage_auth_header; + extern const DataLakeStorageSettingsString storage_oauth_server_uri; + extern const DataLakeStorageSettingsBool storage_oauth_server_use_request_body; + extern const DataLakeStorageSettingsString iceberg_metadata_file_path; } template @@ -324,6 +326,42 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl current_metadata->addDeleteTransformers(object_info, builder, format_settings, local_context); } + ASTPtr createArgsWithAccessData() const override + { + auto res = BaseStorageConfiguration::createArgsWithAccessData(); + + auto iceberg_metadata_file_path = (*settings)[DataLakeStorageSetting::iceberg_metadata_file_path]; + + if (iceberg_metadata_file_path.changed) + { + auto * arguments = res->template as(); + if (!arguments) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Arguments are not an expression list"); + + bool has_settings = false; + + for (auto & arg : arguments->children) + { + if (auto * settings_ast = arg->template as()) + { + has_settings = true; + settings_ast->changes.setSetting("iceberg_metadata_file_path", iceberg_metadata_file_path.value); + break; + } + } + + if (!has_settings) + { + std::shared_ptr settings_ast = std::make_shared(); + settings_ast->is_standalone = false; + settings_ast->changes.setSetting("iceberg_metadata_file_path", iceberg_metadata_file_path.value); + arguments->children.push_back(settings_ast); + } + } + + return res; + } + private: DataLakeMetadataPtr current_metadata; LoggerPtr log = getLogger("DataLakeConfiguration"); diff --git a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp index 3ea71077b1b7..b6471c4ba03d 100644 --- a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp @@ -91,7 +91,7 @@ HudiMetadata::HudiMetadata(ObjectStoragePtr object_storage_, StorageObjectStorag { } -Strings HudiMetadata::getDataFiles(const ActionsDAG *) const +Strings HudiMetadata::getDataFiles() const { if (data_files.empty()) data_files = getDataFilesImpl(); @@ -99,12 +99,12 @@ Strings HudiMetadata::getDataFiles(const ActionsDAG *) const } ObjectIterator HudiMetadata::iterate( - const ActionsDAG * filter_dag, + const ActionsDAG * /* filter_dag */, FileProgressCallback callback, size_t /* list_batch_size */, ContextPtr /* context */) const { - return createKeysIterator(getDataFiles(filter_dag), object_storage, callback); + return createKeysIterator(getDataFiles(), object_storage, callback); } } diff --git a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h index 47f2e8d95366..147b39893006 100644 --- a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h @@ -60,7 +60,7 @@ class HudiMetadata final : public IDataLakeMetadata, private WithContext mutable Strings data_files; Strings getDataFilesImpl() const; - Strings getDataFiles(const ActionsDAG * filter_dag) const; + Strings getDataFiles() const; }; } diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index b07e6f886679..aed11bfa5b6a 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -108,7 +108,8 @@ StorageObjectStorage::StorageObjectStorage( bool distributed_processing_, ASTPtr partition_by_, bool is_table_function, - bool lazy_init) + bool lazy_init, + std::optional sample_path_) : IStorage(table_id_) , configuration(configuration_) , object_storage(object_storage_) @@ -156,7 +157,7 @@ StorageObjectStorage::StorageObjectStorage( /// (e.g. read always follows constructor immediately). update_configuration_on_read_write = !is_table_function || !updated_configuration; - std::string sample_path; + std::string sample_path = sample_path_.value_or(""); ColumnsDescription columns{columns_in_table_or_function_definition}; if (need_resolve_columns_or_format) diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index c69641b37baa..d5b6dd026cde 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -56,7 +56,8 @@ class StorageObjectStorage : public IStorage bool distributed_processing_ = false, ASTPtr partition_by_ = nullptr, bool is_table_function_ = false, - bool lazy_init = false); + bool lazy_init = false, + std::optional sample_path_ = std::nullopt); String getName() const override; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 8e3d3c794a71..7920ecc74b48 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -188,7 +188,8 @@ StorageObjectStorageCluster::StorageObjectStorageCluster( /* distributed_processing */false, partition_by, /* is_table_function */false, - /* lazy_init */lazy_init); + /* lazy_init */lazy_init, + sample_path); auto virtuals_ = getVirtualsPtr(); if (virtuals_) diff --git a/src/TableFunctions/TableFunctionRemote.h b/src/TableFunctions/TableFunctionRemote.h index e58d30cf48df..498339231153 100644 --- a/src/TableFunctions/TableFunctionRemote.h +++ b/src/TableFunctions/TableFunctionRemote.h @@ -26,6 +26,8 @@ class TableFunctionRemote : public ITableFunction bool needStructureConversion() const override { return false; } + void setRemoteTableFunction(ASTPtr remote_table_function_ptr_) { remote_table_function_ptr = remote_table_function_ptr_; } + private: StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override; diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index ed368abb762a..6fa8113f5e37 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -72,6 +72,8 @@ DEFAULT_SORT_ORDER = SortOrder(SortField(source_id=2, transform=IdentityTransform())) +AVAILABLE_ENGINES = ["DataLakeCatalog", "Iceberg"] + def list_namespaces(): response = requests.get(f"{BASE_URL_LOCAL}/namespaces") @@ -122,7 +124,7 @@ def generate_record(): def create_clickhouse_iceberg_database( - started_cluster, node, name, additional_settings={} + started_cluster, node, name, additional_settings={}, engine='DataLakeCatalog' ): settings = { "catalog_type": "rest", @@ -137,7 +139,7 @@ def create_clickhouse_iceberg_database( DROP DATABASE IF EXISTS {name}; SET allow_database_iceberg=true; SET write_full_path_in_iceberg_metadata=1; -CREATE DATABASE {name} ENGINE = DataLakeCatalog('{BASE_URL}', 'minio', '{minio_secret_key}') +CREATE DATABASE {name} ENGINE = {engine}('{BASE_URL}', 'minio', '{minio_secret_key}') SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))} """ ) @@ -202,7 +204,8 @@ def started_cluster(): cluster.shutdown() -def test_list_tables(started_cluster): +@pytest.mark.parametrize("engine", AVAILABLE_ENGINES) +def test_list_tables(started_cluster, engine): node = started_cluster.instances["node1"] root_namespace = f"clickhouse_{uuid.uuid4()}" @@ -233,7 +236,7 @@ def test_list_tables(started_cluster): for namespace in [namespace_1, namespace_2]: assert len(catalog.list_tables(namespace)) == 0 - create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME, engine=engine) tables_list = "" for table in namespace_1_tables: @@ -268,7 +271,8 @@ def test_list_tables(started_cluster): ) -def test_many_namespaces(started_cluster): +@pytest.mark.parametrize("engine", AVAILABLE_ENGINES) +def test_many_namespaces(started_cluster, engine): node = started_cluster.instances["node1"] root_namespace_1 = f"A_{uuid.uuid4()}" root_namespace_2 = f"B_{uuid.uuid4()}" @@ -289,7 +293,7 @@ def test_many_namespaces(started_cluster): for table in tables: create_table(catalog, namespace, table) - create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME, engine=engine) for namespace in namespaces: for table in tables: @@ -301,7 +305,8 @@ def test_many_namespaces(started_cluster): ) -def test_select(started_cluster): +@pytest.mark.parametrize("engine", AVAILABLE_ENGINES) +def test_select(started_cluster, engine): node = started_cluster.instances["node1"] test_ref = f"test_list_tables_{uuid.uuid4()}" @@ -329,7 +334,7 @@ def test_select(started_cluster): df = pa.Table.from_pylist(data) table.append(df) - create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME, engine=engine) expected = DEFAULT_CREATE_TABLE.format(CATALOG_NAME, namespace, table_name) assert expected == node.query( @@ -343,7 +348,8 @@ def test_select(started_cluster): assert int(node.query(f"SELECT count() FROM system.iceberg_history WHERE table = '{namespace}.{table_name}' and database = '{CATALOG_NAME}'").strip()) == 1 -def test_hide_sensitive_info(started_cluster): +@pytest.mark.parametrize("engine", AVAILABLE_ENGINES) +def test_hide_sensitive_info(started_cluster, engine): node = started_cluster.instances["node1"] test_ref = f"test_hide_sensitive_info_{uuid.uuid4()}" @@ -361,6 +367,7 @@ def test_hide_sensitive_info(started_cluster): node, CATALOG_NAME, additional_settings={"catalog_credential": "SECRET_1"}, + engine=engine, ) assert "SECRET_1" not in node.query(f"SHOW CREATE DATABASE {CATALOG_NAME}") @@ -369,11 +376,13 @@ def test_hide_sensitive_info(started_cluster): node, CATALOG_NAME, additional_settings={"auth_header": "SECRET_2"}, + engine=engine, ) assert "SECRET_2" not in node.query(f"SHOW CREATE DATABASE {CATALOG_NAME}") -def test_tables_with_same_location(started_cluster): +@pytest.mark.parametrize("engine", AVAILABLE_ENGINES) +def test_tables_with_same_location(started_cluster, engine): node = started_cluster.instances["node1"] test_ref = f"test_tables_with_same_location_{uuid.uuid4()}" @@ -404,7 +413,7 @@ def record(key): df = pa.Table.from_pylist(data) table_2.append(df) - create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME, engine=engine) assert 'aaa\naaa\naaa' == node.query(f"SELECT symbol FROM {CATALOG_NAME}.`{namespace}.{table_name}`").strip() assert 'bbb\nbbb\nbbb' == node.query(f"SELECT symbol FROM {CATALOG_NAME}.`{namespace}.{table_name_2}`").strip() diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index 1f2c1969a91d..bcce89ac357a 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -1214,7 +1214,7 @@ def test_filesystem_cache(started_cluster, storage_type): @pytest.mark.parametrize( "storage_type, run_on_cluster", - [("s3", False), ("s3", True), ("azure", False), ("local", False)], + [("s3", False), ("s3", True), ("azure", False), ("azure", True), ("local", False)], ) def test_partition_pruning(started_cluster, storage_type, run_on_cluster): instance = started_cluster.instances["node1"] @@ -2051,7 +2051,10 @@ def test_explicit_metadata_file(started_cluster, storage_type): @pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) -def test_minmax_pruning_with_null(started_cluster, storage_type): +@pytest.mark.parametrize("run_on_cluster", [False, True]) +def test_minmax_pruning_with_null(started_cluster, storage_type, run_on_cluster): + if run_on_cluster and storage_type == "local": + pytest.skip("Local storage is not supported on cluster") instance = started_cluster.instances["node1"] spark = started_cluster.spark_session TABLE_NAME = "test_minmax_pruning_with_null" + storage_type + "_" + get_uuid_str() @@ -2121,7 +2124,7 @@ def execute_spark_query(query: str): ) creation_expression = get_creation_expression( - storage_type, TABLE_NAME, started_cluster, table_function=True + storage_type, TABLE_NAME, started_cluster, table_function=True, run_on_cluster=run_on_cluster ) def check_validity_and_get_prunned_files(select_expression):