diff --git a/src/Storages/StorageMerge.cpp b/src/Storages/StorageMerge.cpp index 272f35303bd1..7d666cc4937a 100644 --- a/src/Storages/StorageMerge.cpp +++ b/src/Storages/StorageMerge.cpp @@ -1,48 +1,55 @@ -#include -#include -#include -#include -#include -#include -#include -#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include -#include #include -#include +#include #include #include -#include -#include +#include #include +#include +#include #include -#include -#include -#include -#include +#include #include -#include -#include #include -#include -#include -#include -#include -#include -#include "DataTypes/IDataType.h" -#include -#include +#include +#include +#include +#include #include #include -#include -#include -#include #include +#include +#include #include -#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include -#include - +#include +#include +#include +#include namespace { @@ -398,6 +405,7 @@ ReadFromMerge::ReadFromMerge( , context(std::move(context_)) , common_processed_stage(processed_stage) { + createChildPlans(); } void ReadFromMerge::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) @@ -408,6 +416,65 @@ void ReadFromMerge::initializePipeline(QueryPipelineBuilder & pipeline, const Bu return; } + QueryPlanResourceHolder resources; + std::vector> pipelines; + + chassert(selected_tables.size() == child_plans.size()); + chassert(selected_tables.size() == table_aliases.size()); + auto table_it = selected_tables.begin(); + for (size_t i = 0; i < selected_tables.size(); ++i, ++table_it) + { + auto & plan = child_plans.at(i); + const auto & table = *table_it; + + const auto storage = std::get<1>(table); + const auto storage_metadata_snapshot = storage->getInMemoryMetadataPtr(); + const auto nested_storage_snaphsot = storage->getStorageSnapshot(storage_metadata_snapshot, context); + + auto modified_query_info = getModifiedQueryInfo(query_info, context, table, nested_storage_snaphsot); + + auto source_pipeline = createSources( + plan, nested_storage_snaphsot, modified_query_info, common_processed_stage, common_header, table_aliases.at(i), table, context); + + if (source_pipeline && source_pipeline->initialized()) + { + resources.storage_holders.push_back(std::get<1>(table)); + resources.table_locks.push_back(std::get<2>(table)); + + pipelines.emplace_back(std::move(source_pipeline)); + } + } + + if (pipelines.empty()) + { + pipeline.init(Pipe(std::make_shared(output_stream->header))); + return; + } + + pipeline = QueryPipelineBuilder::unitePipelines(std::move(pipelines)); + + if (!query_info.input_order_info) + { + size_t tables_count = selected_tables.size(); + Float64 num_streams_multiplier = std::min( + static_cast(tables_count), + std::max(1UL, static_cast(context->getSettingsRef().max_streams_multiplier_for_merge_tables))); + size_t num_streams = static_cast(requested_num_streams * num_streams_multiplier); + + // It's possible to have many tables read from merge, resize(num_streams) might open too many files at the same time. + // Using narrowPipe instead. But in case of reading in order of primary key, we cannot do it, + // because narrowPipe doesn't preserve order. + pipeline.narrow(num_streams); + } + + pipeline.addResources(std::move(resources)); +} + +void ReadFromMerge::createChildPlans() +{ + if (selected_tables.empty()) + return; + size_t tables_count = selected_tables.size(); Float64 num_streams_multiplier = std::min(static_cast(tables_count), std::max(1UL, static_cast(context->getSettingsRef().max_streams_multiplier_for_merge_tables))); @@ -438,11 +505,6 @@ void ReadFromMerge::initializePipeline(QueryPipelineBuilder & pipeline, const Bu query_info.input_order_info = input_sorting_info; } - auto sample_block = merge_storage_snapshot->getMetadataForQuery()->getSampleBlock(); - - std::vector> pipelines; - QueryPlanResourceHolder resources; - for (const auto & table : selected_tables) { size_t current_need_streams = tables_count >= num_streams ? 1 : (num_streams / tables_count); @@ -460,7 +522,7 @@ void ReadFromMerge::initializePipeline(QueryPipelineBuilder & pipeline, const Bu if (sampling_requested && !storage->supportsSampling()) throw Exception(ErrorCodes::SAMPLING_NOT_SUPPORTED, "Illegal SAMPLE: table doesn't support sampling"); - Aliases aliases; + auto & aliases = table_aliases.emplace_back(); auto storage_metadata_snapshot = storage->getInMemoryMetadataPtr(); auto nested_storage_snaphsot = storage->getStorageSnapshot(storage_metadata_snapshot, context); @@ -479,6 +541,8 @@ void ReadFromMerge::initializePipeline(QueryPipelineBuilder & pipeline, const Bu ASTPtr required_columns_expr_list = std::make_shared(); ASTPtr column_expr; + auto sample_block = merge_storage_snapshot->getMetadataForQuery()->getSampleBlock(); + for (const auto & column : column_names) { const auto column_default = storage_columns.getDefault(column); @@ -515,42 +579,16 @@ void ReadFromMerge::initializePipeline(QueryPipelineBuilder & pipeline, const Bu } } - auto source_pipeline = createSources( + child_plans.emplace_back(createPlanForTable( nested_storage_snaphsot, modified_query_info, common_processed_stage, required_max_block_size, - common_header, - aliases, table, column_names_as_aliases.empty() ? column_names : column_names_as_aliases, context, - current_streams); - - if (source_pipeline && source_pipeline->initialized()) - { - resources.storage_holders.push_back(std::get<1>(table)); - resources.table_locks.push_back(std::get<2>(table)); - - pipelines.emplace_back(std::move(source_pipeline)); - } + current_streams)); } - - if (pipelines.empty()) - { - pipeline.init(Pipe(std::make_shared(output_stream->header))); - return; - } - - pipeline = QueryPipelineBuilder::unitePipelines(std::move(pipelines)); - - if (!query_info.input_order_info) - // It's possible to have many tables read from merge, resize(num_streams) might open too many files at the same time. - // Using narrowPipe instead. But in case of reading in order of primary key, we cannot do it, - // because narrowPipe doesn't preserve order. - pipeline.narrow(num_streams); - - pipeline.addResources(std::move(resources)); } SelectQueryInfo ReadFromMerge::getModifiedQueryInfo(const SelectQueryInfo & query_info, @@ -616,23 +654,121 @@ SelectQueryInfo ReadFromMerge::getModifiedQueryInfo(const SelectQueryInfo & quer return modified_query_info; } +bool recursivelyApplyToReadingSteps(QueryPlan::Node * node, const std::function & func) +{ + bool ok = true; + for (auto * child : node->children) + ok &= recursivelyApplyToReadingSteps(child, func); + + // This code is mainly meant to be used to call `requestReadingInOrder` on child steps. + // In this case it is ok if one child will read in order and other will not (though I don't know when it is possible), + // the only important part is to acknowledge this at the parent and don't rely on any particular ordering of input data. + if (!ok) + return false; + + if (auto * read_from_merge_tree = typeid_cast(node->step.get())) + ok &= func(*read_from_merge_tree); + + return ok; +} + QueryPipelineBuilderPtr ReadFromMerge::createSources( + QueryPlan & plan, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & modified_query_info, const QueryProcessingStage::Enum & processed_stage, - const UInt64 max_block_size, const Block & header, const Aliases & aliases, const StorageWithLockAndName & storage_with_lock, + ContextMutablePtr modified_context, + bool concat_streams) const +{ + if (!plan.isInitialized()) + return std::make_unique(); + + QueryPipelineBuilderPtr builder; + + const auto & [database_name, storage, _, table_name] = storage_with_lock; + bool allow_experimental_analyzer = modified_context->getSettingsRef().allow_experimental_analyzer; + auto storage_stage + = storage->getQueryProcessingStage(modified_context, QueryProcessingStage::Complete, storage_snapshot, modified_query_info); + + builder = plan.buildQueryPipeline( + QueryPlanOptimizationSettings::fromContext(modified_context), BuildQueryPipelineSettings::fromContext(modified_context)); + + if (processed_stage > storage_stage || (allow_experimental_analyzer && processed_stage != QueryProcessingStage::FetchColumns)) + { + /** Materialization is needed, since from distributed storage the constants come materialized. + * If you do not do this, different types (Const and non-Const) columns will be produced in different threads, + * And this is not allowed, since all code is based on the assumption that in the block stream all types are the same. + */ + builder->addSimpleTransform([](const Block & stream_header) { return std::make_shared(stream_header); }); + } + + if (builder->initialized()) + { + if (concat_streams && builder->getNumStreams() > 1) + { + // It's possible to have many tables read from merge, resize(1) might open too many files at the same time. + // Using concat instead. + builder->addTransform(std::make_shared(builder->getHeader(), builder->getNumStreams())); + } + + /// Add virtual columns if we don't already have them. + + Block pipe_header = builder->getHeader(); + + if (has_database_virtual_column && !pipe_header.has("_database")) + { + ColumnWithTypeAndName column; + column.name = "_database"; + column.type = std::make_shared(std::make_shared()); + column.column = column.type->createColumnConst(0, Field(database_name)); + + auto adding_column_dag = ActionsDAG::makeAddingColumnActions(std::move(column)); + auto adding_column_actions = std::make_shared( + std::move(adding_column_dag), ExpressionActionsSettings::fromContext(modified_context, CompileExpressions::yes)); + + builder->addSimpleTransform([&](const Block & stream_header) + { return std::make_shared(stream_header, adding_column_actions); }); + } + + if (has_table_virtual_column && !pipe_header.has("_table")) + { + ColumnWithTypeAndName column; + column.name = "_table"; + column.type = std::make_shared(std::make_shared()); + column.column = column.type->createColumnConst(0, Field(table_name)); + + auto adding_column_dag = ActionsDAG::makeAddingColumnActions(std::move(column)); + auto adding_column_actions = std::make_shared( + std::move(adding_column_dag), ExpressionActionsSettings::fromContext(modified_context, CompileExpressions::yes)); + + builder->addSimpleTransform([&](const Block & stream_header) + { return std::make_shared(stream_header, adding_column_actions); }); + } + + /// Subordinary tables could have different but convertible types, like numeric types of different width. + /// We must return streams with structure equals to structure of Merge table. + convertingSourceStream(header, storage_snapshot->metadata, aliases, modified_context, *builder, processed_stage); + } + + return builder; +} + +QueryPlan ReadFromMerge::createPlanForTable( + const StorageSnapshotPtr & storage_snapshot, + SelectQueryInfo & modified_query_info, + const QueryProcessingStage::Enum & processed_stage, + UInt64 max_block_size, + const StorageWithLockAndName & storage_with_lock, Names real_column_names, ContextMutablePtr modified_context, - size_t streams_num, - bool concat_streams) + size_t streams_num) { const auto & [database_name, storage, _, table_name] = storage_with_lock; auto & modified_select = modified_query_info.query->as(); - QueryPipelineBuilderPtr builder; if (!InterpreterSelectQuery::isQueryWithFinal(modified_query_info) && storage->needRewriteQueryWithFinal(real_column_names)) { /// NOTE: It may not work correctly in some cases, because query was analyzed without final. @@ -647,14 +783,14 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources( storage_snapshot, modified_query_info); + QueryPlan plan; + if (processed_stage <= storage_stage || (allow_experimental_analyzer && processed_stage == QueryProcessingStage::FetchColumns)) { /// If there are only virtual columns in query, you must request at least one other column. if (real_column_names.empty()) real_column_names.push_back(ExpressionActions::getSmallestColumn(storage_snapshot->metadata->getColumns().getAllPhysical()).name); - QueryPlan & plan = child_plans.emplace_back(); - StorageView * view = dynamic_cast(storage.get()); if (!view || allow_experimental_analyzer) { @@ -688,16 +824,7 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources( if (!plan.isInitialized()) return {}; - if (auto * read_from_merge_tree = typeid_cast(plan.getRootNode()->step.get())) - { - size_t filters_dags_size = filter_dags.size(); - for (size_t i = 0; i < filters_dags_size; ++i) - read_from_merge_tree->addFilter(filter_dags[i], filter_nodes.nodes[i]); - } - - builder = plan.buildQueryPipeline( - QueryPlanOptimizationSettings::fromContext(modified_context), - BuildQueryPipelineSettings::fromContext(modified_context)); + applyFilters(plan); } else if (processed_stage > storage_stage || (allow_experimental_analyzer && processed_stage != QueryProcessingStage::FetchColumns)) { @@ -705,15 +832,14 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources( modified_context->setSetting("max_threads", streams_num); modified_context->setSetting("max_streams_to_max_threads_ratio", 1); - QueryPlan & plan = child_plans.emplace_back(); - if (allow_experimental_analyzer) { InterpreterSelectQueryAnalyzer interpreter(modified_query_info.query_tree, modified_context, SelectQueryOptions(processed_stage).ignoreProjections()); - builder = std::make_unique(interpreter.buildQueryPipeline()); - plan = std::move(interpreter.getPlanner()).extractQueryPlan(); + auto & planner = interpreter.getPlanner(); + planner.buildQueryPlanIfNeeded(); + plan = std::move(planner).extractQueryPlan(); } else { @@ -722,71 +848,11 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources( InterpreterSelectQuery interpreter{modified_query_info.query, modified_context, SelectQueryOptions(processed_stage).ignoreProjections()}; - builder = std::make_unique(interpreter.buildQueryPipeline(plan)); - } - - /** Materialization is needed, since from distributed storage the constants come materialized. - * If you do not do this, different types (Const and non-Const) columns will be produced in different threads, - * And this is not allowed, since all code is based on the assumption that in the block stream all types are the same. - */ - builder->addSimpleTransform([](const Block & stream_header) { return std::make_shared(stream_header); }); - } - - if (builder->initialized()) - { - if (concat_streams && builder->getNumStreams() > 1) - { - // It's possible to have many tables read from merge, resize(1) might open too many files at the same time. - // Using concat instead. - builder->addTransform(std::make_shared(builder->getHeader(), builder->getNumStreams())); - } - - /// Add virtual columns if we don't already have them. - - Block pipe_header = builder->getHeader(); - - if (has_database_virtual_column && !pipe_header.has("_database")) - { - ColumnWithTypeAndName column; - column.name = "_database"; - column.type = std::make_shared(std::make_shared()); - column.column = column.type->createColumnConst(0, Field(database_name)); - - auto adding_column_dag = ActionsDAG::makeAddingColumnActions(std::move(column)); - auto adding_column_actions = std::make_shared( - std::move(adding_column_dag), - ExpressionActionsSettings::fromContext(modified_context, CompileExpressions::yes)); - - builder->addSimpleTransform([&](const Block & stream_header) - { - return std::make_shared(stream_header, adding_column_actions); - }); - } - - if (has_table_virtual_column && !pipe_header.has("_table")) - { - ColumnWithTypeAndName column; - column.name = "_table"; - column.type = std::make_shared(std::make_shared()); - column.column = column.type->createColumnConst(0, Field(table_name)); - - auto adding_column_dag = ActionsDAG::makeAddingColumnActions(std::move(column)); - auto adding_column_actions = std::make_shared( - std::move(adding_column_dag), - ExpressionActionsSettings::fromContext(modified_context, CompileExpressions::yes)); - - builder->addSimpleTransform([&](const Block & stream_header) - { - return std::make_shared(stream_header, adding_column_actions); - }); + interpreter.buildQueryPlan(plan); } - - /// Subordinary tables could have different but convertible types, like numeric types of different width. - /// We must return streams with structure equals to structure of Merge table. - convertingSourceStream(header, storage_snapshot->metadata, aliases, modified_context, *builder, processed_stage); } - return builder; + return plan; } StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables( @@ -1014,10 +1080,47 @@ bool ReadFromMerge::requestReadingInOrder(InputOrderInfoPtr order_info_) if (order_info_->direction != 1 && InterpreterSelectQuery::isQueryWithFinal(query_info)) return false; + auto request_read_in_order = [order_info_](ReadFromMergeTree & read_from_merge_tree) + { + return read_from_merge_tree.requestReadingInOrder( + order_info_->used_prefix_of_sorting_key_size, order_info_->direction, order_info_->limit); + }; + + bool ok = true; + for (const auto & plan : child_plans) + if (plan.isInitialized()) + ok &= recursivelyApplyToReadingSteps(plan.getRootNode(), request_read_in_order); + + if (!ok) + return false; + order_info = order_info_; + query_info.input_order_info = order_info; return true; } +void ReadFromMerge::applyFilters(const QueryPlan & plan) const +{ + auto apply_filters = [this](ReadFromMergeTree & read_from_merge_tree) + { + size_t filters_dags_size = filter_dags.size(); + for (size_t i = 0; i < filters_dags_size; ++i) + read_from_merge_tree.addFilter(filter_dags[i], filter_nodes.nodes[i]); + + read_from_merge_tree.applyFilters(); + return true; + }; + + recursivelyApplyToReadingSteps(plan.getRootNode(), apply_filters); +} + +void ReadFromMerge::applyFilters() +{ + for (const auto & plan : child_plans) + if (plan.isInitialized()) + applyFilters(plan); +} + IStorage::ColumnSizeByName StorageMerge::getColumnSizes() const { ColumnSizeByName column_sizes; diff --git a/src/Storages/StorageMerge.h b/src/Storages/StorageMerge.h index babf0dd92e86..80a5fa335f72 100644 --- a/src/Storages/StorageMerge.h +++ b/src/Storages/StorageMerge.h @@ -1,9 +1,10 @@ #pragma once -#include -#include -#include +#include #include +#include +#include +#include namespace DB @@ -146,6 +147,8 @@ class ReadFromMerge final : public SourceStepWithFilter /// Returns `false` if requested reading cannot be performed. bool requestReadingInOrder(InputOrderInfoPtr order_info_); + void applyFilters() override; + private: const size_t required_max_block_size; const size_t requested_num_streams; @@ -177,23 +180,37 @@ class ReadFromMerge final : public SourceStepWithFilter using Aliases = std::vector; - static SelectQueryInfo getModifiedQueryInfo(const SelectQueryInfo & query_info, - const ContextPtr & modified_context, - const StorageWithLockAndName & storage_with_lock_and_name, - const StorageSnapshotPtr & storage_snapshot); + std::vector table_aliases; - QueryPipelineBuilderPtr createSources( + void createChildPlans(); + + void applyFilters(const QueryPlan & plan) const; + + QueryPlan createPlanForTable( const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info, const QueryProcessingStage::Enum & processed_stage, UInt64 max_block_size, + const StorageWithLockAndName & storage_with_lock, + Names real_column_names, + ContextMutablePtr modified_context, + size_t streams_num); + + QueryPipelineBuilderPtr createSources( + QueryPlan & plan, + const StorageSnapshotPtr & storage_snapshot, + SelectQueryInfo & modified_query_info, + const QueryProcessingStage::Enum & processed_stage, const Block & header, const Aliases & aliases, const StorageWithLockAndName & storage_with_lock, - Names real_column_names, ContextMutablePtr modified_context, - size_t streams_num, - bool concat_streams = false); + bool concat_streams = false) const; + + static SelectQueryInfo getModifiedQueryInfo(const SelectQueryInfo & query_info, + const ContextPtr & modified_context, + const StorageWithLockAndName & storage_with_lock_and_name, + const StorageSnapshotPtr & storage_snapshot); static void convertingSourceStream( const Block & header, diff --git a/tests/queries/0_stateless/02875_merge_engine_set_index.reference b/tests/queries/0_stateless/02875_merge_engine_set_index.reference new file mode 100644 index 000000000000..00750edc07d6 --- /dev/null +++ b/tests/queries/0_stateless/02875_merge_engine_set_index.reference @@ -0,0 +1 @@ +3 diff --git a/tests/queries/0_stateless/02875_merge_engine_set_index.sh b/tests/queries/0_stateless/02875_merge_engine_set_index.sh new file mode 100755 index 000000000000..57b5db374c19 --- /dev/null +++ b/tests/queries/0_stateless/02875_merge_engine_set_index.sh @@ -0,0 +1,65 @@ +#!/usr/bin/env bash + +# shellcheck disable=SC2154 + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + + +$CLICKHOUSE_CLIENT -nq " + CREATE TABLE t1 + ( + a UInt32, + b UInt32 + ) + ENGINE = MergeTree + ORDER BY (a, b) + SETTINGS index_granularity = 8192; + + INSERT INTO t1 SELECT number, number FROM numbers_mt(1e6); + + CREATE TABLE t2 + ( + a UInt32, + b UInt32 + ) + ENGINE = MergeTree + ORDER BY (a, b) + SETTINGS index_granularity = 8192; + + INSERT INTO t2 VALUES (1, 1) (2, 2) (3, 3); + + CREATE TABLE t + ( + a UInt32, + b UInt32 + ) + ENGINE = Merge(currentDatabase(), 't*');" + +query_id="${CLICKHOUSE_DATABASE}_merge_engine_set_index_$RANDOM$RANDOM" +$CLICKHOUSE_CLIENT --query_id="$query_id" --multiquery -q " +SELECT + a, + b +FROM t +WHERE (a, b) IN ( + SELECT DISTINCT + a, + b + FROM t2 +) +GROUP BY + a, + b +ORDER BY + a ASC, + b DESC +FORMAT Null;" + +$CLICKHOUSE_CLIENT -nq " +SYSTEM FLUSH LOGS; + +SELECT ProfileEvents['SelectedMarks'] +FROM system.query_log +WHERE event_date >= yesterday() AND current_database = currentDatabase() AND (query_id = '$query_id') AND (type = 'QueryFinish');"