diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 982b73541214..f17f63a7e6b9 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -68,6 +68,7 @@ #include #include #include +#include #include #include #include @@ -189,6 +190,7 @@ namespace Setting extern const SettingsUInt64 min_count_to_compile_aggregate_expression; extern const SettingsBool enable_software_prefetch_in_aggregation; extern const SettingsBool optimize_group_by_constant_keys; + extern const SettingsBool use_hive_partitioning; } namespace ServerSetting @@ -1965,6 +1967,22 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional

(query_plan.getRootNode()->step.get())) + { + auto object_filter_step = std::make_unique( + query_plan.getCurrentHeader(), + expressions.before_where->dag.clone(), + getSelectQuery().where()->getColumnName()); + + object_filter_step->setStepDescription("WHERE"); + query_plan.addStep(std::move(object_filter_step)); + } + } + if (from_aggregation_stage) { /// No need to aggregate anything, since this was done on remote shards. diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index 837be9cc6a0f..57a3a384c20c 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -721,8 +721,79 @@ def test_remote_no_hedged(started_cluster): assert TSV(pure_s3) == TSV(s3_distributed) -def test_hive_partitioning(started_cluster): +def test_distributed_s3_table_engine(started_cluster): + node = started_cluster.instances["s0_0_0"] + + resp_def = node.query( + """ + SELECT * from s3Cluster( + 'cluster_simple', + 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon) + """ + ) + + node.query("DROP TABLE IF EXISTS single_node"); + node.query( + """ + CREATE TABLE single_node + (name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))) + ENGINE=S3('http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV') + """ + ) + query_id_engine_single_node = str(uuid.uuid4()) + resp_engine_single_node = node.query( + """ + SELECT * FROM single_node ORDER BY (name, value, polygon) + """, + query_id = query_id_engine_single_node + ) + assert resp_def == resp_engine_single_node + + node.query("DROP TABLE IF EXISTS distributed"); + node.query( + """ + CREATE TABLE distributed + (name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))) + ENGINE=S3('http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV') + SETTINGS object_storage_cluster='cluster_simple' + """ + ) + query_id_engine_distributed = str(uuid.uuid4()) + resp_engine_distributed = node.query( + """ + SELECT * FROM distributed ORDER BY (name, value, polygon) + """, + query_id = query_id_engine_distributed + ) + assert resp_def == resp_engine_distributed + + node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'") + + hosts_engine_single_node = node.query( + f""" + SELECT uniq(hostname) + FROM clusterAllReplicas('cluster_simple', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id_engine_single_node}' + """ + ) + assert int(hosts_engine_single_node) == 1 + hosts_engine_distributed = node.query( + f""" + SELECT uniq(hostname) + FROM clusterAllReplicas('cluster_simple', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id_engine_distributed}' + """ + ) + assert int(hosts_engine_distributed) == 3 + + +@pytest.mark.parametrize("allow_experimental_analyzer", [0, 1]) +def test_hive_partitioning(started_cluster, allow_experimental_analyzer): node = started_cluster.instances["s0_0_0"] + + node.query(f"SET allow_experimental_analyzer = {allow_experimental_analyzer}") + for i in range(1, 5): exists = node.query( f""" @@ -846,69 +917,4 @@ def test_hive_partitioning(started_cluster): cluster_optimized_traffic = int(cluster_optimized_traffic) assert cluster_optimized_traffic == optimized_traffic - -def test_distributed_s3_table_engine(started_cluster): - node = started_cluster.instances["s0_0_0"] - - resp_def = node.query( - """ - SELECT * from s3Cluster( - 'cluster_simple', - 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', - 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon) - """ - ) - - node.query("DROP TABLE IF EXISTS single_node"); - node.query( - """ - CREATE TABLE single_node - (name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))) - ENGINE=S3('http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV') - """ - ) - query_id_engine_single_node = str(uuid.uuid4()) - resp_engine_single_node = node.query( - """ - SELECT * FROM single_node ORDER BY (name, value, polygon) - """, - query_id = query_id_engine_single_node - ) - assert resp_def == resp_engine_single_node - - node.query("DROP TABLE IF EXISTS distributed"); - node.query( - """ - CREATE TABLE distributed - (name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))) - ENGINE=S3('http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV') - SETTINGS object_storage_cluster='cluster_simple' - """ - ) - query_id_engine_distributed = str(uuid.uuid4()) - resp_engine_distributed = node.query( - """ - SELECT * FROM distributed ORDER BY (name, value, polygon) - """, - query_id = query_id_engine_distributed - ) - assert resp_def == resp_engine_distributed - - node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'") - - hosts_engine_single_node = node.query( - f""" - SELECT uniq(hostname) - FROM clusterAllReplicas('cluster_simple', system.query_log) - WHERE type='QueryFinish' AND initial_query_id='{query_id_engine_single_node}' - """ - ) - assert int(hosts_engine_single_node) == 1 - hosts_engine_distributed = node.query( - f""" - SELECT uniq(hostname) - FROM clusterAllReplicas('cluster_simple', system.query_log) - WHERE type='QueryFinish' AND initial_query_id='{query_id_engine_distributed}' - """ - ) - assert int(hosts_engine_distributed) == 3 + node.query("SET allow_experimental_analyzer = DEFAULT")