From ee84f2b0313d14b4c35a9309d57f5d9d0b011db2 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 29 Oct 2025 16:29:36 +0100 Subject: [PATCH 1/5] icebergLocalCluster table function --- .../ObjectStorage/Local/Configuration.cpp | 16 ++++++++++ .../ObjectStorage/Local/Configuration.h | 2 ++ .../StorageObjectStorageCluster.cpp | 2 ++ .../StorageObjectStorageDefinitions.h | 7 +++++ .../TableFunctionObjectStorage.cpp | 10 +------ .../TableFunctionObjectStorageCluster.cpp | 7 +++++ .../TableFunctionObjectStorageCluster.h | 1 + ...leFunctionObjectStorageClusterFallback.cpp | 29 +++++++++++++++++++ src/TableFunctions/registerTableFunctions.cpp | 1 - src/TableFunctions/registerTableFunctions.h | 1 - 10 files changed, 65 insertions(+), 11 deletions(-) diff --git a/src/Storages/ObjectStorage/Local/Configuration.cpp b/src/Storages/ObjectStorage/Local/Configuration.cpp index 9ff67e836342..58eeac06b0e2 100644 --- a/src/Storages/ObjectStorage/Local/Configuration.cpp +++ b/src/Storages/ObjectStorage/Local/Configuration.cpp @@ -81,4 +81,20 @@ StorageObjectStorageQuerySettings StorageLocalConfiguration::getQuerySettings(co .ignore_non_existent_file = false}; } +ASTPtr StorageLocalConfiguration::createArgsWithAccessData() const +{ + auto arguments = std::make_shared(); + + arguments->children.push_back(std::make_shared(path.path)); + if (getFormat() != "auto") + arguments->children.push_back(std::make_shared(getFormat())); + if (getStructure() != "auto") + arguments->children.push_back(std::make_shared(getStructure())); + if (getCompressionMethod() != "auto") + arguments->children.push_back(std::make_shared(getCompressionMethod())); + + return arguments; +} + + } diff --git a/src/Storages/ObjectStorage/Local/Configuration.h b/src/Storages/ObjectStorage/Local/Configuration.h index 231e33f84d35..207d297147c1 100644 --- a/src/Storages/ObjectStorage/Local/Configuration.h +++ b/src/Storages/ObjectStorage/Local/Configuration.h @@ -60,6 +60,8 @@ class StorageLocalConfiguration : public StorageObjectStorageConfiguration void addStructureAndFormatToArgsIfNeeded(ASTs &, const String &, const String &, ContextPtr, bool) override { } + ASTPtr createArgsWithAccessData() const override; + private: void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override; void fromAST(ASTs & args, ContextPtr context, bool with_structure) override; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index efae7b129d7a..b317d4dd8826 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -295,6 +295,7 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr {"IcebergS3", "icebergS3"}, {"IcebergAzure", "icebergAzure"}, {"IcebergHDFS", "icebergHDFS"}, + {"IcebergLocal", "icebergLocal"}, {"DeltaLake", "deltaLake"}, {"DeltaLakeS3", "deltaLakeS3"}, {"DeltaLakeAzure", "deltaLakeAzure"}, @@ -416,6 +417,7 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded( {"icebergS3", "icebergS3Cluster"}, {"icebergAzure", "icebergAzureCluster"}, {"icebergHDFS", "icebergHDFSCluster"}, + {"icebergLocal", "icebergLocalCluster"}, {"deltaLake", "deltaLakeCluster"}, {"deltaLakeS3", "deltaLakeS3Cluster"}, {"deltaLakeAzure", "deltaLakeAzureCluster"}, diff --git a/src/Storages/ObjectStorage/StorageObjectStorageDefinitions.h b/src/Storages/ObjectStorage/StorageObjectStorageDefinitions.h index 3163e517542b..ef00c15750ab 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageDefinitions.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageDefinitions.h @@ -155,6 +155,13 @@ struct IcebergHDFSClusterDefinition static constexpr auto non_clustered_storage_engine_name = IcebergHDFSDefinition::storage_engine_name; }; +struct IcebergLocalClusterDefinition +{ + static constexpr auto name = "icebergLocalCluster"; + static constexpr auto storage_engine_name = "IcebergLocalCluster"; + static constexpr auto non_clustered_storage_engine_name = IcebergLocalDefinition::storage_engine_name; +}; + struct DeltaLakeClusterDefinition { static constexpr auto name = "deltaLakeCluster"; diff --git a/src/TableFunctions/TableFunctionObjectStorage.cpp b/src/TableFunctions/TableFunctionObjectStorage.cpp index a7b8762a7e6b..1d04e20debb6 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.cpp +++ b/src/TableFunctions/TableFunctionObjectStorage.cpp @@ -296,6 +296,7 @@ template class TableFunctionObjectStorage; +template class TableFunctionObjectStorage; #endif #if USE_AVRO && USE_AWS_S3 @@ -334,13 +335,4 @@ void registerTableFunctionIceberg(TableFunctionFactory & factory) .allow_readonly = false}); } #endif - - -void registerDataLakeTableFunctions(TableFunctionFactory & factory) -{ - UNUSED(factory); -#if USE_AVRO - registerTableFunctionIceberg(factory); -#endif -} } diff --git a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp index 5d5a6fa15134..89892f45bf1b 100644 --- a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp +++ b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp @@ -155,6 +155,13 @@ void registerTableFunctionIcebergCluster(TableFunctionFactory & factory) .category = FunctionDocumentation::Category::TableFunction}, .allow_readonly = false}); + factory.registerFunction( + {.documentation + = {.description = R"(The table function can be used to read the Iceberg table stored on shared storage in parallel for many nodes in a specified cluster.)", + .examples{{IcebergLocalClusterDefinition::name, "SELECT * FROM icebergLocalCluster(cluster, filename, format, [,compression])", ""}}, + .category = FunctionDocumentation::Category::TableFunction}, + .allow_readonly = false}); + # if USE_AWS_S3 factory.registerFunction( {.documentation diff --git a/src/TableFunctions/TableFunctionObjectStorageCluster.h b/src/TableFunctions/TableFunctionObjectStorageCluster.h index 2529c36da62b..06044e480590 100644 --- a/src/TableFunctions/TableFunctionObjectStorageCluster.h +++ b/src/TableFunctions/TableFunctionObjectStorageCluster.h @@ -62,6 +62,7 @@ using TableFunctionHDFSCluster = TableFunctionObjectStorageCluster; +using TableFunctionIcebergLocalCluster = TableFunctionObjectStorageCluster; #endif #if USE_AVRO && USE_AWS_S3 diff --git a/src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp b/src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp index c0fbea05a3f5..0ec8b178f6a3 100644 --- a/src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp +++ b/src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp @@ -67,6 +67,13 @@ struct IcebergHDFSClusterFallbackDefinition static constexpr auto storage_engine_cluster_name = "IcebergHDFSCluster"; }; +struct IcebergLocalClusterFallbackDefinition +{ + static constexpr auto name = "icebergLocal"; + static constexpr auto storage_engine_name = "Local"; + static constexpr auto storage_engine_cluster_name = "IcebergLocalCluster"; +}; + struct DeltaLakeClusterFallbackDefinition { static constexpr auto name = "deltaLake"; @@ -163,6 +170,7 @@ using TableFunctionHDFSClusterFallback = TableFunctionObjectStorageClusterFallba #if USE_AVRO using TableFunctionIcebergClusterFallback = TableFunctionObjectStorageClusterFallback; +using TableFunctionIcebergLocalClusterFallback = TableFunctionObjectStorageClusterFallback; #endif #if USE_AVRO && USE_AWS_S3 @@ -286,6 +294,27 @@ void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & fa .allow_readonly = false } ); + + factory.registerFunction( + { + .documentation = { + .description=R"(The table function can be used to read the Iceberg table stored on shared disk in parallel for many nodes in a specified cluster or from single node.)", + .examples{ + { + "icebergLocal", + "SELECT * FROM icebergLocal(filename)", "" + }, + { + "icebergLocal", + "SELECT * FROM icebergLocal(filename) " + "SETTINGS object_storage_cluster='cluster'", "" + }, + }, + .category = FunctionDocumentation::Category::TableFunction + }, + .allow_readonly = false + } + ); #endif #if USE_AVRO && USE_AWS_S3 diff --git a/src/TableFunctions/registerTableFunctions.cpp b/src/TableFunctions/registerTableFunctions.cpp index 2f2709ed3a93..c8497fbfdda2 100644 --- a/src/TableFunctions/registerTableFunctions.cpp +++ b/src/TableFunctions/registerTableFunctions.cpp @@ -69,7 +69,6 @@ void registerTableFunctions() registerTableFunctionObjectStorage(factory); registerTableFunctionObjectStorageCluster(factory); registerTableFunctionObjectStorageClusterFallback(factory); - registerDataLakeTableFunctions(factory); registerDataLakeClusterTableFunctions(factory); #if USE_YTSAURUS diff --git a/src/TableFunctions/registerTableFunctions.h b/src/TableFunctions/registerTableFunctions.h index c1dcb14568d7..84f0418bc4e1 100644 --- a/src/TableFunctions/registerTableFunctions.h +++ b/src/TableFunctions/registerTableFunctions.h @@ -70,7 +70,6 @@ void registerTableFunctionExplain(TableFunctionFactory & factory); void registerTableFunctionObjectStorage(TableFunctionFactory & factory); void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory); void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & factory); -void registerDataLakeTableFunctions(TableFunctionFactory & factory); void registerDataLakeClusterTableFunctions(TableFunctionFactory & factory); void registerTableFunctionTimeSeries(TableFunctionFactory & factory); From 31448574d1d2fa3705fc4079ddec3c2a3a6beb4d Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Mon, 3 Nov 2025 20:00:50 +0100 Subject: [PATCH 2/5] Update on reads, tests --- src/Databases/DataLake/DatabaseDataLake.cpp | 15 +++++----- src/Storages/IStorageCluster.cpp | 2 ++ src/Storages/IStorageCluster.h | 2 ++ .../StorageObjectStorageCluster.cpp | 17 +++++++++++ .../StorageObjectStorageCluster.h | 6 +++- .../registerStorageObjectStorage.cpp | 4 ++- .../TableFunctionObjectStorage.cpp | 6 ++-- .../TableFunctionObjectStorageCluster.cpp | 3 +- tests/integration/helpers/iceberg_utils.py | 28 +++++++++++-------- .../configs/config.d/cluster.xml | 8 ++++++ .../integration/test_storage_iceberg/test.py | 4 +-- 11 files changed, 68 insertions(+), 27 deletions(-) diff --git a/src/Databases/DataLake/DatabaseDataLake.cpp b/src/Databases/DataLake/DatabaseDataLake.cpp index aeac09799035..eb5d52c75d8e 100644 --- a/src/Databases/DataLake/DatabaseDataLake.cpp +++ b/src/Databases/DataLake/DatabaseDataLake.cpp @@ -434,17 +434,18 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con configuration, configuration->createObjectStorage(context_copy, /* is_readonly */ false), StorageID(getDatabaseName(), name), - /* columns */columns, - /* constraints */ConstraintsDescription{}, - /* partition_by */nullptr, + /* columns */ columns, + /* constraints */ ConstraintsDescription{}, + /* partition_by */ nullptr, context_copy, - /* comment */"", + /* comment */ "", getFormatSettings(context_copy), LoadingStrictnessLevel::CREATE, getCatalog(), - /* if_not_exists*/true, - /* is_datalake_query*/true, - /* lazy_init */true); + /* if_not_exists */ true, + /* is_datalake_query */ true, + /* is_table_function */ true, + /* lazy_init */ true); } void DatabaseDataLake::dropTable( /// NOLINT diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 076ec5b5a28b..79eca5401a15 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -288,6 +288,8 @@ void IStorageCluster::read( return; } + updateConfigurationIfNeeded(context); + storage_snapshot->check(column_names); const auto & settings = context->getSettingsRef(); diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index 362f938d120b..3f328d699a2d 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -97,6 +97,8 @@ class IStorageCluster : public IStorage throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method writeFallBackToPure is not supported by storage {}", getName()); } + virtual void updateConfigurationIfNeeded(ContextPtr /* context */) {} + private: static ClusterPtr getClusterImpl(ContextPtr context, const String & cluster_name_, size_t max_hosts = 0); diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index b317d4dd8826..c4f2f3797056 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -91,6 +91,7 @@ StorageObjectStorageCluster::StorageObjectStorageCluster( std::shared_ptr catalog, bool if_not_exists, bool is_datalake_query, + bool is_table_function, bool lazy_init) : IStorageCluster( cluster_name_, table_id_, getLogger(fmt::format("{}({})", configuration_->getEngineName(), table_id_.table_name))) @@ -145,6 +146,10 @@ StorageObjectStorageCluster::StorageObjectStorageCluster( tryLogCurrentException(log_); } + // For tables need to update configuration on each read + // because data can be changed after previous update + update_configuration_on_read_write = !is_table_function; + ColumnsDescription columns{columns_in_table_or_function_definition}; std::string sample_path; if (need_resolve_columns_or_format) @@ -743,6 +748,18 @@ IDataLakeMetadata * StorageObjectStorageCluster::getExternalMetadata(ContextPtr return configuration->getExternalMetadata(); } +void StorageObjectStorageCluster::updateConfigurationIfNeeded(ContextPtr context) +{ + if (update_configuration_on_read_write) + { + configuration->update( + object_storage, + context, + /* if_not_updated_before */false, + /* check_consistent_with_previous_metadata */false); + } +} + void StorageObjectStorageCluster::checkAlterIsPossible(const AlterCommands & commands, ContextPtr context) const { if (getClusterName(context).empty()) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index f8b05846f09c..35ac5a4937e2 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -25,7 +25,8 @@ class StorageObjectStorageCluster : public IStorageCluster std::shared_ptr catalog, bool if_not_exists, bool is_datalake_query, - bool lazy_init = false); + bool is_table_function, + bool lazy_init); std::string getName() const override; @@ -154,6 +155,8 @@ class StorageObjectStorageCluster : public IStorageCluster ContextPtr context, bool async_insert) override; + void updateConfigurationIfNeeded(ContextPtr context) override; + /* In case the table was created with `object_storage_cluster` setting, modify the AST query object so that it uses the table function implementation @@ -176,6 +179,7 @@ class StorageObjectStorageCluster : public IStorageCluster /// non-clustered storage to fall back on pure realisation if needed std::shared_ptr pure_storage; + bool update_configuration_on_read_write; }; } diff --git a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp index bc1b96c7198b..cc031aad68a1 100644 --- a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp @@ -98,7 +98,9 @@ createStorageObjectStorage(const StorageFactory::Arguments & args, StorageObject args.mode, configuration->getCatalog(context, args.query.attach), args.query.if_not_exists, - /* is_datalake_query*/ false); + /* is_datalake_query */ false, + /* is_table_function */ false, + /* lazy_init */ false); } #endif diff --git a/src/TableFunctions/TableFunctionObjectStorage.cpp b/src/TableFunctions/TableFunctionObjectStorage.cpp index 1d04e20debb6..145581a35989 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.cpp +++ b/src/TableFunctions/TableFunctionObjectStorage.cpp @@ -204,9 +204,11 @@ StoragePtr TableFunctionObjectStorage:: /* comment */ String{}, /* format_settings */ std::nullopt, /// No format_settings /* mode */ LoadingStrictnessLevel::CREATE, - configuration->getCatalog(context, /*attach*/ false), + configuration->getCatalog(context, /* attach */ false), /* if_not_exists */ false, - /* is_datalake_query*/ false); + /* is_datalake_query */ false, + /* is_table_function */ true, + /* lazy_init */ false); storage->startup(); return storage; diff --git a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp index 89892f45bf1b..b6567ce3d760 100644 --- a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp +++ b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp @@ -76,7 +76,8 @@ StoragePtr TableFunctionObjectStorageCluster + + + + node1 + 9000 + + + diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index 791234dece09..5ee8f9e65f87 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -1214,7 +1214,7 @@ def test_filesystem_cache(started_cluster, storage_type): @pytest.mark.parametrize( "storage_type, run_on_cluster", - [("s3", False), ("s3", True), ("azure", False), ("azure", True), ("local", False)], + [("s3", False), ("s3", True), ("azure", False), ("azure", True), ("local", False), ("local", True)], ) def test_partition_pruning(started_cluster, storage_type, run_on_cluster): instance = started_cluster.instances["node1"] @@ -2053,8 +2053,6 @@ def test_explicit_metadata_file(started_cluster, storage_type): @pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) @pytest.mark.parametrize("run_on_cluster", [False, True]) def test_minmax_pruning_with_null(started_cluster, storage_type, run_on_cluster): - if run_on_cluster and storage_type == "local": - pytest.skip("Local storage is not supported on cluster") instance = started_cluster.instances["node1"] spark = started_cluster.spark_session TABLE_NAME = "test_minmax_pruning_with_null" + storage_type + "_" + get_uuid_str() From ed20ac774446264bb9fc332fe6d76af1fc418528 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 4 Nov 2025 09:39:21 +0100 Subject: [PATCH 3/5] More tests --- tests/integration/helpers/iceberg_utils.py | 14 +++++++++++++- .../configs/config.d/cluster.xml | 8 -------- tests/integration/test_storage_iceberg/test.py | 18 ++++++++++++++++-- 3 files changed, 29 insertions(+), 11 deletions(-) diff --git a/tests/integration/helpers/iceberg_utils.py b/tests/integration/helpers/iceberg_utils.py index a2b1b61447cc..5f2c01a8542e 100644 --- a/tests/integration/helpers/iceberg_utils.py +++ b/tests/integration/helpers/iceberg_utils.py @@ -291,7 +291,7 @@ def get_creation_expression( if run_on_cluster: assert table_function return f""" - iceberg{engine_part}Cluster('cluster_single_node', {storage_arg}, path = '/iceberg_data/default/{table_name}/', format={format}) + iceberg{engine_part}Cluster('cluster_simple', {storage_arg}, path = '/iceberg_data/default/{table_name}/', format={format}) """ else: if table_function: @@ -477,6 +477,17 @@ def default_upload_directory( raise Exception(f"Unknown iceberg storage type: {storage_type}") +def additional_upload_directory( + started_cluster, node, storage_type, local_path, remote_path, **kwargs +): + if storage_type == "local": + return LocalUploader(started_cluster.instances[node]).upload_directory( + local_path, remote_path, **kwargs + ) + else: + raise Exception(f"Unknown iceberg storage type for additional uploading: {storage_type}") + + def default_download_directory( started_cluster, storage_type, remote_path, local_path, **kwargs ): @@ -500,6 +511,7 @@ def execute_spark_query_general( ) return + def get_last_snapshot(path_to_table): import json import os diff --git a/tests/integration/test_storage_iceberg/configs/config.d/cluster.xml b/tests/integration/test_storage_iceberg/configs/config.d/cluster.xml index 74f07fc38dec..54c08b27abe8 100644 --- a/tests/integration/test_storage_iceberg/configs/config.d/cluster.xml +++ b/tests/integration/test_storage_iceberg/configs/config.d/cluster.xml @@ -16,13 +16,5 @@ - - - - node1 - 9000 - - - diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index 5ee8f9e65f87..99b179793dae 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -38,6 +38,7 @@ from helpers.iceberg_utils import ( default_upload_directory, + additional_upload_directory, default_download_directory, execute_spark_query_general, get_creation_expression, @@ -410,7 +411,7 @@ def count_secondary_subqueries(started_cluster, query_id, expected, comment): @pytest.mark.parametrize("format_version", ["1", "2"]) -@pytest.mark.parametrize("storage_type", ["s3", "azure"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) def test_cluster_table_function(started_cluster, format_version, storage_type): instance = started_cluster.instances["node1"] @@ -443,6 +444,19 @@ def add_df(mode): logging.info(f"Adding another dataframe. result files: {files}") + if storage_type == "local": + # For local storage we need to upload data from each node + for node_name, replica in started_cluster.instances.items(): + if node_name == "node1": + continue + additional_upload_directory( + started_cluster, + node_name, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + f"/iceberg_data/default/{TABLE_NAME}/", + ) + return files files = add_df(mode="overwrite") @@ -480,7 +494,7 @@ def make_query_from_function( storage_type_in_named_collection=storage_type_in_named_collection, ) query_id = str(uuid.uuid4()) - settings = "SETTINGS object_storage_cluster='cluster_simple'" if alt_syntax else "" + settings = f"SETTINGS object_storage_cluster='cluster_simple'" if (alt_syntax and not run_on_cluster) else "" if remote: query = f"SELECT * FROM remote('node2', {expr}) {settings}" else: From 3f0edef1dc48900d71c67416dfd520424ee19b0d Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 4 Nov 2025 10:17:32 +0100 Subject: [PATCH 4/5] Fix tests --- tests/integration/helpers/iceberg_utils.py | 10 +++++++++- tests/integration/test_storage_iceberg/test.py | 2 ++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/tests/integration/helpers/iceberg_utils.py b/tests/integration/helpers/iceberg_utils.py index 5f2c01a8542e..e71a07f2eedd 100644 --- a/tests/integration/helpers/iceberg_utils.py +++ b/tests/integration/helpers/iceberg_utils.py @@ -500,7 +500,7 @@ def default_download_directory( def execute_spark_query_general( - spark, started_cluster, storage_type: str, table_name: str, query: str + spark, started_cluster, storage_type: str, table_name: str, query: str, additional_nodes=[] ): spark.sql(query) default_upload_directory( @@ -509,6 +509,14 @@ def execute_spark_query_general( f"/iceberg_data/default/{table_name}/", f"/iceberg_data/default/{table_name}/", ) + for node in additional_nodes: + additional_upload_directory( + started_cluster, + node, + storage_type, + f"/iceberg_data/default/{table_name}/", + f"/iceberg_data/default/{table_name}/", + ) return diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index 99b179793dae..25d6f8f66f10 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -1242,6 +1242,7 @@ def execute_spark_query(query: str): storage_type, TABLE_NAME, query, + additional_nodes=["node2", "node3"] if storage_type=="local" else [], ) execute_spark_query( @@ -2078,6 +2079,7 @@ def execute_spark_query(query: str): storage_type, TABLE_NAME, query, + additional_nodes=["node2", "node3"] if storage_type=="local" else [], ) execute_spark_query( From 54ce5bc15841a6a3e98dba5bb44d98a1b8cf5be2 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 11 Nov 2025 11:24:55 +0100 Subject: [PATCH 5/5] Fix test --- tests/integration/helpers/iceberg_utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/integration/helpers/iceberg_utils.py b/tests/integration/helpers/iceberg_utils.py index e71a07f2eedd..9d0abb6fecea 100644 --- a/tests/integration/helpers/iceberg_utils.py +++ b/tests/integration/helpers/iceberg_utils.py @@ -500,7 +500,7 @@ def default_download_directory( def execute_spark_query_general( - spark, started_cluster, storage_type: str, table_name: str, query: str, additional_nodes=[] + spark, started_cluster, storage_type: str, table_name: str, query: str, additional_nodes=None ): spark.sql(query) default_upload_directory( @@ -509,6 +509,7 @@ def execute_spark_query_general( f"/iceberg_data/default/{table_name}/", f"/iceberg_data/default/{table_name}/", ) + additional_nodes = additional_nodes or [] for node in additional_nodes: additional_upload_directory( started_cluster,