Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
cd1a38f
rename files
filimonov Nov 11, 2025
6ad282e
rename layers to segments
filimonov Nov 11, 2025
5d47f07
store headers
filimonov Nov 11, 2025
e59a1f3
more renames
filimonov Nov 11, 2025
6ac3b3c
more renames2
filimonov Nov 11, 2025
abf4a3b
test for missing column
filimonov Nov 11, 2025
3110105
allow using tables from the current database implicitly
filimonov Nov 19, 2025
bcb0f1f
Mark Hybrid experimental
filimonov Nov 19, 2025
a13a1e5
almost working draft
filimonov Nov 19, 2025
0e0e76b
better
filimonov Nov 19, 2025
cac851f
fix merge issue
filimonov Nov 19, 2025
2f1e11d
misplaced setting
filimonov Nov 19, 2025
6edfd3a
better explantation for cached header
filimonov Nov 21, 2025
78a9ba1
hybrid_table_auto_cast_columns=true
filimonov Nov 21, 2025
d48f1bf
Tightened buildQueryTreeDistributed to avoid unused default params an…
filimonov Nov 21, 2025
df02469
redundant move/copy
filimonov Nov 21, 2025
c152623
better comment for HybridCastVisitor
filimonov Nov 21, 2025
bb3a5f2
better signature, avoid pointer
filimonov Nov 21, 2025
ac8ad7d
review comments
filimonov Nov 21, 2025
113cfe6
Prevent creation of the Hybrid tables when some segments does not pro…
filimonov Nov 21, 2025
eda9305
moving the rewrite to a analyzer pass, seem to work much better
filimonov Nov 21, 2025
2186269
finishing
filimonov Nov 21, 2025
90cbc5e
better
filimonov Nov 21, 2025
0896a85
comment
filimonov Nov 21, 2025
6f8377c
Get rid of experimatal marker for hybrid_table_auto_cast_columns flag…
filimonov Nov 21, 2025
87f0a4e
Trigger CI/CD
filimonov Nov 22, 2025
6940b68
Merge branch 'antalya-25.8' into mf_25.8_hybrid2
Enmk Nov 23, 2025
9015f22
Merge branch 'antalya-25.8' into mf_25.8_hybrid2
Enmk Nov 24, 2025
8c0d982
Merge branch 'antalya-25.8' into mf_25.8_hybrid2
Enmk Nov 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
description: 'Hybrid unions multiple data sources behind per-layer predicates so queries behave like a single table while data is migrated or tiered.'
slug: /engines/table-engines/special/tiered-distributed
description: 'Hybrid unions multiple data sources behind per-segment predicates so queries behave like a single table while data is migrated or tiered.'
slug: /engines/table-engines/special/hybrid
title: 'Hybrid Table Engine'
sidebar_label: 'Hybrid'
sidebar_position: 11
Expand All @@ -9,7 +9,7 @@ sidebar_position: 11
# Hybrid table engine

`Hybrid` builds on top of the [Distributed](./distributed.md) table engine. It lets you expose several data sources as one logical table and assign every source its own predicate.
The engine rewrites incoming queries so that each layer receives the original query plus its predicate. This keeps all of the Distributed optimisations (remote aggregation, `skip_unused_shards`,
The engine rewrites incoming queries so that each segment receives the original query plus its predicate. This keeps all of the Distributed optimisations (remote aggregation, `skip_unused_shards`,
global JOIN pushdown, and so on) while you duplicate or migrate data across clusters, storage types, or formats.

It keeps the same execution pipeline as `engine=Distributed` but can read from multiple underlying sources simultaneously—similar to `engine=Merge`—while still pushing logic down to each source.
Expand All @@ -20,7 +20,21 @@ Typical use cases include:
- Tiered storage, for example fresh data on a local cluster and historical data in S3.
- Gradual roll-outs where only a subset of rows should be served from a new backend.

By giving mutually exclusive predicates to the layers (for example, `date < watermark` and `date >= watermark`), you ensure that each row is read from exactly one source.
By giving mutually exclusive predicates to the segments (for example, `date < watermark` and `date >= watermark`), you ensure that each row is read from exactly one source.

## Enable the engine

The Hybrid engine is experimental. Enable it per session (or in the user profile) before creating tables:

```sql
SET allow_experimental_hybrid_table = 1;
```

### Automatic Type Alignment

Hybrid segments can evolve independently, so the same logical column may use different physical types. With the experimental `hybrid_table_auto_cast_columns = 1` **(enabled by default and requires `allow_experimental_analyzer = 1`)**, the engine inserts the necessary `CAST` operations into each rewritten query so every shard receives the schema defined by the Hybrid table. You can opt out by setting the flag to `0` if it causes issues.

Segment schemas are cached when you create or attach a Hybrid table. If you alter a segment later (for example change a column type), refresh the Hybrid table (detach/attach or recreate it) so the cached headers stay in sync with the new schema; otherwise the auto-cast feature may miss the change and queries can still fail with header/type errors.

## Engine definition

Expand All @@ -39,14 +53,14 @@ You must pass at least two arguments – the first table function and its predic
### Arguments and behaviour

- `table_function_n` must be a valid table function (for example `remote`, `remoteSecure`, `cluster`, `clusterAllReplicas`, `s3Cluster`) or a fully qualified table name (`database.table`). The first argument must be a table function—such as `remote` or `cluster`—because it instantiates the underlying `Distributed` storage.
- `predicate_n` must be an expression that can be evaluated on the table columns. The engine adds it to the layer's query with an additional `AND`, so expressions like `event_date >= '2025-09-01'` or `id BETWEEN 10 AND 15` are typical.
- The query planner picks the same processing stage for every layer as it does for the base `Distributed` plan, so remote aggregation, ORDER BY pushdown, `skip_unused_shards`, and the legacy/analyzer execution modes behave the same way.
- `predicate_n` must be an expression that can be evaluated on the table columns. The engine adds it to the segment's query with an additional `AND`, so expressions like `event_date >= '2025-09-01'` or `id BETWEEN 10 AND 15` are typical.
- The query planner picks the same processing stage for every segment as it does for the base `Distributed` plan, so remote aggregation, ORDER BY pushdown, `skip_unused_shards`, and the legacy/analyzer execution modes behave the same way.
- `INSERT` statements are forwarded to the first table function only. If you need multi-destination writes, use explicit `INSERT` statements into the respective sources.
- Align schemas across the layers. ClickHouse builds a common header; if the physical types differ you may need to add casts on one side or in the query, just as you would when reading from heterogeneous replicas.
- Align schemas across the segments. ClickHouse builds a common header and rejects creation if any segment misses a column defined in the Hybrid schema. If the physical types differ you may need to add casts on one side or in the query, just as you would when reading from heterogeneous replicas.

## Example: local cluster plus S3 historical tier

The following commands illustrate a two-layer layout. Hot data stays on a local ClickHouse cluster, while historical rows come from public S3 Parquet files.
The following commands illustrate a two-segment layout. Hot data stays on a local ClickHouse cluster, while historical rows come from public S3 Parquet files.

```sql
-- Local MergeTree table that keeps current data
Expand Down Expand Up @@ -79,7 +93,7 @@ CREATE OR REPLACE TABLE btc_blocks ENGINE = Hybrid(
s3('s3://aws-public-blockchain/v1.0/btc/blocks/**.parquet', NOSIGN), date < '2025-09-01'
) AS btc_blocks_local;

-- Writes target the first (remote) layer
-- Writes target the first (remote) segment
INSERT INTO btc_blocks
SELECT *
FROM s3('s3://aws-public-blockchain/v1.0/btc/blocks/**.parquet', NOSIGN)
Expand All @@ -103,4 +117,4 @@ GROUP BY date
ORDER BY date;
```

Because the predicates are applied inside every layer, queries such as `ORDER BY`, `GROUP BY`, `LIMIT`, `JOIN`, and `EXPLAIN` behave as if you were reading from a single `Distributed` table. When sources expose different physical types (for example `FixedString(64)` versus `String` in Parquet), add explicit casts during ingestion or in the query, as shown above.
Because the predicates are applied inside every segment, queries such as `ORDER BY`, `GROUP BY`, `LIMIT`, `JOIN`, and `EXPLAIN` behave as if you were reading from a single `Distributed` table. When sources expose different physical types (for example `FixedString(64)` versus `String` in Parquet), add explicit casts during ingestion or in the query, as shown above.
150 changes: 150 additions & 0 deletions src/Analyzer/Passes/HybridCastsPass.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
#include <Analyzer/Passes/HybridCastsPass.h>

#include <Analyzer/QueryTreeBuilder.h>
#include <Analyzer/QueryTreePassManager.h>
#include <Analyzer/Passes/QueryAnalysisPass.h>
#include <Analyzer/Utils.h>
#include <Analyzer/Resolve/IdentifierResolver.h>
#include <Analyzer/QueryNode.h>
#include <Analyzer/TableNode.h>
#include <Analyzer/UnionNode.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/ColumnNode.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>

#include <Storages/IStorage.h>
#include <Storages/StorageDistributed.h>

#include <Core/Settings.h>
#include <Core/SettingsEnums.h>
#include <Common/Exception.h>

namespace DB
{

namespace Setting
{
extern const SettingsBool hybrid_table_auto_cast_columns;
}

namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}

namespace
{

/// Collect Hybrid table expressions that require casts to normalize headers across segments.
///
/// Hybrid is currently exposed only as an engine (TableNode). If it ever gets a table function
/// wrapper, this visitor must also look at TableFunctionNode and unwrap to the underlying
/// StorageDistributed so cached casts can be picked up there as well.
class HybridCastTablesCollector : public InDepthQueryTreeVisitor<HybridCastTablesCollector>
{
public:
explicit HybridCastTablesCollector(std::unordered_map<const IQueryTreeNode *, ColumnsDescription> & cast_map_)
: cast_map(cast_map_)
{}

static bool needChildVisit(QueryTreeNodePtr &, QueryTreeNodePtr &) { return true; }

void visitImpl(QueryTreeNodePtr & node)
{
const auto * table = node->as<TableNode>();
if (!table)
return;

const auto * storage = table->getStorage().get();
if (const auto * distributed = typeid_cast<const StorageDistributed *>(storage))
{
ColumnsDescription to_cast = distributed->getColumnsToCast();
if (!to_cast.empty())
cast_map.emplace(node.get(), std::move(to_cast)); // repeated table_expression can overwrite
}
}

private:
std::unordered_map<const IQueryTreeNode *, ColumnsDescription> & cast_map;
};

// Visitor replaces all usages of the column with CAST(column, type) in the query tree.
class HybridCastVisitor : public InDepthQueryTreeVisitor<HybridCastVisitor>
{
public:
HybridCastVisitor(
const std::unordered_map<const IQueryTreeNode *, ColumnsDescription> & cast_map_,
ContextPtr context_)
: cast_map(cast_map_)
, context(std::move(context_))
{}

bool shouldTraverseTopToBottom() const { return false; }

static bool needChildVisit(QueryTreeNodePtr &, QueryTreeNodePtr & child)
{
/// Traverse all child nodes so casts also apply inside subqueries and UNION branches.
(void)child;
return true;
}

void visitImpl(QueryTreeNodePtr & node)
{
auto * column_node = node->as<ColumnNode>();
if (!column_node)
return;

auto column_source = column_node->getColumnSourceOrNull();
if (!column_source)
return;

auto it = cast_map.find(column_source.get());
if (it == cast_map.end())
return;

const auto & column_name = column_node->getColumnName();
auto expected_column_opt = it->second.tryGetPhysical(column_name);
if (!expected_column_opt)
return;

auto column_clone = std::static_pointer_cast<ColumnNode>(column_node->clone());

auto cast_node = buildCastFunction(column_clone, expected_column_opt->type, context);
const auto & alias = node->getAlias();
if (!alias.empty())
cast_node->setAlias(alias);
else
cast_node->setAlias(expected_column_opt->name);

node = cast_node;
}

private:
const std::unordered_map<const IQueryTreeNode *, ColumnsDescription> & cast_map;
ContextPtr context;
};


} // namespace

void HybridCastsPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context)
{
const auto & settings = context->getSettingsRef();
if (!settings[Setting::hybrid_table_auto_cast_columns])
return;

auto * query = query_tree_node->as<QueryNode>();
if (!query)
return;

std::unordered_map<const IQueryTreeNode *, ColumnsDescription> cast_map;
HybridCastTablesCollector collector(cast_map);
collector.visit(query_tree_node);
if (cast_map.empty())
return;

HybridCastVisitor visitor(cast_map, context);
visitor.visit(query_tree_node);
}

}
32 changes: 32 additions & 0 deletions src/Analyzer/Passes/HybridCastsPass.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#pragma once

#include <Analyzer/IQueryTreePass.h>
#include <Interpreters/Context_fwd.h>

namespace DB
{

/// Adds CASTs for Hybrid segments when physical types differ from the Hybrid schema
///
/// It normalizes headers coming from different segments when table structure in some segments
/// differs from the Hybrid table definition. For example column X is UInt32 in the Hybrid table,
/// but Int64 in an additional segment.
///
/// Without these casts ConvertingActions may fail to reconcile mismatched headers when casts are impossible
/// (e.g. AggregateFunction states carry hashed data tied to their argument type and cannot be recast), for example:
/// "Conversion from AggregateFunction(uniq, Decimal(38, 0)) to AggregateFunction(uniq, UInt64) is not supported"
/// (CANNOT_CONVERT_TYPE).
///
/// Per-segment casts are not reliable because WithMergeState strips aliases, so merged pipelines
/// from different segments would return different headers (with or without CAST), leading to errors
/// like "Cannot find column `max(value)` in source stream, there are only columns: [max(_CAST(value, 'UInt64'))]"
/// (THERE_IS_NO_COLUMN).
class HybridCastsPass : public IQueryTreePass
{
public:
String getName() override { return "HybridCastsPass"; }
String getDescription() override { return "Inject casts for Hybrid columns to match schema types"; }
void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override;
};

}
3 changes: 3 additions & 0 deletions src/Analyzer/QueryTreePassManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
#include <Analyzer/Passes/SumIfToCountIfPass.h>
#include <Analyzer/Passes/UniqInjectiveFunctionsEliminationPass.h>
#include <Analyzer/Passes/UniqToCountPass.h>
#include <Analyzer/Passes/HybridCastsPass.h>
#include <Analyzer/Utils.h>

namespace DB
Expand Down Expand Up @@ -309,6 +310,8 @@ void addQueryTreePasses(QueryTreePassManager & manager, bool only_analyze)
manager.addPass(std::make_unique<ShardNumColumnToFunctionPass>());

manager.addPass(std::make_unique<OptimizeDateOrDateTimeConverterWithPreimagePass>());

manager.addPass(std::make_unique<HybridCastsPass>());
}

}
6 changes: 6 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6926,6 +6926,12 @@ Allows creation of tables with the [TimeSeries](../../engines/table-engines/inte
- 0 — the [TimeSeries](../../engines/table-engines/integrations/time-series.md) table engine is disabled.
- 1 — the [TimeSeries](../../engines/table-engines/integrations/time-series.md) table engine is enabled.
)", EXPERIMENTAL) \
DECLARE(Bool, allow_experimental_hybrid_table, false, R"(
Allows creation of tables with the [Hybrid](../../engines/table-engines/special/hybrid.md) table engine.
)", EXPERIMENTAL) \
DECLARE(Bool, hybrid_table_auto_cast_columns, true, R"(
Automatically cast columns to the schema defined in Hybrid tables when remote segments expose different physical types. Works only with analyzer. Enabled by default, does nothing if (experimental) Hybrid tables are disabled; disable it if it causes issues. Segment schemas are cached when the Hybrid table is created or attached; if a segment schema changes later, detach/attach or recreate the Hybrid table so the cached headers stay in sync.
)", 0) \
DECLARE(Bool, allow_experimental_codecs, false, R"(
If it is set to true, allow to specify experimental compression codecs (but we don't have those yet and this option does nothing).
)", EXPERIMENTAL) \
Expand Down
4 changes: 3 additions & 1 deletion src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"export_merge_tree_partition_max_retries", 3, 3, "New setting."},
{"export_merge_tree_partition_manifest_ttl", 180, 180, "New setting."},
{"export_merge_tree_part_file_already_exists_policy", "skip", "skip", "New setting."},
{"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."}
{"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."},
{"hybrid_table_auto_cast_columns", true, true, "New setting to automatically cast Hybrid table columns when segments disagree on types. Default enabled."},
{"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."}
});
addSettingsChanges(settings_changes_history, "25.8",
{
Expand Down
1 change: 1 addition & 0 deletions src/Databases/enableAllExperimentalSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ void enableAllExperimentalSettings(ContextMutablePtr context)
context->setSetting("allow_experimental_ytsaurus_table_engine", 1);
context->setSetting("allow_experimental_ytsaurus_dictionary_source", 1);
context->setSetting("allow_experimental_time_series_aggregate_functions", 1);
context->setSetting("allow_experimental_hybrid_table", 1);
context->setSetting("allow_experimental_lightweight_update", 1);
context->setSetting("allow_experimental_insert_into_iceberg", 1);
context->setSetting("allow_experimental_iceberg_compaction", 1);
Expand Down
Loading
Loading