Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
3485578
Test for remote+s3Cluster
ianton-ru Apr 4, 2025
32a8fbb
Use INITIAL_QUERY for remote() call
ianton-ru Dec 10, 2024
ca122a5
Avoid CLIENT_INFO_DOES_NOT_MATCH error
ianton-ru Dec 10, 2024
18a5350
Workaroung for remote with multiple shards
ianton-ru Dec 10, 2024
8ac5a73
Workaround to CLIENT_INFO_DOES_NOT_MATCH with 'TCP not equal to HTTP'
ianton-ru Dec 10, 2024
6375939
Keep initial_query_id for remote with INITIAL_QUERY
ianton-ru Dec 10, 2024
8f526b8
Left QueryID for server only
ianton-ru Jan 3, 2025
4957183
Fixes after few comments
ianton-ru Jan 14, 2025
c640138
s3Cluster hive optimization
ianton-ru Dec 24, 2024
c23eeb0
Style fix
ianton-ru Dec 30, 2024
96deb62
Fix tidy build
ianton-ru Dec 30, 2024
82bcc63
Fix test
ianton-ru Dec 30, 2024
096c406
Do not use ObjectFilter when not required
ianton-ru Jan 2, 2025
1f0d9f6
Fix test
ianton-ru Jan 2, 2025
1117652
Alternative syntax for object storage cluster functions
ianton-ru Jan 15, 2025
622489c
Fix build
ianton-ru Jan 17, 2025
2625d14
Rename settings
ianton-ru Jan 17, 2025
ab06c3b
Fix build
ianton-ru Apr 4, 2025
9bd2ea3
Distributed request to tables with Object Storage Engines
ianton-ru Jan 30, 2025
7a54424
Fix tests
ianton-ru Feb 7, 2025
fe89fa2
Fixes after review
ianton-ru Feb 13, 2025
536c4d2
More fixes after review
ianton-ru Feb 14, 2025
50fc94f
Rename getTableFunctionArguments to addPathAndAccessKeysToArgs
ianton-ru Feb 14, 2025
5cb7da7
More refactoring
ianton-ru Feb 14, 2025
b98865a
Add ability to choose object storage cluster in select query
ianton-ru Feb 18, 2025
eaba354
Fix write to pure engine
ianton-ru Feb 18, 2025
ca0b390
Fix virtual columns for pure storage
ianton-ru Feb 19, 2025
7e6de42
More fixes
ianton-ru Feb 19, 2025
3db68b0
Do not update configuration twice
ianton-ru Feb 20, 2025
96b0b48
Create pure storage on create
ianton-ru Feb 20, 2025
e4d2bf8
More simple
ianton-ru Feb 26, 2025
25b081c
Rename cluster_name_ variable to cluster_name_from_settings
ianton-ru Feb 27, 2025
45474c3
Changes after review
ianton-ru Feb 27, 2025
ba8eec2
Fix initialization order
ianton-ru Mar 1, 2025
0344ac4
Fix after merge
ianton-ru Mar 3, 2025
b66f70f
std::atomic update
ianton-ru Mar 5, 2025
2914c67
Fix build
ianton-ru Apr 4, 2025
e1427d7
Autodiscovery dynamic clusters
ianton-ru Feb 11, 2025
31a1892
More simple code
ianton-ru Feb 21, 2025
6ea1ac2
Fix watching for empty cluster nodes
ianton-ru Feb 21, 2025
8ee7470
Fix style and tidy build
ianton-ru Feb 21, 2025
7c3391f
Use single structure for update flags
ianton-ru Feb 21, 2025
fa2d408
Fix use string after remove
ianton-ru Feb 21, 2025
becb052
Fix build
ianton-ru Apr 4, 2025
eafa208
Limit parsing threads for distibuted case
ianton-ru Feb 20, 2025
7519f28
Merge branch 'antalya-25.2' into antalya-25.2-ivashkin
Enmk Apr 6, 2025
061f036
Merge branch 'antalya-25.2' into antalya-25.2-ivashkin
Enmk Apr 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,15 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
/// controls new feature and it's 'true' by default, use 'false' as previous_value).
/// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972)
/// Note: please check if the key already exists to prevent duplicate entries.
addSettingsChanges(settings_changes_history, "25.2.1.20000",
{
// Altinity Antalya modifications atop of 25.2
{"object_storage_cluster", "", "", "New setting"},
{"object_storage_max_nodes", 0, 0, "New setting"},
});
addSettingsChanges(settings_changes_history, "24.12.2.20000",
{
// Altinity Antalya modifications
// Altinity Antalya modifications atop of 24.12
{"input_format_parquet_use_metadata_cache", true, true, "New setting, turned ON by default"}, // https://github.com/Altinity/ClickHouse/pull/586
});
addSettingsChanges(settings_changes_history, "24.12",
Expand Down
3 changes: 3 additions & 0 deletions src/Storages/IStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ class ConditionSelectivityEstimator;

class ActionsDAG;

class IObjectStorage;
using ObjectStoragePtr = std::shared_ptr<IObjectStorage>;

struct ColumnSize
{
size_t marks = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ createStorageObjectStorage(const StorageFactory::Arguments & args, StorageObject
// We only want to perform write actions (e.g. create a container in Azure) when the table is being created,
// and we want to avoid it when we load the table after a server restart.
configuration->createObjectStorage(context, /* is_readonly */ args.mode != LoadingStrictnessLevel::CREATE),
args.getContext(), /// Use global context.
args.getContext(),
args.table_id,
args.columns,
args.constraints,
Expand Down
65 changes: 0 additions & 65 deletions src/TableFunctions/TableFunctionObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,17 +140,6 @@ void registerTableFunctionObjectStorage(TableFunctionFactory & factory)
{
UNUSED(factory);
#if USE_AWS_S3
factory.registerFunction<TableFunctionObjectStorage<S3Definition, StorageS3Configuration>>(
{
.documentation =
{
.description=R"(The table function can be used to read the data stored on AWS S3.)",
.examples{{"s3", "SELECT * FROM s3(url, access_key_id, secret_access_key)", ""}
},
.category{""}},
.allow_readonly = false
});

factory.registerFunction<TableFunctionObjectStorage<GCSDefinition, StorageS3Configuration>>(
{
.documentation =
Expand Down Expand Up @@ -234,29 +223,6 @@ void registerTableFunctionIceberg(TableFunctionFactory & factory)
.examples{{"iceberg", "SELECT * FROM iceberg(url, access_key_id, secret_access_key)", ""}},
.category{""}},
.allow_readonly = false});
factory.registerFunction<TableFunctionIcebergS3>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store.)",
.examples{{"icebergS3", "SELECT * FROM icebergS3(url, access_key_id, secret_access_key)", ""}},
.category{""}},
.allow_readonly = false});

#endif
#if USE_AZURE_BLOB_STORAGE
factory.registerFunction<TableFunctionIcebergAzure>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on Azure object store.)",
.examples{{"icebergAzure", "SELECT * FROM icebergAzure(url, access_key_id, secret_access_key)", ""}},
.category{""}},
.allow_readonly = false});
#endif
#if USE_HDFS
factory.registerFunction<TableFunctionIcebergHDFS>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on HDFS virtual filesystem.)",
.examples{{"icebergHDFS", "SELECT * FROM icebergHDFS(url)", ""}},
.category{""}},
.allow_readonly = false});
#endif
factory.registerFunction<TableFunctionIcebergLocal>(
{.documentation
Expand All @@ -268,42 +234,11 @@ void registerTableFunctionIceberg(TableFunctionFactory & factory)
#endif


#if USE_AWS_S3
#if USE_PARQUET && USE_DELTA_KERNEL_RS
void registerTableFunctionDeltaLake(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionDeltaLake>(
{.documentation
= {.description = R"(The table function can be used to read the DeltaLake table stored on object store.)",
.examples{{"deltaLake", "SELECT * FROM deltaLake(url, access_key_id, secret_access_key)", ""}},
.category{""}},
.allow_readonly = false});
}
#endif

void registerTableFunctionHudi(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionHudi>(
{.documentation
= {.description = R"(The table function can be used to read the Hudi table stored on object store.)",
.examples{{"hudi", "SELECT * FROM hudi(url, access_key_id, secret_access_key)", ""}},
.category{""}},
.allow_readonly = false});
}

#endif

void registerDataLakeTableFunctions(TableFunctionFactory & factory)
{
UNUSED(factory);
#if USE_AVRO
registerTableFunctionIceberg(factory);
#endif
#if USE_AWS_S3
#if USE_PARQUET && USE_DELTA_KERNEL_RS
registerTableFunctionDeltaLake(factory);
#endif
registerTableFunctionHudi(factory);
#endif
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & fa
);
#endif


#if USE_AWS_S3 && USE_PARQUET && USE_DELTA_KERNEL_RS
factory.registerFunction<TableFunctionDeltaLakeClusterFallback>(
{
Expand Down
134 changes: 67 additions & 67 deletions tests/integration/test_s3_cluster/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -721,73 +721,6 @@ def test_remote_no_hedged(started_cluster):
assert TSV(pure_s3) == TSV(s3_distributed)


def test_distributed_s3_table_engine(started_cluster):
node = started_cluster.instances["s0_0_0"]

resp_def = node.query(
"""
SELECT * from s3Cluster(
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon)
"""
)

node.query("DROP TABLE IF EXISTS single_node");
node.query(
"""
CREATE TABLE single_node
(name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64))))
ENGINE=S3('http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV')
"""
)
query_id_engine_single_node = str(uuid.uuid4())
resp_engine_single_node = node.query(
"""
SELECT * FROM single_node ORDER BY (name, value, polygon)
""",
query_id = query_id_engine_single_node
)
assert resp_def == resp_engine_single_node

node.query("DROP TABLE IF EXISTS distributed");
node.query(
"""
CREATE TABLE distributed
(name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64))))
ENGINE=S3('http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV')
SETTINGS object_storage_cluster='cluster_simple'
"""
)
query_id_engine_distributed = str(uuid.uuid4())
resp_engine_distributed = node.query(
"""
SELECT * FROM distributed ORDER BY (name, value, polygon)
""",
query_id = query_id_engine_distributed
)
assert resp_def == resp_engine_distributed

node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'")

hosts_engine_single_node = node.query(
f"""
SELECT uniq(hostname)
FROM clusterAllReplicas('cluster_simple', system.query_log)
WHERE type='QueryFinish' AND initial_query_id='{query_id_engine_single_node}'
"""
)
assert int(hosts_engine_single_node) == 1
hosts_engine_distributed = node.query(
f"""
SELECT uniq(hostname)
FROM clusterAllReplicas('cluster_simple', system.query_log)
WHERE type='QueryFinish' AND initial_query_id='{query_id_engine_distributed}'
"""
)
assert int(hosts_engine_distributed) == 3


def test_hive_partitioning(started_cluster):
node = started_cluster.instances["s0_0_0"]
for i in range(1, 5):
Expand Down Expand Up @@ -912,3 +845,70 @@ def test_hive_partitioning(started_cluster):
)
cluster_optimized_traffic = int(cluster_optimized_traffic)
assert cluster_optimized_traffic == optimized_traffic


def test_distributed_s3_table_engine(started_cluster):
node = started_cluster.instances["s0_0_0"]

resp_def = node.query(
"""
SELECT * from s3Cluster(
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon)
"""
)

node.query("DROP TABLE IF EXISTS single_node");
node.query(
"""
CREATE TABLE single_node
(name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64))))
ENGINE=S3('http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV')
"""
)
query_id_engine_single_node = str(uuid.uuid4())
resp_engine_single_node = node.query(
"""
SELECT * FROM single_node ORDER BY (name, value, polygon)
""",
query_id = query_id_engine_single_node
)
assert resp_def == resp_engine_single_node

node.query("DROP TABLE IF EXISTS distributed");
node.query(
"""
CREATE TABLE distributed
(name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64))))
ENGINE=S3('http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV')
SETTINGS object_storage_cluster='cluster_simple'
"""
)
query_id_engine_distributed = str(uuid.uuid4())
resp_engine_distributed = node.query(
"""
SELECT * FROM distributed ORDER BY (name, value, polygon)
""",
query_id = query_id_engine_distributed
)
assert resp_def == resp_engine_distributed

node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'")

hosts_engine_single_node = node.query(
f"""
SELECT uniq(hostname)
FROM clusterAllReplicas('cluster_simple', system.query_log)
WHERE type='QueryFinish' AND initial_query_id='{query_id_engine_single_node}'
"""
)
assert int(hosts_engine_single_node) == 1
hosts_engine_distributed = node.query(
f"""
SELECT uniq(hostname)
FROM clusterAllReplicas('cluster_simple', system.query_log)
WHERE type='QueryFinish' AND initial_query_id='{query_id_engine_distributed}'
"""
)
assert int(hosts_engine_distributed) == 3
Loading