diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index cc18996d0a01..009461d08614 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -1760,6 +1760,22 @@ Possible values: - `global` — Replaces the `IN`/`JOIN` query with `GLOBAL IN`/`GLOBAL JOIN.` - `allow` — Allows the use of these types of subqueries. )", IMPORTANT) \ + DECLARE(ObjectStorageClusterJoinMode, object_storage_cluster_join_mode, ObjectStorageClusterJoinMode::ALLOW, R"( +Changes the behaviour of object storage cluster function or table. + +ClickHouse applies this setting when the query contains the product of object storage cluster function ot table, i.e. when the query for a object storage cluster function ot table contains a non-GLOBAL subquery for the object storage cluster function ot table. + +Restrictions: + +- Only applied for JOIN subqueries. +- Only if the FROM section uses a object storage cluster function ot table. + +Possible values: + +- `local` — Replaces the database and table in the subquery with local ones for the destination server (shard), leaving the normal `IN`/`JOIN.` +- `global` — Unsupported for now. Replaces the `IN`/`JOIN` query with `GLOBAL IN`/`GLOBAL JOIN.` +- `allow` — Default value. Allows the use of these types of subqueries. +)", 0) \ \ DECLARE(UInt64, max_concurrent_queries_for_all_users, 0, R"( Throw exception if the value of this setting is less or equal than the current number of simultaneously processed queries. diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 7f284dd78b4e..546c687e445a 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -58,6 +58,7 @@ class WriteBuffer; M(CLASS_NAME, DistributedCachePoolBehaviourOnLimit) /* Cloud only */ \ M(CLASS_NAME, DistributedDDLOutputMode) \ M(CLASS_NAME, DistributedProductMode) \ + M(CLASS_NAME, ObjectStorageClusterJoinMode) \ M(CLASS_NAME, Double) \ M(CLASS_NAME, EscapingRule) \ M(CLASS_NAME, Float) \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 923dddf1288e..bbee194e29ed 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -41,6 +41,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() /// Note: please check if the key already exists to prevent duplicate entries. addSettingsChanges(settings_changes_history, "25.8.9.2000", { + {"object_storage_cluster_join_mode", "allow", "allow", "New setting"}, {"lock_object_storage_task_distribution_ms", 500, 500, "Raised the value to 500 to avoid hoping tasks between executors."}, {"object_storage_cluster", "", "", "Antalya: New setting"}, {"object_storage_max_nodes", 0, 0, "Antalya: New setting"}, diff --git a/src/Core/SettingsEnums.cpp b/src/Core/SettingsEnums.cpp index 7a586f51168e..8e34fc296b1f 100644 --- a/src/Core/SettingsEnums.cpp +++ b/src/Core/SettingsEnums.cpp @@ -90,6 +90,11 @@ IMPLEMENT_SETTING_ENUM(DistributedProductMode, ErrorCodes::UNKNOWN_DISTRIBUTED_P {"global", DistributedProductMode::GLOBAL}, {"allow", DistributedProductMode::ALLOW}}) +IMPLEMENT_SETTING_ENUM(ObjectStorageClusterJoinMode, ErrorCodes::BAD_ARGUMENTS, + {{"local", ObjectStorageClusterJoinMode::LOCAL}, + {"global", ObjectStorageClusterJoinMode::GLOBAL}, + {"allow", ObjectStorageClusterJoinMode::ALLOW}}) + IMPLEMENT_SETTING_ENUM(QueryResultCacheNondeterministicFunctionHandling, ErrorCodes::BAD_ARGUMENTS, {{"throw", QueryResultCacheNondeterministicFunctionHandling::Throw}, diff --git a/src/Core/SettingsEnums.h b/src/Core/SettingsEnums.h index 18873a0790ae..d4472e339edf 100644 --- a/src/Core/SettingsEnums.h +++ b/src/Core/SettingsEnums.h @@ -163,6 +163,16 @@ enum class DistributedProductMode : uint8_t DECLARE_SETTING_ENUM(DistributedProductMode) +/// The setting for executing object storage cluster function ot table JOIN sections. +enum class ObjectStorageClusterJoinMode : uint8_t +{ + LOCAL, /// Convert to local query + GLOBAL, /// Convert to global query + ALLOW /// Enable +}; + +DECLARE_SETTING_ENUM(ObjectStorageClusterJoinMode) + /// How the query result cache handles queries with non-deterministic functions, e.g. now() enum class QueryResultCacheNondeterministicFunctionHandling : uint8_t { diff --git a/src/Interpreters/ClusterProxy/executeQuery.cpp b/src/Interpreters/ClusterProxy/executeQuery.cpp index 4d15eb28c8fd..df9018219431 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.cpp +++ b/src/Interpreters/ClusterProxy/executeQuery.cpp @@ -464,6 +464,7 @@ void executeQuery( not_optimized_cluster->getName()); read_from_remote->setStepDescription("Read from remote replica"); + read_from_remote->setIsRemoteFunction(is_remote_function); plan->addStep(std::move(read_from_remote)); plan->addInterpreterContext(new_context); plans.emplace_back(std::move(plan)); diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 002b84335d53..27405614f8f4 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -2982,8 +2982,11 @@ void Context::setCurrentQueryId(const String & query_id) client_info.current_query_id = query_id_to_set; - if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY) + if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY + && (getApplicationType() != ApplicationType::SERVER || client_info.initial_query_id.empty())) + { client_info.initial_query_id = client_info.current_query_id; + } } void Context::setBackgroundOperationTypeForContext(ClientInfo::BackgroundOperationType background_operation) diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index e0ffd41fcdd3..7b9aebf2433c 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -71,6 +71,7 @@ #include #include #include +#include #include #include #include @@ -83,6 +84,7 @@ #include #include #include +#include #include #include @@ -195,6 +197,7 @@ namespace Setting extern const SettingsUInt64 max_rows_to_transfer; extern const SettingsOverflowMode transfer_overflow_mode; extern const SettingsString implicit_table_at_top_level; + extern const SettingsBool use_hive_partitioning; } namespace ServerSetting @@ -1976,6 +1979,22 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional

(query_plan.getRootNode()->step.get())) + { + auto object_filter_step = std::make_unique( + query_plan.getCurrentHeader(), + expressions.before_where->dag.clone(), + getSelectQuery().where()->getColumnName()); + + object_filter_step->setStepDescription("WHERE"); + query_plan.addStep(std::move(object_filter_step)); + } + } + if (from_aggregation_stage) { /// No need to aggregate anything, since this was done on remote shards. diff --git a/src/Planner/Planner.cpp b/src/Planner/Planner.cpp index bd33315d138d..98b3426b82fe 100644 --- a/src/Planner/Planner.cpp +++ b/src/Planner/Planner.cpp @@ -39,6 +39,7 @@ #include #include #include +#include #include #include @@ -53,6 +54,7 @@ #include #include #include +#include #include @@ -144,6 +146,7 @@ namespace Setting extern const SettingsUInt64 max_rows_to_transfer; extern const SettingsOverflowMode transfer_overflow_mode; extern const SettingsBool enable_parallel_blocks_marshalling; + extern const SettingsBool use_hive_partitioning; } namespace ServerSetting @@ -473,6 +476,19 @@ void addFilterStep( query_plan.addStep(std::move(where_step)); } +void addObjectFilterStep(QueryPlan & query_plan, + FilterAnalysisResult & filter_analysis_result, + const std::string & step_description) +{ + auto actions = std::move(filter_analysis_result.filter_actions->dag); + + auto where_step = std::make_unique(query_plan.getCurrentHeader(), + std::move(actions), + filter_analysis_result.filter_column_name); + where_step->setStepDescription(step_description); + query_plan.addStep(std::move(where_step)); +} + Aggregator::Params getAggregatorParams(const PlannerContextPtr & planner_context, const AggregationAnalysisResult & aggregation_analysis_result, const QueryAnalysisResult & query_analysis_result, @@ -1787,6 +1803,16 @@ void Planner::buildPlanForQueryNode() if (query_processing_info.isSecondStage() || query_processing_info.isFromAggregationState()) { + if (settings[Setting::use_hive_partitioning] + && !query_processing_info.isFirstStage() + && expression_analysis_result.hasWhere()) + { + if (typeid_cast(query_plan.getRootNode()->step.get())) + { + addObjectFilterStep(query_plan, expression_analysis_result.getWhere(), "WHERE"); + } + } + if (query_processing_info.isFromAggregationState()) { /// Aggregation was performed on remote shards diff --git a/src/Planner/PlannerJoinTree.cpp b/src/Planner/PlannerJoinTree.cpp index dff3ebfb14c2..eed732ef6728 100644 --- a/src/Planner/PlannerJoinTree.cpp +++ b/src/Planner/PlannerJoinTree.cpp @@ -1372,6 +1372,8 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres /// Hopefully there is no other case when we read from Distributed up to FetchColumns. if (table_node && table_node->getStorage()->isRemote() && select_query_options.to_stage == QueryProcessingStage::FetchColumns) updated_actions_dag_outputs.push_back(output_node); + else if (table_function_node && table_function_node->getStorage()->isRemote()) + updated_actions_dag_outputs.push_back(output_node); } else updated_actions_dag_outputs.push_back(&rename_actions_dag.addAlias(*output_node, *column_identifier)); diff --git a/src/Processors/QueryPlan/ObjectFilterStep.cpp b/src/Processors/QueryPlan/ObjectFilterStep.cpp new file mode 100644 index 000000000000..a635aee729c7 --- /dev/null +++ b/src/Processors/QueryPlan/ObjectFilterStep.cpp @@ -0,0 +1,63 @@ +#include +#include +#include +#include +#include + +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int INCORRECT_DATA; +} + +ObjectFilterStep::ObjectFilterStep( + SharedHeader input_header_, + ActionsDAG actions_dag_, + String filter_column_name_) + : actions_dag(std::move(actions_dag_)) + , filter_column_name(std::move(filter_column_name_)) +{ + input_headers.emplace_back(input_header_); + output_header = input_headers.front(); +} + +QueryPipelineBuilderPtr ObjectFilterStep::updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings & /* settings */) +{ + return std::move(pipelines.front()); +} + +void ObjectFilterStep::updateOutputHeader() +{ + output_header = input_headers.front(); +} + +void ObjectFilterStep::serialize(Serialization & ctx) const +{ + writeStringBinary(filter_column_name, ctx.out); + + actions_dag.serialize(ctx.out, ctx.registry); +} + +std::unique_ptr ObjectFilterStep::deserialize(Deserialization & ctx) +{ + if (ctx.input_headers.size() != 1) + throw Exception(ErrorCodes::INCORRECT_DATA, "ObjectFilterStep must have one input stream"); + + String filter_column_name; + readStringBinary(filter_column_name, ctx.in); + + ActionsDAG actions_dag = ActionsDAG::deserialize(ctx.in, ctx.registry, ctx.context); + + return std::make_unique(ctx.input_headers.front(), std::move(actions_dag), std::move(filter_column_name)); +} + +void registerObjectFilterStep(QueryPlanStepRegistry & registry) +{ + registry.registerStep("ObjectFilter", ObjectFilterStep::deserialize); +} + +} diff --git a/src/Processors/QueryPlan/ObjectFilterStep.h b/src/Processors/QueryPlan/ObjectFilterStep.h new file mode 100644 index 000000000000..ef35d20068ba --- /dev/null +++ b/src/Processors/QueryPlan/ObjectFilterStep.h @@ -0,0 +1,35 @@ +#pragma once +#include +#include + +namespace DB +{ + +/// Implements WHERE operation. +class ObjectFilterStep : public IQueryPlanStep +{ +public: + ObjectFilterStep( + SharedHeader input_header_, + ActionsDAG actions_dag_, + String filter_column_name_); + + String getName() const override { return "ObjectFilter"; } + QueryPipelineBuilderPtr updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings & settings) override; + + const ActionsDAG & getExpression() const { return actions_dag; } + ActionsDAG & getExpression() { return actions_dag; } + const String & getFilterColumnName() const { return filter_column_name; } + + void serialize(Serialization & ctx) const override; + + static std::unique_ptr deserialize(Deserialization & ctx); + +private: + void updateOutputHeader() override; + + ActionsDAG actions_dag; + String filter_column_name; +}; + +} diff --git a/src/Processors/QueryPlan/Optimizations/optimizePrimaryKeyConditionAndLimit.cpp b/src/Processors/QueryPlan/Optimizations/optimizePrimaryKeyConditionAndLimit.cpp index ce36c7bddb43..33408e02df87 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizePrimaryKeyConditionAndLimit.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizePrimaryKeyConditionAndLimit.cpp @@ -3,6 +3,7 @@ #include #include #include +#include namespace DB::QueryPlanOptimizations { @@ -41,6 +42,10 @@ void optimizePrimaryKeyConditionAndLimit(const Stack & stack) /// So this is likely not needed. continue; } + else if (auto * object_filter_step = typeid_cast(iter->node->step.get())) + { + source_step_with_filter->addFilter(object_filter_step->getExpression().clone(), object_filter_step->getFilterColumnName()); + } else { break; diff --git a/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp b/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp index 517f46fbfc96..283eece53c00 100644 --- a/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp +++ b/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp @@ -50,6 +50,7 @@ void registerFilterStep(QueryPlanStepRegistry & registry); void registerTotalsHavingStep(QueryPlanStepRegistry & registry); void registerExtremesStep(QueryPlanStepRegistry & registry); void registerJoinStep(QueryPlanStepRegistry & registry); +void registerObjectFilterStep(QueryPlanStepRegistry & registry); void registerReadFromTableStep(QueryPlanStepRegistry & registry); void registerReadFromTableFunctionStep(QueryPlanStepRegistry & registry); @@ -75,6 +76,7 @@ void QueryPlanStepRegistry::registerPlanSteps() registerReadFromTableStep(registry); registerReadFromTableFunctionStep(registry); + registerObjectFilterStep(registry); } } diff --git a/src/Processors/QueryPlan/ReadFromRemote.cpp b/src/Processors/QueryPlan/ReadFromRemote.cpp index 48a412ef781f..70f3b78b8245 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.cpp +++ b/src/Processors/QueryPlan/ReadFromRemote.cpp @@ -508,7 +508,8 @@ void ReadFromRemote::addLazyPipe( my_stage = stage, my_storage = storage, add_agg_info, add_totals, add_extremes, async_read, async_query_sending, query_tree = shard.query_tree, planner_context = shard.planner_context, - pushed_down_filters, parallel_marshalling_threads]() mutable + pushed_down_filters, parallel_marshalling_threads, + my_is_remote_function = is_remote_function]() mutable -> QueryPipelineBuilder { auto current_settings = my_context->getSettingsRef(); @@ -603,6 +604,8 @@ void ReadFromRemote::addLazyPipe( {DataTypeUInt32().createColumnConst(1, my_shard.shard_info.shard_num), std::make_shared(), "_shard_num"}}; auto remote_query_executor = std::make_shared( std::move(connections), query_string, header, my_context, my_throttler, my_scalars, my_external_tables, stage_to_use, my_shard.query_plan); + remote_query_executor->setRemoteFunction(my_is_remote_function); + remote_query_executor->setShardCount(my_shard_count); auto pipe = createRemoteSourcePipe( remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending, parallel_marshalling_threads); @@ -693,6 +696,8 @@ void ReadFromRemote::addPipe( priority_func); remote_query_executor->setLogger(log); remote_query_executor->setPoolMode(PoolMode::GET_ONE); + remote_query_executor->setRemoteFunction(is_remote_function); + remote_query_executor->setShardCount(shard_count); if (!table_func_ptr) remote_query_executor->setMainTable(shard.main_table ? shard.main_table : main_table); @@ -713,6 +718,8 @@ void ReadFromRemote::addPipe( auto remote_query_executor = std::make_shared( shard.shard_info.pool, query_string, shard.header, context, throttler, scalars, external_tables, stage_to_use, shard.query_plan); remote_query_executor->setLogger(log); + remote_query_executor->setRemoteFunction(is_remote_function); + remote_query_executor->setShardCount(shard_count); if (context->canUseTaskBasedParallelReplicas() || parallel_replicas_disabled) { diff --git a/src/Processors/QueryPlan/ReadFromRemote.h b/src/Processors/QueryPlan/ReadFromRemote.h index fec1549430e1..02e1536e05d1 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.h +++ b/src/Processors/QueryPlan/ReadFromRemote.h @@ -45,6 +45,7 @@ class ReadFromRemote final : public SourceStepWithFilterBase void enableMemoryBoundMerging(); void enforceAggregationInOrder(const SortDescription & sort_description); + void setIsRemoteFunction(bool is_remote_function_ = true) { is_remote_function = is_remote_function_; } bool hasSerializedPlan() const; @@ -62,6 +63,7 @@ class ReadFromRemote final : public SourceStepWithFilterBase UInt32 shard_count; const String cluster_name; std::optional priority_func_factory; + bool is_remote_function = false; Pipes addPipes(const ClusterProxy::SelectStreamFactory::Shards & used_shards, const SharedHeader & out_header); diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index 1e45649ad699..83f33a52de41 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -408,7 +408,16 @@ void RemoteQueryExecutor::sendQueryUnlocked(ClientInfo::QueryKind query_kind, As auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings); ClientInfo modified_client_info = context->getClientInfo(); - modified_client_info.query_kind = query_kind; + + /// Doesn't support now "remote('1.1.1.{1,2}')"" + if (is_remote_function && (shard_count == 1)) + { + modified_client_info.setInitialQuery(); + modified_client_info.client_name = "ClickHouse server"; + modified_client_info.interface = ClientInfo::Interface::TCP; + } + else + modified_client_info.query_kind = query_kind; if (extension) modified_client_info.collaborate_with_initiator = true; diff --git a/src/QueryPipeline/RemoteQueryExecutor.h b/src/QueryPipeline/RemoteQueryExecutor.h index 11deb4f2c9b2..7ef8be9e27cc 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.h +++ b/src/QueryPipeline/RemoteQueryExecutor.h @@ -224,6 +224,10 @@ class RemoteQueryExecutor void setLogger(LoggerPtr logger) { log = logger; } + void setRemoteFunction(bool is_remote_function_ = true) { is_remote_function = is_remote_function_; } + + void setShardCount(UInt32 shard_count_) { shard_count = shard_count_; } + const Block & getHeader() const { return *header; } const SharedHeader & getSharedHeader() const { return header; } @@ -320,6 +324,9 @@ class RemoteQueryExecutor bool packet_in_progress = false; #endif + bool is_remote_function = false; + UInt32 shard_count = 0; + /// Parts uuids, collected from remote replicas std::vector duplicated_part_uuids; diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 3b41df8e4477..bc62df76785c 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -15,7 +15,6 @@ #include #include #include -#include #include #include #include @@ -23,6 +22,12 @@ #include #include #include +#include +#include +#include +#include +#include +#include #include #include @@ -40,11 +45,13 @@ namespace Setting extern const SettingsBool parallel_replicas_local_plan; extern const SettingsString cluster_for_parallel_replicas; extern const SettingsNonZeroUInt64 max_parallel_replicas; + extern const SettingsObjectStorageClusterJoinMode object_storage_cluster_join_mode; extern const SettingsUInt64 object_storage_max_nodes; } namespace ErrorCodes { + extern const int LOGICAL_ERROR; extern const int NOT_IMPLEMENTED; } @@ -63,51 +70,6 @@ IStorageCluster::IStorageCluster( { } -class ReadFromCluster : public SourceStepWithFilter -{ -public: - std::string getName() const override { return "ReadFromCluster"; } - void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override; - void applyFilters(ActionDAGNodes added_filter_nodes) override; - - ReadFromCluster( - const Names & column_names_, - const SelectQueryInfo & query_info_, - const StorageSnapshotPtr & storage_snapshot_, - const ContextPtr & context_, - SharedHeader sample_block, - std::shared_ptr storage_, - ASTPtr query_to_send_, - QueryProcessingStage::Enum processed_stage_, - ClusterPtr cluster_, - LoggerPtr log_) - : SourceStepWithFilter( - std::move(sample_block), - column_names_, - query_info_, - storage_snapshot_, - context_) - , storage(std::move(storage_)) - , query_to_send(std::move(query_to_send_)) - , processed_stage(processed_stage_) - , cluster(std::move(cluster_)) - , log(log_) - { - } - -private: - std::shared_ptr storage; - ASTPtr query_to_send; - QueryProcessingStage::Enum processed_stage; - ClusterPtr cluster; - LoggerPtr log; - - std::optional extension; - - void createExtension(const ActionsDAG::Node * predicate); - ContextPtr updateSettings(const Settings & settings); -}; - void ReadFromCluster::applyFilters(ActionDAGNodes added_filter_nodes) { SourceStepWithFilter::applyFilters(std::move(added_filter_nodes)); @@ -131,6 +93,175 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate) cluster); } +namespace +{ + +/* +Helping class to find in query tree first node of required type +*/ +class SearcherVisitor : public InDepthQueryTreeVisitorWithContext +{ +public: + using Base = InDepthQueryTreeVisitorWithContext; + using Base::Base; + + explicit SearcherVisitor(QueryTreeNodeType type_, ContextPtr context) : Base(context), type(type_) {} + + bool needChildVisit(QueryTreeNodePtr &, QueryTreeNodePtr & /*child*/) + { + return !passed_node; + } + + void enterImpl(QueryTreeNodePtr & node) + { + if (passed_node) + return; + + auto node_type = node->getNodeType(); + + if (node_type == type) + passed_node = node; + } + + QueryTreeNodePtr getNode() const { return passed_node; } + +private: + QueryTreeNodeType type; + QueryTreeNodePtr passed_node; +}; + +/* +Helping class to find all used columns with specific source +*/ +class CollectUsedColumnsForSourceVisitor : public InDepthQueryTreeVisitorWithContext +{ +public: + using Base = InDepthQueryTreeVisitorWithContext; + using Base::Base; + + explicit CollectUsedColumnsForSourceVisitor( + QueryTreeNodePtr source_, + ContextPtr context, + bool collect_columns_from_other_sources_ = false) + : Base(context) + , source(source_) + , collect_columns_from_other_sources(collect_columns_from_other_sources_) + {} + + void enterImpl(QueryTreeNodePtr & node) + { + auto node_type = node->getNodeType(); + + if (node_type != QueryTreeNodeType::COLUMN) + return; + + auto & column_node = node->as(); + auto column_source = column_node.getColumnSourceOrNull(); + if (!column_source) + return; + + if ((column_source == source) != collect_columns_from_other_sources) + { + const auto & name = column_node.getColumnName(); + if (!names.count(name)) + { + columns.emplace_back(column_node.getColumn()); + names.insert(name); + } + } + } + + const NamesAndTypes & getColumns() const { return columns; } + +private: + std::unordered_set names; + QueryTreeNodePtr source; + NamesAndTypes columns; + bool collect_columns_from_other_sources; +}; + +}; + +/* +Try to make subquery to send on nodes +Converts + + SELECT s3.c1, s3.c2, t.c3 + FROM + s3Cluster(...) AS s3 + JOIN + localtable as t + ON s3.key == t.key + +to + + SELECT s3.c1, s3.c2, s3.key + FROM + s3Cluster(...) AS s3 +*/ +void IStorageCluster::updateQueryWithJoinToSendIfNeeded( + ASTPtr & query_to_send, + QueryTreeNodePtr query_tree, + const ContextPtr & context) +{ + auto object_storage_cluster_join_mode = context->getSettingsRef()[Setting::object_storage_cluster_join_mode]; + switch (object_storage_cluster_join_mode) + { + case ObjectStorageClusterJoinMode::LOCAL: + { + auto modified_query_tree = query_tree->clone(); + bool need_modify = false; + + SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context); + table_function_searcher.visit(query_tree); + auto table_function_node = table_function_searcher.getNode(); + if (!table_function_node) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node"); + + if (has_join) + { + auto table_function = extractTableFunctionASTPtrFromSelectQuery(query_to_send); + auto query_tree_distributed = buildTableFunctionQueryTree(table_function, context); + auto & table_function_ast = table_function->as(); + query_tree_distributed->setAlias(table_function_ast.alias); + + // Find add used columns from table function to make proper projection list + CollectUsedColumnsForSourceVisitor collector(table_function_node, context); + collector.visit(query_tree); + const auto & columns = collector.getColumns(); + + auto & query_node = modified_query_tree->as(); + query_node.resolveProjectionColumns(columns); + auto column_nodes_to_select = std::make_shared(); + column_nodes_to_select->getNodes().reserve(columns.size()); + for (auto & column : columns) + column_nodes_to_select->getNodes().emplace_back(std::make_shared(column, table_function_node)); + query_node.getProjectionNode() = column_nodes_to_select; + + // Left only table function to send on cluster nodes + modified_query_tree = modified_query_tree->cloneAndReplace(query_node.getJoinTree(), query_tree_distributed); + + need_modify = true; + } + + if (has_local_columns_in_where) + { + auto & query_node = modified_query_tree->as(); + query_node.getWhere() = {}; + } + + if (need_modify) + query_to_send = queryNodeToDistributedSelectQuery(modified_query_tree); + return; + } + case ObjectStorageClusterJoinMode::GLOBAL: + // TODO + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "`Global` mode for `object_storage_cluster_join_mode` setting is unimplemented for now"); + case ObjectStorageClusterJoinMode::ALLOW: // Do nothing special + return; + } +} + /// The code executes on initiator void IStorageCluster::read( QueryPlan & query_plan, @@ -160,13 +291,15 @@ void IStorageCluster::read( SharedHeader sample_block; ASTPtr query_to_send = query_info.query; + updateQueryWithJoinToSendIfNeeded(query_to_send, query_info.query_tree, context); + if (context->getSettingsRef()[Setting::allow_experimental_analyzer]) { - sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query_info.query, context, SelectQueryOptions(processed_stage)); + sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query_to_send, context, SelectQueryOptions(processed_stage)); } else { - auto interpreter = InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage).analyze()); + auto interpreter = InterpreterSelectQuery(query_to_send, context, SelectQueryOptions(processed_stage).analyze()); sample_block = interpreter.getSampleBlock(); query_to_send = interpreter.getQueryInfo().query->clone(); } @@ -174,7 +307,7 @@ void IStorageCluster::read( updateQueryToSendIfNeeded(query_to_send, storage_snapshot, context); RestoreQualifiedNamesVisitor::Data data; - data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_info.query->as(), 0)); + data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_to_send->as(), 0)); data.remote_table.database = context->getCurrentDatabase(); data.remote_table.table = getName(); RestoreQualifiedNamesVisitor(data).visit(query_to_send); @@ -283,8 +416,40 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const } QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage( - ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo &) const + ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo & query_info) const { + auto object_storage_cluster_join_mode = context->getSettingsRef()[Setting::object_storage_cluster_join_mode]; + + if (object_storage_cluster_join_mode != ObjectStorageClusterJoinMode::ALLOW) + { + if (!context->getSettingsRef()[Setting::allow_experimental_analyzer]) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, + "object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true"); + + SearcherVisitor join_searcher(QueryTreeNodeType::JOIN, context); + join_searcher.visit(query_info.query_tree); + if (join_searcher.getNode()) + has_join = true; + + SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context); + table_function_searcher.visit(query_info.query_tree); + auto table_function_node = table_function_searcher.getNode(); + if (!table_function_node) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node"); + + CollectUsedColumnsForSourceVisitor collector_where(table_function_node, context, true); + auto & query_node = query_info.query_tree->as(); + if (query_node.hasWhere()) + collector_where.visit(query_node.getWhere()); + + // Can't use 'WHERE' on remote node if it contains columns from other sources + if (!collector_where.getColumns().empty()) + has_local_columns_in_where = true; + + if (has_join || has_local_columns_in_where) + return QueryProcessingStage::Enum::FetchColumns; + } + /// Initiator executes query on remote node. if (context->getClientInfo().query_kind == ClientInfo::QueryKind::INITIAL_QUERY) if (to_stage >= QueryProcessingStage::Enum::WithMergeableState) diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index 17952ca759ba..c559acad8f75 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -3,6 +3,7 @@ #include #include #include +#include namespace DB { @@ -61,6 +62,7 @@ class IStorageCluster : public IStorage protected: virtual void updateBeforeRead(const ContextPtr &) {} virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {} + void updateQueryWithJoinToSendIfNeeded(ASTPtr & query_to_send, QueryTreeNodePtr query_tree, const ContextPtr & context); virtual void readFallBackToPure( QueryPlan & /* query_plan */, @@ -89,7 +91,55 @@ class IStorageCluster : public IStorage LoggerPtr log; String cluster_name; + + mutable bool has_join = false; + mutable bool has_local_columns_in_where = false; }; +class ReadFromCluster : public SourceStepWithFilter +{ +public: + std::string getName() const override { return "ReadFromCluster"; } + void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override; + void applyFilters(ActionDAGNodes added_filter_nodes) override; + + ReadFromCluster( + const Names & column_names_, + const SelectQueryInfo & query_info_, + const StorageSnapshotPtr & storage_snapshot_, + const ContextPtr & context_, + SharedHeader sample_block, + std::shared_ptr storage_, + ASTPtr query_to_send_, + QueryProcessingStage::Enum processed_stage_, + ClusterPtr cluster_, + LoggerPtr log_) + : SourceStepWithFilter( + std::move(sample_block), + column_names_, + query_info_, + storage_snapshot_, + context_) + , storage(std::move(storage_)) + , query_to_send(std::move(query_to_send_)) + , processed_stage(processed_stage_) + , cluster(std::move(cluster_)) + , log(log_) + { + } + +private: + std::shared_ptr storage; + ASTPtr query_to_send; + QueryProcessingStage::Enum processed_stage; + ClusterPtr cluster; + LoggerPtr log; + + std::optional extension; + + void createExtension(const ActionsDAG::Node * predicate); + ContextPtr updateSettings(const Settings & settings); +}; + } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index a94d5b71bc71..2751e2dea569 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -509,7 +509,7 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten local_context, predicate, filter, - virtual_columns, + getVirtualsList(), hive_partition_columns_to_read_from_file_path, nullptr, local_context->getFileProgressCallback(), diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 9a442ed572bf..bcb97ae60ef1 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -161,7 +161,6 @@ class StorageObjectStorageCluster : public IStorageCluster const String engine_name; StorageObjectStorageConfigurationPtr configuration; const ObjectStoragePtr object_storage; - NamesAndTypesList virtual_columns; NamesAndTypesList hive_partition_columns_to_read_from_file_path; bool cluster_name_in_settings; diff --git a/src/Storages/extractTableFunctionFromSelectQuery.cpp b/src/Storages/extractTableFunctionFromSelectQuery.cpp index c7f60240b3c7..8477798b62b1 100644 --- a/src/Storages/extractTableFunctionFromSelectQuery.cpp +++ b/src/Storages/extractTableFunctionFromSelectQuery.cpp @@ -9,7 +9,7 @@ namespace DB { -ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query) +ASTTableExpression * extractTableExpressionASTPtrFromSelectQuery(ASTPtr & query) { auto * select_query = query->as(); if (!select_query || !select_query->tables()) @@ -17,11 +17,22 @@ ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query) auto * tables = select_query->tables()->as(); auto * table_expression = tables->children[0]->as()->table_expression->as(); - if (!table_expression->table_function) + return table_expression; +} + +ASTPtr extractTableFunctionASTPtrFromSelectQuery(ASTPtr & query) +{ + auto table_expression = extractTableExpressionASTPtrFromSelectQuery(query); + return table_expression ? table_expression->table_function : nullptr; +} + +ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query) +{ + auto table_function_ast = extractTableFunctionASTPtrFromSelectQuery(query); + if (!table_function_ast) return nullptr; - auto * table_function = table_expression->table_function->as(); - return table_function; + return table_function_ast->as(); } ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query) diff --git a/src/Storages/extractTableFunctionFromSelectQuery.h b/src/Storages/extractTableFunctionFromSelectQuery.h index 87edf01c1c82..9834f3dc7573 100644 --- a/src/Storages/extractTableFunctionFromSelectQuery.h +++ b/src/Storages/extractTableFunctionFromSelectQuery.h @@ -6,7 +6,10 @@ namespace DB { +struct ASTTableExpression; +ASTTableExpression * extractTableExpressionASTPtrFromSelectQuery(ASTPtr & query); +ASTPtr extractTableFunctionASTPtrFromSelectQuery(ASTPtr & query); ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query); ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query); diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index 25dca1c658b1..a4a306f0d34c 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -2,7 +2,10 @@ import logging import os import shutil +import time import uuid +from email.errors import HeaderParseError + import time from email.errors import HeaderParseError @@ -856,6 +859,311 @@ def test_cluster_hosts_limit(started_cluster): assert int(hosts_2) == 2 +def test_remote_hedged(started_cluster): + node = started_cluster.instances["s0_0_0"] + pure_s3 = node.query( + f""" + SELECT * from s3( + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', + 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') + ORDER BY (name, value, polygon) + LIMIT 1 + """ + ) + s3_distributed = node.query( + f""" + SELECT * from remote('s0_0_1', s3Cluster( + 'cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')) + ORDER BY (name, value, polygon) + LIMIT 1 + SETTINGS use_hedged_requests=True + """ + ) + + assert TSV(pure_s3) == TSV(s3_distributed) + + +def test_remote_no_hedged(started_cluster): + node = started_cluster.instances["s0_0_0"] + pure_s3 = node.query( + f""" + SELECT * from s3( + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', + 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') + ORDER BY (name, value, polygon) + LIMIT 1 + """ + ) + s3_distributed = node.query( + f""" + SELECT * from remote('s0_0_1', s3Cluster( + 'cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')) + ORDER BY (name, value, polygon) + LIMIT 1 + SETTINGS use_hedged_requests=False + """ + ) + + assert TSV(pure_s3) == TSV(s3_distributed) + + +@pytest.mark.parametrize("allow_experimental_analyzer", [0, 1]) +def test_hive_partitioning(started_cluster, allow_experimental_analyzer): + node = started_cluster.instances["s0_0_0"] + + node.query(f"SET allow_experimental_analyzer = {allow_experimental_analyzer}") + + for i in range(1, 5): + exists = node.query( + f""" + SELECT + count() + FROM s3('http://minio1:9001/root/data/hive/key={i}/*', 'minio', '{minio_secret_key}', 'Parquet', 'key Int32, value Int32') + GROUP BY ALL + FORMAT TSV + """ + ) + if int(exists) == 0: + node.query( + f""" + INSERT + INTO FUNCTION s3('http://minio1:9001/root/data/hive/key={i}/data.parquet', 'minio', '{minio_secret_key}', 'Parquet', 'key Int32, value Int32') + SELECT {i}, {i} + SETTINGS use_hive_partitioning = 0 + """ + ) + + query_id_full = str(uuid.uuid4()) + result = node.query( + f""" + SELECT count() + FROM s3('http://minio1:9001/root/data/hive/key=**.parquet', 'minio', '{minio_secret_key}', 'Parquet', 'key Int32, value Int32') + WHERE key <= 2 + FORMAT TSV + SETTINGS enable_filesystem_cache = 0, use_query_cache = 0, use_cache_for_count_from_files = 0, use_hive_partitioning = 0 + """, + query_id=query_id_full, + ) + result = int(result) + assert result == 2 + + query_id_optimized = str(uuid.uuid4()) + result = node.query( + f""" + SELECT count() + FROM s3('http://minio1:9001/root/data/hive/key=**.parquet', 'minio', '{minio_secret_key}', 'Parquet', 'key Int32, value Int32') + WHERE key <= 2 + FORMAT TSV + SETTINGS enable_filesystem_cache = 0, use_query_cache = 0, use_cache_for_count_from_files = 0, use_hive_partitioning = 1 + """, + query_id=query_id_optimized, + ) + result = int(result) + assert result == 2 + + query_id_cluster_full = str(uuid.uuid4()) + result = node.query( + f""" + SELECT count() + FROM s3Cluster(cluster_simple, 'http://minio1:9001/root/data/hive/key=**.parquet', 'minio', '{minio_secret_key}', 'Parquet', 'key Int32, value Int32') + WHERE key <= 2 + FORMAT TSV + SETTINGS enable_filesystem_cache = 0, use_query_cache = 0, use_cache_for_count_from_files = 0, use_hive_partitioning = 0 + """, + query_id=query_id_cluster_full, + ) + result = int(result) + assert result == 2 + + query_id_cluster_optimized = str(uuid.uuid4()) + result = node.query( + f""" + SELECT count() + FROM s3Cluster(cluster_simple, 'http://minio1:9001/root/data/hive/key=**.parquet', 'minio', '{minio_secret_key}', 'Parquet', 'key Int32, value Int32') + WHERE key <= 2 + FORMAT TSV + SETTINGS enable_filesystem_cache = 0, use_query_cache = 0, use_cache_for_count_from_files = 0, use_hive_partitioning = 1 + """, + query_id=query_id_cluster_optimized, + ) + result = int(result) + assert result == 2 + + node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'") + + full_traffic = node.query( + f""" + SELECT sum(ProfileEvents['ReadBufferFromS3Bytes']) + FROM clusterAllReplicas(cluster_simple, system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id_full}' + FORMAT TSV + """ + ) + full_traffic = int(full_traffic) + assert full_traffic > 0 # 612*4 + + optimized_traffic = node.query( + f""" + SELECT sum(ProfileEvents['ReadBufferFromS3Bytes']) + FROM clusterAllReplicas(cluster_simple, system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id_optimized}' + FORMAT TSV + """ + ) + optimized_traffic = int(optimized_traffic) + assert optimized_traffic > 0 # 612*2 + assert full_traffic > optimized_traffic + + cluster_full_traffic = node.query( + f""" + SELECT sum(ProfileEvents['ReadBufferFromS3Bytes']) + FROM clusterAllReplicas(cluster_simple, system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id_cluster_full}' + FORMAT TSV + """ + ) + cluster_full_traffic = int(cluster_full_traffic) + assert cluster_full_traffic == full_traffic + + cluster_optimized_traffic = node.query( + f""" + SELECT sum(ProfileEvents['ReadBufferFromS3Bytes']) + FROM clusterAllReplicas(cluster_simple, system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id_cluster_optimized}' + FORMAT TSV + """ + ) + cluster_optimized_traffic = int(cluster_optimized_traffic) + assert cluster_optimized_traffic == optimized_traffic + + node.query("SET allow_experimental_analyzer = DEFAULT") + + +def test_joins(started_cluster): + node = started_cluster.instances["s0_0_0"] + + # Table join_table only exists on the node 's0_0_0'. + node.query( + """ + CREATE TABLE IF NOT EXISTS join_table ( + id UInt32, + name String + ) ENGINE=MergeTree() + ORDER BY id; + """ + ) + + node.query( + f""" + INSERT INTO join_table + SELECT value, concat(name, '_jt') FROM s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))'); + """ + ) + + result1 = node.query( + f""" + SELECT t1.name, t2.name FROM + s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') AS t1 + JOIN + join_table AS t2 + ON t1.value = t2.id + ORDER BY t1.name + SETTINGS object_storage_cluster_join_mode='local'; + """ + ) + + res = list(map(str.split, result1.splitlines())) + assert len(res) == 25 + + for line in res: + if len(line) == 2: + assert line[1] == f"{line[0]}_jt" + else: + assert line == ["_jt"] # for empty name + + result2 = node.query( + f""" + SELECT t1.name, t2.name FROM + join_table AS t2 + JOIN + s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') AS t1 + ON t1.value = t2.id + ORDER BY t1.name + SETTINGS object_storage_cluster_join_mode='local'; + """ + ) + + assert result1 == result2 + + # With WHERE clause with remote column only + result3 = node.query( + f""" + SELECT t1.name, t2.name FROM + s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') AS t1 + JOIN + join_table AS t2 + ON t1.value = t2.id + WHERE (t1.value % 2) + ORDER BY t1.name + SETTINGS object_storage_cluster_join_mode='local'; + """ + ) + + res = list(map(str.split, result3.splitlines())) + assert len(res) == 8 + + # With WHERE clause with local column only + result4 = node.query( + f""" + SELECT t1.name, t2.name FROM + s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') AS t1 + JOIN + join_table AS t2 + ON t1.value = t2.id + WHERE (t2.id % 2) + ORDER BY t1.name + SETTINGS object_storage_cluster_join_mode='local'; + """ + ) + + assert result3 == result4 + + # With WHERE clause with local and remote columns + result5 = node.query( + f""" + SELECT t1.name, t2.name FROM + s3Cluster('cluster_simple', + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') AS t1 + JOIN + join_table AS t2 + ON t1.value = t2.id + WHERE (t1.value % 2) AND ((t2.id % 3) == 2) + ORDER BY t1.name + SETTINGS object_storage_cluster_join_mode='local'; + """ + ) + + res = list(map(str.split, result5.splitlines())) + assert len(res) == 6 + + def test_graceful_shutdown(started_cluster): node = started_cluster.instances["s0_0_0"] node_to_shutdown = started_cluster.instances["s0_1_0"] diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index 6f3dc5108912..eee2b36562f6 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -610,6 +610,30 @@ def make_query_from_table(alt_syntax=False): count_secondary_subqueries(started_cluster, query_id_pure_table_engine_cluster_with_type_in_nc, 1, "table engine with cluster setting with storage type in named collection") + # Cluster Query with node1 as coordinator + table_function_expr_cluster = get_creation_expression( + storage_type, + TABLE_NAME, + started_cluster, + table_function=True, + run_on_cluster=True, + ) + select_remote_cluster = ( + instance.query(f"SELECT * FROM remote('node2',{table_function_expr_cluster})") + .strip() + .split() + ) + assert len(select_remote_cluster) == 600 + assert select_remote_cluster == select_regular + + select_remote_cluster = ( + instance.query(f"SELECT * FROM remote('node2',{table_function_expr_cluster})") + .strip() + .split() + ) + assert len(select_remote_cluster) == 600 + assert select_remote_cluster == select_regular + @pytest.mark.parametrize("format_version", ["1", "2"]) @pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) diff --git a/tests/queries/0_stateless/03550_analyzer_remote_view_columns.sql b/tests/queries/0_stateless/03550_analyzer_remote_view_columns.sql index 044e34d68b5d..7eb3f2e9b67d 100644 --- a/tests/queries/0_stateless/03550_analyzer_remote_view_columns.sql +++ b/tests/queries/0_stateless/03550_analyzer_remote_view_columns.sql @@ -39,4 +39,4 @@ WHERE AND log_comment = 'THIS IS A COMMENT TO MARK THE INITIAL QUERY' LIMIT 1) AND type = 'QueryFinish' - AND NOT is_initial_query; + AND query_id != initial_query_id;