Skip to content

Commit a379172

Browse files
authored
Merge pull request #952 from Altinity/feature/antalya-25.6.5/rendezvous_hashing
25.6.5 Antalya port of #709, #760, #866 - Rendezvous hashing
2 parents 8bcbf47 + 7152388 commit a379172

22 files changed

+591
-39
lines changed

src/Core/Settings.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6869,6 +6869,9 @@ Default number of tasks for parallel reading in distributed query. Tasks are spr
68696869
DECLARE(Bool, distributed_plan_optimize_exchanges, true, R"(
68706870
Removes unnecessary exchanges in distributed query plan. Disable it for debugging.
68716871
)", 0) \
6872+
DECLARE(UInt64, lock_object_storage_task_distribution_ms, 0, R"(
6873+
In object storage distribution queries do not distibute tasks on non-prefetched nodes until prefetched node is active.
6874+
)", EXPERIMENTAL) \
68726875
DECLARE(String, distributed_plan_force_exchange_kind, "", R"(
68736876
Force specified kind of Exchange operators between distributed query stages.
68746877

src/Core/SettingsChangesHistory.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
7373
{"allow_experimental_database_unity_catalog", false, true, "Turned ON by default for Antalya"},
7474
{"allow_experimental_database_glue_catalog", false, true, "Turned ON by default for Antalya"},
7575
{"output_format_parquet_enum_as_byte_array", true, true, "Enable writing Enum as byte array in Parquet by default"},
76+
{"lock_object_storage_task_distribution_ms", 0, 0, "New setting."},
7677
{"object_storage_cluster", "", "", "New setting"},
7778
{"object_storage_max_nodes", 0, 0, "New setting"},
7879
});

src/Disks/ObjectStorages/IObjectStorage.cpp

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88
#include <Common/Exception.h>
99
#include <Common/ObjectStorageKeyGenerator.h>
1010

11+
#include <Poco/JSON/Object.h>
12+
#include <Poco/JSON/Parser.h>
13+
#include <Poco/JSON/JSONException.h>
14+
1115

1216
namespace DB
1317
{
@@ -97,4 +101,36 @@ WriteSettings IObjectStorage::patchSettings(const WriteSettings & write_settings
97101
return write_settings;
98102
}
99103

104+
RelativePathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std::string & task)
105+
{
106+
Poco::JSON::Parser parser;
107+
try
108+
{
109+
auto json = parser.parse(task).extract<Poco::JSON::Object::Ptr>();
110+
if (!json)
111+
return;
112+
113+
successfully_parsed = true;
114+
115+
if (json->has("retry_after_us"))
116+
retry_after_us = json->getValue<size_t>("retry_after_us");
117+
}
118+
catch (const Poco::JSON::JSONException &)
119+
{ /// Not a JSON
120+
return;
121+
}
122+
}
123+
124+
std::string RelativePathWithMetadata::CommandInTaskResponse::to_string() const
125+
{
126+
Poco::JSON::Object json;
127+
if (retry_after_us.has_value())
128+
json.set("retry_after_us", retry_after_us.value());
129+
130+
std::ostringstream oss;
131+
oss.exceptions(std::ios::failbit);
132+
Poco::JSON::Stringifier::stringify(json, oss);
133+
return oss.str();
134+
}
135+
100136
}

src/Disks/ObjectStorages/IObjectStorage.h

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,15 +103,37 @@ struct ObjectMetadata
103103

104104
struct RelativePathWithMetadata
105105
{
106+
class CommandInTaskResponse
107+
{
108+
public:
109+
CommandInTaskResponse() = default;
110+
explicit CommandInTaskResponse(const std::string & task);
111+
112+
bool is_parsed() const { return successfully_parsed; }
113+
void set_retry_after_us(Poco::Timestamp::TimeDiff time_us) { retry_after_us = time_us; }
114+
115+
std::string to_string() const;
116+
117+
std::optional<Poco::Timestamp::TimeDiff> get_retry_after_us() const { return retry_after_us; }
118+
119+
private:
120+
bool successfully_parsed = false;
121+
std::optional<Poco::Timestamp::TimeDiff> retry_after_us;
122+
};
123+
106124
String relative_path;
107125
std::optional<ObjectMetadata> metadata;
126+
CommandInTaskResponse command;
108127

109128
RelativePathWithMetadata() = default;
110129

111-
explicit RelativePathWithMetadata(String relative_path_, std::optional<ObjectMetadata> metadata_ = std::nullopt)
112-
: relative_path(std::move(relative_path_))
113-
, metadata(std::move(metadata_))
114-
{}
130+
explicit RelativePathWithMetadata(const String & task_string, std::optional<ObjectMetadata> metadata_ = std::nullopt)
131+
: metadata(std::move(metadata_))
132+
, command(task_string)
133+
{
134+
if (!command.is_parsed())
135+
relative_path = task_string;
136+
}
115137

116138
virtual ~RelativePathWithMetadata() = default;
117139

@@ -120,6 +142,8 @@ struct RelativePathWithMetadata
120142
virtual bool isArchive() const { return false; }
121143
virtual std::string getPathToArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); }
122144
virtual size_t fileSizeInArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); }
145+
146+
const CommandInTaskResponse & getCommand() const { return command; }
123147
};
124148

125149
struct ObjectKeyWithMetadata

src/Storages/IStorageCluster.cpp

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -65,19 +65,15 @@ void ReadFromCluster::applyFilters(ActionDAGNodes added_filter_nodes)
6565
if (filter_actions_dag)
6666
predicate = filter_actions_dag->getOutputs().at(0);
6767

68-
auto max_replicas_to_use = static_cast<UInt64>(cluster->getShardsInfo().size());
69-
if (context->getSettingsRef()[Setting::max_parallel_replicas] > 1)
70-
max_replicas_to_use = std::min(max_replicas_to_use, context->getSettingsRef()[Setting::max_parallel_replicas].value);
71-
72-
createExtension(predicate, max_replicas_to_use);
68+
createExtension(predicate);
7369
}
7470

75-
void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate, size_t number_of_replicas)
71+
void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate)
7672
{
7773
if (extension)
7874
return;
7975

80-
extension = storage->getTaskIteratorExtension(predicate, context, number_of_replicas);
76+
extension = storage->getTaskIteratorExtension(predicate, context, cluster);
8177
}
8278

8379
/// The code executes on initiator
@@ -178,7 +174,7 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const
178174
if (current_settings[Setting::max_parallel_replicas] > 1)
179175
max_replicas_to_use = std::min(max_replicas_to_use, current_settings[Setting::max_parallel_replicas].value);
180176

181-
createExtension(nullptr, max_replicas_to_use);
177+
createExtension(nullptr);
182178

183179
for (const auto & shard_info : cluster->getShardsInfo())
184180
{

src/Storages/IStorageCluster.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,10 @@ class IStorageCluster : public IStorage
4343
ClusterPtr getCluster(ContextPtr context) const { return getClusterImpl(context, cluster_name); }
4444

4545
/// Query is needed for pruning by virtual columns (_file, _path)
46-
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const = 0;
46+
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(
47+
const ActionsDAG::Node * predicate,
48+
const ContextPtr & context,
49+
ClusterPtr cluster) const = 0;
4750

4851
QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override;
4952

@@ -130,7 +133,7 @@ class ReadFromCluster : public SourceStepWithFilter
130133

131134
std::optional<RemoteQueryExecutor::Extension> extension;
132135

133-
void createExtension(const ActionsDAG::Node * predicate, size_t number_of_replicas);
136+
void createExtension(const ActionsDAG::Node * predicate);
134137
ContextPtr updateSettings(const Settings & settings);
135138
};
136139

src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,14 @@ namespace DB
3030
namespace Setting
3131
{
3232
extern const SettingsBool use_hive_partitioning;
33+
extern const SettingsUInt64 lock_object_storage_task_distribution_ms;
3334
extern const SettingsString object_storage_cluster;
3435
}
3536

3637
namespace ErrorCodes
3738
{
3839
extern const int LOGICAL_ERROR;
40+
extern const int INVALID_SETTING_VALUE;
3941
}
4042

4143
String StorageObjectStorageCluster::getPathSample(ContextPtr context)
@@ -437,13 +439,43 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
437439
}
438440

439441
RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension(
440-
const ActionsDAG::Node * predicate, const ContextPtr & local_context, const size_t number_of_replicas) const
442+
const ActionsDAG::Node * predicate,
443+
const ContextPtr & local_context,
444+
ClusterPtr cluster) const
441445
{
442446
auto iterator = StorageObjectStorageSource::createFileIterator(
443447
configuration, configuration->getQuerySettings(local_context), object_storage, /* distributed_processing */false,
444448
local_context, predicate, {}, getVirtualsList(), hive_partition_columns_to_read_from_file_path, nullptr, local_context->getFileProgressCallback(), /*ignore_archive_globs=*/true, /*skip_object_metadata=*/true);
445449

446-
auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(iterator, number_of_replicas);
450+
std::vector<std::string> ids_of_hosts;
451+
for (const auto & shard : cluster->getShardsInfo())
452+
{
453+
if (shard.per_replica_pools.empty())
454+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {} with empty shard {}", cluster->getName(), shard.shard_num);
455+
for (const auto & replica : shard.per_replica_pools)
456+
{
457+
if (!replica)
458+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {}, shard {} with empty node", cluster->getName(), shard.shard_num);
459+
ids_of_hosts.push_back(replica->getAddress());
460+
}
461+
}
462+
463+
uint64_t lock_object_storage_task_distribution_ms = local_context->getSettingsRef()[Setting::lock_object_storage_task_distribution_ms];
464+
465+
/// Check value to avoid negative result after conversion in microseconds.
466+
/// Poco::Timestamp::TimeDiff is signed int 64.
467+
static const uint64_t lock_object_storage_task_distribution_ms_max = 0x0020000000000000ULL;
468+
if (lock_object_storage_task_distribution_ms > lock_object_storage_task_distribution_ms_max)
469+
throw Exception(ErrorCodes::INVALID_SETTING_VALUE,
470+
"Value lock_object_storage_task_distribution_ms is too big: {}, allowed maximum is {}",
471+
lock_object_storage_task_distribution_ms,
472+
lock_object_storage_task_distribution_ms_max
473+
);
474+
475+
auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(
476+
iterator,
477+
ids_of_hosts,
478+
lock_object_storage_task_distribution_ms);
447479

448480
auto callback = std::make_shared<TaskIterator>(
449481
[task_distributor](size_t number_of_current_replica) mutable -> String

src/Storages/ObjectStorage/StorageObjectStorageCluster.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ class StorageObjectStorageCluster : public IStorageCluster
2929
std::string getName() const override;
3030

3131
RemoteQueryExecutor::Extension getTaskIteratorExtension(
32-
const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const override;
32+
const ActionsDAG::Node * predicate,
33+
const ContextPtr & context,
34+
ClusterPtr cluster) const override;
3335

3436
String getPathSample(ContextPtr context);
3537

src/Storages/ObjectStorage/StorageObjectStorageSource.cpp

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <QueryPipeline/QueryPipelineBuilder.h>
2020
#include <Storages/Cache/SchemaCache.h>
2121
#include <Storages/ObjectStorage/StorageObjectStorage.h>
22+
#include <Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h>
2223
#include <Storages/ObjectStorage/DataLakes/DeltaLake/ObjectInfoWithPartitionColumns.h>
2324
#include <Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h>
2425
#include <Storages/VirtualColumnUtils.h>
@@ -442,11 +443,31 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
442443
ObjectInfoPtr object_info;
443444
auto query_settings = configuration->getQuerySettings(context_);
444445

446+
bool not_a_path = false;
447+
445448
do
446449
{
450+
not_a_path = false;
447451
object_info = file_iterator->next(processor);
448452

449-
if (!object_info || object_info->getPath().empty())
453+
if (!object_info)
454+
return {};
455+
456+
if (object_info->getCommand().is_parsed())
457+
{
458+
auto retry_after_us = object_info->getCommand().get_retry_after_us();
459+
if (retry_after_us.has_value())
460+
{
461+
not_a_path = true;
462+
/// TODO: Make asyncronous waiting without sleep in thread
463+
/// Now this sleep is on executor node in worker thread
464+
/// Does not block query initiator
465+
sleepForMicroseconds(std::min(Poco::Timestamp::TimeDiff(100000ul), retry_after_us.value()));
466+
continue;
467+
}
468+
}
469+
470+
if (object_info->getPath().empty())
450471
return {};
451472

452473
if (!object_info->metadata)
@@ -465,7 +486,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
465486
object_info->metadata = object_storage->getObjectMetadata(path);
466487
}
467488
}
468-
while (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0);
489+
while (not_a_path || (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0));
469490

470491
QueryPipelineBuilder builder;
471492
std::shared_ptr<ISource> source;

0 commit comments

Comments
 (0)