Skip to content

Commit 1f821d4

Browse files
Enmkianton-ru
authored andcommitted
Merge pull request #1042 from Altinity/bugfix/antalya-25.6.5/lock_object_storage_task_distribution_ms_lost_host
Antalya 25.6: Fix lock_object_storage_task_distribution_ms, Changed lock_object_storage_task_distribution_ms value to 500
1 parent 3be47a7 commit 1f821d4

4 files changed

Lines changed: 23 additions & 6 deletions

File tree

src/Core/Settings.cpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7001,8 +7001,18 @@ Default number of tasks for parallel reading in distributed query. Tasks are spr
70017001
DECLARE(Bool, distributed_plan_optimize_exchanges, true, R"(
70027002
Removes unnecessary exchanges in distributed query plan. Disable it for debugging.
70037003
)", 0) \
7004-
DECLARE(UInt64, lock_object_storage_task_distribution_ms, 0, R"(
7005-
In object storage distribution queries do not distibute tasks on non-prefetched nodes until prefetched node is active.
7004+
DECLARE(UInt64, lock_object_storage_task_distribution_ms, 500, R"(
7005+
In object storage distribution queries do not distribute tasks on non-prefetched nodes until prefetched node is active.
7006+
Determines how long the free executor node (one that finished processing all of it assigned tasks) should wait before "stealing" tasks from queue of currently busy executor nodes.
7007+
7008+
Possible values:
7009+
7010+
- 0 - steal tasks immediately after freeing up.
7011+
- >0 - wait for specified period of time before stealing tasks.
7012+
7013+
Having this `>0` helps with cache reuse and might improve overall query time.
7014+
Because busy node might have warmed-up caches for this specific task, while free node needs to fetch lots of data from S3.
7015+
Which might take longer than just waiting for the busy node and generate extra traffic.
70067016
)", EXPERIMENTAL) \
70077017
DECLARE(String, distributed_plan_force_exchange_kind, "", R"(
70087018
Force specified kind of Exchange operators between distributed query stages.

src/Core/SettingsChangesHistory.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
3939
/// controls new feature and it's 'true' by default, use 'false' as previous_value).
4040
/// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972)
4141
/// Note: please check if the key already exists to prevent duplicate entries.
42+
addSettingsChanges(settings_changes_history, "25.8.1.20364",
43+
{
44+
{"lock_object_storage_task_distribution_ms", 500, 500, "Raised the value to 500 to avoid hoping tasks between executors."},
45+
});
4246
addSettingsChanges(settings_changes_history, "25.8",
4347
{
4448
{"output_format_json_quote_64bit_integers", true, false, "Disable quoting of the 64 bit integers in JSON by default"},

src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter
189189
// Queue file for its assigned replica
190190
{
191191
std::lock_guard lock(mutex);
192-
unprocessed_files.emplace(file_path, std::make_pair(object_info, number_of_current_replica));
192+
unprocessed_files.emplace(file_path, std::make_pair(object_info, file_replica_idx));
193193
connection_to_files[file_replica_idx].push_back(object_info);
194194
}
195195
}
@@ -281,7 +281,11 @@ void StorageObjectStorageStableTaskDistributor::rescheduleTasksFromReplica(size_
281281

282282
replica_to_files_to_be_processed.erase(number_of_current_replica);
283283
for (const auto & file : processed_file_list_ptr->second)
284-
unprocessed_files.emplace(file->getPath(), std::make_pair(file, getReplicaForFile(file->getPath())));
284+
{
285+
auto file_replica_idx = getReplicaForFile(file->getPath());
286+
unprocessed_files.emplace(file->getPath(), std::make_pair(file, file_replica_idx));
287+
connection_to_files[file_replica_idx].push_back(file);
288+
}
285289
}
286290

287291
}

tests/integration/test_s3_cache_locality/test.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,7 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second,
8686
"filesystem_cache_name": "'raw_s3_cache'",
8787
}
8888

89-
if lock_object_storage_task_distribution_ms > 0:
90-
settings["lock_object_storage_task_distribution_ms"] = lock_object_storage_task_distribution_ms
89+
settings["lock_object_storage_task_distribution_ms"] = lock_object_storage_task_distribution_ms
9190

9291
query_id_first = str(uuid.uuid4())
9392
result_first = node.query(

0 commit comments

Comments
 (0)