Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 2 additions & 11 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"object_storage_max_nodes", 0, 0, "New setting"},
{"allow_retries_in_cluster_requests", false, false, "New setting"},
{"object_storage_remote_initiator", false, false, "New setting."},
{"allow_experimental_export_merge_tree_part", false, false, "New setting."},
{"export_merge_tree_part_overwrite_file_if_exists", false, false, "New setting."},
});
addSettingsChanges(settings_changes_history, "25.8",
{
Expand Down Expand Up @@ -143,24 +145,13 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"distributed_plan_force_shuffle_aggregation", 0, 0, "New experimental setting"},
{"allow_experimental_insert_into_iceberg", false, false, "New setting."},
/// RELEASE CLOSED
{"allow_experimental_database_iceberg", false, true, "Turned ON by default for Antalya"},
{"allow_experimental_database_unity_catalog", false, true, "Turned ON by default for Antalya"},
{"allow_experimental_database_glue_catalog", false, true, "Turned ON by default for Antalya"},
{"output_format_parquet_enum_as_byte_array", true, true, "Enable writing Enum as byte array in Parquet by default"},
{"lock_object_storage_task_distribution_ms", 0, 0, "New setting."},
{"object_storage_cluster", "", "", "New setting"},
{"object_storage_max_nodes", 0, 0, "New setting"},
});
addSettingsChanges(settings_changes_history, "25.6.5.2000",
{
{"allow_experimental_database_iceberg", false, true, "Turned ON by default for Antalya"},
{"allow_experimental_database_unity_catalog", false, true, "Turned ON by default for Antalya"},
{"allow_experimental_database_glue_catalog", false, true, "Turned ON by default for Antalya"},
{"output_format_parquet_enum_as_byte_array", true, true, "Enable writing Enum as byte array in Parquet by default"},
{"object_storage_cluster", "", "", "New setting"},
{"object_storage_max_nodes", 0, 0, "New setting"},
{"allow_experimental_export_merge_tree_part", false, false, "New setting."},
{"export_merge_tree_part_overwrite_file_if_exists", false, false, "New setting."},
});
addSettingsChanges(settings_changes_history, "25.6",
{
Expand Down
3 changes: 3 additions & 0 deletions src/Interpreters/InterpreterInsertQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,9 @@ InterpreterInsertQuery::distributedWriteIntoReplicatedMergeTreeFromClusterStorag
if (!src_storage_cluster)
return {};

if (src_storage_cluster->getOriginalClusterName().empty())
return {};

if (!isInsertSelectTrivialEnoughForDistributedExecution(query))
return {};

Expand Down
2 changes: 1 addition & 1 deletion src/Planner/PlannerJoinTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1370,7 +1370,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
/// Overall, IStorage::read -> FetchColumns returns normal column names (except Distributed, which is inconsistent)
/// Interpreter::getQueryPlan -> FetchColumns returns identifiers (why?) and this the reason for the bug ^ in Distributed
/// Hopefully there is no other case when we read from Distributed up to FetchColumns.
if (table_node && table_node->getStorage()->isRemote() && select_query_options.to_stage == QueryProcessingStage::FetchColumns)
if (table_node && table_node->getStorage()->isRemote())
updated_actions_dag_outputs.push_back(output_node);
else if (table_function_node && table_function_node->getStorage()->isRemote())
updated_actions_dag_outputs.push_back(output_node);
Expand Down
109 changes: 71 additions & 38 deletions src/Storages/IStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
#include <Analyzer/QueryTreeBuilder.h>
#include <Analyzer/QueryNode.h>
#include <Analyzer/ColumnNode.h>
#include <Analyzer/JoinNode.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/Utils.h>
#include <Storages/StorageDistributed.h>
#include <TableFunctions/TableFunctionFactory.h>

Expand Down Expand Up @@ -112,7 +114,7 @@ class SearcherVisitor : public InDepthQueryTreeVisitorWithContext<SearcherVisito
using Base = InDepthQueryTreeVisitorWithContext<SearcherVisitor>;
using Base::Base;

explicit SearcherVisitor(QueryTreeNodeType type_, ContextPtr context) : Base(context), type(type_) {}
explicit SearcherVisitor(std::unordered_set<QueryTreeNodeType> types_, ContextPtr context) : Base(context), types(types_) {}

bool needChildVisit(QueryTreeNodePtr &, QueryTreeNodePtr & /*child*/)
{
Expand All @@ -126,15 +128,20 @@ class SearcherVisitor : public InDepthQueryTreeVisitorWithContext<SearcherVisito

auto node_type = node->getNodeType();

if (node_type == type)
if (types.contains(node_type))
{
passed_node = node;
passed_type = node_type;
}
}

QueryTreeNodePtr getNode() const { return passed_node; }
std::optional<QueryTreeNodeType> getType() const { return passed_type; }

private:
QueryTreeNodeType type;
std::unordered_set<QueryTreeNodeType> types;
QueryTreeNodePtr passed_node;
std::optional<QueryTreeNodeType> passed_type;
};

/*
Expand Down Expand Up @@ -216,49 +223,69 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded(
{
case ObjectStorageClusterJoinMode::LOCAL:
{
auto modified_query_tree = query_tree->clone();
bool need_modify = false;
if (has_join || has_local_columns_in_where)
{
auto modified_query_tree = query_tree->clone();

SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context);
table_function_searcher.visit(query_tree);
auto table_function_node = table_function_searcher.getNode();
if (!table_function_node)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node");
SearcherVisitor table_function_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context);
table_function_searcher.visit(modified_query_tree);
auto table_function_node = table_function_searcher.getNode();
if (!table_function_node)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node");

if (has_join)
{
auto table_function = extractTableFunctionASTPtrFromSelectQuery(query_to_send);
auto query_tree_distributed = buildTableFunctionQueryTree(table_function, context);
auto & table_function_ast = table_function->as<ASTFunction &>();
query_tree_distributed->setAlias(table_function_ast.alias);
QueryTreeNodePtr query_tree_distributed;

auto & query_node = modified_query_tree->as<QueryNode &>();

if (has_join)
{
if (table_function_searcher.getType().value() == QueryTreeNodeType::TABLE_FUNCTION)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I get it right that you are extracting ONLY the table function from the original AST? Why doesn't it have to be done for tables as well?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, I understand it is being done for tables when grabbing the left side. But why is it different. Is the query tree different for table function vs regular tables?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Looks like lats two blocks work for table function too!
But now need to retest everything.

{
auto table_function = extractTableFunctionASTPtrFromSelectQuery(query_to_send);
query_tree_distributed = buildTableFunctionQueryTree(table_function, context);
auto & table_function_ast = table_function->as<ASTFunction &>();
query_tree_distributed->setAlias(table_function_ast.alias);
}
else
{
auto join_node = query_node.getJoinTree();
query_tree_distributed = join_node->as<JoinNode>()->getLeftTableExpression()->clone();
}
}

// Find add used columns from table function to make proper projection list
// Need to do before changing WHERE condition
CollectUsedColumnsForSourceVisitor collector(table_function_node, context);
collector.visit(query_tree);
collector.visit(modified_query_tree);
const auto & columns = collector.getColumns();

auto & query_node = modified_query_tree->as<QueryNode &>();
query_node.resolveProjectionColumns(columns);
auto column_nodes_to_select = std::make_shared<ListNode>();
column_nodes_to_select->getNodes().reserve(columns.size());
for (auto & column : columns)
column_nodes_to_select->getNodes().emplace_back(std::make_shared<ColumnNode>(column, table_function_node));
query_node.getProjectionNode() = column_nodes_to_select;

// Left only table function to send on cluster nodes
modified_query_tree = modified_query_tree->cloneAndReplace(query_node.getJoinTree(), query_tree_distributed);
if (has_local_columns_in_where)
{
if (query_node.getPrewhere())
removeExpressionsThatDoNotDependOnTableIdentifiers(query_node.getPrewhere(), table_function_node, context);
if (query_node.getWhere())
removeExpressionsThatDoNotDependOnTableIdentifiers(query_node.getWhere(), table_function_node, context);
}

need_modify = true;
}
query_node.getOrderByNode() = std::make_shared<ListNode>();
query_node.getGroupByNode() = std::make_shared<ListNode>();

if (has_local_columns_in_where)
{
auto & query_node = modified_query_tree->as<QueryNode &>();
query_node.getWhere() = {};
}
if (query_tree_distributed)
{
// Left only table function to send on cluster nodes
modified_query_tree = modified_query_tree->cloneAndReplace(query_node.getJoinTree(), query_tree_distributed);
}

if (need_modify)
query_to_send = queryNodeToDistributedSelectQuery(modified_query_tree);
}

return;
}
case ObjectStorageClusterJoinMode::GLOBAL:
Expand Down Expand Up @@ -501,25 +528,31 @@ QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage(
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true");

SearcherVisitor join_searcher(QueryTreeNodeType::JOIN, context);
SearcherVisitor join_searcher({QueryTreeNodeType::JOIN}, context);
join_searcher.visit(query_info.query_tree);
if (join_searcher.getNode())
has_join = true;

SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context);
SearcherVisitor table_function_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context);
table_function_searcher.visit(query_info.query_tree);
auto table_function_node = table_function_searcher.getNode();
if (!table_function_node)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table or table function node");

CollectUsedColumnsForSourceVisitor collector_where(table_function_node, context, true);
auto & query_node = query_info.query_tree->as<QueryNode &>();
if (query_node.hasWhere())
collector_where.visit(query_node.getWhere());

// Can't use 'WHERE' on remote node if it contains columns from other sources
if (!collector_where.getColumns().empty())
has_local_columns_in_where = true;
if (query_node.hasWhere() || query_node.hasPrewhere())
{
CollectUsedColumnsForSourceVisitor collector_where(table_function_node, context, true);
if (query_node.hasPrewhere())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's quite odd that the original code didn't take prewhere into account...

If those are actually needed, we need to place a mental pin here that after backporting prewhere+row_policy we'll need to add hasRowLevelFilter here as well.

collector_where.visit(query_node.getPrewhere());
if (query_node.hasWhere())
collector_where.visit(query_node.getWhere());

// SELECT x FROM datalake.table WHERE x IN local.table
// Need to modify 'WHERE' on remote node if it contains columns from other sources
if (!collector_where.getColumns().empty())
has_local_columns_in_where = true;
}

if (has_join || has_local_columns_in_where)
return QueryProcessingStage::Enum::FetchColumns;
Expand Down
6 changes: 6 additions & 0 deletions src/Storages/extractTableFunctionFromSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ ASTPtr extractTableFunctionASTPtrFromSelectQuery(ASTPtr & query)
return table_expression ? table_expression->table_function : nullptr;
}

ASTPtr extractTableASTPtrFromSelectQuery(ASTPtr & query)
{
auto table_expression = extractTableExpressionASTPtrFromSelectQuery(query);
return table_expression ? table_expression->database_and_table_name : nullptr;
}

ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query)
{
auto table_function_ast = extractTableFunctionASTPtrFromSelectQuery(query);
Expand Down
1 change: 1 addition & 0 deletions src/Storages/extractTableFunctionFromSelectQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ struct ASTTableExpression;

ASTTableExpression * extractTableExpressionASTPtrFromSelectQuery(ASTPtr & query);
ASTPtr extractTableFunctionASTPtrFromSelectQuery(ASTPtr & query);
ASTPtr extractTableASTPtrFromSelectQuery(ASTPtr & query);
ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query);
ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query);

Expand Down
87 changes: 86 additions & 1 deletion tests/integration/test_database_iceberg/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,21 @@
import pytz
from minio import Minio
from pyiceberg.catalog import load_catalog
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.partitioning import PartitionField, PartitionSpec, UNPARTITIONED_PARTITION_SPEC
from pyiceberg.schema import Schema
from pyiceberg.table.sorting import SortField, SortOrder
from pyiceberg.transforms import DayTransform, IdentityTransform
from pyiceberg.types import (
DoubleType,
LongType,
FloatType,
NestedField,
StringType,
StructType,
TimestampType,
TimestamptzType
)
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER

from helpers.cluster import ClickHouseCluster, ClickHouseInstance, is_arm
from helpers.config_cluster import minio_secret_key, minio_access_key
Expand Down Expand Up @@ -609,3 +611,86 @@ def test_table_with_slash(started_cluster):
create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME)
node.query(f"INSERT INTO {CATALOG_NAME}.`{root_namespace}.{table_encoded_name}` VALUES (NULL, 'AAPL', 193.24, 193.31, tuple('bot'));", settings={"allow_experimental_insert_into_iceberg": 1, 'write_full_path_in_iceberg_metadata': 1})
assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{root_namespace}.{table_encoded_name}`") == "\\N\tAAPL\t193.24\t193.31\t('bot')\n"


def test_cluster_joins(started_cluster):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test seems to be passing on the current Antalya branch:

test_database_iceberg/test.py::test_cluster_joins PASSED  

Is it expected?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch, test doesn't cover case with local right table, thanks!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And not on cluster...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test for the CROSS JOIN case as well?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

node = started_cluster.instances["node1"]

test_ref = f"test_join_tables_{uuid.uuid4()}"
table_name = f"{test_ref}_table"
table_name_2 = f"{test_ref}_table_2"

root_namespace = f"{test_ref}_namespace"

catalog = load_catalog_impl(started_cluster)
catalog.create_namespace(root_namespace)

schema = Schema(
NestedField(
field_id=1,
name="tag",
field_type=LongType(),
required=False
),
NestedField(
field_id=2,
name="name",
field_type=StringType(),
required=False,
),
)
table = create_table(catalog, root_namespace, table_name, schema,
partition_spec=UNPARTITIONED_PARTITION_SPEC, sort_order=UNSORTED_SORT_ORDER)
data = [{"tag": 1, "name": "John"}, {"tag": 2, "name": "Jack"}]
df = pa.Table.from_pylist(data)
table.append(df)

schema2 = Schema(
NestedField(
field_id=1,
name="id",
field_type=LongType(),
required=False
),
NestedField(
field_id=2,
name="second_name",
field_type=StringType(),
required=False,
),
)
table2 = create_table(catalog, root_namespace, table_name_2, schema2,
partition_spec=UNPARTITIONED_PARTITION_SPEC, sort_order=UNSORTED_SORT_ORDER)
data = [{"id": 1, "second_name": "Dow"}, {"id": 2, "second_name": "Sparrow"}]
df = pa.Table.from_pylist(data)
table2.append(df)

create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME)

res = node.query(
f"""
SELECT t1.name,t2.second_name
FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` AS t1
JOIN {CATALOG_NAME}.`{root_namespace}.{table_name_2}` AS t2
ON t1.tag=t2.id
ORDER BY ALL
SETTINGS object_storage_cluster_join_mode='local'
"""
)

assert res == "Jack\tSparrow\nJohn\tDow\n"

res = node.query(
f"""
SELECT name
FROM {CATALOG_NAME}.`{root_namespace}.{table_name}`
WHERE tag in (
SELECT id
FROM {CATALOG_NAME}.`{root_namespace}.{table_name_2}`
)
ORDER BY ALL
SETTINGS object_storage_cluster_join_mode='local'
"""
)

assert res == "Jack\nJohn\n"
14 changes: 14 additions & 0 deletions tests/integration/test_s3_cluster/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1163,6 +1163,20 @@ def test_joins(started_cluster):
res = list(map(str.split, result5.splitlines()))
assert len(res) == 6

result6 = node.query(
f"""
SELECT name FROM
s3Cluster('cluster_simple',
'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
WHERE value IN (SELECT id FROM join_table)
ORDER BY name
SETTINGS object_storage_cluster_join_mode='local';
"""
)
res = list(map(str.split, result6.splitlines()))
assert len(res) == 25


def test_graceful_shutdown(started_cluster):
node = started_cluster.instances["s0_0_0"]
Expand Down
Loading
Loading