diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 71fb52bff1c4..d3e5fd55a8be 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -6891,8 +6891,18 @@ Cache the list of objects returned by list objects calls in object storage DECLARE(Bool, distributed_plan_optimize_exchanges, true, R"( Removes unnecessary exchanges in distributed query plan. Disable it for debugging. )", 0) \ - DECLARE(UInt64, lock_object_storage_task_distribution_ms, 0, R"( -In object storage distribution queries do not distibute tasks on non-prefetched nodes until prefetched node is active. + DECLARE(UInt64, lock_object_storage_task_distribution_ms, 500, R"( +In object storage distribution queries do not distribute tasks on non-prefetched nodes until prefetched node is active. +Determines how long the free executor node (one that finished processing all of it assigned tasks) should wait before "stealing" tasks from queue of currently busy executor nodes. + +Possible values: + +- 0 - steal tasks immediately after freeing up. +- >0 - wait for specified period of time before stealing tasks. + +Having this `>0` helps with cache reuse and might improve overall query time. +Because busy node might have warmed-up caches for this specific task, while free node needs to fetch lots of data from S3. +Which might take longer than just waiting for the busy node and generate extra traffic. )", EXPERIMENTAL) \ DECLARE(String, distributed_plan_force_exchange_kind, "", R"( Force specified kind of Exchange operators between distributed query stages. diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 0d1dd9cf0835..b392b47bc6fb 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -67,7 +67,11 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() /// controls new feature and it's 'true' by default, use 'false' as previous_value). /// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972) /// Note: please check if the key already exists to prevent duplicate entries. - addSettingsChanges(settings_changes_history, "25.6.5.2000", + addSettingsChanges(settings_changes_history, "25.6.5.20364", + { + {"lock_object_storage_task_distribution_ms", 500, 500, "Raised the value to 500 to avoid hoping tasks between executors."}, + }); + addSettingsChanges(settings_changes_history, "25.6.5.20000", { {"allow_experimental_database_iceberg", false, true, "Turned ON by default for Antalya"}, {"allow_experimental_database_unity_catalog", false, true, "Turned ON by default for Antalya"}, diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index a08b7f8c91b1..482b3a6bcb31 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -195,7 +195,7 @@ std::optional StorageObjectStorageStableTaskDistributor::getMatchingFile // Queue file for its assigned replica { std::lock_guard lock(mutex); - unprocessed_files[file_path] = number_of_current_replica; + unprocessed_files[file_path] = file_replica_idx; connection_to_files[file_replica_idx].push_back(file_path); } } @@ -286,7 +286,11 @@ void StorageObjectStorageStableTaskDistributor::rescheduleTasksFromReplica(size_ replica_to_files_to_be_processed.erase(number_of_current_replica); for (const auto & file_path : processed_file_list_ptr->second) - unprocessed_files[file_path] = getReplicaForFile(file_path); + { + auto file_replica_idx = getReplicaForFile(file_path); + unprocessed_files[file_path] = file_replica_idx; + connection_to_files[file_replica_idx].push_back(file_path); + } } } diff --git a/tests/integration/test_s3_cache_locality/test.py b/tests/integration/test_s3_cache_locality/test.py index da85e78a5643..7d2fc2a2ada4 100644 --- a/tests/integration/test_s3_cache_locality/test.py +++ b/tests/integration/test_s3_cache_locality/test.py @@ -86,8 +86,7 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, "filesystem_cache_name": "'raw_s3_cache'", } - if lock_object_storage_task_distribution_ms > 0: - settings["lock_object_storage_task_distribution_ms"] = lock_object_storage_task_distribution_ms + settings["lock_object_storage_task_distribution_ms"] = lock_object_storage_task_distribution_ms query_id_first = str(uuid.uuid4()) result_first = node.query(