diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index cd431ae934db..7eb6b8eba36b 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -64,9 +64,15 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() /// controls new feature and it's 'true' by default, use 'false' as previous_value). /// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972) /// Note: please check if the key already exists to prevent duplicate entries. + addSettingsChanges(settings_changes_history, "25.2.1.20000", + { + // Altinity Antalya modifications atop of 25.2 + {"object_storage_cluster", "", "", "New setting"}, + {"object_storage_max_nodes", 0, 0, "New setting"}, + }); addSettingsChanges(settings_changes_history, "24.12.2.20000", { - // Altinity Antalya modifications + // Altinity Antalya modifications atop of 24.12 {"input_format_parquet_use_metadata_cache", true, true, "New setting, turned ON by default"}, // https://github.com/Altinity/ClickHouse/pull/586 }); addSettingsChanges(settings_changes_history, "24.12", diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index 7feebbbd7921..89d59f6cd4e4 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -66,6 +66,9 @@ class ConditionSelectivityEstimator; class ActionsDAG; +class IObjectStorage; +using ObjectStoragePtr = std::shared_ptr; + struct ColumnSize { size_t marks = 0; diff --git a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp index ba1703b72303..12567d0c5893 100644 --- a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp @@ -76,7 +76,7 @@ createStorageObjectStorage(const StorageFactory::Arguments & args, StorageObject // We only want to perform write actions (e.g. create a container in Azure) when the table is being created, // and we want to avoid it when we load the table after a server restart. configuration->createObjectStorage(context, /* is_readonly */ args.mode != LoadingStrictnessLevel::CREATE), - args.getContext(), /// Use global context. + args.getContext(), args.table_id, args.columns, args.constraints, diff --git a/src/TableFunctions/TableFunctionObjectStorage.cpp b/src/TableFunctions/TableFunctionObjectStorage.cpp index 7849094a71d7..62d017c854d6 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.cpp +++ b/src/TableFunctions/TableFunctionObjectStorage.cpp @@ -140,17 +140,6 @@ void registerTableFunctionObjectStorage(TableFunctionFactory & factory) { UNUSED(factory); #if USE_AWS_S3 - factory.registerFunction>( - { - .documentation = - { - .description=R"(The table function can be used to read the data stored on AWS S3.)", - .examples{{"s3", "SELECT * FROM s3(url, access_key_id, secret_access_key)", ""} - }, - .category{""}}, - .allow_readonly = false - }); - factory.registerFunction>( { .documentation = @@ -234,29 +223,6 @@ void registerTableFunctionIceberg(TableFunctionFactory & factory) .examples{{"iceberg", "SELECT * FROM iceberg(url, access_key_id, secret_access_key)", ""}}, .category{""}}, .allow_readonly = false}); - factory.registerFunction( - {.documentation - = {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store.)", - .examples{{"icebergS3", "SELECT * FROM icebergS3(url, access_key_id, secret_access_key)", ""}}, - .category{""}}, - .allow_readonly = false}); - -#endif -#if USE_AZURE_BLOB_STORAGE - factory.registerFunction( - {.documentation - = {.description = R"(The table function can be used to read the Iceberg table stored on Azure object store.)", - .examples{{"icebergAzure", "SELECT * FROM icebergAzure(url, access_key_id, secret_access_key)", ""}}, - .category{""}}, - .allow_readonly = false}); -#endif -#if USE_HDFS - factory.registerFunction( - {.documentation - = {.description = R"(The table function can be used to read the Iceberg table stored on HDFS virtual filesystem.)", - .examples{{"icebergHDFS", "SELECT * FROM icebergHDFS(url)", ""}}, - .category{""}}, - .allow_readonly = false}); #endif factory.registerFunction( {.documentation @@ -268,42 +234,11 @@ void registerTableFunctionIceberg(TableFunctionFactory & factory) #endif -#if USE_AWS_S3 -#if USE_PARQUET && USE_DELTA_KERNEL_RS -void registerTableFunctionDeltaLake(TableFunctionFactory & factory) -{ - factory.registerFunction( - {.documentation - = {.description = R"(The table function can be used to read the DeltaLake table stored on object store.)", - .examples{{"deltaLake", "SELECT * FROM deltaLake(url, access_key_id, secret_access_key)", ""}}, - .category{""}}, - .allow_readonly = false}); -} -#endif - -void registerTableFunctionHudi(TableFunctionFactory & factory) -{ - factory.registerFunction( - {.documentation - = {.description = R"(The table function can be used to read the Hudi table stored on object store.)", - .examples{{"hudi", "SELECT * FROM hudi(url, access_key_id, secret_access_key)", ""}}, - .category{""}}, - .allow_readonly = false}); -} - -#endif - void registerDataLakeTableFunctions(TableFunctionFactory & factory) { UNUSED(factory); #if USE_AVRO registerTableFunctionIceberg(factory); #endif -#if USE_AWS_S3 -#if USE_PARQUET && USE_DELTA_KERNEL_RS - registerTableFunctionDeltaLake(factory); -#endif - registerTableFunctionHudi(factory); -#endif } } diff --git a/src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp b/src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp index 1b08e85321fd..6fe9f6853dce 100644 --- a/src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp +++ b/src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp @@ -278,6 +278,7 @@ void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & fa ); #endif + #if USE_AWS_S3 && USE_PARQUET && USE_DELTA_KERNEL_RS factory.registerFunction( { diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index 63e546429151..837be9cc6a0f 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -721,73 +721,6 @@ def test_remote_no_hedged(started_cluster): assert TSV(pure_s3) == TSV(s3_distributed) -def test_distributed_s3_table_engine(started_cluster): - node = started_cluster.instances["s0_0_0"] - - resp_def = node.query( - """ - SELECT * from s3Cluster( - 'cluster_simple', - 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', - 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon) - """ - ) - - node.query("DROP TABLE IF EXISTS single_node"); - node.query( - """ - CREATE TABLE single_node - (name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))) - ENGINE=S3('http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV') - """ - ) - query_id_engine_single_node = str(uuid.uuid4()) - resp_engine_single_node = node.query( - """ - SELECT * FROM single_node ORDER BY (name, value, polygon) - """, - query_id = query_id_engine_single_node - ) - assert resp_def == resp_engine_single_node - - node.query("DROP TABLE IF EXISTS distributed"); - node.query( - """ - CREATE TABLE distributed - (name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))) - ENGINE=S3('http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV') - SETTINGS object_storage_cluster='cluster_simple' - """ - ) - query_id_engine_distributed = str(uuid.uuid4()) - resp_engine_distributed = node.query( - """ - SELECT * FROM distributed ORDER BY (name, value, polygon) - """, - query_id = query_id_engine_distributed - ) - assert resp_def == resp_engine_distributed - - node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'") - - hosts_engine_single_node = node.query( - f""" - SELECT uniq(hostname) - FROM clusterAllReplicas('cluster_simple', system.query_log) - WHERE type='QueryFinish' AND initial_query_id='{query_id_engine_single_node}' - """ - ) - assert int(hosts_engine_single_node) == 1 - hosts_engine_distributed = node.query( - f""" - SELECT uniq(hostname) - FROM clusterAllReplicas('cluster_simple', system.query_log) - WHERE type='QueryFinish' AND initial_query_id='{query_id_engine_distributed}' - """ - ) - assert int(hosts_engine_distributed) == 3 - - def test_hive_partitioning(started_cluster): node = started_cluster.instances["s0_0_0"] for i in range(1, 5): @@ -912,3 +845,70 @@ def test_hive_partitioning(started_cluster): ) cluster_optimized_traffic = int(cluster_optimized_traffic) assert cluster_optimized_traffic == optimized_traffic + + +def test_distributed_s3_table_engine(started_cluster): + node = started_cluster.instances["s0_0_0"] + + resp_def = node.query( + """ + SELECT * from s3Cluster( + 'cluster_simple', + 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon) + """ + ) + + node.query("DROP TABLE IF EXISTS single_node"); + node.query( + """ + CREATE TABLE single_node + (name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))) + ENGINE=S3('http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV') + """ + ) + query_id_engine_single_node = str(uuid.uuid4()) + resp_engine_single_node = node.query( + """ + SELECT * FROM single_node ORDER BY (name, value, polygon) + """, + query_id = query_id_engine_single_node + ) + assert resp_def == resp_engine_single_node + + node.query("DROP TABLE IF EXISTS distributed"); + node.query( + """ + CREATE TABLE distributed + (name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))) + ENGINE=S3('http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV') + SETTINGS object_storage_cluster='cluster_simple' + """ + ) + query_id_engine_distributed = str(uuid.uuid4()) + resp_engine_distributed = node.query( + """ + SELECT * FROM distributed ORDER BY (name, value, polygon) + """, + query_id = query_id_engine_distributed + ) + assert resp_def == resp_engine_distributed + + node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'") + + hosts_engine_single_node = node.query( + f""" + SELECT uniq(hostname) + FROM clusterAllReplicas('cluster_simple', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id_engine_single_node}' + """ + ) + assert int(hosts_engine_single_node) == 1 + hosts_engine_distributed = node.query( + f""" + SELECT uniq(hostname) + FROM clusterAllReplicas('cluster_simple', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id_engine_distributed}' + """ + ) + assert int(hosts_engine_distributed) == 3