Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions docs/en/engines/table-engines/special/tiered-distributed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
---
description: 'Hybrid unions multiple data sources behind per-layer predicates so queries behave like a single table while data is migrated or tiered.'
slug: /engines/table-engines/special/tiered-distributed
title: 'Hybrid Table Engine'
sidebar_label: 'Hybrid'
sidebar_position: 11
---

# Hybrid table engine

`Hybrid` builds on top of the [Distributed](./distributed.md) table engine. It lets you expose several data sources as one logical table and assign every source its own predicate.
The engine rewrites incoming queries so that each layer receives the original query plus its predicate. This keeps all of the Distributed optimisations (remote aggregation, `skip_unused_shards`,
global JOIN pushdown, and so on) while you duplicate or migrate data across clusters, storage types, or formats.

It keeps the same execution pipeline as `engine=Distributed` but can read from multiple underlying sources simultaneously—similar to `engine=Merge`—while still pushing logic down to each source.

Typical use cases include:

- Zero-downtime migrations where "old" and "new" replicas temporarily overlap.
- Tiered storage, for example fresh data on a local cluster and historical data in S3.
- Gradual roll-outs where only a subset of rows should be served from a new backend.

By giving mutually exclusive predicates to the layers (for example, `date < watermark` and `date >= watermark`), you ensure that each row is read from exactly one source.

## Engine definition

```sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name
(
column1 type1,
column2 type2,
...
)
ENGINE = Hybrid(table_function_1, predicate_1 [, table_function_2, predicate_2 ...])
```

You must pass at least two arguments – the first table function and its predicate. Additional sources are appended as `table_function, predicate` pairs. The first table function is also used for `INSERT` statements.

### Arguments and behaviour

- `table_function_n` must be a valid table function (for example `remote`, `remoteSecure`, `cluster`, `clusterAllReplicas`, `s3Cluster`) or a fully qualified table name (`database.table`). The first argument must be a table function—such as `remote` or `cluster`—because it instantiates the underlying `Distributed` storage.
- `predicate_n` must be an expression that can be evaluated on the table columns. The engine adds it to the layer's query with an additional `AND`, so expressions like `event_date >= '2025-09-01'` or `id BETWEEN 10 AND 15` are typical.
- The query planner picks the same processing stage for every layer as it does for the base `Distributed` plan, so remote aggregation, ORDER BY pushdown, `skip_unused_shards`, and the legacy/analyzer execution modes behave the same way.
- `INSERT` statements are forwarded to the first table function only. If you need multi-destination writes, use explicit `INSERT` statements into the respective sources.
- Align schemas across the layers. ClickHouse builds a common header; if the physical types differ you may need to add casts on one side or in the query, just as you would when reading from heterogeneous replicas.

## Example: local cluster plus S3 historical tier

The following commands illustrate a two-layer layout. Hot data stays on a local ClickHouse cluster, while historical rows come from public S3 Parquet files.

```sql
-- Local MergeTree table that keeps current data
CREATE OR REPLACE TABLE btc_blocks_local
(
`hash` FixedString(64),
`version` Int64,
`mediantime` DateTime64(9),
`nonce` Int64,
`bits` FixedString(8),
`difficulty` Float64,
`chainwork` FixedString(64),
`size` Int64,
`weight` Int64,
`coinbase_param` String,
`number` Int64,
`transaction_count` Int64,
`merkle_root` FixedString(64),
`stripped_size` Int64,
`timestamp` DateTime64(9),
`date` Date
)
ENGINE = MergeTree
ORDER BY (timestamp)
PARTITION BY toYYYYMM(date);

-- Hybrid table that unions the local shard with historical data in S3
CREATE OR REPLACE TABLE btc_blocks ENGINE = Hybrid(
remote('localhost:9000', currentDatabase(), 'btc_blocks_local'), date >= '2025-09-01',
s3('s3://aws-public-blockchain/v1.0/btc/blocks/**.parquet', NOSIGN), date < '2025-09-01'
) AS btc_blocks_local;

-- Writes target the first (remote) layer
INSERT INTO btc_blocks
SELECT *
FROM s3('s3://aws-public-blockchain/v1.0/btc/blocks/**.parquet', NOSIGN)
WHERE date BETWEEN '2025-09-01' AND '2025-09-30';

-- Reads seamlessly combine both predicates
SELECT * FROM btc_blocks WHERE date = '2025-08-01'; -- data from s3
SELECT * FROM btc_blocks WHERE date = '2025-09-05'; -- data from MergeTree (TODO: still analyzes s3)
SELECT * FROM btc_blocks WHERE date IN ('2025-08-31','2025-09-01') -- data from both sources, single copy always


-- Run analytic queries as usual
SELECT
date,
count(),
uniqExact(CAST(hash, 'Nullable(String)')) AS hashes,
sum(CAST(number, 'Nullable(Int64)')) AS blocks_seen
FROM btc_blocks
WHERE date BETWEEN '2025-08-01' AND '2025-09-30'
GROUP BY date
ORDER BY date;
```

Because the predicates are applied inside every layer, queries such as `ORDER BY`, `GROUP BY`, `LIMIT`, `JOIN`, and `EXPLAIN` behave as if you were reading from a single `Distributed` table. When sources expose different physical types (for example `FixedString(64)` versus `String` in Parquet), add explicit casts during ingestion or in the query, as shown above.
31 changes: 29 additions & 2 deletions src/Interpreters/ClusterProxy/SelectStreamFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ ASTPtr rewriteSelectQuery(
const ASTPtr & query,
const std::string & remote_database,
const std::string & remote_table,
ASTPtr table_function_ptr)
ASTPtr table_function_ptr,
ASTPtr additional_filter)
{
auto modified_query_ast = query->clone();

Expand All @@ -80,8 +81,33 @@ ASTPtr rewriteSelectQuery(

if (!context->getSettingsRef()[Setting::allow_experimental_analyzer])
{
// Apply additional filter if provided
if (additional_filter)
{
if (select_query.where())
{
/// WHERE <old> AND <filter>
select_query.setExpression(
ASTSelectQuery::Expression::WHERE,
makeASTFunction("and", select_query.where(), additional_filter->clone()));
}
else
{
/// No WHERE – simply set it
select_query.setExpression(
ASTSelectQuery::Expression::WHERE, additional_filter->clone());
}
}

if (table_function_ptr)
select_query.addTableFunction(table_function_ptr);
{
select_query.addTableFunction(table_function_ptr->clone());

// Reset semantic table information for all column identifiers to prevent
// RestoreQualifiedNamesVisitor from adding wrong table names
ResetSemanticTableVisitor::Data data;
ResetSemanticTableVisitor(data).visit(modified_query_ast);
}
else
select_query.replaceDatabaseAndTable(remote_database, remote_table);

Expand All @@ -93,6 +119,7 @@ ASTPtr rewriteSelectQuery(
data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query->as<ASTSelectQuery &>(), 0));
data.remote_table.database = remote_database;
data.remote_table.table = remote_table;

RestoreQualifiedNamesVisitor(data).visit(modified_query_ast);
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/Interpreters/ClusterProxy/SelectStreamFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ ASTPtr rewriteSelectQuery(
const ASTPtr & query,
const std::string & remote_database,
const std::string & remote_table,
ASTPtr table_function_ptr = nullptr);
ASTPtr table_function_ptr = nullptr,
ASTPtr additional_filter = nullptr);

using ColumnsDescriptionByShardNum = std::unordered_map<UInt32, ColumnsDescription>;
using AdditionalShardFilterGenerator = std::function<ASTPtr(uint64_t)>;
Expand Down
30 changes: 29 additions & 1 deletion src/Interpreters/ClusterProxy/executeQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <Parsers/ASTInsertQuery.h>
#include <Planner/Utils.h>
#include <Processors/QueryPlan/ParallelReplicasLocalPlan.h>
#include <Processors/QueryPlan/DistributedCreateLocalPlan.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/ReadFromLocalReplica.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
Expand Down Expand Up @@ -333,7 +334,8 @@ void executeQuery(
const std::string & sharding_key_column_name,
const DistributedSettings & distributed_settings,
AdditionalShardFilterGenerator shard_filter_generator,
bool is_remote_function)
bool is_remote_function,
std::span<const SelectQueryInfo> additional_query_infos)
{
const Settings & settings = context->getSettingsRef();

Expand Down Expand Up @@ -361,6 +363,7 @@ void executeQuery(
new_context->increaseDistributedDepth();

const size_t shards = cluster->getShardCount();
const bool has_additional_query_infos = !additional_query_infos.empty();

if (context->getSettingsRef()[Setting::allow_experimental_analyzer])
{
Expand Down Expand Up @@ -470,6 +473,29 @@ void executeQuery(
plans.emplace_back(std::move(plan));
}

if (has_additional_query_infos)
{
if (!header)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Header is not initialized for local hybrid plan creation");

const Block & header_block = *header;
for (const auto & additional_query_info : additional_query_infos)
{
auto additional_plan = createLocalPlan(
additional_query_info.query,
header_block,
context,
processed_stage,
0, /// shard_num is not applicable for local hybrid plans
1, /// shard_count is not applicable for local hybrid plans
false,
false,
"");

plans.emplace_back(std::move(additional_plan));
}
}

if (plans.empty())
return;

Expand All @@ -485,6 +511,8 @@ void executeQuery(
input_headers.emplace_back(plan->getCurrentHeader());

auto union_step = std::make_unique<UnionStep>(std::move(input_headers));
if (has_additional_query_infos)
union_step->setStepDescription("Hybrid");
query_plan.unitePlans(std::move(union_step), std::move(plans));
}

Expand Down
4 changes: 3 additions & 1 deletion src/Interpreters/ClusterProxy/executeQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <Parsers/IAST_fwd.h>

#include <optional>
#include <span>

namespace DB
{
Expand Down Expand Up @@ -88,7 +89,8 @@ void executeQuery(
const std::string & sharding_key_column_name,
const DistributedSettings & distributed_settings,
AdditionalShardFilterGenerator shard_filter_generator,
bool is_remote_function);
bool is_remote_function,
std::span<const SelectQueryInfo> additional_query_infos = {});

std::optional<QueryPipeline> executeInsertSelectWithParallelReplicas(
const ASTInsertQuery & query_ast,
Expand Down
11 changes: 11 additions & 0 deletions src/Interpreters/TranslateQualifiedNamesVisitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -399,4 +399,15 @@ void RestoreQualifiedNamesMatcher::visit(ASTIdentifier & identifier, ASTPtr &, D
}
}

void ResetSemanticTableMatcher::visit(ASTPtr & ast, Data & data)
{
if (auto * t = ast->as<ASTIdentifier>())
visit(*t, ast, data);
}

void ResetSemanticTableMatcher::visit(ASTIdentifier & identifier, ASTPtr &, Data &)
{
identifier.resetSemanticTable();
}

}
29 changes: 29 additions & 0 deletions src/Interpreters/TranslateQualifiedNamesVisitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,33 @@ struct RestoreQualifiedNamesMatcher

using RestoreQualifiedNamesVisitor = InDepthNodeVisitor<RestoreQualifiedNamesMatcher, true>;


/// Reset semantic->table for all column identifiers in the AST.
///
/// PROBLEM DESCRIPTION:
/// When an AST is passed through multiple query rewrites (e.g., in Hybrid -> remote),
/// the semantic->table information attached to ASTIdentifier nodes can become stale and
/// cause incorrect column qualification. This happens because:
///
/// 1. During initial parsing, semantic->table is populated with the original table name
/// 2. When the query is rewritten (e.g., FROM clause changed from table to remote() function inside Hybrid),
/// the AST structure is modified but semantic->table information is preserved
/// 3. Subsequent visitors like RestoreQualifiedNamesVisitor called in remote() function over the same AST
/// may use this stale semantic->table information to incorrectly qualify column names with the original table name
///
/// SOLUTION:
/// This visitor clears semantic->table for all column identifiers, ensuring that subsequent
/// visitors work with clean semantic information and don't apply stale table qualifications.
struct ResetSemanticTableMatcher
{
// No data needed for this visitor
struct Data {};

static bool needChildVisit(const ASTPtr &, const ASTPtr &) { return true; }
static void visit(ASTPtr & ast, Data & data);
static void visit(ASTIdentifier & identifier, ASTPtr &, Data & data);
};

using ResetSemanticTableVisitor = InDepthNodeVisitor<ResetSemanticTableMatcher, true>;

}
11 changes: 11 additions & 0 deletions src/Parsers/ASTIdentifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,17 @@ void ASTIdentifier::restoreTable()
}
}

void ASTIdentifier::resetSemanticTable()
{
// Only reset semantic table for column identifiers (not table identifiers)
if (semantic && !semantic->special)
{
semantic->table.clear();
semantic->can_be_alias = true;
semantic->membership = std::nullopt;
}
}

std::shared_ptr<ASTTableIdentifier> ASTIdentifier::createTable() const
{
if (name_parts.size() == 1) return std::make_shared<ASTTableIdentifier>(name_parts[0]);
Expand Down
1 change: 1 addition & 0 deletions src/Parsers/ASTIdentifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class ASTIdentifier : public ASTWithAlias
void updateTreeHashImpl(SipHash & hash_state, bool ignore_alias) const override;

void restoreTable(); // TODO(ilezhankin): get rid of this
void resetSemanticTable(); // Reset semantic to empty string (see ResetSemanticTableVisitor)
std::shared_ptr<ASTTableIdentifier> createTable() const; // returns |nullptr| if identifier is not table.

String full_name;
Expand Down
2 changes: 1 addition & 1 deletion src/Planner/PlannerActionsVisitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ class ActionNodeNameHelper
}
default:
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid action query tree node {}", node->formatASTForErrorMessage());
throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid action query tree node {} (node_type: {})", node->formatASTForErrorMessage(), static_cast<int>(node_type));
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,9 @@ void DataFileMetaInfo::serialize(WriteBuffer & out) const
size_t field_mask = 0;
if (column.second.rows_count.has_value())
field_mask |= FIELD_MASK_ROWS;
if (column.second.rows_count.has_value())
if (column.second.nulls_count.has_value())
field_mask |= FIELD_MASK_NULLS;
if (column.second.rows_count.has_value())
if (column.second.hyperrectangle.has_value())
field_mask |= FIELD_MASK_RECT;
writeIntBinary(field_mask, out);

Expand Down
25 changes: 17 additions & 8 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,23 @@
#pragma once

#include "config.h"
#include <Core/Range.h>
#include <Core/Types.h>

#include <optional>

namespace DB::Iceberg
{

struct ColumnInfo
{
std::optional<Int64> rows_count;
std::optional<Int64> bytes_size;
std::optional<Int64> nulls_count;
std::optional<DB::Range> hyperrectangle;
};

}

#if USE_AVRO

Expand Down Expand Up @@ -38,14 +55,6 @@ enum class ManifestFileContentType

String FileContentTypeToString(FileContentType type);

struct ColumnInfo
{
std::optional<Int64> rows_count;
std::optional<Int64> bytes_size;
std::optional<Int64> nulls_count;
std::optional<DB::Range> hyperrectangle;
};

struct PartitionSpecsEntry
{
Int32 source_id;
Expand Down
Loading
Loading