Skip to content

Commit 110557d

Browse files
committed
Fix hive partitioning in cluster functions for old analyzer
1 parent 024a0b4 commit 110557d

File tree

2 files changed

+91
-67
lines changed

2 files changed

+91
-67
lines changed

src/Interpreters/InterpreterSelectQuery.cpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
#include <Processors/QueryPlan/TotalsHavingStep.h>
6969
#include <Processors/QueryPlan/WindowStep.h>
7070
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
71+
#include <Processors/QueryPlan/ObjectFilterStep.h>
7172
#include <Processors/Sources/NullSource.h>
7273
#include <Processors/Sources/SourceFromSingleChunk.h>
7374
#include <Processors/Transforms/AggregatingTransform.h>
@@ -189,6 +190,7 @@ namespace Setting
189190
extern const SettingsUInt64 min_count_to_compile_aggregate_expression;
190191
extern const SettingsBool enable_software_prefetch_in_aggregation;
191192
extern const SettingsBool optimize_group_by_constant_keys;
193+
extern const SettingsBool use_hive_partitioning;
192194
}
193195

194196
namespace ServerSetting
@@ -1965,6 +1967,22 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
19651967

19661968
if (expressions.second_stage || from_aggregation_stage)
19671969
{
1970+
if (settings[Setting::use_hive_partitioning]
1971+
&& !expressions.first_stage
1972+
&& expressions.hasWhere())
1973+
{
1974+
if (typeid_cast<ReadFromCluster *>(query_plan.getRootNode()->step.get()))
1975+
{
1976+
auto object_filter_step = std::make_unique<ObjectFilterStep>(
1977+
query_plan.getCurrentHeader(),
1978+
expressions.before_where->dag.clone(),
1979+
getSelectQuery().where()->getColumnName());
1980+
1981+
object_filter_step->setStepDescription("WHERE");
1982+
query_plan.addStep(std::move(object_filter_step));
1983+
}
1984+
}
1985+
19681986
if (from_aggregation_stage)
19691987
{
19701988
/// No need to aggregate anything, since this was done on remote shards.

tests/integration/test_s3_cluster/test.py

Lines changed: 73 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -721,8 +721,79 @@ def test_remote_no_hedged(started_cluster):
721721
assert TSV(pure_s3) == TSV(s3_distributed)
722722

723723

724-
def test_hive_partitioning(started_cluster):
724+
def test_distributed_s3_table_engine(started_cluster):
725+
node = started_cluster.instances["s0_0_0"]
726+
727+
resp_def = node.query(
728+
"""
729+
SELECT * from s3Cluster(
730+
'cluster_simple',
731+
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
732+
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon)
733+
"""
734+
)
735+
736+
node.query("DROP TABLE IF EXISTS single_node");
737+
node.query(
738+
"""
739+
CREATE TABLE single_node
740+
(name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64))))
741+
ENGINE=S3('http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV')
742+
"""
743+
)
744+
query_id_engine_single_node = str(uuid.uuid4())
745+
resp_engine_single_node = node.query(
746+
"""
747+
SELECT * FROM single_node ORDER BY (name, value, polygon)
748+
""",
749+
query_id = query_id_engine_single_node
750+
)
751+
assert resp_def == resp_engine_single_node
752+
753+
node.query("DROP TABLE IF EXISTS distributed");
754+
node.query(
755+
"""
756+
CREATE TABLE distributed
757+
(name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64))))
758+
ENGINE=S3('http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV')
759+
SETTINGS object_storage_cluster='cluster_simple'
760+
"""
761+
)
762+
query_id_engine_distributed = str(uuid.uuid4())
763+
resp_engine_distributed = node.query(
764+
"""
765+
SELECT * FROM distributed ORDER BY (name, value, polygon)
766+
""",
767+
query_id = query_id_engine_distributed
768+
)
769+
assert resp_def == resp_engine_distributed
770+
771+
node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'")
772+
773+
hosts_engine_single_node = node.query(
774+
f"""
775+
SELECT uniq(hostname)
776+
FROM clusterAllReplicas('cluster_simple', system.query_log)
777+
WHERE type='QueryFinish' AND initial_query_id='{query_id_engine_single_node}'
778+
"""
779+
)
780+
assert int(hosts_engine_single_node) == 1
781+
hosts_engine_distributed = node.query(
782+
f"""
783+
SELECT uniq(hostname)
784+
FROM clusterAllReplicas('cluster_simple', system.query_log)
785+
WHERE type='QueryFinish' AND initial_query_id='{query_id_engine_distributed}'
786+
"""
787+
)
788+
assert int(hosts_engine_distributed) == 3
789+
790+
791+
@pytest.mark.parametrize("allow_experimental_analyzer", [0, 1])
792+
def test_hive_partitioning(started_cluster, allow_experimental_analyzer):
725793
node = started_cluster.instances["s0_0_0"]
794+
795+
node.query(f"SET allow_experimental_analyzer = {allow_experimental_analyzer}")
796+
726797
for i in range(1, 5):
727798
exists = node.query(
728799
f"""
@@ -846,69 +917,4 @@ def test_hive_partitioning(started_cluster):
846917
cluster_optimized_traffic = int(cluster_optimized_traffic)
847918
assert cluster_optimized_traffic == optimized_traffic
848919

849-
850-
def test_distributed_s3_table_engine(started_cluster):
851-
node = started_cluster.instances["s0_0_0"]
852-
853-
resp_def = node.query(
854-
"""
855-
SELECT * from s3Cluster(
856-
'cluster_simple',
857-
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
858-
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon)
859-
"""
860-
)
861-
862-
node.query("DROP TABLE IF EXISTS single_node");
863-
node.query(
864-
"""
865-
CREATE TABLE single_node
866-
(name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64))))
867-
ENGINE=S3('http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV')
868-
"""
869-
)
870-
query_id_engine_single_node = str(uuid.uuid4())
871-
resp_engine_single_node = node.query(
872-
"""
873-
SELECT * FROM single_node ORDER BY (name, value, polygon)
874-
""",
875-
query_id = query_id_engine_single_node
876-
)
877-
assert resp_def == resp_engine_single_node
878-
879-
node.query("DROP TABLE IF EXISTS distributed");
880-
node.query(
881-
"""
882-
CREATE TABLE distributed
883-
(name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64))))
884-
ENGINE=S3('http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV')
885-
SETTINGS object_storage_cluster='cluster_simple'
886-
"""
887-
)
888-
query_id_engine_distributed = str(uuid.uuid4())
889-
resp_engine_distributed = node.query(
890-
"""
891-
SELECT * FROM distributed ORDER BY (name, value, polygon)
892-
""",
893-
query_id = query_id_engine_distributed
894-
)
895-
assert resp_def == resp_engine_distributed
896-
897-
node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'")
898-
899-
hosts_engine_single_node = node.query(
900-
f"""
901-
SELECT uniq(hostname)
902-
FROM clusterAllReplicas('cluster_simple', system.query_log)
903-
WHERE type='QueryFinish' AND initial_query_id='{query_id_engine_single_node}'
904-
"""
905-
)
906-
assert int(hosts_engine_single_node) == 1
907-
hosts_engine_distributed = node.query(
908-
f"""
909-
SELECT uniq(hostname)
910-
FROM clusterAllReplicas('cluster_simple', system.query_log)
911-
WHERE type='QueryFinish' AND initial_query_id='{query_id_engine_distributed}'
912-
"""
913-
)
914-
assert int(hosts_engine_distributed) == 3
920+
node.query("SET allow_experimental_analyzer = DEFAULT")

0 commit comments

Comments
 (0)