From a8041afc0fa923ce9a2e1dbceaecbed4c933ea63 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Fri, 22 Aug 2025 13:19:53 +0200 Subject: [PATCH 1/4] Respect prefer_global_in_and_join setting with *Cluster functions --- src/Planner/PlannerJoinTree.cpp | 2 + src/Storages/IStorageCluster.cpp | 168 +++++++++++++++++- src/Storages/IStorageCluster.h | 1 + .../extractTableFunctionFromSelectQuery.cpp | 13 +- .../extractTableFunctionFromSelectQuery.h | 1 + tests/integration/test_s3_cluster/test.py | 64 +++++++ 6 files changed, 241 insertions(+), 8 deletions(-) diff --git a/src/Planner/PlannerJoinTree.cpp b/src/Planner/PlannerJoinTree.cpp index 70fb5a73a805..852d24e69cf9 100644 --- a/src/Planner/PlannerJoinTree.cpp +++ b/src/Planner/PlannerJoinTree.cpp @@ -1311,6 +1311,8 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres /// Hopefully there is no other case when we read from Distributed up to FetchColumns. if (table_node && table_node->getStorage()->isRemote() && select_query_options.to_stage == QueryProcessingStage::FetchColumns) updated_actions_dag_outputs.push_back(output_node); + else if (table_function_node && table_function_node->getStorage()->isRemote()) + updated_actions_dag_outputs.push_back(output_node); } else updated_actions_dag_outputs.push_back(&rename_actions_dag.addAlias(*output_node, *column_identifier)); diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 270bed6db180..6ccfe06967cb 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -23,6 +23,12 @@ #include #include #include +#include +#include +#include +#include +#include +#include #include #include @@ -41,6 +47,7 @@ namespace Setting extern const SettingsString cluster_for_parallel_replicas; extern const SettingsNonZeroUInt64 max_parallel_replicas; extern const SettingsUInt64 object_storage_max_nodes; + extern const SettingsBool prefer_global_in_and_join; } namespace ErrorCodes @@ -126,6 +133,149 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate, size_t extension = storage->getTaskIteratorExtension(predicate, context, number_of_replicas); } +/* +Helping class to find in query tree first node of required type +*/ +class SearcherVisitor : public InDepthQueryTreeVisitorWithContext +{ +public: + using Base = InDepthQueryTreeVisitorWithContext; + using Base::Base; + + explicit SearcherVisitor(QueryTreeNodeType type_, ContextPtr context) : Base(context), type(type_) {} + + bool needChildVisit(QueryTreeNodePtr &, QueryTreeNodePtr & /*child*/) + { + return !passed_node; + } + + void enterImpl(QueryTreeNodePtr & node) + { + if (passed_node) + return; + + auto node_type = node->getNodeType(); + + if (node_type == type) + passed_node = node; + } + + QueryTreeNodePtr getNode() const { return passed_node; } + +private: + QueryTreeNodeType type; + QueryTreeNodePtr passed_node; +}; + +/* +Helping class to find all used columns with specific source +*/ +class CollectUsedColumnsForSourceVisitor : public InDepthQueryTreeVisitorWithContext +{ +public: + using Base = InDepthQueryTreeVisitorWithContext; + using Base::Base; + + explicit CollectUsedColumnsForSourceVisitor( + QueryTreeNodePtr source_, + ContextPtr context) + : Base(context) + , source(source_) {} + + void enterImpl(QueryTreeNodePtr & node) + { + auto node_type = node->getNodeType(); + + if (node_type != QueryTreeNodeType::COLUMN) + return; + + auto & column_node = node->as(); + auto column_source = column_node.getColumnSourceOrNull(); + if (!column_source) + return; + + if (column_source == source) + { + const auto & name = column_node.getColumnName(); + if (!names.count(name)) + { + columns.emplace_back(column_node.getColumn()); + names.insert(name); + } + } + } + + const NamesAndTypes & getColumns() const { return columns; } + +private: + std::unordered_set names; + QueryTreeNodePtr source; + NamesAndTypes columns; +}; + +/* +Try to make subquery to send on nodes +Converts + + SELECT s3.c1, s3.c2, t.c3 + FROM + s3Cluster(...) AS s3 + JOIN + localtable as t + ON s3.key == t.key + +to + + SELECT s3.c1, s3.c2, s3.key + FROM + s3Cluster(...) AS s3 +*/ +void IStorageCluster::updateQueryToSendWithGlobalJoinIfNeeded( + ASTPtr & query_to_send, + QueryTreeNodePtr query_tree, + const ContextPtr & context) +{ + if (!context->getSettingsRef()[Setting::prefer_global_in_and_join]) + return; + + SearcherVisitor join_searcher(QueryTreeNodeType::JOIN, context); + join_searcher.visit(query_tree); + if (!join_searcher.getNode()) + return; + + auto table_function = extractTableFunctionFromSelectQueryPtr(query_to_send); + auto query_tree_distributed = buildTableFunctionQueryTree(table_function, context); + auto & table_function_ast = table_function->as(); + query_tree_distributed->setAlias(table_function_ast.alias); + + SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context); + table_function_searcher.visit(query_tree); + auto table_function_node = table_function_searcher.getNode(); + + if (!table_function_node) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't fiund table function node"); + + // Find add used columns from table function to make proper projection list + CollectUsedColumnsForSourceVisitor collector(table_function_node, context); + collector.visit(query_tree); + const auto & columns = collector.getColumns(); + + auto modified_query_tree = query_tree->clone(); + auto & query_node = modified_query_tree->as(); + + query_node.resolveProjectionColumns(columns); + auto column_nodes_to_select = std::make_shared(); + column_nodes_to_select->getNodes().reserve(columns.size()); + for (auto & column : columns) + column_nodes_to_select->getNodes().emplace_back(std::make_shared(column, table_function_node)); + query_node.getProjectionNode() = column_nodes_to_select; + + // Left only table function to send on cluster nodes + modified_query_tree = modified_query_tree->cloneAndReplace(query_node.getJoinTree(), query_tree_distributed); + + query_to_send = queryNodeToDistributedSelectQuery(modified_query_tree); +} + /// The code executes on initiator void IStorageCluster::read( QueryPlan & query_plan, @@ -155,13 +305,15 @@ void IStorageCluster::read( Block sample_block; ASTPtr query_to_send = query_info.query; + updateQueryToSendWithGlobalJoinIfNeeded(query_to_send, query_info.query_tree, context); + if (context->getSettingsRef()[Setting::allow_experimental_analyzer]) { - sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query_info.query, context, SelectQueryOptions(processed_stage)); + sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query_to_send, context, SelectQueryOptions(processed_stage)); } else { - auto interpreter = InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage).analyze()); + auto interpreter = InterpreterSelectQuery(query_to_send, context, SelectQueryOptions(processed_stage).analyze()); sample_block = interpreter.getSampleBlock(); query_to_send = interpreter.getQueryInfo().query->clone(); } @@ -169,7 +321,7 @@ void IStorageCluster::read( updateQueryToSendIfNeeded(query_to_send, storage_snapshot, context); RestoreQualifiedNamesVisitor::Data data; - data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_info.query->as(), 0)); + data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_to_send->as(), 0)); data.remote_table.database = context->getCurrentDatabase(); data.remote_table.table = getName(); RestoreQualifiedNamesVisitor(data).visit(query_to_send); @@ -278,8 +430,16 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const } QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage( - ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo &) const + ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo & query_info) const { + if (context->getSettingsRef()[Setting::prefer_global_in_and_join]) + { + SearcherVisitor join_searcher(QueryTreeNodeType::JOIN, context); + join_searcher.visit(query_info.query_tree); + if (join_searcher.getNode()) + return QueryProcessingStage::Enum::FetchColumns; + } + /// Initiator executes query on remote node. if (context->getClientInfo().query_kind == ClientInfo::QueryKind::INITIAL_QUERY) if (to_stage >= QueryProcessingStage::Enum::WithMergeableState) diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index a119f5fc676d..754550243316 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -57,6 +57,7 @@ class IStorageCluster : public IStorage protected: virtual void updateBeforeRead(const ContextPtr &) {} virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {} + void updateQueryToSendWithGlobalJoinIfNeeded(ASTPtr & query_to_send, QueryTreeNodePtr query_tree, const ContextPtr & context); virtual void readFallBackToPure( QueryPlan & /* query_plan */, diff --git a/src/Storages/extractTableFunctionFromSelectQuery.cpp b/src/Storages/extractTableFunctionFromSelectQuery.cpp index c7f60240b3c7..43fa6ef0bdc3 100644 --- a/src/Storages/extractTableFunctionFromSelectQuery.cpp +++ b/src/Storages/extractTableFunctionFromSelectQuery.cpp @@ -9,7 +9,7 @@ namespace DB { -ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query) +ASTPtr extractTableFunctionFromSelectQueryPtr(ASTPtr & query) { auto * select_query = query->as(); if (!select_query || !select_query->tables()) @@ -17,11 +17,16 @@ ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query) auto * tables = select_query->tables()->as(); auto * table_expression = tables->children[0]->as()->table_expression->as(); - if (!table_expression->table_function) + return table_expression->table_function; +} + +ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query) +{ + auto table_function_ptr = extractTableFunctionFromSelectQueryPtr(query); + if (!table_function_ptr) return nullptr; - auto * table_function = table_expression->table_function->as(); - return table_function; + return table_function_ptr->as(); } ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query) diff --git a/src/Storages/extractTableFunctionFromSelectQuery.h b/src/Storages/extractTableFunctionFromSelectQuery.h index 87edf01c1c82..d6d276a90adf 100644 --- a/src/Storages/extractTableFunctionFromSelectQuery.h +++ b/src/Storages/extractTableFunctionFromSelectQuery.h @@ -7,6 +7,7 @@ namespace DB { +ASTPtr extractTableFunctionFromSelectQueryPtr(ASTPtr & query); ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query); ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query); diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index 4659ed1d5cee..7c4ec8d9ef82 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -858,3 +858,67 @@ def test_cluster_hosts_limit(started_cluster): """ ) assert int(hosts_2) == 2 + + +def test_joins(started_cluster): + node = started_cluster.instances["s0_0_0"] + + # Table join_table only exists on the node 's0_0_0'. + node.query( + """ + CREATE TABLE IF NOT EXISTS join_table ( + id UInt32, + name String + ) ENGINE=MergeTree() + ORDER BY id; + """ + ) + + node.query( + f""" + INSERT INTO join_table + SELECT value, concat(name, '_jt') FROM s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))'); + """ + ) + + result1 = node.query( + f""" + SELECT t1.name, t2.name FROM + s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') AS t1 + JOIN + join_table AS t2 + ON t1.value = t2.id + ORDER BY t1.name + SETTINGS prefer_global_in_and_join=1; + """ + ) + + res = list(map(str.split, result1.splitlines())) + + assert len(res) == 25 + + for line in res: + if len(line) == 2: + assert line[1] == f"{line[0]}_jt" + else: + assert line == ["_jt"] # for empty name + + result2 = node.query( + f""" + SELECT t1.name, t2.name FROM + join_table AS t2 + JOIN + s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') AS t1 + ON t1.value = t2.id + ORDER BY t1.name + SETTINGS prefer_global_in_and_join=1; + """ + ) + + assert result1 == result2 From 628b035f9ff24122978a31f3aa831c601e499c7f Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 27 Aug 2025 14:51:34 +0200 Subject: [PATCH 2/4] Fix WHERE --- src/Storages/IStorageCluster.cpp | 106 ++++++++++++++++------ tests/integration/test_s3_cluster/test.py | 57 +++++++++++- 2 files changed, 133 insertions(+), 30 deletions(-) diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 6ccfe06967cb..7c53c31dfd44 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -133,6 +133,9 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate, size_t extension = storage->getTaskIteratorExtension(predicate, context, number_of_replicas); } +namespace +{ + /* Helping class to find in query tree first node of required type */ @@ -178,9 +181,12 @@ class CollectUsedColumnsForSourceVisitor : public InDepthQueryTreeVisitorWithCon explicit CollectUsedColumnsForSourceVisitor( QueryTreeNodePtr source_, - ContextPtr context) + ContextPtr context, + bool collect_columns_from_other_sources_ = false) : Base(context) - , source(source_) {} + , source(source_) + , collect_columns_from_other_sources(collect_columns_from_other_sources_) + {} void enterImpl(QueryTreeNodePtr & node) { @@ -194,7 +200,7 @@ class CollectUsedColumnsForSourceVisitor : public InDepthQueryTreeVisitorWithCon if (!column_source) return; - if (column_source == source) + if ((column_source == source) != collect_columns_from_other_sources) { const auto & name = column_node.getColumnName(); if (!names.count(name)) @@ -211,6 +217,9 @@ class CollectUsedColumnsForSourceVisitor : public InDepthQueryTreeVisitorWithCon std::unordered_set names; QueryTreeNodePtr source; NamesAndTypes columns; + bool collect_columns_from_other_sources; +}; + }; /* @@ -238,42 +247,66 @@ void IStorageCluster::updateQueryToSendWithGlobalJoinIfNeeded( if (!context->getSettingsRef()[Setting::prefer_global_in_and_join]) return; - SearcherVisitor join_searcher(QueryTreeNodeType::JOIN, context); - join_searcher.visit(query_tree); - if (!join_searcher.getNode()) - return; - - auto table_function = extractTableFunctionFromSelectQueryPtr(query_to_send); - auto query_tree_distributed = buildTableFunctionQueryTree(table_function, context); - auto & table_function_ast = table_function->as(); - query_tree_distributed->setAlias(table_function_ast.alias); + auto modified_query_tree = query_tree->clone(); + bool need_modify = false; + bool can_use_where_on_remote_nodes = true; SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context); table_function_searcher.visit(query_tree); auto table_function_node = table_function_searcher.getNode(); - if (!table_function_node) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't fiund table function node"); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node"); - // Find add used columns from table function to make proper projection list - CollectUsedColumnsForSourceVisitor collector(table_function_node, context); - collector.visit(query_tree); - const auto & columns = collector.getColumns(); + // Find columns from other sources in 'WHERE' condition + { + CollectUsedColumnsForSourceVisitor collector_where(table_function_node, context, true); + auto & query_node = query_tree->as(); + if (query_node.hasWhere()) + collector_where.visit(query_node.getWhere()); - auto modified_query_tree = query_tree->clone(); - auto & query_node = modified_query_tree->as(); + // Can't use 'WHERE' on remote node if it contains columns from other sources + if (!collector_where.getColumns().empty()) + can_use_where_on_remote_nodes = false; - query_node.resolveProjectionColumns(columns); - auto column_nodes_to_select = std::make_shared(); - column_nodes_to_select->getNodes().reserve(columns.size()); - for (auto & column : columns) - column_nodes_to_select->getNodes().emplace_back(std::make_shared(column, table_function_node)); - query_node.getProjectionNode() = column_nodes_to_select; + need_modify = true; + } + + SearcherVisitor join_searcher(QueryTreeNodeType::JOIN, context); + join_searcher.visit(query_tree); + if (join_searcher.getNode()) + { + auto table_function = extractTableFunctionFromSelectQueryPtr(query_to_send); + auto query_tree_distributed = buildTableFunctionQueryTree(table_function, context); + auto & table_function_ast = table_function->as(); + query_tree_distributed->setAlias(table_function_ast.alias); + + // Find add used columns from table function to make proper projection list + CollectUsedColumnsForSourceVisitor collector(table_function_node, context); + collector.visit(query_tree); + const auto & columns = collector.getColumns(); + + auto & query_node = modified_query_tree->as(); + query_node.resolveProjectionColumns(columns); + auto column_nodes_to_select = std::make_shared(); + column_nodes_to_select->getNodes().reserve(columns.size()); + for (auto & column : columns) + column_nodes_to_select->getNodes().emplace_back(std::make_shared(column, table_function_node)); + query_node.getProjectionNode() = column_nodes_to_select; + + // Left only table function to send on cluster nodes + modified_query_tree = modified_query_tree->cloneAndReplace(query_node.getJoinTree(), query_tree_distributed); + + need_modify = true; + } - // Left only table function to send on cluster nodes - modified_query_tree = modified_query_tree->cloneAndReplace(query_node.getJoinTree(), query_tree_distributed); + if (!can_use_where_on_remote_nodes) + { + auto & query_node = modified_query_tree->as(); + query_node.getWhere() = {}; + } - query_to_send = queryNodeToDistributedSelectQuery(modified_query_tree); + if (need_modify) + query_to_send = queryNodeToDistributedSelectQuery(modified_query_tree); } /// The code executes on initiator @@ -438,6 +471,21 @@ QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage( join_searcher.visit(query_info.query_tree); if (join_searcher.getNode()) return QueryProcessingStage::Enum::FetchColumns; + + SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context); + table_function_searcher.visit(query_info.query_tree); + auto table_function_node = table_function_searcher.getNode(); + if (!table_function_node) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node"); + + CollectUsedColumnsForSourceVisitor collector_where(table_function_node, context, true); + auto & query_node = query_info.query_tree->as(); + if (query_node.hasWhere()) + collector_where.visit(query_node.getWhere()); + + // Can't use 'WHERE' on remote node if it contains columns from other sources + if (!collector_where.getColumns().empty()) + return QueryProcessingStage::Enum::FetchColumns; } /// Initiator executes query on remote node. diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index 7c4ec8d9ef82..d0c04798ab54 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -898,7 +898,6 @@ def test_joins(started_cluster): ) res = list(map(str.split, result1.splitlines())) - assert len(res) == 25 for line in res: @@ -922,3 +921,59 @@ def test_joins(started_cluster): ) assert result1 == result2 + + # With WHERE clause with remote column only + result3 = node.query( + f""" + SELECT t1.name, t2.name FROM + s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') AS t1 + JOIN + join_table AS t2 + ON t1.value = t2.id + WHERE (t1.value % 2) + ORDER BY t1.name + SETTINGS prefer_global_in_and_join=1; + """ + ) + + res = list(map(str.split, result3.splitlines())) + assert len(res) == 8 + + # With WHERE clause with local column only + result4 = node.query( + f""" + SELECT t1.name, t2.name FROM + s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') AS t1 + JOIN + join_table AS t2 + ON t1.value = t2.id + WHERE (t2.id % 2) + ORDER BY t1.name + SETTINGS prefer_global_in_and_join=1; + """ + ) + + assert result3 == result4 + + # With WHERE clause with local and remote columns + result5 = node.query( + f""" + SELECT t1.name, t2.name FROM + s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') AS t1 + JOIN + join_table AS t2 + ON t1.value = t2.id + WHERE (t1.value % 2) AND ((t2.id % 3) == 2) + ORDER BY t1.name + SETTINGS prefer_global_in_and_join=1; + """ + ) + + res = list(map(str.split, result5.splitlines())) + assert len(res) == 6 From 77b4f1fcf05cd2957d9cffc1ccf70811938e19b5 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Mon, 1 Sep 2025 18:25:05 +0200 Subject: [PATCH 3/4] Setting --- src/Core/Settings.cpp | 16 +++ src/Core/Settings.h | 1 + src/Core/SettingsChangesHistory.cpp | 1 + src/Core/SettingsEnums.cpp | 5 + src/Core/SettingsEnums.h | 10 ++ src/Storages/IStorageCluster.cpp | 122 +++++++++++----------- src/Storages/IStorageCluster.h | 5 +- tests/integration/test_s3_cluster/test.py | 10 +- 8 files changed, 102 insertions(+), 68 deletions(-) diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 7e014dd62900..60bb33a0c927 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -1713,6 +1713,22 @@ Possible values: - `global` — Replaces the `IN`/`JOIN` query with `GLOBAL IN`/`GLOBAL JOIN.` - `allow` — Allows the use of these types of subqueries. )", IMPORTANT) \ + DECLARE(ObjectStorageClusterJoinMode, object_storage_cluster_join_mode, ObjectStorageClusterJoinMode::ALLOW, R"( +Changes the behaviour of object storage cluster function ot table. + +ClickHouse applies this setting when the query contains the product of object storage cluster function ot table, i.e. when the query for a object storage cluster function ot table contains a non-GLOBAL subquery for the object storage cluster function ot table. + +Restrictions: + +- Only applied for JOIN subqueries. +- Only if the FROM section uses a object storage cluster function ot table. + +Possible values: + +- `local` — Replaces the database and table in the subquery with local ones for the destination server (shard), leaving the normal `IN`/`JOIN.` +- `global` — Unsupported for now. Replaces the `IN`/`JOIN` query with `GLOBAL IN`/`GLOBAL JOIN.` +- `allow` — Default value. Allows the use of these types of subqueries. +)", 0) \ \ DECLARE(UInt64, max_concurrent_queries_for_all_users, 0, R"( Throw exception if the value of this setting is less or equal than the current number of simultaneously processed queries. diff --git a/src/Core/Settings.h b/src/Core/Settings.h index c93a1589c44d..1ade278234fb 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -58,6 +58,7 @@ class WriteBuffer; M(CLASS_NAME, DistributedCachePoolBehaviourOnLimit) /* Cloud only */ \ M(CLASS_NAME, DistributedDDLOutputMode) \ M(CLASS_NAME, DistributedProductMode) \ + M(CLASS_NAME, ObjectStorageClusterJoinMode) \ M(CLASS_NAME, Double) \ M(CLASS_NAME, EscapingRule) \ M(CLASS_NAME, Float) \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 70624b079351..04598f704ca1 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -72,6 +72,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() // Altinity Antalya modifications atop of 25.6 {"object_storage_cluster", "", "", "New setting"}, {"object_storage_max_nodes", 0, 0, "New setting"}, + {"object_storage_cluster_join_mode", "allow", "allow", "New setting"}, }); addSettingsChanges(settings_changes_history, "25.6", { diff --git a/src/Core/SettingsEnums.cpp b/src/Core/SettingsEnums.cpp index bf6b3e52df62..71e1e20657be 100644 --- a/src/Core/SettingsEnums.cpp +++ b/src/Core/SettingsEnums.cpp @@ -90,6 +90,11 @@ IMPLEMENT_SETTING_ENUM(DistributedProductMode, ErrorCodes::UNKNOWN_DISTRIBUTED_P {"global", DistributedProductMode::GLOBAL}, {"allow", DistributedProductMode::ALLOW}}) +IMPLEMENT_SETTING_ENUM(ObjectStorageClusterJoinMode, ErrorCodes::BAD_ARGUMENTS, + {{"local", ObjectStorageClusterJoinMode::LOCAL}, + {"global", ObjectStorageClusterJoinMode::GLOBAL}, + {"allow", ObjectStorageClusterJoinMode::ALLOW}}) + IMPLEMENT_SETTING_ENUM(QueryResultCacheNondeterministicFunctionHandling, ErrorCodes::BAD_ARGUMENTS, {{"throw", QueryResultCacheNondeterministicFunctionHandling::Throw}, diff --git a/src/Core/SettingsEnums.h b/src/Core/SettingsEnums.h index a030c7fb741e..17ca34695b42 100644 --- a/src/Core/SettingsEnums.h +++ b/src/Core/SettingsEnums.h @@ -163,6 +163,16 @@ enum class DistributedProductMode : uint8_t DECLARE_SETTING_ENUM(DistributedProductMode) +/// The setting for executing object storage cluster function ot table JOIN sections. +enum class ObjectStorageClusterJoinMode : uint8_t +{ + LOCAL, /// Convert to local query + GLOBAL, /// Convert to global query + ALLOW /// Enable +}; + +DECLARE_SETTING_ENUM(ObjectStorageClusterJoinMode) + /// How the query result cache handles queries with non-deterministic functions, e.g. now() enum class QueryResultCacheNondeterministicFunctionHandling : uint8_t { diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 7c53c31dfd44..6655139147ac 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -47,7 +47,7 @@ namespace Setting extern const SettingsString cluster_for_parallel_replicas; extern const SettingsNonZeroUInt64 max_parallel_replicas; extern const SettingsUInt64 object_storage_max_nodes; - extern const SettingsBool prefer_global_in_and_join; + extern const SettingsObjectStorageClusterJoinMode object_storage_cluster_join_mode; } namespace ErrorCodes @@ -239,74 +239,67 @@ to FROM s3Cluster(...) AS s3 */ -void IStorageCluster::updateQueryToSendWithGlobalJoinIfNeeded( +void IStorageCluster::updateQueryWithJoinToSendIfNeeded( ASTPtr & query_to_send, QueryTreeNodePtr query_tree, const ContextPtr & context) { - if (!context->getSettingsRef()[Setting::prefer_global_in_and_join]) - return; - - auto modified_query_tree = query_tree->clone(); - bool need_modify = false; - bool can_use_where_on_remote_nodes = true; - - SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context); - table_function_searcher.visit(query_tree); - auto table_function_node = table_function_searcher.getNode(); - if (!table_function_node) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node"); - - // Find columns from other sources in 'WHERE' condition + auto object_storage_cluster_join_mode = context->getSettingsRef()[Setting::object_storage_cluster_join_mode]; + switch (object_storage_cluster_join_mode) { - CollectUsedColumnsForSourceVisitor collector_where(table_function_node, context, true); - auto & query_node = query_tree->as(); - if (query_node.hasWhere()) - collector_where.visit(query_node.getWhere()); + case ObjectStorageClusterJoinMode::LOCAL: + { + auto modified_query_tree = query_tree->clone(); + bool need_modify = false; - // Can't use 'WHERE' on remote node if it contains columns from other sources - if (!collector_where.getColumns().empty()) - can_use_where_on_remote_nodes = false; + SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context); + table_function_searcher.visit(query_tree); + auto table_function_node = table_function_searcher.getNode(); + if (!table_function_node) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node"); - need_modify = true; - } + if (has_join) + { + auto table_function = extractTableFunctionFromSelectQueryPtr(query_to_send); + auto query_tree_distributed = buildTableFunctionQueryTree(table_function, context); + auto & table_function_ast = table_function->as(); + query_tree_distributed->setAlias(table_function_ast.alias); + + // Find add used columns from table function to make proper projection list + CollectUsedColumnsForSourceVisitor collector(table_function_node, context); + collector.visit(query_tree); + const auto & columns = collector.getColumns(); + + auto & query_node = modified_query_tree->as(); + query_node.resolveProjectionColumns(columns); + auto column_nodes_to_select = std::make_shared(); + column_nodes_to_select->getNodes().reserve(columns.size()); + for (auto & column : columns) + column_nodes_to_select->getNodes().emplace_back(std::make_shared(column, table_function_node)); + query_node.getProjectionNode() = column_nodes_to_select; + + // Left only table function to send on cluster nodes + modified_query_tree = modified_query_tree->cloneAndReplace(query_node.getJoinTree(), query_tree_distributed); + + need_modify = true; + } - SearcherVisitor join_searcher(QueryTreeNodeType::JOIN, context); - join_searcher.visit(query_tree); - if (join_searcher.getNode()) - { - auto table_function = extractTableFunctionFromSelectQueryPtr(query_to_send); - auto query_tree_distributed = buildTableFunctionQueryTree(table_function, context); - auto & table_function_ast = table_function->as(); - query_tree_distributed->setAlias(table_function_ast.alias); - - // Find add used columns from table function to make proper projection list - CollectUsedColumnsForSourceVisitor collector(table_function_node, context); - collector.visit(query_tree); - const auto & columns = collector.getColumns(); - - auto & query_node = modified_query_tree->as(); - query_node.resolveProjectionColumns(columns); - auto column_nodes_to_select = std::make_shared(); - column_nodes_to_select->getNodes().reserve(columns.size()); - for (auto & column : columns) - column_nodes_to_select->getNodes().emplace_back(std::make_shared(column, table_function_node)); - query_node.getProjectionNode() = column_nodes_to_select; - - // Left only table function to send on cluster nodes - modified_query_tree = modified_query_tree->cloneAndReplace(query_node.getJoinTree(), query_tree_distributed); - - need_modify = true; - } + if (has_local_columns_in_where) + { + auto & query_node = modified_query_tree->as(); + query_node.getWhere() = {}; + } - if (!can_use_where_on_remote_nodes) - { - auto & query_node = modified_query_tree->as(); - query_node.getWhere() = {}; + if (need_modify) + query_to_send = queryNodeToDistributedSelectQuery(modified_query_tree); + return; + } + case ObjectStorageClusterJoinMode::GLOBAL: + // TODO + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "`Global` mode for `object_storage_cluster_join_mode` setting is unimplemented for now"); + case ObjectStorageClusterJoinMode::ALLOW: // Do nothing special + return; } - - if (need_modify) - query_to_send = queryNodeToDistributedSelectQuery(modified_query_tree); } /// The code executes on initiator @@ -338,7 +331,7 @@ void IStorageCluster::read( Block sample_block; ASTPtr query_to_send = query_info.query; - updateQueryToSendWithGlobalJoinIfNeeded(query_to_send, query_info.query_tree, context); + updateQueryWithJoinToSendIfNeeded(query_to_send, query_info.query_tree, context); if (context->getSettingsRef()[Setting::allow_experimental_analyzer]) { @@ -465,12 +458,14 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage( ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo & query_info) const { - if (context->getSettingsRef()[Setting::prefer_global_in_and_join]) + auto object_storage_cluster_join_mode = context->getSettingsRef()[Setting::object_storage_cluster_join_mode]; + + if (object_storage_cluster_join_mode != ObjectStorageClusterJoinMode::ALLOW) { SearcherVisitor join_searcher(QueryTreeNodeType::JOIN, context); join_searcher.visit(query_info.query_tree); if (join_searcher.getNode()) - return QueryProcessingStage::Enum::FetchColumns; + has_join = true; SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context); table_function_searcher.visit(query_info.query_tree); @@ -485,6 +480,9 @@ QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage( // Can't use 'WHERE' on remote node if it contains columns from other sources if (!collector_where.getColumns().empty()) + has_local_columns_in_where = true; + + if (has_join || has_local_columns_in_where) return QueryProcessingStage::Enum::FetchColumns; } diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index 754550243316..b3d8674f2033 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -57,7 +57,7 @@ class IStorageCluster : public IStorage protected: virtual void updateBeforeRead(const ContextPtr &) {} virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {} - void updateQueryToSendWithGlobalJoinIfNeeded(ASTPtr & query_to_send, QueryTreeNodePtr query_tree, const ContextPtr & context); + void updateQueryWithJoinToSendIfNeeded(ASTPtr & query_to_send, QueryTreeNodePtr query_tree, const ContextPtr & context); virtual void readFallBackToPure( QueryPlan & /* query_plan */, @@ -86,6 +86,9 @@ class IStorageCluster : public IStorage LoggerPtr log; String cluster_name; + + mutable bool has_join = false; + mutable bool has_local_columns_in_where = false; }; diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index d0c04798ab54..603896f4192b 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -893,7 +893,7 @@ def test_joins(started_cluster): join_table AS t2 ON t1.value = t2.id ORDER BY t1.name - SETTINGS prefer_global_in_and_join=1; + SETTINGS object_storage_cluster_join_mode='local'; """ ) @@ -916,7 +916,7 @@ def test_joins(started_cluster): 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') AS t1 ON t1.value = t2.id ORDER BY t1.name - SETTINGS prefer_global_in_and_join=1; + SETTINGS object_storage_cluster_join_mode='local'; """ ) @@ -934,7 +934,7 @@ def test_joins(started_cluster): ON t1.value = t2.id WHERE (t1.value % 2) ORDER BY t1.name - SETTINGS prefer_global_in_and_join=1; + SETTINGS object_storage_cluster_join_mode='local'; """ ) @@ -953,7 +953,7 @@ def test_joins(started_cluster): ON t1.value = t2.id WHERE (t2.id % 2) ORDER BY t1.name - SETTINGS prefer_global_in_and_join=1; + SETTINGS object_storage_cluster_join_mode='local'; """ ) @@ -971,7 +971,7 @@ def test_joins(started_cluster): ON t1.value = t2.id WHERE (t1.value % 2) AND ((t2.id % 3) == 2) ORDER BY t1.name - SETTINGS prefer_global_in_and_join=1; + SETTINGS object_storage_cluster_join_mode='local'; """ ) From 70f47edf38c4acc36613412fd37da3a617f0fd56 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 9 Sep 2025 20:04:38 +0200 Subject: [PATCH 4/4] Throw exception on allow_experimental_analyzer=0 && object_storage_cluster_join_mode --- src/Storages/IStorageCluster.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 6655139147ac..6a65199cd04e 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -53,6 +53,7 @@ namespace Setting namespace ErrorCodes { extern const int NOT_IMPLEMENTED; + extern const int LOGICAL_ERROR; } IStorageCluster::IStorageCluster( @@ -462,6 +463,10 @@ QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage( if (object_storage_cluster_join_mode != ObjectStorageClusterJoinMode::ALLOW) { + if (!context->getSettingsRef()[Setting::allow_experimental_analyzer]) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, + "object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true"); + SearcherVisitor join_searcher(QueryTreeNodeType::JOIN, context); join_searcher.visit(query_info.query_tree); if (join_searcher.getNode())