Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6141,6 +6141,9 @@ Cache the list of objects returned by list objects calls in object storage
)", EXPERIMENTAL) \
DECLARE(Bool, object_storage_remote_initiator, false, R"(
Execute request to object storage as remote on one of object_storage_cluster nodes.
)", EXPERIMENTAL) \
DECLARE(UInt64, lock_object_storage_task_distribution_ms, 0, R"(
In object storage distribution queries do not distibute tasks on non-prefetched nodes until prefetched node is active.
)", EXPERIMENTAL) \
\

Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"object_storage_cluster", "", "", "New setting"},
{"object_storage_max_nodes", 0, 0, "New setting"},
{"use_object_storage_list_objects_cache", true, false, "New setting."},
{"lock_object_storage_task_distribution_ms", 0, 0, "New setting."},
});
addSettingsChanges(settings_changes_history, "25.3",
{
Expand Down
6 changes: 5 additions & 1 deletion src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ namespace Setting
{
extern const SettingsBool use_hive_partitioning;
extern const SettingsString object_storage_cluster;
extern const SettingsUInt64 lock_object_storage_task_distribution_ms;
}

namespace ErrorCodes
Expand Down Expand Up @@ -386,7 +387,10 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten
}
}

auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(iterator, ids_of_hosts);
auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(
iterator,
ids_of_hosts,
local_context->getSettingsRef()[Setting::lock_object_storage_task_distribution_ms]);

auto callback = std::make_shared<TaskIterator>(
[task_distributor](size_t number_of_current_replica) mutable -> String {
Expand Down
19 changes: 18 additions & 1 deletion src/Storages/ObjectStorage/StorageObjectStorageSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/Cache/SchemaCache.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h>
#include <Storages/ObjectStorage/DataLakes/DeltaLake/ObjectInfoWithPartitionColumns.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h>
#include <Storages/VirtualColumnUtils.h>
Expand Down Expand Up @@ -430,16 +431,32 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
ObjectInfoPtr object_info;
auto query_settings = configuration->getQuerySettings(context_);

bool not_a_path = false;

do
{
not_a_path = false;
object_info = file_iterator->next(processor);

if (!object_info || object_info->getPath().empty())
return {};

StorageObjectStorageStableTaskDistributor::CommandInTaskResponse command(object_info->getPath());
if (command.is_parsed())
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not elegant, but for backward compatibility left string as filename and JSON as something with additional information.
Can be extended in future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like storing a JSON in a variable named as PATH, which is highly misleading... If you want to pass some extra data as JSON, please do it explicitly.

Either by storing some extra information in ObjectInfo's ObjectMetadata, by updating ObjectMetadata with extra field (preferred), OR inserting a special key into ObjectMetadata::attributes with stringified JSON (less desirable), OR doing something else.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved into RelativePathWithMetadata

{
auto retry_after_us = command.get_retry_after_us();
if (retry_after_us.has_value())
{
not_a_path = true;
/// TODO: Make asyncronous waiting without sleep in thread
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/// TODO: Make asyncronous waiting without sleep in thread

When do you plan to implement this TODO?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need more deep understanding how to do it, plan to make separate PR later.
In any case need to do it before sending to upstream, now it is just experimental level of code quality.

sleepForMicroseconds(std::min(100000ul, retry_after_us.value()));
continue;
}
}

object_info->loadMetadata(object_storage);
}
while (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0);
while (not_a_path || (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0));

QueryPipelineBuilder builder;
std::shared_ptr<ISource> source;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,21 @@
#include <consistent_hashing.h>
#include <optional>

#include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h>
#include <Poco/JSON/JSONException.h>

namespace DB
{

StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor(
std::shared_ptr<IObjectIterator> iterator_,
std::vector<std::string> ids_of_nodes_)
std::vector<std::string> ids_of_nodes_,
uint64_t lock_object_storage_task_distribution_ms_)
: iterator(std::move(iterator_))
, connection_to_files(ids_of_nodes_.size())
, ids_of_nodes(ids_of_nodes_)
, lock_object_storage_task_distribution_us(lock_object_storage_task_distribution_ms_ * 1000)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Poco::Timestamp is in microseconds, so here convert from milliseconds to microseconds to avoid multiplications or divides later.

Copy link
Member

@Enmk Enmk Jul 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a slight chance of overflow with large positive value (mot likely unintentionally, due to misconfiguration or error) becoming a negative one, since Poco::Timestamp::TimeDiff is signed int64.

IDK if it is even worth it to do some overflow checks here, but please makes sure that negative values do not break anything down the road.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add checks for lock_object_storage_task_distribution_ms in getTaskIteratorExtension before constructor called.

, iterator_exhausted(false)
{
}
Expand All @@ -24,6 +30,8 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getNextTask(siz
number_of_current_replica
);

saveLastNodeActivity(number_of_current_replica);

// 1. Check pre-queued files first
if (auto file = getPreQueuedFile(number_of_current_replica))
return file;
Expand Down Expand Up @@ -148,7 +156,7 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getMatchingFile
// Queue file for its assigned replica
{
std::lock_guard lock(mutex);
unprocessed_files.insert(file_path);
unprocessed_files[file_path] = number_of_current_replica;
connection_to_files[file_replica_idx].push_back(file_path);
}
}
Expand All @@ -158,25 +166,96 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getMatchingFile

std::optional<String> StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(size_t number_of_current_replica)
{
/// Limit time of node activity to keep task in queue
Poco::Timestamp activity_limit;
Poco::Timestamp oldest_activity;
if (lock_object_storage_task_distribution_us)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lock_object_storage_task_distribution_us >= 0, since lock_object_storage_task_distribution_us is SIGNED

activity_limit -= lock_object_storage_task_distribution_us;

std::lock_guard lock(mutex);

if (!unprocessed_files.empty())
{
auto it = unprocessed_files.begin();
String next_file = *it;
unprocessed_files.erase(it);

while (it != unprocessed_files.end())
{
auto last_activity = last_node_activity.find(it->second);
if (!lock_object_storage_task_distribution_us
|| last_activity == last_node_activity.end()
|| activity_limit > last_activity->second)
{
String next_file = it->first;
unprocessed_files.erase(it);

LOG_TRACE(
log,
"Iterator exhausted. Assigning unprocessed file {} to replica {}",
next_file,
number_of_current_replica
);

return next_file;
}

oldest_activity = std::min(oldest_activity, last_activity->second);
++it;
}

LOG_TRACE(
log,
"Iterator exhausted. Assigning unprocessed file {} to replica {}",
next_file,
number_of_current_replica
"No unprocessed file for replica {}, need to retry after {} us",
number_of_current_replica,
oldest_activity - activity_limit
);

return next_file;
/// All unprocessed files owned by alive replicas with recenlty activity
/// Need to retry after (oldest_activity - activity_limit) microseconds
CommandInTaskResponse response;
response.set_retry_after_us(oldest_activity - activity_limit);
return response.to_string();
}

return std::nullopt;
}

void StorageObjectStorageStableTaskDistributor::saveLastNodeActivity(size_t number_of_current_replica)
{
Poco::Timestamp now;
std::lock_guard lock(mutex);
last_node_activity[number_of_current_replica] = now;
}

StorageObjectStorageStableTaskDistributor::CommandInTaskResponse::CommandInTaskResponse(const std::string & task)
{
Poco::JSON::Parser parser;
try
{
auto json = parser.parse(task).extract<Poco::JSON::Object::Ptr>();
if (!json)
return;

successfully_parsed = true;

if (json->has("retry_after_us"))
retry_after_us = json->getValue<size_t>("retry_after_us");
}
catch (const Poco::JSON::JSONException &)
{ /// Not a JSON
return;
}
}

std::string StorageObjectStorageStableTaskDistributor::CommandInTaskResponse::to_string() const
{
Poco::JSON::Object json;
if (retry_after_us.has_value())
json.set("retry_after_us", retry_after_us.value());

std::ostringstream oss;
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(json, oss);
return oss.str();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
#include <Interpreters/Cluster.h>
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSource.h>

#include <Poco/Timestamp.h>

#include <unordered_set>
#include <unordered_map>
#include <vector>
#include <mutex>
#include <memory>
Expand All @@ -16,9 +20,28 @@ namespace DB
class StorageObjectStorageStableTaskDistributor
{
public:
class CommandInTaskResponse
{
public:
CommandInTaskResponse() {}
CommandInTaskResponse(const std::string & task);

bool is_parsed() const { return successfully_parsed; }
void set_retry_after_us(uint64_t time_us) { retry_after_us = time_us; }

std::string to_string() const;

std::optional<uint64_t> get_retry_after_us() const { return retry_after_us; }

private:
bool successfully_parsed = false;
std::optional<uint64_t> retry_after_us;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not Poco::Timestamp::TimeDiff ?

};

StorageObjectStorageStableTaskDistributor(
std::shared_ptr<IObjectIterator> iterator_,
std::vector<std::string> ids_of_nodes_);
std::vector<std::string> ids_of_nodes_,
uint64_t lock_object_storage_task_distribution_ms_);

std::optional<String> getNextTask(size_t number_of_current_replica);

Expand All @@ -28,12 +51,17 @@ class StorageObjectStorageStableTaskDistributor
std::optional<String> getMatchingFileFromIterator(size_t number_of_current_replica);
std::optional<String> getAnyUnprocessedFile(size_t number_of_current_replica);

void saveLastNodeActivity(size_t number_of_current_replica);

std::shared_ptr<IObjectIterator> iterator;

std::vector<std::vector<String>> connection_to_files;
std::unordered_set<String> unprocessed_files;
/// Map of unprocessed files in format filename => number of prefetched replica
std::unordered_map<String, size_t> unprocessed_files;

std::vector<std::string> ids_of_nodes;
std::unordered_map<size_t, Poco::Timestamp> last_node_activity;
Poco::Timestamp::TimeDiff lock_object_storage_task_distribution_us;

std::mutex mutex;
bool iterator_exhausted = false;
Expand Down
Loading
Loading