Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6891,8 +6891,18 @@ Cache the list of objects returned by list objects calls in object storage
DECLARE(Bool, distributed_plan_optimize_exchanges, true, R"(
Removes unnecessary exchanges in distributed query plan. Disable it for debugging.
)", 0) \
DECLARE(UInt64, lock_object_storage_task_distribution_ms, 0, R"(
In object storage distribution queries do not distibute tasks on non-prefetched nodes until prefetched node is active.
DECLARE(UInt64, lock_object_storage_task_distribution_ms, 500, R"(
In object storage distribution queries do not distribute tasks on non-prefetched nodes until prefetched node is active.
Determines how long the free executor node (one that finished processing all of it assigned tasks) should wait before "stealing" tasks from queue of currently busy executor nodes.

Possible values:

- 0 - steal tasks immediately after freeing up.
- >0 - wait for specified period of time before stealing tasks.

Having this `>0` helps with cache reuse and might improve overall query time.
Because busy node might have warmed-up caches for this specific task, while free node needs to fetch lots of data from S3.
Which might take longer than just waiting for the busy node and generate extra traffic.
)", EXPERIMENTAL) \
DECLARE(String, distributed_plan_force_exchange_kind, "", R"(
Force specified kind of Exchange operators between distributed query stages.
Expand Down
6 changes: 5 additions & 1 deletion src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
/// controls new feature and it's 'true' by default, use 'false' as previous_value).
/// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972)
/// Note: please check if the key already exists to prevent duplicate entries.
addSettingsChanges(settings_changes_history, "25.6.5.2000",
addSettingsChanges(settings_changes_history, "25.6.5.20364",
{
{"lock_object_storage_task_distribution_ms", 500, 500, "Raised the value to 500 to avoid hoping tasks between executors."},
});
addSettingsChanges(settings_changes_history, "25.6.5.20000",
{
{"allow_experimental_database_iceberg", false, true, "Turned ON by default for Antalya"},
{"allow_experimental_database_unity_catalog", false, true, "Turned ON by default for Antalya"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getMatchingFile
// Queue file for its assigned replica
{
std::lock_guard lock(mutex);
unprocessed_files[file_path] = number_of_current_replica;
unprocessed_files[file_path] = file_replica_idx;
connection_to_files[file_replica_idx].push_back(file_path);
}
}
Expand Down Expand Up @@ -286,7 +286,11 @@ void StorageObjectStorageStableTaskDistributor::rescheduleTasksFromReplica(size_

replica_to_files_to_be_processed.erase(number_of_current_replica);
for (const auto & file_path : processed_file_list_ptr->second)
unprocessed_files[file_path] = getReplicaForFile(file_path);
{
auto file_replica_idx = getReplicaForFile(file_path);
unprocessed_files[file_path] = file_replica_idx;
connection_to_files[file_replica_idx].push_back(file_path);
}
}

}
3 changes: 1 addition & 2 deletions tests/integration/test_s3_cache_locality/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second,
"filesystem_cache_name": "'raw_s3_cache'",
}

if lock_object_storage_task_distribution_ms > 0:
settings["lock_object_storage_task_distribution_ms"] = lock_object_storage_task_distribution_ms
settings["lock_object_storage_task_distribution_ms"] = lock_object_storage_task_distribution_ms

query_id_first = str(uuid.uuid4())
result_first = node.query(
Expand Down
Loading