diff --git a/src/Interpreters/ClusterProxy/executeQuery.cpp b/src/Interpreters/ClusterProxy/executeQuery.cpp index c4cd7849897b..acc93b7d8dea 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.cpp +++ b/src/Interpreters/ClusterProxy/executeQuery.cpp @@ -443,6 +443,7 @@ void executeQuery( not_optimized_cluster->getName()); read_from_remote->setStepDescription("Read from remote replica"); + read_from_remote->setIsRemoteFunction(is_remote_function); plan->addStep(std::move(read_from_remote)); plan->addInterpreterContext(new_context); plans.emplace_back(std::move(plan)); diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index b66bdd104a99..8ae8391f98b0 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -2844,8 +2844,11 @@ void Context::setCurrentQueryId(const String & query_id) client_info.current_query_id = query_id_to_set; - if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY) + if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY + && (getApplicationType() != ApplicationType::SERVER || client_info.initial_query_id.empty())) + { client_info.initial_query_id = client_info.current_query_id; + } } void Context::setBackgroundOperationTypeForContext(ClientInfo::BackgroundOperationType background_operation) diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 32f05f89a9d6..789e0fabb4da 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -68,6 +68,7 @@ #include #include #include +#include #include #include #include @@ -189,6 +190,7 @@ namespace Setting extern const SettingsUInt64 min_count_to_compile_aggregate_expression; extern const SettingsBool enable_software_prefetch_in_aggregation; extern const SettingsBool optimize_group_by_constant_keys; + extern const SettingsBool use_hive_partitioning; } namespace ServerSetting @@ -1965,6 +1967,22 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional

(query_plan.getRootNode()->step.get())) + { + auto object_filter_step = std::make_unique( + query_plan.getCurrentHeader(), + expressions.before_where->dag.clone(), + getSelectQuery().where()->getColumnName()); + + object_filter_step->setStepDescription("WHERE"); + query_plan.addStep(std::move(object_filter_step)); + } + } + if (from_aggregation_stage) { /// No need to aggregate anything, since this was done on remote shards. diff --git a/src/Planner/Planner.cpp b/src/Planner/Planner.cpp index 4dba9e6b16b8..109977f87ae9 100644 --- a/src/Planner/Planner.cpp +++ b/src/Planner/Planner.cpp @@ -37,6 +37,7 @@ #include #include #include +#include #include #include @@ -133,6 +134,7 @@ namespace Setting extern const SettingsUInt64 min_count_to_compile_aggregate_expression; extern const SettingsBool enable_software_prefetch_in_aggregation; extern const SettingsBool optimize_group_by_constant_keys; + extern const SettingsBool use_hive_partitioning; } namespace ServerSetting @@ -413,6 +415,19 @@ void addFilterStep(QueryPlan & query_plan, query_plan.addStep(std::move(where_step)); } +void addObjectFilterStep(QueryPlan & query_plan, + FilterAnalysisResult & filter_analysis_result, + const std::string & step_description) +{ + auto actions = std::move(filter_analysis_result.filter_actions->dag); + + auto where_step = std::make_unique(query_plan.getCurrentHeader(), + std::move(actions), + filter_analysis_result.filter_column_name); + where_step->setStepDescription(step_description); + query_plan.addStep(std::move(where_step)); +} + Aggregator::Params getAggregatorParams(const PlannerContextPtr & planner_context, const AggregationAnalysisResult & aggregation_analysis_result, const QueryAnalysisResult & query_analysis_result, @@ -1677,6 +1692,16 @@ void Planner::buildPlanForQueryNode() if (query_processing_info.isSecondStage() || query_processing_info.isFromAggregationState()) { + if (settings[Setting::use_hive_partitioning] + && !query_processing_info.isFirstStage() + && expression_analysis_result.hasWhere()) + { + if (typeid_cast(query_plan.getRootNode()->step.get())) + { + addObjectFilterStep(query_plan, expression_analysis_result.getWhere(), "WHERE"); + } + } + if (query_processing_info.isFromAggregationState()) { /// Aggregation was performed on remote shards diff --git a/src/Processors/QueryPlan/ObjectFilterStep.cpp b/src/Processors/QueryPlan/ObjectFilterStep.cpp new file mode 100644 index 000000000000..2ae2294a571b --- /dev/null +++ b/src/Processors/QueryPlan/ObjectFilterStep.cpp @@ -0,0 +1,63 @@ +#include +#include +#include +#include +#include + +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int INCORRECT_DATA; +} + +ObjectFilterStep::ObjectFilterStep( + const Header & input_header_, + ActionsDAG actions_dag_, + String filter_column_name_) + : actions_dag(std::move(actions_dag_)) + , filter_column_name(std::move(filter_column_name_)) +{ + input_headers.emplace_back(input_header_); + output_header = input_headers.front(); +} + +QueryPipelineBuilderPtr ObjectFilterStep::updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings & /* settings */) +{ + return std::move(pipelines.front()); +} + +void ObjectFilterStep::updateOutputHeader() +{ + output_header = input_headers.front(); +} + +void ObjectFilterStep::serialize(Serialization & ctx) const +{ + writeStringBinary(filter_column_name, ctx.out); + + actions_dag.serialize(ctx.out, ctx.registry); +} + +std::unique_ptr ObjectFilterStep::deserialize(Deserialization & ctx) +{ + if (ctx.input_headers.size() != 1) + throw Exception(ErrorCodes::INCORRECT_DATA, "ObjectFilterStep must have one input stream"); + + String filter_column_name; + readStringBinary(filter_column_name, ctx.in); + + ActionsDAG actions_dag = ActionsDAG::deserialize(ctx.in, ctx.registry, ctx.context); + + return std::make_unique(ctx.input_headers.front(), std::move(actions_dag), std::move(filter_column_name)); +} + +void registerObjectFilterStep(QueryPlanStepRegistry & registry) +{ + registry.registerStep("ObjectFilter", ObjectFilterStep::deserialize); +} + +} diff --git a/src/Processors/QueryPlan/ObjectFilterStep.h b/src/Processors/QueryPlan/ObjectFilterStep.h new file mode 100644 index 000000000000..f72cb00c86ab --- /dev/null +++ b/src/Processors/QueryPlan/ObjectFilterStep.h @@ -0,0 +1,35 @@ +#pragma once +#include +#include + +namespace DB +{ + +/// Implements WHERE operation. +class ObjectFilterStep : public IQueryPlanStep +{ +public: + ObjectFilterStep( + const Header & input_header_, + ActionsDAG actions_dag_, + String filter_column_name_); + + String getName() const override { return "ObjectFilter"; } + QueryPipelineBuilderPtr updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings & settings) override; + + const ActionsDAG & getExpression() const { return actions_dag; } + ActionsDAG & getExpression() { return actions_dag; } + const String & getFilterColumnName() const { return filter_column_name; } + + void serialize(Serialization & ctx) const override; + + static std::unique_ptr deserialize(Deserialization & ctx); + +private: + void updateOutputHeader() override; + + ActionsDAG actions_dag; + String filter_column_name; +}; + +} diff --git a/src/Processors/QueryPlan/Optimizations/optimizePrimaryKeyConditionAndLimit.cpp b/src/Processors/QueryPlan/Optimizations/optimizePrimaryKeyConditionAndLimit.cpp index ce36c7bddb43..33408e02df87 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizePrimaryKeyConditionAndLimit.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizePrimaryKeyConditionAndLimit.cpp @@ -3,6 +3,7 @@ #include #include #include +#include namespace DB::QueryPlanOptimizations { @@ -41,6 +42,10 @@ void optimizePrimaryKeyConditionAndLimit(const Stack & stack) /// So this is likely not needed. continue; } + else if (auto * object_filter_step = typeid_cast(iter->node->step.get())) + { + source_step_with_filter->addFilter(object_filter_step->getExpression().clone(), object_filter_step->getFilterColumnName()); + } else { break; diff --git a/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp b/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp index a3f0d87f10de..ffeccbfd3e0b 100644 --- a/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp +++ b/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp @@ -49,6 +49,7 @@ void registerFilterStep(QueryPlanStepRegistry & registry); void registerTotalsHavingStep(QueryPlanStepRegistry & registry); void registerExtremesStep(QueryPlanStepRegistry & registry); void registerJoinStep(QueryPlanStepRegistry & registry); +void registerObjectFilterStep(QueryPlanStepRegistry & registry); void QueryPlanStepRegistry::registerPlanSteps() { @@ -67,6 +68,7 @@ void QueryPlanStepRegistry::registerPlanSteps() registerTotalsHavingStep(registry); registerExtremesStep(registry); registerJoinStep(registry); + registerObjectFilterStep(registry); } } diff --git a/src/Processors/QueryPlan/ReadFromRemote.cpp b/src/Processors/QueryPlan/ReadFromRemote.cpp index e150c029ff76..65be58bd635c 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.cpp +++ b/src/Processors/QueryPlan/ReadFromRemote.cpp @@ -459,7 +459,7 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStream my_stage = stage, my_storage = storage, add_agg_info, add_totals, add_extremes, async_read, async_query_sending, query_tree = shard.query_tree, planner_context = shard.planner_context, - pushed_down_filters]() mutable + pushed_down_filters, my_is_remote_function = is_remote_function]() mutable -> QueryPipelineBuilder { auto current_settings = my_context->getSettingsRef(); @@ -543,6 +543,8 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStream {DataTypeUInt32().createColumnConst(1, my_shard.shard_info.shard_num), std::make_shared(), "_shard_num"}}; auto remote_query_executor = std::make_shared( std::move(connections), query_string, header, my_context, my_throttler, my_scalars, my_external_tables, my_stage); + remote_query_executor->setRemoteFunction(my_is_remote_function); + remote_query_executor->setShardCount(my_shard_count); auto pipe = createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending); QueryPipelineBuilder builder; @@ -627,6 +629,8 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact priority_func); remote_query_executor->setLogger(log); remote_query_executor->setPoolMode(PoolMode::GET_ONE); + remote_query_executor->setRemoteFunction(is_remote_function); + remote_query_executor->setShardCount(shard_count); if (!table_func_ptr) remote_query_executor->setMainTable(shard.main_table ? shard.main_table : main_table); @@ -646,6 +650,8 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact auto remote_query_executor = std::make_shared( shard.shard_info.pool, query_string, shard.header, context, throttler, scalars, external_tables, stage); remote_query_executor->setLogger(log); + remote_query_executor->setRemoteFunction(is_remote_function); + remote_query_executor->setShardCount(shard_count); if (context->canUseTaskBasedParallelReplicas()) { diff --git a/src/Processors/QueryPlan/ReadFromRemote.h b/src/Processors/QueryPlan/ReadFromRemote.h index cfd7e82bba14..55cd1fe8eae0 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.h +++ b/src/Processors/QueryPlan/ReadFromRemote.h @@ -46,6 +46,7 @@ class ReadFromRemote final : public SourceStepWithFilterBase void enableMemoryBoundMerging(); void enforceAggregationInOrder(); + void setIsRemoteFunction(bool is_remote_function_ = true) { is_remote_function = is_remote_function_; } private: ClusterProxy::SelectStreamFactory::Shards shards; @@ -61,6 +62,7 @@ class ReadFromRemote final : public SourceStepWithFilterBase UInt32 shard_count; const String cluster_name; std::optional priority_func_factory; + bool is_remote_function = false; Pipes addPipes(const ClusterProxy::SelectStreamFactory::Shards & used_shards, const Header & out_header); void addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard, const Header & out_header); diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index 1b7df321b7f0..beb7c517662f 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -405,7 +405,16 @@ void RemoteQueryExecutor::sendQueryUnlocked(ClientInfo::QueryKind query_kind, As auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings); ClientInfo modified_client_info = context->getClientInfo(); - modified_client_info.query_kind = query_kind; + + /// Doesn't support now "remote('1.1.1.{1,2}')"" + if (is_remote_function && (shard_count == 1)) + { + modified_client_info.setInitialQuery(); + modified_client_info.client_name = "ClickHouse server"; + modified_client_info.interface = ClientInfo::Interface::TCP; + } + else + modified_client_info.query_kind = query_kind; if (!duplicated_part_uuids.empty()) connections->sendIgnoredPartUUIDs(duplicated_part_uuids); diff --git a/src/QueryPipeline/RemoteQueryExecutor.h b/src/QueryPipeline/RemoteQueryExecutor.h index f2d7f8ca8823..f3381828e84d 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.h +++ b/src/QueryPipeline/RemoteQueryExecutor.h @@ -212,6 +212,10 @@ class RemoteQueryExecutor void setLogger(LoggerPtr logger) { log = logger; } + void setRemoteFunction(bool is_remote_function_ = true) { is_remote_function = is_remote_function_; } + + void setShardCount(UInt32 shard_count_) { shard_count = shard_count_; } + const Block & getHeader() const { return header; } IConnections & getConnections() { return *connections; } @@ -301,6 +305,9 @@ class RemoteQueryExecutor bool packet_in_progress = false; + bool is_remote_function = false; + UInt32 shard_count = 0; + /// Parts uuids, collected from remote replicas std::vector duplicated_part_uuids; diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 9cb37cbf032f..ec447c0738ab 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -14,7 +14,6 @@ #include #include #include -#include #include #include #include @@ -57,51 +56,6 @@ IStorageCluster::IStorageCluster( { } -class ReadFromCluster : public SourceStepWithFilter -{ -public: - std::string getName() const override { return "ReadFromCluster"; } - void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override; - void applyFilters(ActionDAGNodes added_filter_nodes) override; - - ReadFromCluster( - const Names & column_names_, - const SelectQueryInfo & query_info_, - const StorageSnapshotPtr & storage_snapshot_, - const ContextPtr & context_, - Block sample_block, - std::shared_ptr storage_, - ASTPtr query_to_send_, - QueryProcessingStage::Enum processed_stage_, - ClusterPtr cluster_, - LoggerPtr log_) - : SourceStepWithFilter( - std::move(sample_block), - column_names_, - query_info_, - storage_snapshot_, - context_) - , storage(std::move(storage_)) - , query_to_send(std::move(query_to_send_)) - , processed_stage(processed_stage_) - , cluster(std::move(cluster_)) - , log(log_) - { - } - -private: - std::shared_ptr storage; - ASTPtr query_to_send; - QueryProcessingStage::Enum processed_stage; - ClusterPtr cluster; - LoggerPtr log; - - std::optional extension; - - void createExtension(const ActionsDAG::Node * predicate); - ContextPtr updateSettings(const Settings & settings); -}; - void ReadFromCluster::applyFilters(ActionDAGNodes added_filter_nodes) { SourceStepWithFilter::applyFilters(std::move(added_filter_nodes)); diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index 98082829d422..e66ba8222b8a 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -4,6 +4,7 @@ #include #include #include +#include namespace DB { @@ -89,4 +90,50 @@ class IStorageCluster : public IStorage }; +class ReadFromCluster : public SourceStepWithFilter +{ +public: + std::string getName() const override { return "ReadFromCluster"; } + void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override; + void applyFilters(ActionDAGNodes added_filter_nodes) override; + + ReadFromCluster( + const Names & column_names_, + const SelectQueryInfo & query_info_, + const StorageSnapshotPtr & storage_snapshot_, + const ContextPtr & context_, + Block sample_block, + std::shared_ptr storage_, + ASTPtr query_to_send_, + QueryProcessingStage::Enum processed_stage_, + ClusterPtr cluster_, + LoggerPtr log_) + : SourceStepWithFilter( + std::move(sample_block), + column_names_, + query_info_, + storage_snapshot_, + context_) + , storage(std::move(storage_)) + , query_to_send(std::move(query_to_send_)) + , processed_stage(processed_stage_) + , cluster(std::move(cluster_)) + , log(log_) + { + } + +private: + std::shared_ptr storage; + ASTPtr query_to_send; + QueryProcessingStage::Enum processed_stage; + ClusterPtr cluster; + LoggerPtr log; + + std::optional extension; + + void createExtension(const ActionsDAG::Node * predicate); + ContextPtr updateSettings(const Settings & settings); +}; + + } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index fd43360b171f..0ced844d4584 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -92,7 +92,9 @@ StorageObjectStorageCluster::StorageObjectStorageCluster( metadata.setColumns(columns); metadata.setConstraints(constraints_); - if (sample_path.empty() && context_->getSettingsRef()[Setting::use_hive_partitioning]) + if (sample_path.empty() + && context_->getSettingsRef()[Setting::use_hive_partitioning] + && !configuration->withPartitionWildcard()) sample_path = getPathSample(metadata, context_); setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(metadata.columns, context_, sample_path)); @@ -364,7 +366,7 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten { auto iterator = StorageObjectStorageSource::createFileIterator( configuration, configuration->getQuerySettings(local_context), object_storage, /* distributed_processing */false, - local_context, predicate, virtual_columns, nullptr, local_context->getFileProgressCallback(), /*ignore_archive_globs=*/true); + local_context, predicate, getVirtualsList(), nullptr, local_context->getFileProgressCallback(), /*ignore_archive_globs=*/true); std::vector ids_of_hosts; for (const auto & shard : cluster->getShardsInfo()) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 3e464d0d03c8..fd4270518976 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -107,7 +107,6 @@ class StorageObjectStorageCluster : public IStorageCluster const String engine_name; StorageObjectStorage::ConfigurationPtr configuration; const ObjectStoragePtr object_storage; - NamesAndTypesList virtual_columns; bool cluster_name_in_settings; /// non-clustered storage to fall back on pure realisation if needed diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index 5b448d0a095b..8b4a2008d9c0 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -844,6 +844,14 @@ def make_query_from_table(alt_syntax=False): count_secondary_subqueries(started_cluster, query_id_pure_table_engine_with_type_in_nc, 0, "table engine with storage type in named collection") count_secondary_subqueries(started_cluster, query_id_pure_table_engine_cluster_with_type_in_nc, 1, "table engine with cluster setting with storage type in named collection") + select_remote_cluster = ( + instance.query(f"SELECT * FROM remote('node2',{table_function_expr_cluster})") + .strip() + .split() + ) + assert len(select_remote_cluster) == 600 + assert select_remote_cluster == select_regular + @pytest.mark.parametrize("format_version", ["1", "2"]) @pytest.mark.parametrize("storage_type", ["s3", "azure", "local"])