Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/Interpreters/Cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,7 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti

UInt32 shard_num = 0;
std::set<std::pair<String, int>> unique_hosts;
Addresses all_addresses;
for (size_t shard_index : collections::range(0, from.shards_info.size()))
{
auto create_shards_from_replicas = [&](std::span<const Address> replicas)
Expand All @@ -790,6 +791,7 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti

if (address.is_local)
info.local_addresses.push_back(address);
all_addresses.push_back(address);

auto pool = ConnectionPoolFactory::instance().get(
static_cast<unsigned>(settings[Setting::distributed_connections_pool_size]),
Expand Down Expand Up @@ -838,14 +840,21 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
std::shuffle(shards_info.begin(), shards_info.end(), gen);
shards_info.resize(max_hosts);

Addresses all_addresses_;
Copy link
Collaborator

@arthurpassos arthurpassos Apr 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you put the entire logic that lives inside this if statement in a separate method? IMHO, it is now slightly more complex and it deserves a named method. Perhaps something like:

/*
Shuffle shards_info to avoid picking only the first `max_hosts` hosts. Adjust addresses accordingly.
*/
static void constrainShardInfoAndAddressesToMaxHosts(ShardInfo & shard_info, Addresses & addresses)
{
        pcg64_fast gen{randomSeed()};
        std::shuffle(shards_info.begin(), shards_info.end(), gen);
        shards_info.resize(max_hosts);
        
        // shuffled_addresses must have the same ordering as shards_info, that's why the re-ordering is done manually instead of making a call to `std::shuffle`
        Addresses shuffled_addresses;
        for (auto & shard_info : shards_info)
        {
            shuffled_addresses.emplace_back(addresses[shard_info.shard_num - 1]);
            shard_info.shard_num = ++shard_num;
        }

         addresses.swap(shuffled_addresses);
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move to separate method.


shard_num = 0;
for (auto & shard_info : shards_info)
{
all_addresses_.push_back(all_addresses[shard_info.shard_num - 1]);
shard_info.shard_num = ++shard_num;
}

all_addresses.swap(all_addresses_);
}

for (size_t i = 0; i < shards_info.size(); ++i)
{
addresses_with_failover.emplace_back(shards_info[i].local_addresses);
addresses_with_failover.emplace_back(Addresses({all_addresses[shards_info[i].shard_num - 1]}));
slot_to_shard.insert(std::end(slot_to_shard), shards_info[i].weight, i);
}

Expand Down
Loading