diff --git a/docs/en/engines/table-engines/special/tiered-distributed.md b/docs/en/engines/table-engines/special/tiered-distributed.md new file mode 100644 index 000000000000..95a986ff48de --- /dev/null +++ b/docs/en/engines/table-engines/special/tiered-distributed.md @@ -0,0 +1,106 @@ +--- +description: 'Hybrid unions multiple data sources behind per-layer predicates so queries behave like a single table while data is migrated or tiered.' +slug: /engines/table-engines/special/tiered-distributed +title: 'Hybrid Table Engine' +sidebar_label: 'Hybrid' +sidebar_position: 11 +--- + +# Hybrid table engine + +`Hybrid` builds on top of the [Distributed](./distributed.md) table engine. It lets you expose several data sources as one logical table and assign every source its own predicate. +The engine rewrites incoming queries so that each layer receives the original query plus its predicate. This keeps all of the Distributed optimisations (remote aggregation, `skip_unused_shards`, +global JOIN pushdown, and so on) while you duplicate or migrate data across clusters, storage types, or formats. + +It keeps the same execution pipeline as `engine=Distributed` but can read from multiple underlying sources simultaneously—similar to `engine=Merge`—while still pushing logic down to each source. + +Typical use cases include: + +- Zero-downtime migrations where "old" and "new" replicas temporarily overlap. +- Tiered storage, for example fresh data on a local cluster and historical data in S3. +- Gradual roll-outs where only a subset of rows should be served from a new backend. + +By giving mutually exclusive predicates to the layers (for example, `date < watermark` and `date >= watermark`), you ensure that each row is read from exactly one source. + +## Engine definition + +```sql +CREATE TABLE [IF NOT EXISTS] [db.]table_name +( + column1 type1, + column2 type2, + ... +) +ENGINE = Hybrid(table_function_1, predicate_1 [, table_function_2, predicate_2 ...]) +``` + +You must pass at least two arguments – the first table function and its predicate. Additional sources are appended as `table_function, predicate` pairs. The first table function is also used for `INSERT` statements. + +### Arguments and behaviour + +- `table_function_n` must be a valid table function (for example `remote`, `remoteSecure`, `cluster`, `clusterAllReplicas`, `s3Cluster`) or a fully qualified table name (`database.table`). The first argument must be a table function—such as `remote` or `cluster`—because it instantiates the underlying `Distributed` storage. +- `predicate_n` must be an expression that can be evaluated on the table columns. The engine adds it to the layer's query with an additional `AND`, so expressions like `event_date >= '2025-09-01'` or `id BETWEEN 10 AND 15` are typical. +- The query planner picks the same processing stage for every layer as it does for the base `Distributed` plan, so remote aggregation, ORDER BY pushdown, `skip_unused_shards`, and the legacy/analyzer execution modes behave the same way. +- `INSERT` statements are forwarded to the first table function only. If you need multi-destination writes, use explicit `INSERT` statements into the respective sources. +- Align schemas across the layers. ClickHouse builds a common header; if the physical types differ you may need to add casts on one side or in the query, just as you would when reading from heterogeneous replicas. + +## Example: local cluster plus S3 historical tier + +The following commands illustrate a two-layer layout. Hot data stays on a local ClickHouse cluster, while historical rows come from public S3 Parquet files. + +```sql +-- Local MergeTree table that keeps current data +CREATE OR REPLACE TABLE btc_blocks_local +( + `hash` FixedString(64), + `version` Int64, + `mediantime` DateTime64(9), + `nonce` Int64, + `bits` FixedString(8), + `difficulty` Float64, + `chainwork` FixedString(64), + `size` Int64, + `weight` Int64, + `coinbase_param` String, + `number` Int64, + `transaction_count` Int64, + `merkle_root` FixedString(64), + `stripped_size` Int64, + `timestamp` DateTime64(9), + `date` Date +) +ENGINE = MergeTree +ORDER BY (timestamp) +PARTITION BY toYYYYMM(date); + +-- Hybrid table that unions the local shard with historical data in S3 +CREATE OR REPLACE TABLE btc_blocks ENGINE = Hybrid( + remote('localhost:9000', currentDatabase(), 'btc_blocks_local'), date >= '2025-09-01', + s3('s3://aws-public-blockchain/v1.0/btc/blocks/**.parquet', NOSIGN), date < '2025-09-01' +) AS btc_blocks_local; + +-- Writes target the first (remote) layer +INSERT INTO btc_blocks +SELECT * +FROM s3('s3://aws-public-blockchain/v1.0/btc/blocks/**.parquet', NOSIGN) +WHERE date BETWEEN '2025-09-01' AND '2025-09-30'; + +-- Reads seamlessly combine both predicates +SELECT * FROM btc_blocks WHERE date = '2025-08-01'; -- data from s3 +SELECT * FROM btc_blocks WHERE date = '2025-09-05'; -- data from MergeTree (TODO: still analyzes s3) +SELECT * FROM btc_blocks WHERE date IN ('2025-08-31','2025-09-01') -- data from both sources, single copy always + + +-- Run analytic queries as usual +SELECT + date, + count(), + uniqExact(CAST(hash, 'Nullable(String)')) AS hashes, + sum(CAST(number, 'Nullable(Int64)')) AS blocks_seen +FROM btc_blocks +WHERE date BETWEEN '2025-08-01' AND '2025-09-30' +GROUP BY date +ORDER BY date; +``` + +Because the predicates are applied inside every layer, queries such as `ORDER BY`, `GROUP BY`, `LIMIT`, `JOIN`, and `EXPLAIN` behave as if you were reading from a single `Distributed` table. When sources expose different physical types (for example `FixedString(64)` versus `String` in Parquet), add explicit casts during ingestion or in the query, as shown above. diff --git a/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp b/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp index f57fae899df5..8259d57fa7f0 100644 --- a/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp +++ b/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp @@ -67,7 +67,8 @@ ASTPtr rewriteSelectQuery( const ASTPtr & query, const std::string & remote_database, const std::string & remote_table, - ASTPtr table_function_ptr) + ASTPtr table_function_ptr, + ASTPtr additional_filter) { auto modified_query_ast = query->clone(); @@ -80,8 +81,33 @@ ASTPtr rewriteSelectQuery( if (!context->getSettingsRef()[Setting::allow_experimental_analyzer]) { + // Apply additional filter if provided + if (additional_filter) + { + if (select_query.where()) + { + /// WHERE AND + select_query.setExpression( + ASTSelectQuery::Expression::WHERE, + makeASTFunction("and", select_query.where(), additional_filter->clone())); + } + else + { + /// No WHERE – simply set it + select_query.setExpression( + ASTSelectQuery::Expression::WHERE, additional_filter->clone()); + } + } + if (table_function_ptr) - select_query.addTableFunction(table_function_ptr); + { + select_query.addTableFunction(table_function_ptr->clone()); + + // Reset semantic table information for all column identifiers to prevent + // RestoreQualifiedNamesVisitor from adding wrong table names + ResetSemanticTableVisitor::Data data; + ResetSemanticTableVisitor(data).visit(modified_query_ast); + } else select_query.replaceDatabaseAndTable(remote_database, remote_table); @@ -93,6 +119,7 @@ ASTPtr rewriteSelectQuery( data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query->as(), 0)); data.remote_table.database = remote_database; data.remote_table.table = remote_table; + RestoreQualifiedNamesVisitor(data).visit(modified_query_ast); } } diff --git a/src/Interpreters/ClusterProxy/SelectStreamFactory.h b/src/Interpreters/ClusterProxy/SelectStreamFactory.h index b61f7a0012af..47646a80e1aa 100644 --- a/src/Interpreters/ClusterProxy/SelectStreamFactory.h +++ b/src/Interpreters/ClusterProxy/SelectStreamFactory.h @@ -41,7 +41,8 @@ ASTPtr rewriteSelectQuery( const ASTPtr & query, const std::string & remote_database, const std::string & remote_table, - ASTPtr table_function_ptr = nullptr); + ASTPtr table_function_ptr = nullptr, + ASTPtr additional_filter = nullptr); using ColumnsDescriptionByShardNum = std::unordered_map; using AdditionalShardFilterGenerator = std::function; diff --git a/src/Interpreters/ClusterProxy/executeQuery.cpp b/src/Interpreters/ClusterProxy/executeQuery.cpp index df9018219431..5ed4c2862d01 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.cpp +++ b/src/Interpreters/ClusterProxy/executeQuery.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -333,7 +334,8 @@ void executeQuery( const std::string & sharding_key_column_name, const DistributedSettings & distributed_settings, AdditionalShardFilterGenerator shard_filter_generator, - bool is_remote_function) + bool is_remote_function, + std::span additional_query_infos) { const Settings & settings = context->getSettingsRef(); @@ -361,6 +363,7 @@ void executeQuery( new_context->increaseDistributedDepth(); const size_t shards = cluster->getShardCount(); + const bool has_additional_query_infos = !additional_query_infos.empty(); if (context->getSettingsRef()[Setting::allow_experimental_analyzer]) { @@ -470,6 +473,29 @@ void executeQuery( plans.emplace_back(std::move(plan)); } + if (has_additional_query_infos) + { + if (!header) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Header is not initialized for local hybrid plan creation"); + + const Block & header_block = *header; + for (const auto & additional_query_info : additional_query_infos) + { + auto additional_plan = createLocalPlan( + additional_query_info.query, + header_block, + context, + processed_stage, + 0, /// shard_num is not applicable for local hybrid plans + 1, /// shard_count is not applicable for local hybrid plans + false, + false, + ""); + + plans.emplace_back(std::move(additional_plan)); + } + } + if (plans.empty()) return; @@ -485,6 +511,8 @@ void executeQuery( input_headers.emplace_back(plan->getCurrentHeader()); auto union_step = std::make_unique(std::move(input_headers)); + if (has_additional_query_infos) + union_step->setStepDescription("Hybrid"); query_plan.unitePlans(std::move(union_step), std::move(plans)); } diff --git a/src/Interpreters/ClusterProxy/executeQuery.h b/src/Interpreters/ClusterProxy/executeQuery.h index 0142e57a9120..8f1b8be42182 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.h +++ b/src/Interpreters/ClusterProxy/executeQuery.h @@ -5,6 +5,7 @@ #include #include +#include namespace DB { @@ -88,7 +89,8 @@ void executeQuery( const std::string & sharding_key_column_name, const DistributedSettings & distributed_settings, AdditionalShardFilterGenerator shard_filter_generator, - bool is_remote_function); + bool is_remote_function, + std::span additional_query_infos = {}); std::optional executeInsertSelectWithParallelReplicas( const ASTInsertQuery & query_ast, diff --git a/src/Interpreters/TranslateQualifiedNamesVisitor.cpp b/src/Interpreters/TranslateQualifiedNamesVisitor.cpp index 7ef39f82cc29..0eb3c4fcefc3 100644 --- a/src/Interpreters/TranslateQualifiedNamesVisitor.cpp +++ b/src/Interpreters/TranslateQualifiedNamesVisitor.cpp @@ -399,4 +399,15 @@ void RestoreQualifiedNamesMatcher::visit(ASTIdentifier & identifier, ASTPtr &, D } } +void ResetSemanticTableMatcher::visit(ASTPtr & ast, Data & data) +{ + if (auto * t = ast->as()) + visit(*t, ast, data); +} + +void ResetSemanticTableMatcher::visit(ASTIdentifier & identifier, ASTPtr &, Data &) +{ + identifier.resetSemanticTable(); +} + } diff --git a/src/Interpreters/TranslateQualifiedNamesVisitor.h b/src/Interpreters/TranslateQualifiedNamesVisitor.h index 00c85d08873f..becff4845755 100644 --- a/src/Interpreters/TranslateQualifiedNamesVisitor.h +++ b/src/Interpreters/TranslateQualifiedNamesVisitor.h @@ -80,4 +80,33 @@ struct RestoreQualifiedNamesMatcher using RestoreQualifiedNamesVisitor = InDepthNodeVisitor; + +/// Reset semantic->table for all column identifiers in the AST. +/// +/// PROBLEM DESCRIPTION: +/// When an AST is passed through multiple query rewrites (e.g., in Hybrid -> remote), +/// the semantic->table information attached to ASTIdentifier nodes can become stale and +/// cause incorrect column qualification. This happens because: +/// +/// 1. During initial parsing, semantic->table is populated with the original table name +/// 2. When the query is rewritten (e.g., FROM clause changed from table to remote() function inside Hybrid), +/// the AST structure is modified but semantic->table information is preserved +/// 3. Subsequent visitors like RestoreQualifiedNamesVisitor called in remote() function over the same AST +/// may use this stale semantic->table information to incorrectly qualify column names with the original table name +/// +/// SOLUTION: +/// This visitor clears semantic->table for all column identifiers, ensuring that subsequent +/// visitors work with clean semantic information and don't apply stale table qualifications. +struct ResetSemanticTableMatcher +{ + // No data needed for this visitor + struct Data {}; + + static bool needChildVisit(const ASTPtr &, const ASTPtr &) { return true; } + static void visit(ASTPtr & ast, Data & data); + static void visit(ASTIdentifier & identifier, ASTPtr &, Data & data); +}; + +using ResetSemanticTableVisitor = InDepthNodeVisitor; + } diff --git a/src/Parsers/ASTIdentifier.cpp b/src/Parsers/ASTIdentifier.cpp index 490116cfc6cf..0767516aa74a 100644 --- a/src/Parsers/ASTIdentifier.cpp +++ b/src/Parsers/ASTIdentifier.cpp @@ -167,6 +167,17 @@ void ASTIdentifier::restoreTable() } } +void ASTIdentifier::resetSemanticTable() +{ + // Only reset semantic table for column identifiers (not table identifiers) + if (semantic && !semantic->special) + { + semantic->table.clear(); + semantic->can_be_alias = true; + semantic->membership = std::nullopt; + } +} + std::shared_ptr ASTIdentifier::createTable() const { if (name_parts.size() == 1) return std::make_shared(name_parts[0]); diff --git a/src/Parsers/ASTIdentifier.h b/src/Parsers/ASTIdentifier.h index 72dde7f644fb..3ea66264ca24 100644 --- a/src/Parsers/ASTIdentifier.h +++ b/src/Parsers/ASTIdentifier.h @@ -52,6 +52,7 @@ class ASTIdentifier : public ASTWithAlias void updateTreeHashImpl(SipHash & hash_state, bool ignore_alias) const override; void restoreTable(); // TODO(ilezhankin): get rid of this + void resetSemanticTable(); // Reset semantic to empty string (see ResetSemanticTableVisitor) std::shared_ptr createTable() const; // returns |nullptr| if identifier is not table. String full_name; diff --git a/src/Planner/PlannerActionsVisitor.cpp b/src/Planner/PlannerActionsVisitor.cpp index 50e54a113967..f52b63536f48 100644 --- a/src/Planner/PlannerActionsVisitor.cpp +++ b/src/Planner/PlannerActionsVisitor.cpp @@ -348,7 +348,7 @@ class ActionNodeNameHelper } default: { - throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid action query tree node {}", node->formatASTForErrorMessage()); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid action query tree node {} (node_type: {})", node->formatASTForErrorMessage(), static_cast(node_type)); } } diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp index 4a1a09969d99..f524d946a8af 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp @@ -141,9 +141,9 @@ void DataFileMetaInfo::serialize(WriteBuffer & out) const size_t field_mask = 0; if (column.second.rows_count.has_value()) field_mask |= FIELD_MASK_ROWS; - if (column.second.rows_count.has_value()) + if (column.second.nulls_count.has_value()) field_mask |= FIELD_MASK_NULLS; - if (column.second.rows_count.has_value()) + if (column.second.hyperrectangle.has_value()) field_mask |= FIELD_MASK_RECT; writeIntBinary(field_mask, out); diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h index 0bfdb758ee22..97098811c015 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h @@ -1,6 +1,23 @@ #pragma once #include "config.h" +#include +#include + +#include + +namespace DB::Iceberg +{ + +struct ColumnInfo +{ + std::optional rows_count; + std::optional bytes_size; + std::optional nulls_count; + std::optional hyperrectangle; +}; + +} #if USE_AVRO @@ -38,14 +55,6 @@ enum class ManifestFileContentType String FileContentTypeToString(FileContentType type); -struct ColumnInfo -{ - std::optional rows_count; - std::optional bytes_size; - std::optional nulls_count; - std::optional hyperrectangle; -}; - struct PartitionSpecsEntry { Int32 source_id; diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index f5b86d8f783f..a3ffad98a47b 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -41,6 +41,7 @@ #include #include #include +#include #include #include #include @@ -87,6 +88,7 @@ #include #include +#include #include #include @@ -148,6 +150,27 @@ namespace CurrentMetrics namespace DB { +namespace +{ +void replaceCurrentDatabaseFunction(ASTPtr & ast, const ContextPtr & context) +{ + if (!ast) + return; + + if (auto * func = ast->as()) + { + if (func->name == "currentDatabase") + { + ast = evaluateConstantExpressionForDatabaseName(ast, context); + return; + } + } + + for (auto & child : ast->children) + replaceCurrentDatabaseFunction(child, context); +} +} + namespace Setting { extern const SettingsBool allow_experimental_analyzer; @@ -198,6 +221,8 @@ namespace ErrorCodes extern const int NOT_IMPLEMENTED; extern const int STORAGE_REQUIRES_PARAMETER; extern const int BAD_ARGUMENTS; + extern const int UNKNOWN_DATABASE; + extern const int UNKNOWN_TABLE; extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int INCORRECT_NUMBER_OF_COLUMNS; extern const int INFINITE_LOOP; @@ -522,6 +547,10 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage( if (to_stage == QueryProcessingStage::WithMergeableState) return QueryProcessingStage::WithMergeableState; + // TODO: check logic + if (!additional_table_functions.empty()) + nodes += additional_table_functions.size(); + /// If there is only one node, the query can be fully processed by the /// shard, initiator will work as a proxy only. if (nodes == 1) @@ -564,6 +593,9 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage( bool StorageDistributed::isShardingKeySuitsQueryTreeNodeExpression( const QueryTreeNodePtr & expr, const SelectQueryInfo & query_info) const { + if (!additional_table_functions.empty()) + return false; + ColumnsWithTypeAndName empty_input_columns; ColumnNodePtrWithHashSet empty_correlated_columns_set; // When comparing sharding key expressions, we need to ignore table qualifiers in column names @@ -604,6 +636,7 @@ bool StorageDistributed::isShardingKeySuitsQueryTreeNodeExpression( return allOutputsDependsOnlyOnAllowedNodes(sharding_key_dag, irreducibe_nodes, matches); } +// TODO: support additional table functions std::optional StorageDistributed::getOptimizedQueryProcessingStageAnalyzer(const SelectQueryInfo & query_info, const Settings & settings) const { bool optimize_sharding_key_aggregation = settings[Setting::optimize_skip_unused_shards] && settings[Setting::optimize_distributed_group_by_sharding_key] @@ -662,6 +695,7 @@ std::optional StorageDistributed::getOptimizedQueryP return QueryProcessingStage::Complete; } +// TODO: support additional table functions std::optional StorageDistributed::getOptimizedQueryProcessingStage(const SelectQueryInfo & query_info, const Settings & settings) const { bool optimize_sharding_key_aggregation = settings[Setting::optimize_skip_unused_shards] && settings[Setting::optimize_distributed_group_by_sharding_key] @@ -771,9 +805,11 @@ static bool requiresObjectColumns(const ColumnsDescription & all_columns, ASTPtr StorageSnapshotPtr StorageDistributed::getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot, ContextPtr query_context) const { + /// TODO: support additional table functions return getStorageSnapshotForQuery(metadata_snapshot, nullptr, query_context); } +/// TODO: support additional table functions StorageSnapshotPtr StorageDistributed::getStorageSnapshotForQuery( const StorageMetadataPtr & metadata_snapshot, const ASTPtr & query, ContextPtr /*query_context*/) const { @@ -909,7 +945,8 @@ bool rewriteJoinToGlobalJoinIfNeeded(QueryTreeNodePtr join_tree) QueryTreeNodePtr buildQueryTreeDistributed(SelectQueryInfo & query_info, const StorageSnapshotPtr & distributed_storage_snapshot, const StorageID & remote_storage_id, - const ASTPtr & remote_table_function) + const ASTPtr & remote_table_function, + const ASTPtr & additional_filter = nullptr) { auto & planner_context = query_info.planner_context; const auto & query_context = planner_context->getQueryContext(); @@ -976,7 +1013,29 @@ QueryTreeNodePtr buildQueryTreeDistributed(SelectQueryInfo & query_info, replacement_table_expression->setAlias(query_info.table_expression->getAlias()); + + QueryTreeNodePtr filter; + + if (additional_filter) + { + const auto & context = query_info.planner_context->getQueryContext(); + + filter = buildQueryTree(additional_filter->clone(), query_context); + + QueryAnalysisPass(replacement_table_expression).run(filter, context); + } + auto query_tree_to_modify = query_info.query_tree->cloneAndReplace(query_info.table_expression, std::move(replacement_table_expression)); + + // Apply additional filter if provided + if (filter) + { + auto & query = query_tree_to_modify->as(); + query.getWhere() = query.hasWhere() + ? mergeConditionNodes({query.getWhere(), filter}, query_context) + : std::move(filter); + } + ReplaseAliasColumnsVisitor replase_alias_columns_visitor; replase_alias_columns_visitor.visit(query_tree_to_modify); @@ -995,6 +1054,7 @@ QueryTreeNodePtr buildQueryTreeDistributed(SelectQueryInfo & query_info, } return buildQueryTreeForShard(query_info.planner_context, query_tree_to_modify, /*allow_global_join_for_right_table*/ false); + } } @@ -1013,30 +1073,56 @@ void StorageDistributed::read( SelectQueryInfo modified_query_info = query_info; + std::vector additional_query_infos; + const auto & settings = local_context->getSettingsRef(); if (settings[Setting::allow_experimental_analyzer]) { - StorageID remote_storage_id = StorageID{remote_database, remote_table}; + StorageID remote_storage_id = StorageID::createEmpty(); + if (!remote_table_function_ptr) + remote_storage_id = StorageID{remote_database, remote_table}; auto query_tree_distributed = buildQueryTreeDistributed(modified_query_info, query_info.initial_storage_snapshot ? query_info.initial_storage_snapshot : storage_snapshot, remote_storage_id, - remote_table_function_ptr); + remote_table_function_ptr, + additional_filter); Block block = *InterpreterSelectQueryAnalyzer::getSampleBlock(query_tree_distributed, local_context, SelectQueryOptions(processed_stage).analyze()); /** For distributed tables we do not need constants in header, since we don't send them to remote servers. * Moreover, constants can break some functions like `hostName` that are constants only for local queries. */ for (auto & column : block) column.column = column.column->convertToFullColumnIfConst(); + header = std::make_shared(std::move(block)); modified_query_info.query = queryNodeToDistributedSelectQuery(query_tree_distributed); modified_query_info.query_tree = std::move(query_tree_distributed); - /// Return directly (with correct header) if no shard to query. - if (modified_query_info.getCluster()->getShardsInfo().empty()) + if (!additional_table_functions.empty()) + { + for (const auto & table_function_entry : additional_table_functions) + { + // Create a modified query info with the additional predicate + SelectQueryInfo additional_query_info = query_info; + + auto additional_query_tree = buildQueryTreeDistributed(additional_query_info, + query_info.initial_storage_snapshot ? query_info.initial_storage_snapshot : storage_snapshot, + table_function_entry.storage_id ? *table_function_entry.storage_id : StorageID::createEmpty(), + table_function_entry.storage_id ? nullptr : table_function_entry.table_function_ast, + table_function_entry.predicate_ast); + + additional_query_info.query = queryNodeToDistributedSelectQuery(additional_query_tree); + additional_query_info.query_tree = std::move(additional_query_tree); + + additional_query_infos.push_back(std::move(additional_query_info)); + } + } + + // For empty shards - avoid early return if we have additional table functions + if (modified_query_info.getCluster()->getShardsInfo().empty() && additional_table_functions.empty()) return; } else @@ -1045,9 +1131,37 @@ void StorageDistributed::read( modified_query_info.query = ClusterProxy::rewriteSelectQuery( local_context, modified_query_info.query, - remote_database, remote_table, remote_table_function_ptr); + remote_database, remote_table, remote_table_function_ptr, + additional_filter); + + if (!additional_table_functions.empty()) + { + for (const auto & table_function_entry : additional_table_functions) + { + SelectQueryInfo additional_query_info = query_info; + + if (table_function_entry.storage_id) + { + additional_query_info.query = ClusterProxy::rewriteSelectQuery( + local_context, additional_query_info.query, + table_function_entry.storage_id->database_name, table_function_entry.storage_id->table_name, + nullptr, + table_function_entry.predicate_ast); + } + else + { + additional_query_info.query = ClusterProxy::rewriteSelectQuery( + local_context, additional_query_info.query, + "", "", table_function_entry.table_function_ast, + table_function_entry.predicate_ast); + } + + additional_query_infos.push_back(std::move(additional_query_info)); + } + } - if (modified_query_info.getCluster()->getShardsInfo().empty()) + // For empty shards - avoid early return if we have additional table functions + if (modified_query_info.getCluster()->getShardsInfo().empty() && additional_table_functions.empty()) { Pipe pipe(std::make_shared(header)); auto read_from_pipe = std::make_unique(std::move(pipe)); @@ -1059,35 +1173,40 @@ void StorageDistributed::read( } const auto & snapshot_data = assert_cast(*storage_snapshot->data); - ClusterProxy::SelectStreamFactory select_stream_factory = - ClusterProxy::SelectStreamFactory( + + if (!modified_query_info.getCluster()->getShardsInfo().empty() || !additional_query_infos.empty()) + { + ClusterProxy::SelectStreamFactory select_stream_factory = + ClusterProxy::SelectStreamFactory( + header, + snapshot_data.objects_by_shard, + storage_snapshot, + processed_stage); + + auto shard_filter_generator = ClusterProxy::getShardFilterGeneratorForCustomKey( + *modified_query_info.getCluster(), local_context, getInMemoryMetadataPtr()->columns); + + ClusterProxy::executeQuery( + query_plan, header, - snapshot_data.objects_by_shard, - storage_snapshot, - processed_stage); - - auto shard_filter_generator = ClusterProxy::getShardFilterGeneratorForCustomKey( - *modified_query_info.getCluster(), local_context, getInMemoryMetadataPtr()->columns); - - ClusterProxy::executeQuery( - query_plan, - header, - processed_stage, - remote_storage, - remote_table_function_ptr, - select_stream_factory, - log, - local_context, - modified_query_info, - sharding_key_expr, - sharding_key_column_name, - *distributed_settings, - shard_filter_generator, - is_remote_function); - - /// This is a bug, it is possible only when there is no shards to query, and this is handled earlier. - if (!query_plan.isInitialized()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline is not initialized"); + processed_stage, + remote_storage, + remote_table_function_ptr, + select_stream_factory, + log, + local_context, + modified_query_info, + sharding_key_expr, + sharding_key_column_name, + *distributed_settings, + shard_filter_generator, + is_remote_function, + additional_query_infos); + + /// This is a bug, it is possible only when there is no shards to query, and this is handled earlier. + if (!query_plan.isInitialized()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline is not initialized"); + } } @@ -2071,6 +2190,17 @@ void StorageDistributed::delayInsertOrThrowIfNeeded() const } } +void StorageDistributed::setHybridLayout(std::vector additional_table_functions_) +{ + additional_table_functions = std::move(additional_table_functions_); + log = getLogger("Hybrid (" + getStorageID().table_name + ")"); + + auto virtuals = createVirtuals(); + // or _layer_index? + virtuals.addEphemeral("_table_index", std::make_shared(), "Index of the table function in Hybrid (0 for main table, 1+ for additional table functions)"); + setVirtuals(virtuals); +} + void registerStorageDistributed(StorageFactory & factory) { factory.registerStorage("Distributed", [](const StorageFactory::Arguments & args) @@ -2175,6 +2305,215 @@ void registerStorageDistributed(StorageFactory & factory) }); } +void registerStorageHybrid(StorageFactory & factory) +{ + // Register Hybrid engine + // TODO: consider moving it to a separate file / subclass of StorageDistributed + factory.registerStorage("Hybrid", [](const StorageFactory::Arguments & args) -> StoragePtr + { + ASTs & engine_args = args.engine_args; + + if (engine_args.size() < 2) + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Storage Hybrid requires at least 2 arguments, got {}", engine_args.size()); + + const ContextPtr & global_context = args.getContext(); + ContextPtr local_context = args.getLocalContext(); + if (!local_context) + local_context = global_context; + + // Validate first argument - must be a table function + ASTPtr first_arg = engine_args[0]; + if (const auto * func = first_arg->as()) + { + // Check if it's a valid table function name + if (!TableFunctionFactory::instance().isTableFunctionName(func->name)) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "First argument must be a table function, got: {}", func->name); + + // Check if it's one of the supported remote table functions + if (func->name != "remote" && func->name != "remoteSecure" && + func->name != "cluster" && func->name != "clusterAllReplicas") + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "First argument must be one of: remote, remoteSecure, cluster, clusterAllReplicas, got: {}", func->name); + } + else + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "First argument must be a table function, got: {}", first_arg->getID()); + } + + // Now handle the first table function (which must be a TableFunctionRemote) + auto table_function = TableFunctionFactory::instance().get(first_arg, local_context); + if (!table_function) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid table function in Hybrid engine"); + + // For schema inference, we need to determine the columns first if they're not provided + ColumnsDescription columns_to_use = args.columns; + if (columns_to_use.empty()) + { + // Get the column structure from the table function + columns_to_use = table_function->getActualTableStructure(local_context, true); + } + + // Execute the table function to get the underlying storage + StoragePtr storage = table_function->execute( + first_arg, + local_context, + args.table_id.table_name, + columns_to_use, + false, // use_global_context = false + false); // is_insert_query = false + + // table function execution wraps the actual storage in a StorageTableFunctionProxy, to make initialize it lazily in queries + // here we need to get the nested storage + if (auto proxy = std::dynamic_pointer_cast(storage)) + { + storage = proxy->getNested(); + } + + // Cast to StorageDistributed to access its methods + auto distributed_storage = std::dynamic_pointer_cast(storage); + if (!distributed_storage) + { + // Debug: Print the actual type we got + std::string actual_type = storage ? storage->getName() : "nullptr"; + throw Exception(ErrorCodes::LOGICAL_ERROR, + "TableFunctionRemote did not return a StorageDistributed or StorageProxy, got: {}", actual_type); + } + + const auto physical_columns = columns_to_use.getAllPhysical(); + + auto validate_predicate = [&](ASTPtr & predicate, size_t argument_index) + { + try + { + auto syntax_result = TreeRewriter(local_context).analyze(predicate, physical_columns); + ExpressionAnalyzer(predicate, syntax_result, local_context).getActions(true); + } + catch (const Exception & e) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Argument #{} must be a valid SQL expression: {}", argument_index, e.message()); + } + }; + + ASTPtr second_arg = engine_args[1]; + validate_predicate(second_arg, 1); + distributed_storage->setAdditionalFilter(second_arg); + + // Parse additional table function pairs (if any) + std::vector additional_table_functions; + for (size_t i = 2; i < engine_args.size(); i += 2) + { + if (i + 1 >= engine_args.size()) + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Table function pairs must have both table function and predicate, got odd number of arguments"); + + ASTPtr table_function_ast = engine_args[i]; + ASTPtr predicate_ast = engine_args[i + 1]; + + validate_predicate(predicate_ast, i + 1); + + // Validate table function or table identifier + if (const auto * func = table_function_ast->as()) + { + // It's a table function - validate it + if (!TableFunctionFactory::instance().isTableFunctionName(func->name)) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Argument #{}: additional table function must be a valid table function, got: {}", i, func->name); + } + + // Normalize arguments (evaluate `currentDatabase()`, expand named collections, etc.). + // TableFunctionFactory::get mutates the AST in-place inside TableFunctionRemote::parseArguments. + ASTPtr normalized_table_function_ast = table_function_ast->clone(); + auto additional_table_function = TableFunctionFactory::instance().get(normalized_table_function_ast, local_context); + (void)additional_table_function; + replaceCurrentDatabaseFunction(normalized_table_function_ast, local_context); + + // It's a table function - store the AST for later execution + additional_table_functions.emplace_back(normalized_table_function_ast, predicate_ast); + } + else if (const auto * ast_identifier = table_function_ast->as()) + { + // It's an identifier - try to convert it to a table identifier + auto table_identifier = ast_identifier->createTable(); + if (!table_identifier) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Argument #{}: identifier '{}' cannot be converted to table identifier", i, ast_identifier->name()); + } + + try + { + // Parse table identifier to get StorageID + StorageID storage_id(table_identifier); + + // Sanity check: ensure the table identifier is fully qualified (has database name) + if (storage_id.database_name.empty()) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Argument #{}: table identifier '{}' must be fully qualified (database.table)", i, ast_identifier->name()); + } + + // Sanity check: verify the table exists + try + { + auto database = DatabaseCatalog::instance().getDatabase(storage_id.database_name, local_context); + if (!database) + { + throw Exception(ErrorCodes::UNKNOWN_DATABASE, + "Database '{}' does not exist", storage_id.database_name); + } + + auto table = database->tryGetTable(storage_id.table_name, local_context); + if (!table) + { + throw Exception(ErrorCodes::UNKNOWN_TABLE, + "Table '{}.{}' does not exist", storage_id.database_name, storage_id.table_name); + } + } + catch (const Exception & e) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Argument #{}: table '{}' validation failed: {}", i, ast_identifier->name(), e.message()); + } + + additional_table_functions.emplace_back(table_function_ast, predicate_ast, storage_id); + } + catch (const Exception & e) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Argument #{}: invalid table identifier '{}': {}", i, ast_identifier->name(), e.message()); + } + } + else + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Argument #{}: additional argument must be either a table function or table identifier, got: {}", i, table_function_ast->getID()); + } + + } + + // Fix the database and table names - this is the same pattern used in InterpreterCreateQuery + // The TableFunctionRemote creates a StorageDistributed with "_table_function" database, + // but we need to rename it to the correct database and table names + distributed_storage->renameInMemory({args.table_id.database_name, args.table_id.table_name, args.table_id.uuid}); + + // Store additional table functions for later use + distributed_storage->setHybridLayout(std::move(additional_table_functions)); + + return distributed_storage; + }, + { + .supports_settings = false, + .supports_parallel_insert = true, + .supports_schema_inference = true, + .source_access_type = AccessTypeObjects::Source::REMOTE, + }); +} + bool StorageDistributed::initializeDiskOnConfigChange(const std::set & new_added_disks) { if (!storage_policy || !data_volume) diff --git a/src/Storages/StorageDistributed.h b/src/Storages/StorageDistributed.h index 75354edb7ffc..273a1c233c81 100644 --- a/src/Storages/StorageDistributed.h +++ b/src/Storages/StorageDistributed.h @@ -50,6 +50,25 @@ class StorageDistributed final : public IStorage, WithContext friend class StorageSystemDistributionQueue; public: + /// Structure to hold table function AST, predicate, and optional StorageID for table identifiers + struct TableFunctionEntry + { + ASTPtr table_function_ast; + ASTPtr predicate_ast; + std::optional storage_id; // For table identifiers instead of table functions + + TableFunctionEntry(ASTPtr table_function_ast_, ASTPtr predicate_ast_) + : table_function_ast(std::move(table_function_ast_)) + , predicate_ast(std::move(predicate_ast_)) + {} + + TableFunctionEntry(ASTPtr table_function_ast_, ASTPtr predicate_ast_, StorageID storage_id_) + : table_function_ast(std::move(table_function_ast_)) + , predicate_ast(std::move(predicate_ast_)) + , storage_id(std::move(storage_id_)) + {} + }; + StorageDistributed( const StorageID & id_, const ColumnsDescription & columns_, @@ -70,7 +89,12 @@ class StorageDistributed final : public IStorage, WithContext ~StorageDistributed() override; - std::string getName() const override { return "Distributed"; } + std::string getName() const override + { + return (additional_table_functions.empty() && !additional_filter) + ? "Distributed" + : "Hybrid"; + } bool supportsSampling() const override { return true; } bool supportsFinal() const override { return true; } @@ -149,6 +173,18 @@ class StorageDistributed final : public IStorage, WithContext size_t getShardCount() const; + /// Set additional filter for Hybrid engine + void setAdditionalFilter(ASTPtr filter) { additional_filter = std::move(filter); } + + /// Set additional table functions for Hybrid engine + void setHybridLayout(std::vector additional_table_functions_); + + /// Getter methods for ClusterProxy::executeQuery + StorageID getRemoteStorageID() const { return remote_storage; } + ExpressionActionsPtr getShardingKeyExpression() const { return sharding_key_expr; } + const DistributedSettings * getDistributedSettings() const { return distributed_settings.get(); } + bool isRemoteFunction() const { return is_remote_function; } + bool initializeDiskOnConfigChange(const std::set & new_added_disks) override; private: @@ -283,6 +319,12 @@ class StorageDistributed final : public IStorage, WithContext pcg64 rng; bool is_remote_function; + + /// Additional filter expression for Hybrid engine + ASTPtr additional_filter; + + /// Additional table functions for Hybrid engine + std::vector additional_table_functions; }; } diff --git a/src/Storages/registerStorages.cpp b/src/Storages/registerStorages.cpp index 3d050fb9633b..74cb706cccec 100644 --- a/src/Storages/registerStorages.cpp +++ b/src/Storages/registerStorages.cpp @@ -13,6 +13,7 @@ void registerStorageNull(StorageFactory & factory); void registerStorageMerge(StorageFactory & factory); void registerStorageBuffer(StorageFactory & factory); void registerStorageDistributed(StorageFactory & factory); +void registerStorageHybrid(StorageFactory & factory); void registerStorageMemory(StorageFactory & factory); void registerStorageFile(StorageFactory & factory); void registerStorageURL(StorageFactory & factory); @@ -122,6 +123,7 @@ void registerStorages() registerStorageMerge(factory); registerStorageBuffer(factory); registerStorageDistributed(factory); + registerStorageHybrid(factory); registerStorageMemory(factory); registerStorageFile(factory); registerStorageURL(factory); diff --git a/tests/queries/0_stateless/03642_tiered_distributed.reference b/tests/queries/0_stateless/03642_tiered_distributed.reference new file mode 100644 index 000000000000..d4ac38a21257 --- /dev/null +++ b/tests/queries/0_stateless/03642_tiered_distributed.reference @@ -0,0 +1,199 @@ +Check Hybrid engine is registered +Hybrid +Ensure no leftovers before validation checks +Expect error when Hybrid has no arguments +Expect error when Hybrid has a single literal argument +Expect error when Hybrid arguments are literals only +Expect error when first argument is a table function of the wrong subtype (can not construct Distributed from file) +Expect error when first argument is not a table function (scalar expression) +Expect error when first argument is a table function of the wrong subtype (can not construct Distributed from url) +Expect error when predicate references a missing column +Missing column + schema inference +Create Hybrid table with remote() and constant predicate (explicit column list) +CREATE TABLE default.test_tiered_distributed\n(\n `dummy` UInt8\n)\nENGINE = Hybrid(remote(\'localhost:9000\'), 1) +dummy UInt8 +0 +Row 1: +────── +database: default +name: test_tiered_distributed +engine: Hybrid +create_table_query: CREATE TABLE default.test_tiered_distributed (`dummy` UInt8) ENGINE = Hybrid(remote('localhost:9000'), 1) +engine_full: Hybrid(remote('localhost:9000'), 1) +Create Hybrid table with remote table function and predicate (inference) +CREATE TABLE default.test_tiered_distributed_numbers_range\n(\n `number` UInt64\n)\nENGINE = Hybrid(remote(\'localhost:9000\', \'system.numbers\'), number < 5) +0 +1 +2 +3 +4 +Create Hybrid table with two remote layers as table +CREATE TABLE default.test_tiered_distributed_numbers_dual\n(\n `number` UInt64\n)\nENGINE = Hybrid(remote(\'localhost:9000\', \'system.numbers\'), number < 5, remote(\'localhost:9000\', system.numbers), (number >= 10) AND (number <= 15))\nCOMMENT \'Generates all natural numbers, starting from 0 (to 2^64 - 1, and then again) in sorted order.\' +0 +1 +2 +3 +4 +10 +11 +12 +13 +14 +15 +0 +1 +2 +3 +4 +10 +11 +12 +13 +14 +15 +Create Hybrid table combining remote function and local table +0 +1 +2 +3 +4 +10 +11 +12 +13 +14 +15 +Verify Hybrid skips layer with always false predicate on the first layer +10 +11 +12 +13 +14 +15 +Verify Hybrid skips layer with always false predicate on the second layer +0 +1 +2 +Prepare local MergeTree table for multi-layer tests +Populate local table with sample data +Create Hybrid table with three layer pairs +Count rows across all layers +6 +Count rows from layers with id > 4 +1 +Count rows where value > 200 +3 +Count rows named Alice +1 +Select rows ordered by value descending (id > 2) +4 David 300.2 +5 Eve 250.1 +3 Charlie 150.7 +Limit results ordered by id +0 Invalid 2022-01-01 10:00:00 0.5 +1 Alice 2022-01-01 10:00:00 100.5 +2 Bob 2022-01-02 11:00:00 200.3 +Explain plan for filter on value +Union (Hybrid) + ReadFromRemote (Read from remote replica) + ReadFromRemote (Read from remote replica) + ReadFromRemote (Read from remote replica) +Union (Hybrid) + ReadFromRemote (Read from remote replica) + ReadFromRemote (Read from remote replica) + ReadFromRemote (Read from remote replica) +Union (Hybrid) + ReadFromRemote (Read from remote replica) + Expression ((Projection + Before ORDER BY)) + Expression (WHERE) + ReadFromMergeTree (default.test_tiered_local_data) + Expression ((Projection + Before ORDER BY)) + Expression (WHERE) + ReadFromMergeTree (default.test_tiered_local_data) +Union (Hybrid) + ReadFromRemote (Read from remote replica) + Expression ((Project names + Projection)) + Expression ((WHERE + Change column names to column identifiers)) + ReadFromMergeTree (default.test_tiered_local_data) + Expression ((Project names + Projection)) + Expression ((WHERE + Change column names to column identifiers)) + ReadFromMergeTree (default.test_tiered_local_data) +Aggregate values across name when filtering by event_time +David 1 300.2 +Eve 1 250.1 +Bob 1 200.3 +Charlie 1 150.7 +Verify additional_table_filters works consistently (legacy analyser) +2 Bob 200.3 +Verify additional_table_filters works consistently (new analyser) +2 Bob 200.3 +Clean up Hybrid table with three layer pairs +Clean up local helper table +Drop predicate filtering fixtures if they exist +Create local tables representing before/after watermark partitions +Create second local table with different value type +Insert rows before watermark into both tables +Insert rows after watermark into both tables +Create Hybrid table with analyzer disabled during reads +Insert row via Hybrid table (should go to first layer) +Verify that inserted row landed in first table +17 John 2025-09-25 400 +Verify that second table did not receive the inserted row +0 +Read predicate-filtered data with analyzer disabled and no localhost preference +14 David 2025-09-05 400 +15 Eve 2025-09-10 500 +16 Frank 2025-09-15 600 +17 John 2025-09-25 400 +21 Alice 2025-08-15 100 +22 Bob 2025-08-20 200 +23 Charlie 2025-08-25 300 +Read predicate-filtered data with analyzer enabled and no localhost preference +14 David 2025-09-05 400 +15 Eve 2025-09-10 500 +16 Frank 2025-09-15 600 +17 John 2025-09-25 400 +21 Alice 2025-08-15 100 +22 Bob 2025-08-20 200 +23 Charlie 2025-08-25 300 +Read predicate-filtered data with analyzer disabled and prefer localhost replica +14 David 2025-09-05 400 +15 Eve 2025-09-10 500 +16 Frank 2025-09-15 600 +17 John 2025-09-25 400 +21 Alice 2025-08-15 100 +22 Bob 2025-08-20 200 +23 Charlie 2025-08-25 300 +Read predicate-filtered data with analyzer enabled and prefer localhost replica +14 David 2025-09-05 400 +15 Eve 2025-09-10 500 +16 Frank 2025-09-15 600 +17 John 2025-09-25 400 +21 Alice 2025-08-15 100 +22 Bob 2025-08-20 200 +23 Charlie 2025-08-25 300 +Check if the subqueries were recorded in query_log +Row 1: +────── +type: QueryFinish +is_initial_query2: 1 +tbl: ['_table_function.remote','db.test_tiered_watermark'] +qry: SELECT * FROM test_tiered_watermark ORDER BY id DESC SETTINGS enable_analyzer = 1, prefer_localhost_replica = 0, log_queries=1, serialize_query_plan=0, log_comment = 'test_tiered_watermark', max_threads=1 FORMAT Null; +log_comment: test_tiered_watermark + +Row 2: +────── +type: QueryFinish +is_initial_query2: 0 +tbl: ['db.test_tiered_watermark_after'] +qry: SELECT `__table1`.`id` AS `id`, `__table1`.`name` AS `name`, `__table1`.`date` AS `date`, `__table1`.`value` AS `value` FROM `db`.`test_tiered_watermark_after` AS `__table1` WHERE `__table1`.`date` >= '2025-09-01' ORDER BY `__table1`.`id` DESC +log_comment: test_tiered_watermark + +Row 3: +────── +type: QueryFinish +is_initial_query2: 0 +tbl: ['db.test_tiered_watermark_before'] +qry: SELECT `__table1`.`id` AS `id`, `__table1`.`name` AS `name`, `__table1`.`date` AS `date`, `__table1`.`value` AS `value` FROM `db`.`test_tiered_watermark_before` AS `__table1` WHERE `__table1`.`date` < '2025-09-01' ORDER BY `__table1`.`id` DESC +log_comment: test_tiered_watermark +Clean up predicate filtering tables diff --git a/tests/queries/0_stateless/03642_tiered_distributed.sql b/tests/queries/0_stateless/03642_tiered_distributed.sql new file mode 100644 index 000000000000..4fd346bba166 --- /dev/null +++ b/tests/queries/0_stateless/03642_tiered_distributed.sql @@ -0,0 +1,345 @@ +-- Test Hybrid engine registration and basic validation +SELECT 'Check Hybrid engine is registered'; +SELECT name FROM system.table_engines WHERE name = 'Hybrid'; + +SELECT 'Ensure no leftovers before validation checks'; +DROP TABLE IF EXISTS test_tiered_distributed SYNC; +DROP TABLE IF EXISTS test_tiered_distributed_bad_args SYNC; +DROP TABLE IF EXISTS test_tiered_distributed_invalid_first_arg SYNC; + +SELECT 'Expect error when Hybrid has no arguments'; +CREATE TABLE test_tiered_distributed_bad_args (`id` UInt32,`name` String) ENGINE = Hybrid(); -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH } + +SELECT 'Expect error when Hybrid has a single literal argument'; +CREATE TABLE test_tiered_distributed_bad_args (`id` UInt32,`name` String) ENGINE = Hybrid(1); -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH } + +SELECT 'Expect error when Hybrid arguments are literals only'; +CREATE TABLE test_tiered_distributed_bad_args (`id` UInt32,`name` String) ENGINE = Hybrid(1, 1); -- { serverError BAD_ARGUMENTS } + +SELECT 'Expect error when first argument is a table function of the wrong subtype (can not construct Distributed from file)'; +CREATE TABLE test_tiered_distributed_invalid_first_arg (`id` UInt32, `name` String) ENGINE = Hybrid(file('foo.x'), 1); -- { serverError BAD_ARGUMENTS } + +SELECT 'Expect error when first argument is not a table function (scalar expression)'; +CREATE TABLE test_tiered_distributed_invalid_first_arg (`id` UInt32, `name` String) ENGINE = Hybrid(sin(3), 1); -- { serverError BAD_ARGUMENTS } + +SELECT 'Expect error when first argument is a table function of the wrong subtype (can not construct Distributed from url)'; +CREATE TABLE test_tiered_distributed_invalid_first_arg (`id` UInt32, `name` String) ENGINE = Hybrid(url('http://google.com', 'RawBLOB'), 1); -- { serverError BAD_ARGUMENTS } + +SELECT 'Expect error when predicate references a missing column'; +CREATE TABLE test_tiered_distributed_bad_args(`number` UInt64) ENGINE = Hybrid(remote('localhost:9000', system.numbers), number2 < 5); -- { serverError BAD_ARGUMENTS } + +SELECT 'Missing column + schema inference'; +CREATE TABLE test_tiered_distributed_bad_args ENGINE = Hybrid(remote('localhost:9000', system.numbers), number2 < 5); -- { serverError BAD_ARGUMENTS } + +DROP TABLE IF EXISTS test_tiered_distributed_bad_args SYNC; + +SELECT 'Create Hybrid table with remote() and constant predicate (explicit column list)'; +DROP TABLE IF EXISTS test_tiered_distributed SYNC; +CREATE TABLE test_tiered_distributed(`dummy` UInt8) ENGINE = Hybrid(remote('localhost:9000'), 1); +SHOW CREATE TABLE test_tiered_distributed; +DESCRIBE TABLE test_tiered_distributed; +SELECT * FROM test_tiered_distributed; +SELECT database, name, engine, create_table_query, engine_full FROM system.tables WHERE table = 'test_tiered_distributed' FORMAT Vertical; +DROP TABLE IF EXISTS test_tiered_distributed SYNC; + +SELECT 'Create Hybrid table with remote table function and predicate (inference)'; +DROP TABLE IF EXISTS test_tiered_distributed_numbers_range SYNC; +CREATE TABLE test_tiered_distributed_numbers_range ENGINE = Hybrid(remote('localhost:9000', system.numbers), number < 5); +SHOW CREATE TABLE test_tiered_distributed_numbers_range; +SELECT * FROM test_tiered_distributed_numbers_range ORDER BY number; +DROP TABLE IF EXISTS test_tiered_distributed_numbers_range SYNC; + +SELECT 'Create Hybrid table with two remote layers as table'; +DROP TABLE IF EXISTS test_tiered_distributed_numbers_dual SYNC; +CREATE TABLE test_tiered_distributed_numbers_dual ENGINE = Hybrid( + remote('localhost:9000', system.numbers), number < 5, + remote('localhost:9000', system.numbers), number BETWEEN 10 AND 15 +) AS system.numbers; + +SHOW CREATE TABLE test_tiered_distributed_numbers_dual; +SELECT * FROM test_tiered_distributed_numbers_dual ORDER BY number SETTINGS enable_analyzer = 0; +SELECT * FROM test_tiered_distributed_numbers_dual ORDER BY number SETTINGS enable_analyzer = 1; +DROP TABLE IF EXISTS test_tiered_distributed_numbers_dual SYNC; + +SELECT 'Create Hybrid table combining remote function and local table'; +DROP TABLE IF EXISTS test_tiered_distributed_numbers_mixed SYNC; +CREATE TABLE test_tiered_distributed_numbers_mixed +( + `number` UInt64 +) ENGINE = Hybrid( + remote('localhost:9000', system.numbers), number < 5, + system.numbers, number BETWEEN 10 AND 15 +); +SELECT * FROM test_tiered_distributed_numbers_mixed ORDER BY number; +DROP TABLE IF EXISTS test_tiered_distributed_numbers_mixed SYNC; + +SELECT 'Verify Hybrid skips layer with always false predicate on the first layer'; +DROP TABLE IF EXISTS test_tiered_distributed_numbers_skip_first SYNC; +CREATE TABLE test_tiered_distributed_numbers_skip_first +( + `number` UInt64 +) ENGINE = Hybrid( + remote('localhost:9000', system.numbers), 0, + system.numbers, number BETWEEN 10 AND 15 +); +SELECT * FROM test_tiered_distributed_numbers_skip_first ORDER BY number; +DROP TABLE IF EXISTS test_tiered_distributed_numbers_skip_first SYNC; + +SELECT 'Verify Hybrid skips layer with always false predicate on the second layer'; +DROP TABLE IF EXISTS test_tiered_distributed_numbers_skip_second SYNC; +CREATE TABLE test_tiered_distributed_numbers_skip_second +( + `number` UInt64 +) ENGINE = Hybrid( + remote('localhost:9000', system.numbers), number < 3, + system.numbers, 0 +); +SELECT * FROM test_tiered_distributed_numbers_skip_second ORDER BY number; +DROP TABLE IF EXISTS test_tiered_distributed_numbers_skip_second SYNC; + +----------------------------- + +SELECT 'Prepare local MergeTree table for multi-layer tests'; +DROP TABLE IF EXISTS test_tiered_local_data SYNC; +CREATE TABLE test_tiered_local_data +( + `id` UInt32, + `name` String, + `event_time` DateTime, + `value` Float64 +) ENGINE = MergeTree() +ORDER BY id; + +SELECT 'Populate local table with sample data'; +INSERT INTO test_tiered_local_data VALUES + (0, 'Invalid', '2022-01-01 10:00:00', 0.5), + (1, 'Alice', '2022-01-01 10:00:00', 100.5), + (2, 'Bob', '2022-01-02 11:00:00', 200.3), + (3, 'Charlie', '2022-01-03 12:00:00', 150.7), + (4, 'David', '2022-01-04 13:00:00', 300.2), + (5, 'Eve', '2022-01-05 14:00:00', 250.1); + +SELECT 'Create Hybrid table with three layer pairs'; +DROP TABLE IF EXISTS test_tiered_multi_layer SYNC; + +CREATE TABLE test_tiered_multi_layer +( + `id` UInt32, + `name` String, + `event_time` DateTime, + `value` Float64 +) +ENGINE = Hybrid( + remote('127.0.0.2:9000', currentDatabase(), 'test_tiered_local_data'), + id <= 2, + cluster('test_shard_localhost', currentDatabase(), 'test_tiered_local_data'), + id = 3, + remoteSecure('127.0.0.1:9440', currentDatabase(), 'test_tiered_local_data'), + id > 3 +); + +SELECT 'Count rows across all layers'; +SELECT count() FROM test_tiered_multi_layer; +SELECT 'Count rows from layers with id > 4'; +SELECT count() FROM test_tiered_multi_layer WHERE id > 4; +SELECT 'Count rows where value > 200'; +SELECT count() FROM test_tiered_multi_layer WHERE value > 200; +SELECT 'Count rows named Alice'; +SELECT count() AS alice_rows FROM test_tiered_multi_layer WHERE name = 'Alice'; + +SELECT 'Select rows ordered by value descending (id > 2)'; +SELECT id, name, value FROM test_tiered_multi_layer WHERE id > 2 ORDER BY value DESC; +SELECT 'Limit results ordered by id'; +SELECT * FROM test_tiered_multi_layer ORDER BY id LIMIT 3; +SELECT 'Explain plan for filter on value'; +EXPLAIN SELECT * FROM test_tiered_multi_layer WHERE value > 150 SETTINGS prefer_localhost_replica=0, enable_analyzer=0; +EXPLAIN SELECT * FROM test_tiered_multi_layer WHERE value > 150 SETTINGS prefer_localhost_replica=0, enable_analyzer=1; +EXPLAIN SELECT * FROM test_tiered_multi_layer WHERE value > 150 SETTINGS prefer_localhost_replica=1, enable_analyzer=0; +EXPLAIN SELECT * FROM test_tiered_multi_layer WHERE value > 150 SETTINGS prefer_localhost_replica=1, enable_analyzer=1; + +SELECT 'Aggregate values across name when filtering by event_time'; +SELECT + name, + count() AS count, + avg(value) AS avg_value +FROM test_tiered_multi_layer +WHERE event_time >= '2022-01-02' +GROUP BY name +ORDER BY avg_value DESC; + +SELECT 'Verify additional_table_filters works consistently (legacy analyser)'; +SELECT id, name, value +FROM test_tiered_multi_layer +WHERE id < 3 +ORDER BY id +SETTINGS additional_table_filters = {'test_tiered_multi_layer' : 'id > 1'}, allow_experimental_analyzer = 0; + +SELECT 'Verify additional_table_filters works consistently (new analyser)'; +SELECT id, name, value +FROM test_tiered_multi_layer +WHERE id < 3 +ORDER BY id +SETTINGS additional_table_filters = {'test_tiered_multi_layer' : 'id > 1'}, allow_experimental_analyzer = 1; + + +SELECT 'Clean up Hybrid table with three layer pairs'; +DROP TABLE IF EXISTS test_tiered_multi_layer SYNC; +SELECT 'Clean up local helper table'; +DROP TABLE IF EXISTS test_tiered_local_data SYNC; + +--------------------------------- + +-- Test Hybrid engine predicate filtering functionality + +SELECT 'Drop predicate filtering fixtures if they exist'; +DROP TABLE IF EXISTS test_tiered_watermark_after SYNC; +DROP TABLE IF EXISTS test_tiered_watermark_before SYNC; +DROP TABLE IF EXISTS test_tiered_watermark SYNC; + +SELECT 'Create local tables representing before/after watermark partitions'; +CREATE TABLE test_tiered_watermark_after +( + `id` UInt32, + `name` String, + `date` Date, + `value` UInt64 +) +ENGINE = MergeTree() +ORDER BY id; + +SELECT 'Create second local table with different value type'; +CREATE TABLE test_tiered_watermark_before +( + `id` Int32, + `name` Nullable(String), + `date` Date, + `value` Decimal128(0) +) +ENGINE = MergeTree() +ORDER BY id; + +SELECT 'Insert rows before watermark into both tables'; +INSERT INTO test_tiered_watermark_after VALUES + (11, 'Alice', '2025-08-15', 100), + (12, 'Bob', '2025-08-20', 200), + (13, 'Charlie', '2025-08-25', 300); +INSERT INTO test_tiered_watermark_before VALUES + (21, 'Alice', '2025-08-15', 100), + (22, 'Bob', '2025-08-20', 200), + (23, 'Charlie', '2025-08-25', 300); + +SELECT 'Insert rows after watermark into both tables'; +INSERT INTO test_tiered_watermark_after VALUES + (14, 'David', '2025-09-05', 400), + (15, 'Eve', '2025-09-10', 500), + (16, 'Frank', '2025-09-15', 600); +INSERT INTO test_tiered_watermark_before VALUES + (24, 'David', '2025-09-05', 400), + (25, 'Eve', '2025-09-10', 500), + (26, 'Frank', '2025-09-15', 600); + + +SELECT 'Create Hybrid table with analyzer disabled during reads'; +CREATE TABLE test_tiered_watermark +( + `id` UInt32, + `name` String, + `date` Date, + `value` UInt32 +) +ENGINE = Hybrid( + remote('127.0.0.1:9000', currentDatabase(), 'test_tiered_watermark_after'), + date >= '2025-09-01', + remote('127.0.0.1:9000', currentDatabase(), 'test_tiered_watermark_before'), + date < '2025-09-01' +); + +SELECT 'Insert row via Hybrid table (should go to first layer)'; +INSERT INTO test_tiered_watermark SETTINGS distributed_foreground_insert = 1 +VALUES (17, 'John', '2025-09-25', 400); + +SELECT 'Verify that inserted row landed in first table'; +SELECT * FROM test_tiered_watermark_after WHERE id = 17 ORDER BY id; +SELECT 'Verify that second table did not receive the inserted row'; +SELECT count() FROM test_tiered_watermark_before WHERE id = 17; + + +SELECT 'Read predicate-filtered data with analyzer disabled and no localhost preference'; +SELECT * FROM test_tiered_watermark ORDER BY id SETTINGS enable_analyzer = 0, prefer_localhost_replica = 0; +SELECT 'Read predicate-filtered data with analyzer enabled and no localhost preference'; +SELECT * FROM test_tiered_watermark ORDER BY id SETTINGS enable_analyzer = 1, prefer_localhost_replica = 0; +SELECT 'Read predicate-filtered data with analyzer disabled and prefer localhost replica'; +SELECT * FROM test_tiered_watermark ORDER BY id SETTINGS enable_analyzer = 0, prefer_localhost_replica = 1; +SELECT 'Read predicate-filtered data with analyzer enabled and prefer localhost replica'; +SELECT * FROM test_tiered_watermark ORDER BY id SETTINGS enable_analyzer = 1, prefer_localhost_replica = 1; + +-- other combinations of settings work, but give a bit different content in the query_log +-- See the problem around is_initial_query described in https://github.com/Altinity/ClickHouse/issues/1077 +SELECT 'Check if the subqueries were recorded in query_log'; + +SELECT * FROM test_tiered_watermark ORDER BY id DESC SETTINGS enable_analyzer = 1, prefer_localhost_replica = 0, log_queries=1, serialize_query_plan=0, log_comment = 'test_tiered_watermark', max_threads=1 FORMAT Null; +SYSTEM FLUSH LOGS; +SELECT + type, + query_id = initial_query_id AS is_initial_query2, + arraySort(arrayMap(x -> replaceAll(x, currentDatabase(), 'db'), tables)) as tbl, + replaceAll(query, currentDatabase(), 'db') as qry, + log_comment +FROM system.query_log +WHERE + event_time > now() - 300 AND type = 'QueryFinish' AND + initial_query_id IN ( + SELECT initial_query_id + FROM system.query_log + WHERE + event_time > now() - 300 + and log_comment = 'test_tiered_watermark' + and current_database = currentDatabase() + and query_id = initial_query_id ) +ORDER BY tbl, event_time_microseconds +FORMAT Vertical; + + +SELECT 'Clean up predicate filtering tables'; +DROP TABLE IF EXISTS test_tiered_watermark SYNC; +DROP TABLE IF EXISTS test_tiered_watermark_after SYNC; +DROP TABLE IF EXISTS test_tiered_watermark_before SYNC; + +-- TODO: +-- Code: 70. DB::Exception: Received from localhost:9000. DB::Exception: Conversion from AggregateFunction(sum, Decimal(38, 0)) to AggregateFunction(sum, UInt32) is not supported: while converting source column `sum(__table1.value)` to destination column `sum(__table1.value)`. (CANNOT_CONVERT_TYPE) +-- SELECT sum(value) FROM test_tiered_watermark; + +-- TODO: +-- Code: 47. DB::Exception: Received from localhost:9000. DB::Exception: Received from 127.0.0.2:9000. DB::Exception: Identifier '__table1._database' cannot be resolved from table with name __table1. In scope SELECT __table1._database AS _database, __table1._table AS row_count FROM default.test_tiered_watermark_after AS __table1 WHERE __table1.date >= '2025-09-01'. Maybe you meant: ['__table1._table']. (UNKNOWN_IDENTIFIER) +-- SELECT _database, _table, count() AS row_count FROM test_tiered_watermark GROUP BY _database, _table ORDER BY _database, _table; + +-- Other things which may need attention: +-- complex combinations? (overview / over Merge) +-- prefer_localhost_replica +-- threads versus local subquery pipeline part +-- ALTER support + +-- TODO +-- SELECT _table_index, count() AS row_count FROM test_debug_tiered GROUP BY _table_index ORDER BY _table_index; + +-- TODO +-- 1. Integration tests (similar to tests/queries/0_stateless) +-- - Base SELECT with date split: part in Distributed, part in S3 -> results should match a manual UNION ALL (with correct ORDER BY/aggregation). +-- - GROUP BY / ORDER BY / LIMIT: confirm the stage is selected correctly, finalization happens at the top, rows_before_limit_at_least is correct (createLocalPlan already keeps LIMIT). +-- - JOIN: with a small table on the initiator; check GLOBAL JOIN scenarios. Ensure remote layers behave the same as remote shard subqueries created through createLocalPlan. +-- - skipUnusedShards: with analyzer ensure layer conditions are respected (where FILTER DAG is available). +-- - Constants: hostName()/now() in SELECT across several layers -> ensure no discrepancies. +-- - EXPLAIN PLAN/PIPELINE: show child plans for layers and remote plans. +-- - Subqueries in logs. +-- - Different column sets/types: supertype in snapshot, converting actions on read. +-- - Object columns: same as Distributed — use ColumnsDescriptionByShardNum for layers if needed (optional for local layers; already implemented for Distributed). + +-- Condition with dictGet('a1_watermarks_dict', ...) + +-- access rights check + + +-- TODO: +-- test for distributed_aggregation_memory_efficient & enable_memory_bound_merging_of_aggregation_results +-- to avoid UNKNOWN_AGGREGATED_DATA_VARIANT when mixing different aggregation variants +-- from remote shards (with memory_bound) and local layers (without memory_bound)