diff --git a/src/Interpreters/ClusterDiscovery.cpp b/src/Interpreters/ClusterDiscovery.cpp index ab2ec886e76d..75a5fa8f44fc 100644 --- a/src/Interpreters/ClusterDiscovery.cpp +++ b/src/Interpreters/ClusterDiscovery.cpp @@ -34,8 +34,8 @@ namespace DB namespace ErrorCodes { extern const int KEEPER_EXCEPTION; - extern const int LOGICAL_ERROR; extern const int NO_ELEMENTS_IN_CONFIG; + extern const int EXCESSIVE_ELEMENT_IN_CONFIG; } namespace FailPoints @@ -53,44 +53,45 @@ fs::path getShardsListPath(const String & zk_root) } -/* - * Holds boolean flags for fixed set of keys. - * Flags can be concurrently set from different threads, and consumer can wait for it. - */ template -class ClusterDiscovery::ConcurrentFlags +class ClusterDiscovery::Flags { public: template - ConcurrentFlags(It begin, It end) + Flags(It begin, It end) { for (auto it = begin; it != end; ++it) flags.emplace(*it, false); } - void set(const T & key) + void set(const T & key, bool value = true) { - auto it = flags.find(key); - if (it == flags.end()) - throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Unknown value '{}'", key); - it->second = true; - any_need_update = true; + std::unique_lock lk(mu); + if (stop_flag) + return; + flags[key] = value; + any_need_update |= value; cv.notify_one(); } - /// waits unit at least one flag is set - /// caller should handle all set flags (or set it again manually) - /// note: keys of returen map should not be changed! - /// @param finished - output parameter indicates that stop() was called - std::unordered_map & wait(std::chrono::milliseconds timeout, bool & finished) + void remove(const T & key) + { + std::unique_lock lk(mu); + if (!stop_flag) + flags.erase(key); + } + + std::unordered_map wait(std::chrono::milliseconds timeout, bool & finished) { std::unique_lock lk(mu); cv.wait_for(lk, timeout, [this]() -> bool { return any_need_update || stop_flag; }); finished = stop_flag; - /// all set flags expected to be handled by caller any_need_update = false; - return flags; + auto res = flags; + for (auto & f : flags) + f.second = false; + return res; } void stop() @@ -105,8 +106,8 @@ class ClusterDiscovery::ConcurrentFlags std::mutex mu; /// flag indicates that update is required - std::unordered_map flags; - std::atomic_bool any_need_update = true; + std::unordered_map flags; + bool any_need_update = true; bool stop_flag = false; }; @@ -123,23 +124,60 @@ ClusterDiscovery::ClusterDiscovery( Poco::Util::AbstractConfiguration::Keys config_keys; config.keys(config_prefix, config_keys); + multicluster_discovery_paths = std::make_shared>>(); + for (const auto & key : config_keys) { String cluster_config_prefix = config_prefix + "." + key + ".discovery"; if (!config.has(cluster_config_prefix)) continue; - String zk_name_and_root = config.getString(cluster_config_prefix + ".path"); - if (zk_name_and_root.empty()) - throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "ZooKeeper path for cluster '{}' is empty", key); - String zk_root = zkutil::extractZooKeeperPath(zk_name_and_root, true); - String zk_name = zkutil::extractZooKeeperName(zk_name_and_root); + String zk_name_and_root = config.getString(cluster_config_prefix + ".path", ""); + String zk_multicluster_name_and_root = config.getString(cluster_config_prefix + ".multicluster_root_path", ""); + bool is_observer = ConfigHelper::getBool(config, cluster_config_prefix + ".observer"); const auto & password = config.getString(cluster_config_prefix + ".password", ""); const auto & cluster_secret = config.getString(cluster_config_prefix + ".secret", ""); if (!password.empty() && !cluster_secret.empty()) throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Both 'password' and 'secret' are specified for cluster '{}', only one option can be used at the same time", key); + if (!zk_multicluster_name_and_root.empty()) + { + if (!zk_name_and_root.empty()) + throw Exception( + ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG, + "Autodiscovery cluster node {} has 'path' and 'multicluster_root_path' subnodes simultaneously", + key); + if (!is_observer) + throw Exception( + ErrorCodes::NO_ELEMENTS_IN_CONFIG, + "Autodiscovery cluster node {} must be in observer mode", + key); + + String zk_root = zkutil::extractZooKeeperPath(zk_multicluster_name_and_root, true); + String zk_name = zkutil::extractZooKeeperName(zk_multicluster_name_and_root); + + auto mcd = std::make_shared( + /* zk_name */ zk_name, + /* zk_path */ zk_root, + /* is_secure_connection */ config.getBool(cluster_config_prefix + ".secure", false), + /* username */ config.getString(cluster_config_prefix + ".user", context->getUserName()), + /* password */ password, + /* cluster_secret */ cluster_secret + ); + + multicluster_discovery_paths->push_back( + mcd + ); + continue; + } + + if (zk_name_and_root.empty()) + throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "ZooKeeper path for cluster '{}' is empty", key); + + String zk_root = zkutil::extractZooKeeperPath(zk_name_and_root, true); + String zk_name = zkutil::extractZooKeeperName(zk_name_and_root); + clusters_info.emplace( key, ClusterInfo( @@ -153,7 +191,7 @@ ClusterDiscovery::ClusterDiscovery( /* port= */ context->getTCPPort(), /* secure= */ config.getBool(cluster_config_prefix + ".secure", false), /* shard_id= */ config.getUInt(cluster_config_prefix + ".shard", 0), - /* observer_mode= */ ConfigHelper::getBool(config, cluster_config_prefix + ".observer"), + /* observer_mode= */ is_observer, /* invisible= */ ConfigHelper::getBool(config, cluster_config_prefix + ".invisible") ) ); @@ -166,6 +204,14 @@ ClusterDiscovery::ClusterDiscovery( LOG_TRACE(log, "Clusters in discovery mode: {}", fmt::join(clusters_info_names, ", ")); clusters_to_update = std::make_shared(clusters_info_names.begin(), clusters_info_names.end()); + + /// Init get_nodes_callbacks after init clusters_to_update. + for (const auto & e : clusters_info) + get_nodes_callbacks[e.first] = std::make_shared( + [cluster_name = e.first, my_clusters_to_update = clusters_to_update](auto) + { + my_clusters_to_update->set(cluster_name); + }); } /// List node in zookeper for cluster @@ -173,12 +219,35 @@ Strings ClusterDiscovery::getNodeNames(zkutil::ZooKeeperPtr & zk, const String & zk_root, const String & cluster_name, int * version, - bool set_callback) + bool set_callback, + size_t zk_root_index) { - auto watch_callback = [cluster_name, my_clusters_to_update = clusters_to_update](auto) { my_clusters_to_update->set(cluster_name); }; - Coordination::Stat stat; - Strings nodes = zk->getChildrenWatch(getShardsListPath(zk_root), &stat, set_callback ? watch_callback : Coordination::WatchCallback{}); + Strings nodes; + + if (set_callback) + { + auto callback = get_nodes_callbacks.find(cluster_name); + if (callback == get_nodes_callbacks.end()) + { + auto watch_dynamic_callback = std::make_shared([ + cluster_name, + zk_root_index, + my_clusters_to_update = clusters_to_update, + my_discovery_paths = multicluster_discovery_paths + ](auto) + { + (*my_discovery_paths)[zk_root_index - 1]->need_update = true; + my_clusters_to_update->set(cluster_name); + }); + auto res = get_nodes_callbacks.insert(std::make_pair(cluster_name, watch_dynamic_callback)); + callback = res.first; + } + nodes = zk->getChildrenWatch(getShardsListPath(zk_root), &stat, *(callback->second)); + } + else + nodes = zk->getChildrenWatch(getShardsListPath(zk_root), &stat, Coordination::WatchCallback{}); + if (version) *version = stat.cversion; return nodes; @@ -287,21 +356,20 @@ static bool contains(const Strings & list, const String & value) /// Reads data from zookeeper and tries to update cluster. /// Returns true on success (or no update required). -bool ClusterDiscovery::updateCluster(ClusterInfo & cluster_info) +bool ClusterDiscovery::upsertCluster(ClusterInfo & cluster_info) { LOG_DEBUG(log, "Updating cluster '{}'", cluster_info.name); auto zk = context->getDefaultOrAuxiliaryZooKeeper(cluster_info.zk_name); int start_version; - Strings node_uuids = getNodeNames(zk, cluster_info.zk_root, cluster_info.name, &start_version, false); + Strings node_uuids = getNodeNames(zk, cluster_info.zk_root, cluster_info.name, &start_version, false, cluster_info.zk_root_index); auto & nodes_info = cluster_info.nodes_info; - auto on_exit = [this, start_version, &zk, &cluster_info, &nodes_info]() { /// in case of successful update we still need to check if configuration of cluster still valid and also set watch callback int current_version; - getNodeNames(zk, cluster_info.zk_root, cluster_info.name, ¤t_version, true); + getNodeNames(zk, cluster_info.zk_root, cluster_info.name, ¤t_version, true, cluster_info.zk_root_index); if (current_version != start_version) { @@ -333,24 +401,37 @@ bool ClusterDiscovery::updateCluster(ClusterInfo & cluster_info) } nodes_info = getNodes(zk, cluster_info.zk_root, node_uuids); - if (nodes_info.empty()) - { - LOG_WARNING(log, "Can't get nodes info for '{}'", cluster_info.name); - return false; - } if (bool ok = on_exit(); !ok) return false; LOG_DEBUG(log, "Updating system.clusters record for '{}' with {} nodes", cluster_info.name, cluster_info.nodes_info.size()); - auto cluster = makeCluster(cluster_info); + if (nodes_info.empty()) + { + String name = cluster_info.name; + /// cluster_info removed inside removeCluster, can't use reference to name. + removeCluster(name); + return true; + } + auto cluster = makeCluster(cluster_info); std::lock_guard lock(mutex); cluster_impls[cluster_info.name] = cluster; + return true; } +void ClusterDiscovery::removeCluster(const String & name) +{ + { + std::lock_guard lock(mutex); + cluster_impls.erase(name); + } + clusters_to_update->remove(name); + LOG_DEBUG(log, "Dynamic cluster '{}' removed successfully", name); +} + void ClusterDiscovery::registerInZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & info) { /// Create root node in observer mode not to get 'No node' error @@ -384,23 +465,111 @@ void ClusterDiscovery::initialUpdate() throw Exception(ErrorCodes::KEEPER_EXCEPTION, "Failpoint cluster_discovery_faults is triggered"); }); + for (auto & path : (*multicluster_discovery_paths)) + { + auto zk = context->getDefaultOrAuxiliaryZooKeeper(path->zk_name); + + zk->createAncestors(path->zk_path); + zk->createIfNotExists(path->zk_path, ""); + + auto watch_callback = [&path](auto) { path->need_update = true; }; + zk->getChildrenWatch(path->zk_path, nullptr, watch_callback); + } + + findDynamicClusters(clusters_info); + for (auto & [_, info] : clusters_info) { auto zk = context->getDefaultOrAuxiliaryZooKeeper(info.zk_name); registerInZk(zk, info); - if (!updateCluster(info)) + if (!upsertCluster(info)) { LOG_WARNING(log, "Error on initial cluster '{}' update, will retry in background", info.name); clusters_to_update->set(info.name); } + else if (info.zk_root_index) + clusters_to_update->set(info.name, false); } + LOG_DEBUG(log, "Initialized"); is_initialized = true; } +void ClusterDiscovery::findDynamicClusters( + std::unordered_map & info, + std::unordered_set * unchanged_roots) +{ + using namespace std::chrono_literals; + + constexpr auto force_update_interval = 2min; + + size_t zk_root_index = 0; + + for (auto & path : (*multicluster_discovery_paths)) + { + ++zk_root_index; + + if (unchanged_roots) + { + if (!path->need_update.exchange(false)) + { + /// force updating periodically + bool force_update = path->watch.elapsedSeconds() > std::chrono::seconds(force_update_interval).count(); + if (!force_update) + { + unchanged_roots->insert(zk_root_index); + continue; + } + } + } + + auto zk = context->getDefaultOrAuxiliaryZooKeeper(path->zk_name); + + auto clusters = zk->getChildren(path->zk_path); + + for (const auto & cluster : clusters) + { + auto p = clusters_info.find(cluster); + if (p != clusters_info.end() && !p->second.zk_root_index) + { /// Not a warning - node can register itsefs in one cluster and discover other clusters + LOG_TRACE(log, "Found dynamic duplicate of cluster '{}' in config and Keeper, skipped", cluster); + continue; + } + + if (info.contains(cluster)) + { /// Possible with several root paths, it's a configuration error + LOG_WARNING(log, "Found dynamic duplicate of cluster '{}' in Keeper, skipped record by path {}:{}", + cluster, path->zk_name, path->zk_path); + continue; + } + + info.emplace( + cluster, + ClusterInfo( + /* name_= */ cluster, + /* zk_name_= */ path->zk_name, + /* zk_root_= */ path->zk_path + "/" + cluster, + /* host_name= */ "", + /* username= */ path->username, + /* password= */ path->password, + /* cluster_secret= */ path->cluster_secret, + /* port= */ context->getTCPPort(), + /* secure= */ path->is_secure_connection, + /* shard_id= */ 0, + /* observer_mode= */ true, + /* invisible= */ false, + /* zk_root_index= */ zk_root_index + ) + ); + } + + path->watch.restart(); + } +} + void ClusterDiscovery::start() { - if (clusters_info.empty()) + if (clusters_info.empty() && multicluster_discovery_paths->empty()) { LOG_DEBUG(log, "No defined clusters for discovery"); return; @@ -461,8 +630,36 @@ bool ClusterDiscovery::runMainThread(std::function up_to_date_callback) while (!finished) { bool all_up_to_date = true; - auto & clusters = clusters_to_update->wait(5s, finished); - for (auto & [cluster_name, need_update] : clusters) + + std::unordered_map new_dynamic_clusters_info; + std::unordered_set unchanged_roots; + findDynamicClusters(new_dynamic_clusters_info, &unchanged_roots); + + std::unordered_set clusters_to_insert; + std::unordered_set clusters_to_remove; + + for (const auto & [cluster_name, info] : clusters_info) + { + if (!info.zk_root_index) + continue; + if (!new_dynamic_clusters_info.erase(cluster_name)) + { + if (!unchanged_roots.contains(info.zk_root_index)) + clusters_to_remove.insert(cluster_name); + } + } + /// new_dynamic_clusters_info now contains only new clusters + + for (const auto & [cluster_name, _] : new_dynamic_clusters_info) + clusters_to_insert.insert(cluster_name); + + for (const auto & cluster_name : clusters_to_remove) + removeCluster(cluster_name); + + clusters_info.merge(new_dynamic_clusters_info); + + auto clusters = clusters_to_update->wait(5s, finished); + for (const auto & [cluster_name, need_update] : clusters) { auto cluster_info_it = clusters_info.find(cluster_name); if (cluster_info_it == clusters_info.end()) @@ -470,9 +667,9 @@ bool ClusterDiscovery::runMainThread(std::function up_to_date_callback) LOG_ERROR(log, "Unknown cluster '{}'", cluster_name); continue; } - auto & cluster_info = cluster_info_it->second; - if (!need_update.exchange(false)) + auto & cluster_info = cluster_info_it->second; + if (!need_update) { /// force updating periodically bool force_update = cluster_info.watch.elapsedSeconds() > std::chrono::seconds(force_update_interval).count(); @@ -480,7 +677,7 @@ bool ClusterDiscovery::runMainThread(std::function up_to_date_callback) continue; } - if (updateCluster(cluster_info)) + if (upsertCluster(cluster_info)) { cluster_info.watch.restart(); LOG_DEBUG(log, "Cluster '{}' updated successfully", cluster_name); @@ -489,11 +686,34 @@ bool ClusterDiscovery::runMainThread(std::function up_to_date_callback) { all_up_to_date = false; /// no need to trigger convar, will retry after timeout in `wait` - need_update = true; + clusters_to_update->set(cluster_name); LOG_WARNING(log, "Cluster '{}' wasn't updated, will retry", cluster_name); } } + for (const auto & cluster_name : clusters_to_insert) + { + auto cluster_info_it = clusters_info.find(cluster_name); + if (cluster_info_it == clusters_info.end()) + { + LOG_ERROR(log, "Unknown dynamic cluster '{}'", cluster_name); + continue; + } + auto & cluster_info = cluster_info_it->second; + if (upsertCluster(cluster_info)) + { + cluster_info.watch.restart(); + LOG_DEBUG(log, "Dynamic cluster '{}' inserted successfully", cluster_name); + } + else + { + all_up_to_date = false; + /// no need to trigger convar, will retry after timeout in `wait` + clusters_to_update->set(cluster_name); + LOG_WARNING(log, "Dynamic cluster '{}' wasn't inserted, will retry", cluster_name); + } + } + if (all_up_to_date) { up_to_date_callback(); diff --git a/src/Interpreters/ClusterDiscovery.h b/src/Interpreters/ClusterDiscovery.h index b253473ce3e4..b8f8176ed47c 100644 --- a/src/Interpreters/ClusterDiscovery.h +++ b/src/Interpreters/ClusterDiscovery.h @@ -88,6 +88,10 @@ class ClusterDiscovery String password; String cluster_secret; + /// For dynamic clusters, index+1 in multicluster_discovery_paths where cluster was found + /// 0 for static ckusters + size_t zk_root_index; + ClusterInfo(const String & name_, const String & zk_name_, const String & zk_root_, @@ -99,7 +103,9 @@ class ClusterDiscovery bool secure, size_t shard_id, bool observer_mode, - bool invisible) + bool invisible, + size_t zk_root_index_ = 0 + ) : name(name_) , zk_name(zk_name_) , zk_root(zk_root_) @@ -110,6 +116,7 @@ class ClusterDiscovery , username(username_) , password(password_) , cluster_secret(cluster_secret_) + , zk_root_index(zk_root_index_) { } }; @@ -121,19 +128,23 @@ class ClusterDiscovery Strings getNodeNames(zkutil::ZooKeeperPtr & zk, const String & zk_root, const String & cluster_name, - int * version = nullptr, - bool set_callback = true); + int * version, + bool set_callback, + size_t zk_root_index); NodesInfo getNodes(zkutil::ZooKeeperPtr & zk, const String & zk_root, const Strings & node_uuids); ClusterPtr makeCluster(const ClusterInfo & cluster_info); bool needUpdate(const Strings & node_uuids, const NodesInfo & nodes); - bool updateCluster(ClusterInfo & cluster_info); + bool upsertCluster(ClusterInfo & cluster_info); + void removeCluster(const String & name); bool runMainThread(std::function up_to_date_callback); void shutdown(); + void findDynamicClusters(std::unordered_map & info, std::unordered_set * unchanged_roots = nullptr); + /// cluster name -> cluster info (zk root, set of nodes) std::unordered_map clusters_info; @@ -141,14 +152,18 @@ class ClusterDiscovery String current_node_name; - template class ConcurrentFlags; - using UpdateFlags = ConcurrentFlags; + template class Flags; + using UpdateFlags = Flags; /// Cluster names to update. /// The `shared_ptr` is used because it's passed to watch callback. /// It prevents accessing to invalid object after ClusterDiscovery is destroyed. std::shared_ptr clusters_to_update; + /// Hold the callback pointers of each cluster. + /// To avoid registering callbacks for the same path multiple times. + std::unordered_map get_nodes_callbacks; + mutable std::mutex mutex; std::unordered_map cluster_impls; @@ -156,6 +171,39 @@ class ClusterDiscovery ThreadFromGlobalPool main_thread; LoggerPtr log; + + struct MulticlusterDiscovery + { + const String zk_name; + const String zk_path; + bool is_secure_connection; + String username; + String password; + String cluster_secret; + + Stopwatch watch; + mutable std::atomic_bool need_update; + + MulticlusterDiscovery(const String & zk_name_, + const String & zk_path_, + bool is_secure_connection_, + const String & username_, + const String & password_, + const String & cluster_secret_) + : zk_name(zk_name_) + , zk_path(zk_path_) + , is_secure_connection(is_secure_connection_) + , username(username_) + , password(password_) + , cluster_secret(cluster_secret_) + { + need_update = true; + } + + String getFullPath() const { return zk_name + ":" + zk_path; } + }; + + std::shared_ptr>> multicluster_discovery_paths; }; } diff --git a/tests/integration/test_cluster_discovery/config/config_dynamic_cluster1.xml b/tests/integration/test_cluster_discovery/config/config_dynamic_cluster1.xml new file mode 100644 index 000000000000..451a43b08a83 --- /dev/null +++ b/tests/integration/test_cluster_discovery/config/config_dynamic_cluster1.xml @@ -0,0 +1,11 @@ + + 1 + + + + /clickhouse/discovery/test_auto_cluster1 + 1 + + + + diff --git a/tests/integration/test_cluster_discovery/config/config_dynamic_cluster2.xml b/tests/integration/test_cluster_discovery/config/config_dynamic_cluster2.xml new file mode 100644 index 000000000000..8b51241477e6 --- /dev/null +++ b/tests/integration/test_cluster_discovery/config/config_dynamic_cluster2.xml @@ -0,0 +1,11 @@ + + 1 + + + + /clickhouse/discovery/test_auto_cluster2 + 1 + + + + diff --git a/tests/integration/test_cluster_discovery/config/config_dynamic_cluster3.xml b/tests/integration/test_cluster_discovery/config/config_dynamic_cluster3.xml new file mode 100644 index 000000000000..35ccf5cadc74 --- /dev/null +++ b/tests/integration/test_cluster_discovery/config/config_dynamic_cluster3.xml @@ -0,0 +1,11 @@ + + 1 + + + + /clickhouse/discovery2/test_auto_cluster3 + 1 + + + + diff --git a/tests/integration/test_cluster_discovery/config/config_dynamic_cluster_observer.xml b/tests/integration/test_cluster_discovery/config/config_dynamic_cluster_observer.xml new file mode 100644 index 000000000000..224ce1ce66bc --- /dev/null +++ b/tests/integration/test_cluster_discovery/config/config_dynamic_cluster_observer.xml @@ -0,0 +1,17 @@ + + 1 + + + + + /clickhouse/discovery + + + + + + /clickhouse/discovery2 + + + + diff --git a/tests/integration/test_cluster_discovery/test_dynamic_clusters.py b/tests/integration/test_cluster_discovery/test_dynamic_clusters.py new file mode 100644 index 000000000000..ece8012da19a --- /dev/null +++ b/tests/integration/test_cluster_discovery/test_dynamic_clusters.py @@ -0,0 +1,95 @@ +import time +import pytest +import json + +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) + +shard_configs = { + "node0": ["config/config_dynamic_cluster1.xml"], + "node1": ["config/config_dynamic_cluster1.xml"], + "node2": ["config/config_dynamic_cluster2.xml"], + "node3": ["config/config_dynamic_cluster3.xml"], + "node_observer": [], +} + +nodes = { + node_name: cluster.add_instance( + node_name, + main_configs=shard_config + ["config/config_dynamic_cluster_observer.xml"], + stay_alive=True, + with_zookeeper=True, + ) + for node_name, shard_config in shard_configs.items() +} + + +@pytest.fixture(scope="module") +def start_cluster(): + try: + cluster.start() + yield cluster + finally: + cluster.shutdown() + + +def get_clusters_hosts(node, expected): + count = 30 + while True: + resp = node.query("SELECT cluster, host_name FROM system.clusters ORDER BY cluster, host_name FORMAT JSONCompact") + hosts = json.loads(resp)["data"] + if count <= 0 or len(hosts) == expected: + break + time.sleep(1) + count -= 1 + return hosts + + +def test_cluster_discovery_startup_and_stop(start_cluster): + """ + Start cluster, check nodes count in system.clusters, + then stop/start some nodes and check that it (dis)appeared in cluster. + """ + + for node in ["node0", "node1", "node2", "node3", "node_observer"]: + nodes[node].stop_clickhouse() + + for node in ["node0", "node1", "node2", "node_observer"]: + nodes[node].start_clickhouse() + + expect1 = [["test_auto_cluster1", "node0"], ["test_auto_cluster1", "node1"], ["test_auto_cluster2", "node2"]] + for node in ["node0", "node1", "node2", "node_observer"]: + clusters = get_clusters_hosts(nodes[node], 3) + assert clusters == expect1 + + # Kill cluster test_auto_cluster2 + nodes["node2"].stop_clickhouse(kill=True) + + expect2 = [["test_auto_cluster1", "node0"], ["test_auto_cluster1", "node1"]] + for node in ["node0", "node1", "node_observer"]: + clusters = get_clusters_hosts(nodes[node], 2) + assert clusters == expect2 + + # Kill node in cluster test_auto_cluster1 + nodes["node1"].stop_clickhouse(kill=True) + + expect3 = [["test_auto_cluster1", "node0"]] + for node in ["node0", "node_observer"]: + clusters = get_clusters_hosts(nodes[node], 1) + assert clusters == expect3 + + # Restore cluster test_auto_cluster2 + nodes["node2"].start_clickhouse() + + expect4 = [["test_auto_cluster1", "node0"], ["test_auto_cluster2", "node2"]] + for node in ["node0", "node2", "node_observer"]: + clusters = get_clusters_hosts(nodes[node], 2) + assert clusters == expect4 + + nodes["node3"].start_clickhouse() + + expect5 = [["test_auto_cluster1", "node0"], ["test_auto_cluster2", "node2"], ["test_auto_cluster3", "node3"]] + for node in ["node0", "node2", "node3", "node_observer"]: + clusters = get_clusters_hosts(nodes[node], 3) + assert clusters == expect5