From 258bbbb9bc6a595d2ce74fe6dc36640e58c76c6f Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Thu, 12 Jun 2025 16:26:25 +0200 Subject: [PATCH 1/5] lock_object_storage_task_distribution_ms setting --- src/Core/Settings.cpp | 3 + src/Core/SettingsChangesHistory.cpp | 1 + .../StorageObjectStorageCluster.cpp | 6 +- .../StorageObjectStorageSource.cpp | 19 +++- ...rageObjectStorageStableTaskDistributor.cpp | 95 +++++++++++++++++-- ...torageObjectStorageStableTaskDistributor.h | 32 ++++++- .../test_s3_cache_locality/test.py | 69 +++++++------- 7 files changed, 181 insertions(+), 44 deletions(-) diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 380a2ab33e03..1aff9006c305 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -6141,6 +6141,9 @@ Cache the list of objects returned by list objects calls in object storage )", EXPERIMENTAL) \ DECLARE(Bool, object_storage_remote_initiator, false, R"( Execute request to object storage as remote on one of object_storage_cluster nodes. +)", EXPERIMENTAL) \ + DECLARE(UInt64, lock_object_storage_task_distribution_ms, 0, R"( +In object storage distribution queries do not distibute tasks on non-prefetched nodes until prefetched node is active. )", EXPERIMENTAL) \ \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 987a1ee7598d..feff390f4aae 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -72,6 +72,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"object_storage_cluster", "", "", "New setting"}, {"object_storage_max_nodes", 0, 0, "New setting"}, {"use_object_storage_list_objects_cache", true, false, "New setting."}, + {"lock_object_storage_task_distribution_ms", 0, 0, "New setting."}, }); addSettingsChanges(settings_changes_history, "25.3", { diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 6fdb2c0d4b9d..7758822f7284 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -27,6 +27,7 @@ namespace Setting { extern const SettingsBool use_hive_partitioning; extern const SettingsString object_storage_cluster; + extern const SettingsUInt64 lock_object_storage_task_distribution_ms; } namespace ErrorCodes @@ -386,7 +387,10 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten } } - auto task_distributor = std::make_shared(iterator, ids_of_hosts); + auto task_distributor = std::make_shared( + iterator, + ids_of_hosts, + local_context->getSettingsRef()[Setting::lock_object_storage_task_distribution_ms]); auto callback = std::make_shared( [task_distributor](size_t number_of_current_replica) mutable -> String { diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index f51e0e5860f2..91795c81c205 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -430,16 +431,32 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade ObjectInfoPtr object_info; auto query_settings = configuration->getQuerySettings(context_); + bool not_a_path = false; + do { + not_a_path = false; object_info = file_iterator->next(processor); if (!object_info || object_info->getPath().empty()) return {}; + StorageObjectStorageStableTaskDistributor::CommandInTaskResponse command(object_info->getPath()); + if (command.is_parsed()) + { + auto retry_after_us = command.get_retry_after_us(); + if (retry_after_us.has_value()) + { + not_a_path = true; + /// TODO: Make asyncronous waiting without sleep in thread + sleepForMicroseconds(std::min(100000ul, retry_after_us.value())); + continue; + } + } + object_info->loadMetadata(object_storage); } - while (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0); + while (not_a_path || (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0)); QueryPipelineBuilder builder; std::shared_ptr source; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index d9ca7b344637..fcdf4bc2cb0c 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -3,15 +3,21 @@ #include #include +#include +#include +#include + namespace DB { StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor( std::shared_ptr iterator_, - std::vector ids_of_nodes_) + std::vector ids_of_nodes_, + uint64_t lock_object_storage_task_distribution_ms_) : iterator(std::move(iterator_)) , connection_to_files(ids_of_nodes_.size()) , ids_of_nodes(ids_of_nodes_) + , lock_object_storage_task_distribution_us(lock_object_storage_task_distribution_ms_ * 1000) , iterator_exhausted(false) { } @@ -24,6 +30,8 @@ std::optional StorageObjectStorageStableTaskDistributor::getNextTask(siz number_of_current_replica ); + saveLastNodeActivity(number_of_current_replica); + // 1. Check pre-queued files first if (auto file = getPreQueuedFile(number_of_current_replica)) return file; @@ -148,7 +156,7 @@ std::optional StorageObjectStorageStableTaskDistributor::getMatchingFile // Queue file for its assigned replica { std::lock_guard lock(mutex); - unprocessed_files.insert(file_path); + unprocessed_files[file_path] = number_of_current_replica; connection_to_files[file_replica_idx].push_back(file_path); } } @@ -158,25 +166,96 @@ std::optional StorageObjectStorageStableTaskDistributor::getMatchingFile std::optional StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(size_t number_of_current_replica) { + /// Limit time of node activity to keep task in queue + Poco::Timestamp activity_limit; + Poco::Timestamp oldest_activity; + if (lock_object_storage_task_distribution_us) + activity_limit -= lock_object_storage_task_distribution_us; + std::lock_guard lock(mutex); if (!unprocessed_files.empty()) { auto it = unprocessed_files.begin(); - String next_file = *it; - unprocessed_files.erase(it); + + while (it != unprocessed_files.end()) + { + auto last_activity = last_node_activity.find(it->second); + if (!lock_object_storage_task_distribution_us + || last_activity == last_node_activity.end() + || activity_limit > last_activity->second) + { + String next_file = it->first; + unprocessed_files.erase(it); + + LOG_TRACE( + log, + "Iterator exhausted. Assigning unprocessed file {} to replica {}", + next_file, + number_of_current_replica + ); + + return next_file; + } + + oldest_activity = std::min(oldest_activity, last_activity->second); + ++it; + } LOG_TRACE( log, - "Iterator exhausted. Assigning unprocessed file {} to replica {}", - next_file, - number_of_current_replica + "No unprocessed file for replica {}, need to retry after {} us", + number_of_current_replica, + oldest_activity - activity_limit ); - return next_file; + /// All unprocessed files owned by alive replicas with recenlty activity + /// Need to retry after (oldest_activity - activity_limit) microseconds + CommandInTaskResponse response; + response.set_retry_after_us(oldest_activity - activity_limit); + return response.to_string(); } return std::nullopt; } +void StorageObjectStorageStableTaskDistributor::saveLastNodeActivity(size_t number_of_current_replica) +{ + Poco::Timestamp now; + std::lock_guard lock(mutex); + last_node_activity[number_of_current_replica] = now; +} + +StorageObjectStorageStableTaskDistributor::CommandInTaskResponse::CommandInTaskResponse(const std::string & task) +{ + Poco::JSON::Parser parser; + try + { + auto json = parser.parse(task).extract(); + if (!json) + return; + + successfully_parsed = true; + + if (json->has("retry_after_us")) + retry_after_us = json->getValue("retry_after_us"); + } + catch (const Poco::JSON::JSONException &) + { /// Not a JSON + return; + } +} + +std::string StorageObjectStorageStableTaskDistributor::CommandInTaskResponse::to_string() const +{ + Poco::JSON::Object json; + if (retry_after_us.has_value()) + json.set("retry_after_us", retry_after_us.value()); + + std::ostringstream oss; + oss.exceptions(std::ios::failbit); + Poco::JSON::Stringifier::stringify(json, oss); + return oss.str(); +} + } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h index 678ff4372f5f..9b0992904800 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h @@ -5,7 +5,11 @@ #include #include #include + +#include + #include +#include #include #include #include @@ -16,9 +20,28 @@ namespace DB class StorageObjectStorageStableTaskDistributor { public: + class CommandInTaskResponse + { + public: + CommandInTaskResponse() {} + CommandInTaskResponse(const std::string & task); + + bool is_parsed() const { return successfully_parsed; } + void set_retry_after_us(uint64_t time_us) { retry_after_us = time_us; } + + std::string to_string() const; + + std::optional get_retry_after_us() const { return retry_after_us; } + + private: + bool successfully_parsed = false; + std::optional retry_after_us; + }; + StorageObjectStorageStableTaskDistributor( std::shared_ptr iterator_, - std::vector ids_of_nodes_); + std::vector ids_of_nodes_, + uint64_t lock_object_storage_task_distribution_ms_); std::optional getNextTask(size_t number_of_current_replica); @@ -28,12 +51,17 @@ class StorageObjectStorageStableTaskDistributor std::optional getMatchingFileFromIterator(size_t number_of_current_replica); std::optional getAnyUnprocessedFile(size_t number_of_current_replica); + void saveLastNodeActivity(size_t number_of_current_replica); + std::shared_ptr iterator; std::vector> connection_to_files; - std::unordered_set unprocessed_files; + /// Map of unprocessed files in format filename => number of prefetched replica + std::unordered_map unprocessed_files; std::vector ids_of_nodes; + std::unordered_map last_node_activity; + Poco::Timestamp::TimeDiff lock_object_storage_task_distribution_us; std::mutex mutex; bool iterator_exhausted = false; diff --git a/tests/integration/test_s3_cache_locality/test.py b/tests/integration/test_s3_cache_locality/test.py index da667fad35b9..e49c255b4285 100644 --- a/tests/integration/test_s3_cache_locality/test.py +++ b/tests/integration/test_s3_cache_locality/test.py @@ -15,12 +15,12 @@ SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) -def create_buckets_s3(cluster): +def create_buckets_s3(cluster, files=1000): minio = cluster.minio_client s3_data = [] - for file_number in range(1000): + for file_number in range(files): file_name = f"data/generated/file_{file_number}.csv" os.makedirs(os.path.join(SCRIPT_DIR, "data/generated/"), exist_ok=True) s3_data.append(file_name) @@ -60,6 +60,7 @@ def started_cluster(): macros={"replica": f"clickhouse{i}"}, with_minio=True, with_zookeeper=True, + stay_alive=True, ) logging.info("Starting cluster...") @@ -70,13 +71,21 @@ def started_cluster(): yield cluster finally: - shutil.rmtree(os.path.join(SCRIPT_DIR, "data/generated/")) + shutil.rmtree(os.path.join(SCRIPT_DIR, "data/generated/"), ignore_errors=True) cluster.shutdown() -def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache): +def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache, lock=False): for host in list(cluster.instances.values()): - host.query("SYSTEM DROP FILESYSTEM CACHE 'raw_s3_cache'", timeout=30) + host.query("SYSTEM DROP FILESYSTEM CACHE 'raw_s3_cache'", timeout=30, ignore_error=True) + + settings = { + "enable_filesystem_cache": enable_filesystem_cache, + "filesystem_cache_name": "'raw_s3_cache'", + } + + if lock: + settings["lock_object_storage_task_distribution_ms"] = 30000 query_id_first = str(uuid.uuid4()) result_first = node.query( @@ -84,9 +93,7 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, SELECT count(*) FROM s3Cluster('{cluster_first}', 'http://minio1:9001/root/data/generated/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64') WHERE b=42 - SETTINGS - enable_filesystem_cache={enable_filesystem_cache}, - filesystem_cache_name='raw_s3_cache' + SETTINGS {",".join(f"{k}={v}" for k, v in settings.items())} """, query_id=query_id_first, timeout=30, @@ -98,18 +105,15 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, SELECT count(*) FROM s3Cluster('{cluster_second}', 'http://minio1:9001/root/data/generated/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64') WHERE b=42 - SETTINGS - enable_filesystem_cache={enable_filesystem_cache}, - filesystem_cache_name='raw_s3_cache' + SETTINGS {",".join(f"{k}={v}" for k, v in settings.items())} """, query_id=query_id_second, timeout=30, ) assert result_second == expected_result - node.query("SYSTEM FLUSH LOGS", timeout=30) - node.query(f"SYSTEM FLUSH LOGS ON CLUSTER {cluster_first}", timeout=30) - node.query(f"SYSTEM FLUSH LOGS ON CLUSTER {cluster_second}", timeout=30) + for host in list(cluster.instances.values()): + host.query("SYSTEM FLUSH LOGS", timeout=30) s3_get_first = node.query( f""" @@ -133,19 +137,20 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, return int(s3_get_first), int(s3_get_second) -def check_s3_gets_repeat(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache): +def check_s3_gets_repeat(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache, lock=False): # Repeat test several times to get average result - iterations = 10 + iterations = 1 if lock else 10 s3_get_first_sum = 0 s3_get_second_sum = 0 for _ in range(iterations): - (s3_get_first, s3_get_second) = check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache) + (s3_get_first, s3_get_second) = check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache, lock) s3_get_first_sum += s3_get_first s3_get_second_sum += s3_get_second return s3_get_first_sum, s3_get_second_sum -def test_cache_locality(started_cluster): +@pytest.mark.parametrize("lock", [False, True]) +def test_cache_locality(started_cluster, lock): node = started_cluster.instances["clickhouse0"] expected_result = node.query( @@ -157,36 +162,36 @@ def test_cache_locality(started_cluster): ) # Algorithm does not give 100% guarantee, so add 10% on dispersion - dispersion = 0.1 + dispersion = 0.0 if lock else 0.1 # No cache - (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_12345', 0) + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_12345', 0, lock) assert s3_get_second == s3_get_first # With cache - (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_12345', 1) + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_12345', 1, lock) assert s3_get_second <= s3_get_first * dispersion # Different nodes order - (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_34512', 1) + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_34512', 1, lock) assert s3_get_second <= s3_get_first * dispersion # No last node - (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_1234', 1) - assert s3_get_second <= s3_get_first * (0.2 + dispersion) + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_1234', 1, lock) + assert s3_get_second <= s3_get_first * (0.211 + dispersion) # actual value - 24 for 100 files, 211 for 1000 # No first node - (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_2345', 1) - assert s3_get_second <= s3_get_first * (0.2 + dispersion) + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_2345', 1, lock) + assert s3_get_second <= s3_get_first * (0.189 + dispersion) # actual value - 12 for 100 files, 189 for 1000 # No first node, different nodes order - (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_4523', 1) - assert s3_get_second <= s3_get_first * (0.2 + dispersion) + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_4523', 1, lock) + assert s3_get_second <= s3_get_first * (0.189 + dispersion) # Add new node, different nodes order - (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_4523', 'cluster_12345', 1) - assert s3_get_second <= s3_get_first * (0.2 + dispersion) + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_4523', 'cluster_12345', 1, lock) + assert s3_get_second <= s3_get_first * (0.189 + dispersion) # New node and old node, different nodes order - (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_1234', 'cluster_4523', 1) - assert s3_get_second <= s3_get_first * (0.4375 + dispersion) + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_1234', 'cluster_4523', 1, lock) + assert s3_get_second <= s3_get_first * (0.400 + dispersion) # actual value - 36 for 100 files, 400 for 1000 From 27fb928df72900316a1bb8c0b59680fb0534fff7 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Mon, 23 Jun 2025 13:06:50 +0200 Subject: [PATCH 2/5] Remove timeouts --- tests/integration/test_s3_cache_locality/test.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/tests/integration/test_s3_cache_locality/test.py b/tests/integration/test_s3_cache_locality/test.py index e49c255b4285..567dbbc25ce4 100644 --- a/tests/integration/test_s3_cache_locality/test.py +++ b/tests/integration/test_s3_cache_locality/test.py @@ -77,7 +77,7 @@ def started_cluster(): def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache, lock=False): for host in list(cluster.instances.values()): - host.query("SYSTEM DROP FILESYSTEM CACHE 'raw_s3_cache'", timeout=30, ignore_error=True) + host.query("SYSTEM DROP FILESYSTEM CACHE 'raw_s3_cache'", ignore_error=True) settings = { "enable_filesystem_cache": enable_filesystem_cache, @@ -96,7 +96,6 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, SETTINGS {",".join(f"{k}={v}" for k, v in settings.items())} """, query_id=query_id_first, - timeout=30, ) assert result_first == expected_result query_id_second = str(uuid.uuid4()) @@ -108,12 +107,11 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, SETTINGS {",".join(f"{k}={v}" for k, v in settings.items())} """, query_id=query_id_second, - timeout=30, ) assert result_second == expected_result for host in list(cluster.instances.values()): - host.query("SYSTEM FLUSH LOGS", timeout=30) + host.query("SYSTEM FLUSH LOGS") s3_get_first = node.query( f""" @@ -122,7 +120,6 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, WHERE type='QueryFinish' AND initial_query_id='{query_id_first}' """, - timeout=30, ) s3_get_second = node.query( f""" @@ -131,7 +128,6 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, WHERE type='QueryFinish' AND initial_query_id='{query_id_second}' """, - timeout=30, ) return int(s3_get_first), int(s3_get_second) From fd36f73833fb9d96f11b0e1b2d2624e0603f3091 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 24 Jun 2025 13:49:19 +0200 Subject: [PATCH 3/5] Faster test --- tests/integration/test_s3_cache_locality/test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_s3_cache_locality/test.py b/tests/integration/test_s3_cache_locality/test.py index 567dbbc25ce4..7aac310c1c2e 100644 --- a/tests/integration/test_s3_cache_locality/test.py +++ b/tests/integration/test_s3_cache_locality/test.py @@ -110,8 +110,8 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, ) assert result_second == expected_result - for host in list(cluster.instances.values()): - host.query("SYSTEM FLUSH LOGS") + node.query(f"SYSTEM FLUSH LOGS ON CLUSTER {cluster_first}") + node.query(f"SYSTEM FLUSH LOGS ON CLUSTER {cluster_second}") s3_get_first = node.query( f""" From d851e30dcccf59e262b56e3f3dd73ef0d7fe6ec8 Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Tue, 1 Jul 2025 15:26:51 +0200 Subject: [PATCH 4/5] Moved changes to 25.3 section --- src/Core/SettingsChangesHistory.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index feff390f4aae..9fb07c7ef5b5 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -66,13 +66,17 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() /// controls new feature and it's 'true' by default, use 'false' as previous_value). /// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972) /// Note: please check if the key already exists to prevent duplicate entries. + addSettingsChanges(settings_changes_history, "25.3.3.20000", + { + // Altinity Antalya modifications atop of 25.3 + {"lock_object_storage_task_distribution_ms", 0, 0, "New setting."}, + }); addSettingsChanges(settings_changes_history, "25.2.1.20000", { // Altinity Antalya modifications atop of 25.2 {"object_storage_cluster", "", "", "New setting"}, {"object_storage_max_nodes", 0, 0, "New setting"}, {"use_object_storage_list_objects_cache", true, false, "New setting."}, - {"lock_object_storage_task_distribution_ms", 0, 0, "New setting."}, }); addSettingsChanges(settings_changes_history, "25.3", { From d1f4b4c276d32f4c507d63d737feb2f7348ed24f Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 2 Jul 2025 18:01:04 +0200 Subject: [PATCH 5/5] Fix after review --- src/Disks/ObjectStorages/IObjectStorage.cpp | 36 ++++++++++++++++ src/Disks/ObjectStorages/IObjectStorage.h | 32 ++++++++++++-- .../StorageObjectStorageCluster.cpp | 15 ++++++- .../StorageObjectStorageSource.cpp | 14 ++++--- ...rageObjectStorageStableTaskDistributor.cpp | 42 ++----------------- ...torageObjectStorageStableTaskDistributor.h | 18 -------- .../test_s3_cache_locality/test.py | 36 ++++++++-------- 7 files changed, 109 insertions(+), 84 deletions(-) diff --git a/src/Disks/ObjectStorages/IObjectStorage.cpp b/src/Disks/ObjectStorages/IObjectStorage.cpp index f729b7ce6913..f8917e20e7de 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.cpp +++ b/src/Disks/ObjectStorages/IObjectStorage.cpp @@ -8,6 +8,10 @@ #include #include +#include +#include +#include + namespace DB { @@ -107,4 +111,36 @@ void RelativePathWithMetadata::loadMetadata(ObjectStoragePtr object_storage) } } +RelativePathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std::string & task) +{ + Poco::JSON::Parser parser; + try + { + auto json = parser.parse(task).extract(); + if (!json) + return; + + successfully_parsed = true; + + if (json->has("retry_after_us")) + retry_after_us = json->getValue("retry_after_us"); + } + catch (const Poco::JSON::JSONException &) + { /// Not a JSON + return; + } +} + +std::string RelativePathWithMetadata::CommandInTaskResponse::to_string() const +{ + Poco::JSON::Object json; + if (retry_after_us.has_value()) + json.set("retry_after_us", retry_after_us.value()); + + std::ostringstream oss; + oss.exceptions(std::ios::failbit); + Poco::JSON::Stringifier::stringify(json, oss); + return oss.str(); +} + } diff --git a/src/Disks/ObjectStorages/IObjectStorage.h b/src/Disks/ObjectStorages/IObjectStorage.h index 417fa10e5212..15bacb1ca0eb 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.h +++ b/src/Disks/ObjectStorages/IObjectStorage.h @@ -66,15 +66,37 @@ struct ObjectMetadata struct RelativePathWithMetadata { + class CommandInTaskResponse + { + public: + CommandInTaskResponse() {} + CommandInTaskResponse(const std::string & task); + + bool is_parsed() const { return successfully_parsed; } + void set_retry_after_us(Poco::Timestamp::TimeDiff time_us) { retry_after_us = time_us; } + + std::string to_string() const; + + std::optional get_retry_after_us() const { return retry_after_us; } + + private: + bool successfully_parsed = false; + std::optional retry_after_us; + }; + String relative_path; std::optional metadata; + CommandInTaskResponse command; RelativePathWithMetadata() = default; - explicit RelativePathWithMetadata(String relative_path_, std::optional metadata_ = std::nullopt) - : relative_path(std::move(relative_path_)) - , metadata(std::move(metadata_)) - {} + explicit RelativePathWithMetadata(const String & task_string, std::optional metadata_ = std::nullopt) + : metadata(std::move(metadata_)) + , command(task_string) + { + if (!command.is_parsed()) + relative_path = task_string; + } virtual ~RelativePathWithMetadata() = default; @@ -85,6 +107,8 @@ struct RelativePathWithMetadata virtual size_t fileSizeInArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); } void loadMetadata(ObjectStoragePtr object_storage); + + const CommandInTaskResponse & getCommand() const { return command; } }; struct ObjectKeyWithMetadata diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 7758822f7284..cdfad7f646cb 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -35,6 +35,7 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; extern const int UNKNOWN_FUNCTION; extern const int NOT_IMPLEMENTED; + extern const int INVALID_SETTING_VALUE; } @@ -387,10 +388,22 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten } } + uint64_t lock_object_storage_task_distribution_ms = local_context->getSettingsRef()[Setting::lock_object_storage_task_distribution_ms]; + + /// Check value to avoid negative result after conversion in microseconds. + /// Poco::Timestamp::TimeDiff is signed int 64. + static const uint64_t lock_object_storage_task_distribution_ms_max = 0x0020000000000000ULL; + if (lock_object_storage_task_distribution_ms > lock_object_storage_task_distribution_ms_max) + throw Exception(ErrorCodes::INVALID_SETTING_VALUE, + "Value lock_object_storage_task_distribution_ms is too big: {}, allowed maximum is {}", + lock_object_storage_task_distribution_ms, + lock_object_storage_task_distribution_ms_max + ); + auto task_distributor = std::make_shared( iterator, ids_of_hosts, - local_context->getSettingsRef()[Setting::lock_object_storage_task_distribution_ms]); + lock_object_storage_task_distribution_ms); auto callback = std::make_shared( [task_distributor](size_t number_of_current_replica) mutable -> String { diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 91795c81c205..814b99d78032 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -438,22 +438,26 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade not_a_path = false; object_info = file_iterator->next(processor); - if (!object_info || object_info->getPath().empty()) + if (!object_info) return {}; - StorageObjectStorageStableTaskDistributor::CommandInTaskResponse command(object_info->getPath()); - if (command.is_parsed()) + if (object_info->getCommand().is_parsed()) { - auto retry_after_us = command.get_retry_after_us(); + auto retry_after_us = object_info->getCommand().get_retry_after_us(); if (retry_after_us.has_value()) { not_a_path = true; /// TODO: Make asyncronous waiting without sleep in thread - sleepForMicroseconds(std::min(100000ul, retry_after_us.value())); + /// Now this sleep is on executor node in worker thread + /// Does not block query initiator + sleepForMicroseconds(std::min(Poco::Timestamp::TimeDiff(100000ul), retry_after_us.value())); continue; } } + if (object_info->getPath().empty()) + return {}; + object_info->loadMetadata(object_storage); } while (not_a_path || (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0)); diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index fcdf4bc2cb0c..b9c6e1e3ce0b 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -3,10 +3,6 @@ #include #include -#include -#include -#include - namespace DB { @@ -169,7 +165,7 @@ std::optional StorageObjectStorageStableTaskDistributor::getAnyUnprocess /// Limit time of node activity to keep task in queue Poco::Timestamp activity_limit; Poco::Timestamp oldest_activity; - if (lock_object_storage_task_distribution_us) + if (lock_object_storage_task_distribution_us > 0) activity_limit -= lock_object_storage_task_distribution_us; std::lock_guard lock(mutex); @@ -181,7 +177,7 @@ std::optional StorageObjectStorageStableTaskDistributor::getAnyUnprocess while (it != unprocessed_files.end()) { auto last_activity = last_node_activity.find(it->second); - if (!lock_object_storage_task_distribution_us + if (lock_object_storage_task_distribution_us <= 0 || last_activity == last_node_activity.end() || activity_limit > last_activity->second) { @@ -211,7 +207,7 @@ std::optional StorageObjectStorageStableTaskDistributor::getAnyUnprocess /// All unprocessed files owned by alive replicas with recenlty activity /// Need to retry after (oldest_activity - activity_limit) microseconds - CommandInTaskResponse response; + RelativePathWithMetadata::CommandInTaskResponse response; response.set_retry_after_us(oldest_activity - activity_limit); return response.to_string(); } @@ -226,36 +222,4 @@ void StorageObjectStorageStableTaskDistributor::saveLastNodeActivity(size_t numb last_node_activity[number_of_current_replica] = now; } -StorageObjectStorageStableTaskDistributor::CommandInTaskResponse::CommandInTaskResponse(const std::string & task) -{ - Poco::JSON::Parser parser; - try - { - auto json = parser.parse(task).extract(); - if (!json) - return; - - successfully_parsed = true; - - if (json->has("retry_after_us")) - retry_after_us = json->getValue("retry_after_us"); - } - catch (const Poco::JSON::JSONException &) - { /// Not a JSON - return; - } -} - -std::string StorageObjectStorageStableTaskDistributor::CommandInTaskResponse::to_string() const -{ - Poco::JSON::Object json; - if (retry_after_us.has_value()) - json.set("retry_after_us", retry_after_us.value()); - - std::ostringstream oss; - oss.exceptions(std::ios::failbit); - Poco::JSON::Stringifier::stringify(json, oss); - return oss.str(); -} - } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h index 9b0992904800..2132ba95a752 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h @@ -20,24 +20,6 @@ namespace DB class StorageObjectStorageStableTaskDistributor { public: - class CommandInTaskResponse - { - public: - CommandInTaskResponse() {} - CommandInTaskResponse(const std::string & task); - - bool is_parsed() const { return successfully_parsed; } - void set_retry_after_us(uint64_t time_us) { retry_after_us = time_us; } - - std::string to_string() const; - - std::optional get_retry_after_us() const { return retry_after_us; } - - private: - bool successfully_parsed = false; - std::optional retry_after_us; - }; - StorageObjectStorageStableTaskDistributor( std::shared_ptr iterator_, std::vector ids_of_nodes_, diff --git a/tests/integration/test_s3_cache_locality/test.py b/tests/integration/test_s3_cache_locality/test.py index 7aac310c1c2e..b7ce32c8b825 100644 --- a/tests/integration/test_s3_cache_locality/test.py +++ b/tests/integration/test_s3_cache_locality/test.py @@ -75,7 +75,8 @@ def started_cluster(): cluster.shutdown() -def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache, lock=False): +def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache, + lock_object_storage_task_distribution_ms): for host in list(cluster.instances.values()): host.query("SYSTEM DROP FILESYSTEM CACHE 'raw_s3_cache'", ignore_error=True) @@ -84,8 +85,8 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, "filesystem_cache_name": "'raw_s3_cache'", } - if lock: - settings["lock_object_storage_task_distribution_ms"] = 30000 + if lock_object_storage_task_distribution_ms > 0: + settings["lock_object_storage_task_distribution_ms"] = lock_object_storage_task_distribution_ms query_id_first = str(uuid.uuid4()) result_first = node.query( @@ -133,20 +134,21 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, return int(s3_get_first), int(s3_get_second) -def check_s3_gets_repeat(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache, lock=False): +def check_s3_gets_repeat(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache, + lock_object_storage_task_distribution_ms): # Repeat test several times to get average result - iterations = 1 if lock else 10 + iterations = 1 if lock_object_storage_task_distribution_ms > 0 else 10 s3_get_first_sum = 0 s3_get_second_sum = 0 for _ in range(iterations): - (s3_get_first, s3_get_second) = check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache, lock) + (s3_get_first, s3_get_second) = check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache, lock_object_storage_task_distribution_ms) s3_get_first_sum += s3_get_first s3_get_second_sum += s3_get_second return s3_get_first_sum, s3_get_second_sum -@pytest.mark.parametrize("lock", [False, True]) -def test_cache_locality(started_cluster, lock): +@pytest.mark.parametrize("lock_object_storage_task_distribution_ms ", [0, 30000]) +def test_cache_locality(started_cluster, lock_object_storage_task_distribution_ms): node = started_cluster.instances["clickhouse0"] expected_result = node.query( @@ -158,36 +160,36 @@ def test_cache_locality(started_cluster, lock): ) # Algorithm does not give 100% guarantee, so add 10% on dispersion - dispersion = 0.0 if lock else 0.1 + dispersion = 0.0 if lock_object_storage_task_distribution_ms > 0 else 0.1 # No cache - (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_12345', 0, lock) + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_12345', 0, lock_object_storage_task_distribution_ms) assert s3_get_second == s3_get_first # With cache - (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_12345', 1, lock) + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_12345', 1, lock_object_storage_task_distribution_ms) assert s3_get_second <= s3_get_first * dispersion # Different nodes order - (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_34512', 1, lock) + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_34512', 1, lock_object_storage_task_distribution_ms) assert s3_get_second <= s3_get_first * dispersion # No last node - (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_1234', 1, lock) + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_1234', 1, lock_object_storage_task_distribution_ms) assert s3_get_second <= s3_get_first * (0.211 + dispersion) # actual value - 24 for 100 files, 211 for 1000 # No first node - (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_2345', 1, lock) + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_2345', 1, lock_object_storage_task_distribution_ms) assert s3_get_second <= s3_get_first * (0.189 + dispersion) # actual value - 12 for 100 files, 189 for 1000 # No first node, different nodes order - (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_4523', 1, lock) + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_4523', 1, lock_object_storage_task_distribution_ms) assert s3_get_second <= s3_get_first * (0.189 + dispersion) # Add new node, different nodes order - (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_4523', 'cluster_12345', 1, lock) + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_4523', 'cluster_12345', 1, lock_object_storage_task_distribution_ms) assert s3_get_second <= s3_get_first * (0.189 + dispersion) # New node and old node, different nodes order - (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_1234', 'cluster_4523', 1, lock) + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_1234', 'cluster_4523', 1, lock_object_storage_task_distribution_ms) assert s3_get_second <= s3_get_first * (0.400 + dispersion) # actual value - 36 for 100 files, 400 for 1000