Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1763,12 +1763,12 @@ Possible values:
DECLARE(ObjectStorageClusterJoinMode, object_storage_cluster_join_mode, ObjectStorageClusterJoinMode::ALLOW, R"(
Changes the behaviour of object storage cluster function or table.

ClickHouse applies this setting when the query contains the product of object storage cluster function ot table, i.e. when the query for a object storage cluster function ot table contains a non-GLOBAL subquery for the object storage cluster function ot table.
ClickHouse applies this setting when the query contains the product of object storage cluster function or table, i.e. when the query for a object storage cluster function or table contains a non-GLOBAL subquery for the object storage cluster function or table.

Restrictions:

- Only applied for JOIN subqueries.
- Only if the FROM section uses a object storage cluster function ot table.
- Only if the FROM section uses a object storage cluster function or table.

Possible values:

Expand Down
2 changes: 2 additions & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"lock_object_storage_task_distribution_ms", 500, 500, "Raised the value to 500 to avoid hoping tasks between executors."},
{"allow_retries_in_cluster_requests", false, false, "New setting"},
{"object_storage_remote_initiator", false, false, "New setting."},
{"allow_experimental_export_merge_tree_part", false, false, "New setting."},
{"export_merge_tree_part_overwrite_file_if_exists", false, false, "New setting."},
{"allow_experimental_export_merge_tree_part", false, true, "Turned ON by default for Antalya."},
{"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."}
});
Expand Down
2 changes: 1 addition & 1 deletion src/Core/SettingsEnums.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ enum class DistributedProductMode : uint8_t

DECLARE_SETTING_ENUM(DistributedProductMode)

/// The setting for executing object storage cluster function ot table JOIN sections.
/// The setting for executing object storage cluster function or table JOIN sections.
enum class ObjectStorageClusterJoinMode : uint8_t
{
LOCAL, /// Convert to local query
Expand Down
2 changes: 1 addition & 1 deletion src/Planner/PlannerJoinTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1370,7 +1370,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
/// Overall, IStorage::read -> FetchColumns returns normal column names (except Distributed, which is inconsistent)
/// Interpreter::getQueryPlan -> FetchColumns returns identifiers (why?) and this the reason for the bug ^ in Distributed
/// Hopefully there is no other case when we read from Distributed up to FetchColumns.
if (table_node && table_node->getStorage()->isRemote() && select_query_options.to_stage == QueryProcessingStage::FetchColumns)
if (table_node && table_node->getStorage()->isRemote())
updated_actions_dag_outputs.push_back(output_node);
else if (table_function_node && table_function_node->getStorage()->isRemote())
updated_actions_dag_outputs.push_back(output_node);
Expand Down
156 changes: 107 additions & 49 deletions src/Storages/IStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
#include <Analyzer/QueryTreeBuilder.h>
#include <Analyzer/QueryNode.h>
#include <Analyzer/ColumnNode.h>
#include <Analyzer/JoinNode.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/Utils.h>
#include <Storages/StorageDistributed.h>
#include <TableFunctions/TableFunctionFactory.h>

Expand Down Expand Up @@ -112,7 +114,7 @@ class SearcherVisitor : public InDepthQueryTreeVisitorWithContext<SearcherVisito
using Base = InDepthQueryTreeVisitorWithContext<SearcherVisitor>;
using Base::Base;

explicit SearcherVisitor(QueryTreeNodeType type_, ContextPtr context) : Base(context), type(type_) {}
explicit SearcherVisitor(std::unordered_set<QueryTreeNodeType> types_, ContextPtr context) : Base(context), types(types_) {}

bool needChildVisit(QueryTreeNodePtr &, QueryTreeNodePtr & /*child*/)
{
Expand All @@ -126,15 +128,20 @@ class SearcherVisitor : public InDepthQueryTreeVisitorWithContext<SearcherVisito

auto node_type = node->getNodeType();

if (node_type == type)
if (types.contains(node_type))
{
passed_node = node;
passed_type = node_type;
}
}

QueryTreeNodePtr getNode() const { return passed_node; }
std::optional<QueryTreeNodeType> getType() const { return passed_type; }

private:
QueryTreeNodeType type;
std::unordered_set<QueryTreeNodeType> types;
QueryTreeNodePtr passed_node;
std::optional<QueryTreeNodeType> passed_type;
};

/*
Expand Down Expand Up @@ -216,49 +223,80 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded(
{
case ObjectStorageClusterJoinMode::LOCAL:
{
auto modified_query_tree = query_tree->clone();
bool need_modify = false;

SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context);
table_function_searcher.visit(query_tree);
auto table_function_node = table_function_searcher.getNode();
if (!table_function_node)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node");
auto info = getQueryTreeInfo(query_tree, context);

if (has_join)
if (info.has_join || info.has_cross_join || info.has_local_columns_in_where)
{
auto table_function = extractTableFunctionASTPtrFromSelectQuery(query_to_send);
auto query_tree_distributed = buildTableFunctionQueryTree(table_function, context);
auto & table_function_ast = table_function->as<ASTFunction &>();
query_tree_distributed->setAlias(table_function_ast.alias);
auto modified_query_tree = query_tree->clone();

SearcherVisitor table_function_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context);
table_function_searcher.visit(modified_query_tree);
auto table_function_node = table_function_searcher.getNode();
if (!table_function_node)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node");

QueryTreeNodePtr query_tree_distributed;

auto & query_node = modified_query_tree->as<QueryNode &>();

if (info.has_join || info.has_cross_join)
{
if (table_function_searcher.getType().value() == QueryTreeNodeType::TABLE_FUNCTION)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I get it right that you are extracting ONLY the table function from the original AST? Why doesn't it have to be done for tables as well?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, I understand it is being done for tables when grabbing the left side. But why is it different. Is the query tree different for table function vs regular tables?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Looks like lats two blocks work for table function too!
But now need to retest everything.

{
auto table_function = extractTableFunctionASTPtrFromSelectQuery(query_to_send);
query_tree_distributed = buildTableFunctionQueryTree(table_function, context);
auto & table_function_ast = table_function->as<ASTFunction &>();
query_tree_distributed->setAlias(table_function_ast.alias);
}
else if (info.has_join)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I get it right that the iceberg table is always the left one?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes (actually not only iceberg, s3Cluster too). In other cases code does not come into IStorageCluster.
When join with local table on the left and iceberg on the right clickhouse processes query as local join and only on final stage reads right table without attempts to send left to remote nodes as part of the subquery, so I did not touch that case.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... So in a nutshell, there are two cases: iceberg / s3cluster table on the left, in which we need to prevent local tables from reaching remote nodes. and the other case, in which the local is resolved locally before reaching this code path?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. In this code left table always s3Cluster/icebergCluster or Iceberg table.

{
auto join_node = query_node.getJoinTree();
query_tree_distributed = join_node->as<JoinNode>()->getLeftTableExpression()->clone();
}
else
{
SearcherVisitor join_searcher({QueryTreeNodeType::CROSS_JOIN}, context);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this become a method in QueryNode? For example: QueryNode::getCrossJoinTree

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All others methods in QueryNode don't search on request, but returns pre-filled fields for children vector. Now is very specific case, not sure that for this single case need to make and try to fill this position in all queries.
May be it make sense, but need to review all cross-join-related code and make refactoring.

join_searcher.visit(modified_query_tree);
auto cross_join_node = join_searcher.getNode();
if (!cross_join_node)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find CROSS JOIN node");
query_tree_distributed = cross_join_node->as<CrossJoinNode>()->getTableExpressions()[0]->clone();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does [0] mean left?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. CROSS JOIN has a vector of expression, can have more than two sources. So element 0 is a left. Can't be empty, as I understand.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps just add a comment, but not required

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment

}
}

// Find add used columns from table function to make proper projection list
// Need to do before changing WHERE condition
CollectUsedColumnsForSourceVisitor collector(table_function_node, context);
collector.visit(query_tree);
collector.visit(modified_query_tree);
const auto & columns = collector.getColumns();

auto & query_node = modified_query_tree->as<QueryNode &>();
query_node.resolveProjectionColumns(columns);
auto column_nodes_to_select = std::make_shared<ListNode>();
column_nodes_to_select->getNodes().reserve(columns.size());
for (auto & column : columns)
column_nodes_to_select->getNodes().emplace_back(std::make_shared<ColumnNode>(column, table_function_node));
query_node.getProjectionNode() = column_nodes_to_select;

// Left only table function to send on cluster nodes
modified_query_tree = modified_query_tree->cloneAndReplace(query_node.getJoinTree(), query_tree_distributed);
if (info.has_local_columns_in_where)
{
if (query_node.getPrewhere())
removeExpressionsThatDoNotDependOnTableIdentifiers(query_node.getPrewhere(), table_function_node, context);
if (query_node.getWhere())
removeExpressionsThatDoNotDependOnTableIdentifiers(query_node.getWhere(), table_function_node, context);
}

need_modify = true;
}
query_node.getOrderByNode() = std::make_shared<ListNode>();
query_node.getGroupByNode() = std::make_shared<ListNode>();

if (has_local_columns_in_where)
{
auto & query_node = modified_query_tree->as<QueryNode &>();
query_node.getWhere() = {};
}
if (query_tree_distributed)
{
// Left only table function to send on cluster nodes
modified_query_tree = modified_query_tree->cloneAndReplace(query_node.getJoinTree(), query_tree_distributed);
}

if (need_modify)
query_to_send = queryNodeToDistributedSelectQuery(modified_query_tree);
}

return;
}
case ObjectStorageClusterJoinMode::GLOBAL:
Expand Down Expand Up @@ -492,38 +530,58 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const
pipeline.init(std::move(pipe));
}

QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage(
ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo & query_info) const
IStorageCluster::QueryTreeInfo IStorageCluster::getQueryTreeInfo(QueryTreeNodePtr query_tree, ContextPtr context)
{
auto object_storage_cluster_join_mode = context->getSettingsRef()[Setting::object_storage_cluster_join_mode];
QueryTreeInfo info;

if (object_storage_cluster_join_mode != ObjectStorageClusterJoinMode::ALLOW)
{
if (!context->getSettingsRef()[Setting::allow_experimental_analyzer])
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true");
SearcherVisitor join_searcher({QueryTreeNodeType::JOIN, QueryTreeNodeType::CROSS_JOIN}, context);
join_searcher.visit(query_tree);

SearcherVisitor join_searcher(QueryTreeNodeType::JOIN, context);
join_searcher.visit(query_info.query_tree);
if (join_searcher.getNode())
has_join = true;
if (join_searcher.getNode())
{
if (join_searcher.getType() == QueryTreeNodeType::JOIN)
info.has_join = true;
else
info.has_cross_join = true;
}

SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context);
table_function_searcher.visit(query_info.query_tree);
auto table_function_node = table_function_searcher.getNode();
if (!table_function_node)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node");
SearcherVisitor table_function_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: is this still a table_FUNCTION_visitor?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Table and table function. Try to make better name. Actually I try to get left expression of join here, something like left_table_expression_searcher.

table_function_searcher.visit(query_tree);
auto table_function_node = table_function_searcher.getNode();
if (!table_function_node)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table or table function node");

auto & query_node = query_tree->as<QueryNode &>();
if (query_node.hasWhere() || query_node.hasPrewhere())
{
CollectUsedColumnsForSourceVisitor collector_where(table_function_node, context, true);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand CollectUsedColumnsForSourceVisitor is meant to collect all columns for a given "source". As I understand, the source is provided in the constructor. But what makes it local? Doesn't the query_tree object you are passing represent the entire query tree? I don't get it, please educate me

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, query_tree contains all, But in different places.
Columns can be in selected list, in where condition, in join condition, etc.

SELECT table1.column1,... FROM table1 JOIN table2 ON table1.column2=table2.column2 WHERE table1.column3=...

and collector traverses the tree and collect all cases in single list.
Need to all these columns to select list for left table subquery.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the columns can be in different places, but let's say there are no local columns. Wouldn't collector_where.getColumns() be non-empty regardless? Btw, what makes a table local or not local assuming you can't check for its existence on the remote node?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually CollectUsedColumnsForSourceVisitor find here all columns from other sources (last boolean flag collect_columns_from_other_sources_ in constructor).
Columns without source skipped in any case.
So when if where something like table1.column1 =1 AND table2.column2 = 2 AND (table1.column3 + table2.column4) = 3 AND randUniform(1,6) = 4 result contains only table2.column2 and table2.column4. Actually I don't know is table2 local or not, but author of query can know. If he used object_storage_cluster_join_mode='local', need to process as if it local.

auto & query_node = query_info.query_tree->as<QueryNode &>();
if (query_node.hasPrewhere())
collector_where.visit(query_node.getPrewhere());
if (query_node.hasWhere())
collector_where.visit(query_node.getWhere());

// Can't use 'WHERE' on remote node if it contains columns from other sources
// SELECT x FROM datalake.table WHERE x IN local.table
// Need to modify 'WHERE' on remote node if it contains columns from other sources
if (!collector_where.getColumns().empty())
has_local_columns_in_where = true;
info.has_local_columns_in_where = true;
}

return info;
}

QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage(
ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo & query_info) const
{
auto object_storage_cluster_join_mode = context->getSettingsRef()[Setting::object_storage_cluster_join_mode];

if (object_storage_cluster_join_mode != ObjectStorageClusterJoinMode::ALLOW)
{
if (!context->getSettingsRef()[Setting::allow_experimental_analyzer])
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true");

if (has_join || has_local_columns_in_where)
auto info = getQueryTreeInfo(query_info.query_tree, context);
if (info.has_join || info.has_cross_join || info.has_local_columns_in_where)
return QueryProcessingStage::Enum::FetchColumns;
}

Expand Down
10 changes: 8 additions & 2 deletions src/Storages/IStorageCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,14 @@ class IStorageCluster : public IStorage
LoggerPtr log;
String cluster_name;

mutable bool has_join = false;
mutable bool has_local_columns_in_where = false;
struct QueryTreeInfo
{
bool has_join = false;
bool has_cross_join = false;
bool has_local_columns_in_where = false;
};

static QueryTreeInfo getQueryTreeInfo(QueryTreeNodePtr query_tree, ContextPtr context);
};


Expand Down
2 changes: 1 addition & 1 deletion src/Storages/ObjectStorage/StorageObjectStorageSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ Chunk StorageObjectStorageSource::generate()
{
chunk.addColumn(constant_column.first,
constant_column.second.name_and_type.type->createColumnConst(
chunk.getNumRows(), constant_column.second.value));
chunk.getNumRows(), constant_column.second.value)->convertToFullColumnIfConst());
}

#if USE_PARQUET && USE_AWS_S3
Expand Down
6 changes: 6 additions & 0 deletions src/Storages/extractTableFunctionFromSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ ASTPtr extractTableFunctionASTPtrFromSelectQuery(ASTPtr & query)
return table_expression ? table_expression->table_function : nullptr;
}

ASTPtr extractTableASTPtrFromSelectQuery(ASTPtr & query)
{
auto table_expression = extractTableExpressionASTPtrFromSelectQuery(query);
return table_expression ? table_expression->database_and_table_name : nullptr;
}

ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query)
{
auto table_function_ast = extractTableFunctionASTPtrFromSelectQuery(query);
Expand Down
1 change: 1 addition & 0 deletions src/Storages/extractTableFunctionFromSelectQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ struct ASTTableExpression;

ASTTableExpression * extractTableExpressionASTPtrFromSelectQuery(ASTPtr & query);
ASTPtr extractTableFunctionASTPtrFromSelectQuery(ASTPtr & query);
ASTPtr extractTableASTPtrFromSelectQuery(ASTPtr & query);
ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query);
ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query);

Expand Down
12 changes: 12 additions & 0 deletions tests/integration/test_database_iceberg/configs/cluster.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<clickhouse>
<remote_servers>
<cluster_simple>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
</cluster_simple>
</remote_servers>
</clickhouse>
Loading
Loading