From 1a16befb2699baabe754fb1bab7c5d214b6f4b10 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Fri, 6 Jun 2025 16:58:31 +0200 Subject: [PATCH 1/6] SYSTEM PRESHUTDOWN to allow graceful shutdown node --- docs/en/sql-reference/statements/system.md | 6 ++ src/Access/Common/AccessType.h | 2 +- src/Interpreters/ClusterDiscovery.cpp | 27 +++++++ src/Interpreters/ClusterDiscovery.h | 3 + src/Interpreters/Context.cpp | 24 +++++- src/Interpreters/Context.h | 5 ++ src/Interpreters/InterpreterSystemQuery.cpp | 8 ++ src/Parsers/ASTSystemQuery.cpp | 1 + src/Parsers/ASTSystemQuery.h | 1 + src/QueryPipeline/RemoteQueryExecutor.cpp | 5 ++ src/QueryPipeline/RemoteQueryExecutor.h | 2 + .../RemoteQueryExecutorReadContext.cpp | 38 ++++++--- .../StorageObjectStorageSource.cpp | 22 ++++- .../StorageObjectStorageSource.h | 4 +- .../test_s3_cluster/data/graceful/part0.csv | 1 + .../test_s3_cluster/data/graceful/part1.csv | 1 + .../test_s3_cluster/data/graceful/part2.csv | 1 + .../test_s3_cluster/data/graceful/part3.csv | 1 + .../test_s3_cluster/data/graceful/part4.csv | 1 + .../test_s3_cluster/data/graceful/part5.csv | 1 + .../test_s3_cluster/data/graceful/part6.csv | 1 + .../test_s3_cluster/data/graceful/part7.csv | 1 + .../test_s3_cluster/data/graceful/part8.csv | 1 + .../test_s3_cluster/data/graceful/part9.csv | 1 + .../test_s3_cluster/data/graceful/partA.csv | 1 + .../test_s3_cluster/data/graceful/partB.csv | 1 + .../test_s3_cluster/data/graceful/partC.csv | 1 + .../test_s3_cluster/data/graceful/partD.csv | 1 + .../test_s3_cluster/data/graceful/partE.csv | 1 + .../test_s3_cluster/data/graceful/partF.csv | 1 + tests/integration/test_s3_cluster/test.py | 81 ++++++++++++++++++- 31 files changed, 227 insertions(+), 18 deletions(-) create mode 100644 tests/integration/test_s3_cluster/data/graceful/part0.csv create mode 100644 tests/integration/test_s3_cluster/data/graceful/part1.csv create mode 100644 tests/integration/test_s3_cluster/data/graceful/part2.csv create mode 100644 tests/integration/test_s3_cluster/data/graceful/part3.csv create mode 100644 tests/integration/test_s3_cluster/data/graceful/part4.csv create mode 100644 tests/integration/test_s3_cluster/data/graceful/part5.csv create mode 100644 tests/integration/test_s3_cluster/data/graceful/part6.csv create mode 100644 tests/integration/test_s3_cluster/data/graceful/part7.csv create mode 100644 tests/integration/test_s3_cluster/data/graceful/part8.csv create mode 100644 tests/integration/test_s3_cluster/data/graceful/part9.csv create mode 100644 tests/integration/test_s3_cluster/data/graceful/partA.csv create mode 100644 tests/integration/test_s3_cluster/data/graceful/partB.csv create mode 100644 tests/integration/test_s3_cluster/data/graceful/partC.csv create mode 100644 tests/integration/test_s3_cluster/data/graceful/partD.csv create mode 100644 tests/integration/test_s3_cluster/data/graceful/partE.csv create mode 100644 tests/integration/test_s3_cluster/data/graceful/partF.csv diff --git a/docs/en/sql-reference/statements/system.md b/docs/en/sql-reference/statements/system.md index 4ad4d94f7e34..c10e200daee6 100644 --- a/docs/en/sql-reference/statements/system.md +++ b/docs/en/sql-reference/statements/system.md @@ -206,6 +206,12 @@ SYSTEM RELOAD USERS [ON CLUSTER cluster_name] Normally shuts down ClickHouse (like `service clickhouse-server stop` / `kill {$pid_clickhouse-server}`) +## PRESHUTDOWN {#preshutdown} + + + +Prepare node for graceful shutdown. Unregister in autodiscovered clusters, stop accepting distributed requests to object storages (s3Cluster, icebergCluster, etc.). + ## KILL {#kill} Aborts ClickHouse process (like `kill -9 {$ pid_clickhouse-server}`) diff --git a/src/Access/Common/AccessType.h b/src/Access/Common/AccessType.h index 6a951560586f..7c0686f30522 100644 --- a/src/Access/Common/AccessType.h +++ b/src/Access/Common/AccessType.h @@ -163,7 +163,7 @@ enum class AccessType : uint8_t \ M(TABLE_ENGINE, "TABLE ENGINE", TABLE_ENGINE, ALL) \ \ - M(SYSTEM_SHUTDOWN, "SYSTEM KILL, SHUTDOWN", GLOBAL, SYSTEM) \ + M(SYSTEM_SHUTDOWN, "SYSTEM KILL, SHUTDOWN, PRESHUTDOWN", GLOBAL, SYSTEM) \ M(SYSTEM_DROP_DNS_CACHE, "SYSTEM DROP DNS, DROP DNS CACHE, DROP DNS", GLOBAL, SYSTEM_DROP_CACHE) \ M(SYSTEM_DROP_CONNECTIONS_CACHE, "SYSTEM DROP CONNECTIONS CACHE, DROP CONNECTIONS CACHE", GLOBAL, SYSTEM_DROP_CACHE) \ M(SYSTEM_PREWARM_MARK_CACHE, "SYSTEM PREWARM MARK, PREWARM MARK CACHE, PREWARM MARKS", GLOBAL, SYSTEM_DROP_CACHE) \ diff --git a/src/Interpreters/ClusterDiscovery.cpp b/src/Interpreters/ClusterDiscovery.cpp index 033f1379543c..d1a57a57771c 100644 --- a/src/Interpreters/ClusterDiscovery.cpp +++ b/src/Interpreters/ClusterDiscovery.cpp @@ -455,12 +455,30 @@ void ClusterDiscovery::registerInZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & inf return; } + if (context->isPreShutdownCalled()) + { + LOG_DEBUG(log, "PreShutdown called, skip self-registering current node {} in cluster {}", current_node_name, info.name); + return; + } + LOG_DEBUG(log, "Registering current node {} in cluster {}", current_node_name, info.name); zk->createOrUpdate(node_path, info.current_node.serialize(), zkutil::CreateMode::Ephemeral); LOG_DEBUG(log, "Current node {} registered in cluster {}", current_node_name, info.name); } +void ClusterDiscovery::unregisterFromZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & info) +{ + if (info.current_node_is_observer) + return; + + String node_path = getShardsListPath(info.zk_root) / current_node_name; + LOG_DEBUG(log, "Removing current node {} from cluster {}", current_node_name, info.name); + + zk->remove(node_path); + LOG_DEBUG(log, "Current node {} removed from cluster {}", current_node_name, info.name); +} + void ClusterDiscovery::initialUpdate() { LOG_DEBUG(log, "Initializing"); @@ -506,6 +524,15 @@ void ClusterDiscovery::initialUpdate() is_initialized = true; } +void ClusterDiscovery::unregisterAll() +{ + for (auto & [_, info] : clusters_info) + { + auto zk = context->getDefaultOrAuxiliaryZooKeeper(info.zk_name); + unregisterFromZk(zk, info); + } +} + void ClusterDiscovery::findDynamicClusters( std::unordered_map & info, std::unordered_set * unchanged_roots) diff --git a/src/Interpreters/ClusterDiscovery.h b/src/Interpreters/ClusterDiscovery.h index c0e4af3b86f3..00658c4eed6a 100644 --- a/src/Interpreters/ClusterDiscovery.h +++ b/src/Interpreters/ClusterDiscovery.h @@ -38,6 +38,8 @@ class ClusterDiscovery ~ClusterDiscovery(); + void unregisterAll(); + private: struct NodeInfo { @@ -125,6 +127,7 @@ class ClusterDiscovery void initialUpdate(); void registerInZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & info); + void unregisterFromZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & info); Strings getNodeNames(zkutil::ZooKeeperPtr & zk, const String & zk_root, diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index da90eb37938b..42c29d6f1b5b 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -579,6 +579,7 @@ struct ContextSharedPart : boost::noncopyable std::map server_ports; std::atomic shutdown_called = false; + std::atomic preshutdown_called = false; Stopwatch uptime_watch TSA_GUARDED_BY(mutex); @@ -926,6 +927,11 @@ struct ContextSharedPart : boost::noncopyable total_memory_tracker.resetPageCache(); } + void preShutdown() + { + preshutdown_called = true; + } + bool hasTraceCollector() const { return trace_collector.has_value(); @@ -4644,7 +4650,6 @@ std::shared_ptr Context::getCluster(const std::string & cluster_name) c throw Exception(ErrorCodes::CLUSTER_DOESNT_EXIST, "Requested cluster '{}' not found", cluster_name); } - std::shared_ptr Context::tryGetCluster(const std::string & cluster_name) const { std::shared_ptr res = nullptr; @@ -4663,6 +4668,13 @@ std::shared_ptr Context::tryGetCluster(const std::string & cluster_name return res; } +void Context::unregisterInDynamicClusters() +{ + std::lock_guard lock(shared->clusters_mutex); + if (!shared->cluster_discovery) + return; + shared->cluster_discovery->unregisterAll(); +} void Context::reloadClusterConfig() const { @@ -5540,12 +5552,20 @@ void Context::stopServers(const ServerType & server_type) const shared->stop_servers_callback(server_type); } - void Context::shutdown() TSA_NO_THREAD_SAFETY_ANALYSIS { shared->shutdown(); } +void Context::preShutdown() +{ + shared->preshutdown_called = true; +} + +bool Context::isPreShutdownCalled() const +{ + return shared->preshutdown_called; +} Context::ApplicationType Context::getApplicationType() const { diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index f26899ea3c72..d0fff7a5a57a 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -1296,6 +1296,7 @@ class Context: public ContextData, public std::enable_shared_from_this size_t getClustersVersion() const; void startClusterDiscovery(); + void unregisterInDynamicClusters(); /// Sets custom cluster, but doesn't update configuration void setCluster(const String & cluster_name, const std::shared_ptr & cluster); @@ -1408,6 +1409,10 @@ class Context: public ContextData, public std::enable_shared_from_this void shutdown(); + /// Stop some works to allow graceful shutdown later + void preShutdown(); + bool isPreShutdownCalled() const; + bool isInternalQuery() const { return is_internal_query; } void setInternalQuery(bool internal) { is_internal_query = internal; } diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index ca029ae076b0..de7fa474f263 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -335,6 +335,13 @@ BlockIO InterpreterSystemQuery::execute() throw ErrnoException(ErrorCodes::CANNOT_KILL, "System call kill(0, SIGTERM) failed"); break; } + case Type::PRESHUTDOWN: + { + getContext()->checkAccess(AccessType::SYSTEM_SHUTDOWN); + getContext()->preShutdown(); + getContext()->unregisterInDynamicClusters(); + break; + } case Type::KILL: { getContext()->checkAccess(AccessType::SYSTEM_SHUTDOWN); @@ -1527,6 +1534,7 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster() switch (query.type) { case Type::SHUTDOWN: + case Type::PRESHUTDOWN: case Type::KILL: case Type::SUSPEND: { diff --git a/src/Parsers/ASTSystemQuery.cpp b/src/Parsers/ASTSystemQuery.cpp index 4c463107a0c3..b6852143cfb4 100644 --- a/src/Parsers/ASTSystemQuery.cpp +++ b/src/Parsers/ASTSystemQuery.cpp @@ -453,6 +453,7 @@ void ASTSystemQuery::formatImpl(WriteBuffer & ostr, const FormatSettings & setti } case Type::KILL: case Type::SHUTDOWN: + case Type::PRESHUTDOWN: case Type::DROP_DNS_CACHE: case Type::DROP_CONNECTIONS_CACHE: case Type::DROP_MMAP_CACHE: diff --git a/src/Parsers/ASTSystemQuery.h b/src/Parsers/ASTSystemQuery.h index 13af5fe76ea0..0c8d17023da0 100644 --- a/src/Parsers/ASTSystemQuery.h +++ b/src/Parsers/ASTSystemQuery.h @@ -17,6 +17,7 @@ class ASTSystemQuery : public IAST, public ASTQueryWithOnCluster { UNKNOWN, SHUTDOWN, + PRESHUTDOWN, KILL, SUSPEND, DROP_DNS_CACHE, diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index eb709d2c49cb..3945e438b3a1 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -1010,6 +1010,11 @@ void RemoteQueryExecutor::setProfileInfoCallback(ProfileInfoCallback callback) profile_info_callback = std::move(callback); } +bool RemoteQueryExecutor::skipUnavailableShards() const +{ + return context->getSettingsRef()[Setting::skip_unavailable_shards]; +} + bool RemoteQueryExecutor::needToSkipUnavailableShard() const { return context->getSettingsRef()[Setting::skip_unavailable_shards] && (0 == connections->size()); diff --git a/src/QueryPipeline/RemoteQueryExecutor.h b/src/QueryPipeline/RemoteQueryExecutor.h index c32d2fbce19e..ad09d6052b0b 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.h +++ b/src/QueryPipeline/RemoteQueryExecutor.h @@ -227,6 +227,8 @@ class RemoteQueryExecutor IConnections & getConnections() { return *connections; } + bool skipUnavailableShards() const; + bool needToSkipUnavailableShard() const; bool isReplicaUnavailable() const { return extension && extension->parallel_reading_coordinator && connections->size() == 0; } diff --git a/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp b/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp index 9090d045daae..09ba3ee3cd41 100644 --- a/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp +++ b/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp @@ -16,6 +16,7 @@ namespace ErrorCodes extern const int CANNOT_READ_FROM_SOCKET; extern const int CANNOT_OPEN_FILE; extern const int SOCKET_TIMEOUT; + extern const int ATTEMPT_TO_READ_AFTER_EOF; } RemoteQueryExecutorReadContext::RemoteQueryExecutorReadContext( @@ -54,19 +55,38 @@ void RemoteQueryExecutorReadContext::Task::run(AsyncCallback async_callback, Sus if (read_context.executor.needToSkipUnavailableShard()) return; - while (true) - { - read_context.has_read_packet_part = PacketPart::None; + bool has_data_packets = false; - if (read_context.read_packet_type_separately) + try + { + while (true) { - read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback); - read_context.has_read_packet_part = PacketPart::Type; + read_context.has_read_packet_part = PacketPart::None; + + if (read_context.read_packet_type_separately) + { + read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback); + read_context.has_read_packet_part = PacketPart::Type; + suspend_callback(); + } + read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback); + read_context.has_read_packet_part = PacketPart::Body; + if (read_context.packet.type == Protocol::Server::Data) + has_data_packets = true; suspend_callback(); } - read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback); - read_context.has_read_packet_part = PacketPart::Body; - suspend_callback(); + } + catch (const Exception & e) + { + if (e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF) + { + if (!has_data_packets && read_context.executor.skipUnavailableShards()) + { + read_context.has_read_packet_part = PacketPart::None; + return; + } + } + throw; } } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 2f21601a984a..9755d3930c22 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -151,7 +151,10 @@ std::shared_ptr StorageObjectStorageSource::createFileIterator( if (distributed_processing) { - auto distributed_iterator = std::make_unique(local_context->getReadTaskCallback(), local_context->getSettingsRef()[Setting::max_threads]); + auto distributed_iterator = std::make_unique( + local_context->getReadTaskCallback(), + local_context->getSettingsRef()[Setting::max_threads], + local_context); if (is_archive) return std::make_shared(object_storage, configuration, std::move(distributed_iterator), local_context, nullptr); @@ -1050,9 +1053,16 @@ StorageObjectStorageSource::ReaderHolder::operator=(ReaderHolder && other) noexc } StorageObjectStorageSource::ReadTaskIterator::ReadTaskIterator( - const ReadTaskCallback & callback_, size_t max_threads_count) - : callback(callback_) + const ReadTaskCallback & callback_, size_t max_threads_count, ContextPtr context_) + : WithContext(context_) + , callback(callback_) { + if (getContext()->isPreShutdownCalled()) + { + LOG_DEBUG(getLogger("StorageObjectStorageSource"), "PRESHUTDOWN called, stop getting new tasks"); + return; + } + ThreadPool pool( CurrentMetrics::StorageObjectStorageThreads, CurrentMetrics::StorageObjectStorageThreadsActive, @@ -1082,6 +1092,12 @@ StorageObjectStorage::ObjectInfoPtr StorageObjectStorageSource::ReadTaskIterator size_t current_index = index.fetch_add(1, std::memory_order_relaxed); if (current_index >= buffer.size()) { + if (getContext()->isPreShutdownCalled()) + { + LOG_DEBUG(getLogger("StorageObjectStorageSource"), "PRESHUTDOWN called, stop getting new tasks"); + return nullptr; + } + auto key = callback(); if (key.empty()) return nullptr; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.h b/src/Storages/ObjectStorage/StorageObjectStorageSource.h index 6b08b02f9245..5e4345f3ad37 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.h @@ -153,10 +153,10 @@ class StorageObjectStorageSource : public SourceWithKeyCondition void lazyInitialize(); }; -class StorageObjectStorageSource::ReadTaskIterator : public IObjectIterator +class StorageObjectStorageSource::ReadTaskIterator : public IObjectIterator, WithContext { public: - ReadTaskIterator(const ReadTaskCallback & callback_, size_t max_threads_count); + ReadTaskIterator(const ReadTaskCallback & callback_, size_t max_threads_count, ContextPtr context_); ObjectInfoPtr next(size_t) override; diff --git a/tests/integration/test_s3_cluster/data/graceful/part0.csv b/tests/integration/test_s3_cluster/data/graceful/part0.csv new file mode 100644 index 000000000000..2a8ceabbea58 --- /dev/null +++ b/tests/integration/test_s3_cluster/data/graceful/part0.csv @@ -0,0 +1 @@ +0,"Foo" \ No newline at end of file diff --git a/tests/integration/test_s3_cluster/data/graceful/part1.csv b/tests/integration/test_s3_cluster/data/graceful/part1.csv new file mode 100644 index 000000000000..1950012fffd2 --- /dev/null +++ b/tests/integration/test_s3_cluster/data/graceful/part1.csv @@ -0,0 +1 @@ +1,"Bar" \ No newline at end of file diff --git a/tests/integration/test_s3_cluster/data/graceful/part2.csv b/tests/integration/test_s3_cluster/data/graceful/part2.csv new file mode 100644 index 000000000000..dc782d5adf9b --- /dev/null +++ b/tests/integration/test_s3_cluster/data/graceful/part2.csv @@ -0,0 +1 @@ +2,"Foo" \ No newline at end of file diff --git a/tests/integration/test_s3_cluster/data/graceful/part3.csv b/tests/integration/test_s3_cluster/data/graceful/part3.csv new file mode 100644 index 000000000000..6e581549d23c --- /dev/null +++ b/tests/integration/test_s3_cluster/data/graceful/part3.csv @@ -0,0 +1 @@ +3,"Bar" \ No newline at end of file diff --git a/tests/integration/test_s3_cluster/data/graceful/part4.csv b/tests/integration/test_s3_cluster/data/graceful/part4.csv new file mode 100644 index 000000000000..bb5a4d956c51 --- /dev/null +++ b/tests/integration/test_s3_cluster/data/graceful/part4.csv @@ -0,0 +1 @@ +4,"Foo" \ No newline at end of file diff --git a/tests/integration/test_s3_cluster/data/graceful/part5.csv b/tests/integration/test_s3_cluster/data/graceful/part5.csv new file mode 100644 index 000000000000..5cb2c6be144b --- /dev/null +++ b/tests/integration/test_s3_cluster/data/graceful/part5.csv @@ -0,0 +1 @@ +5,"Bar" \ No newline at end of file diff --git a/tests/integration/test_s3_cluster/data/graceful/part6.csv b/tests/integration/test_s3_cluster/data/graceful/part6.csv new file mode 100644 index 000000000000..e2e2428d100d --- /dev/null +++ b/tests/integration/test_s3_cluster/data/graceful/part6.csv @@ -0,0 +1 @@ +6,"Foo" \ No newline at end of file diff --git a/tests/integration/test_s3_cluster/data/graceful/part7.csv b/tests/integration/test_s3_cluster/data/graceful/part7.csv new file mode 100644 index 000000000000..3c819a315c20 --- /dev/null +++ b/tests/integration/test_s3_cluster/data/graceful/part7.csv @@ -0,0 +1 @@ +7,"Bar" \ No newline at end of file diff --git a/tests/integration/test_s3_cluster/data/graceful/part8.csv b/tests/integration/test_s3_cluster/data/graceful/part8.csv new file mode 100644 index 000000000000..72f39e512be3 --- /dev/null +++ b/tests/integration/test_s3_cluster/data/graceful/part8.csv @@ -0,0 +1 @@ +8,"Foo" \ No newline at end of file diff --git a/tests/integration/test_s3_cluster/data/graceful/part9.csv b/tests/integration/test_s3_cluster/data/graceful/part9.csv new file mode 100644 index 000000000000..f288cb2051dd --- /dev/null +++ b/tests/integration/test_s3_cluster/data/graceful/part9.csv @@ -0,0 +1 @@ +9,"Bar" \ No newline at end of file diff --git a/tests/integration/test_s3_cluster/data/graceful/partA.csv b/tests/integration/test_s3_cluster/data/graceful/partA.csv new file mode 100644 index 000000000000..da99f68ba784 --- /dev/null +++ b/tests/integration/test_s3_cluster/data/graceful/partA.csv @@ -0,0 +1 @@ +10,"Foo" \ No newline at end of file diff --git a/tests/integration/test_s3_cluster/data/graceful/partB.csv b/tests/integration/test_s3_cluster/data/graceful/partB.csv new file mode 100644 index 000000000000..46591e0be815 --- /dev/null +++ b/tests/integration/test_s3_cluster/data/graceful/partB.csv @@ -0,0 +1 @@ +11,"Bar" \ No newline at end of file diff --git a/tests/integration/test_s3_cluster/data/graceful/partC.csv b/tests/integration/test_s3_cluster/data/graceful/partC.csv new file mode 100644 index 000000000000..24af8010b5c6 --- /dev/null +++ b/tests/integration/test_s3_cluster/data/graceful/partC.csv @@ -0,0 +1 @@ +12,"Foo" \ No newline at end of file diff --git a/tests/integration/test_s3_cluster/data/graceful/partD.csv b/tests/integration/test_s3_cluster/data/graceful/partD.csv new file mode 100644 index 000000000000..0365a5024871 --- /dev/null +++ b/tests/integration/test_s3_cluster/data/graceful/partD.csv @@ -0,0 +1 @@ +13,"Bar" \ No newline at end of file diff --git a/tests/integration/test_s3_cluster/data/graceful/partE.csv b/tests/integration/test_s3_cluster/data/graceful/partE.csv new file mode 100644 index 000000000000..3143c0eed915 --- /dev/null +++ b/tests/integration/test_s3_cluster/data/graceful/partE.csv @@ -0,0 +1 @@ +14,"Foo" \ No newline at end of file diff --git a/tests/integration/test_s3_cluster/data/graceful/partF.csv b/tests/integration/test_s3_cluster/data/graceful/partF.csv new file mode 100644 index 000000000000..d0306b9bb806 --- /dev/null +++ b/tests/integration/test_s3_cluster/data/graceful/partF.csv @@ -0,0 +1 @@ +15,"Bar" \ No newline at end of file diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index 1df0f36f9321..9b15bbd8a6d2 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -4,11 +4,13 @@ import shutil import uuid -from email.errors import HeaderParseError +import time +import threading import pytest from helpers.cluster import ClickHouseCluster +from helpers.client import QueryRuntimeException from helpers.config_cluster import minio_secret_key from helpers.mock_servers import start_mock_servers from helpers.test_tools import TSV @@ -22,6 +24,22 @@ "data/clickhouse/part123.csv", "data/database/part2.csv", "data/database/partition675.csv", + "data/graceful/part0.csv", + "data/graceful/part1.csv", + "data/graceful/part2.csv", + "data/graceful/part3.csv", + "data/graceful/part4.csv", + "data/graceful/part5.csv", + "data/graceful/part6.csv", + "data/graceful/part7.csv", + "data/graceful/part8.csv", + "data/graceful/part9.csv", + "data/graceful/partA.csv", + "data/graceful/partB.csv", + "data/graceful/partC.csv", + "data/graceful/partD.csv", + "data/graceful/partE.csv", + "data/graceful/partF.csv", ] @@ -77,6 +95,7 @@ def started_cluster(): macros={"replica": "node1", "shard": "shard1"}, with_minio=True, with_zookeeper=True, + stay_alive=True, ) cluster.add_instance( "s0_0_1", @@ -84,6 +103,7 @@ def started_cluster(): user_configs=["configs/users.xml"], macros={"replica": "replica2", "shard": "shard1"}, with_zookeeper=True, + stay_alive=True, ) cluster.add_instance( "s0_1_0", @@ -91,6 +111,7 @@ def started_cluster(): user_configs=["configs/users.xml"], macros={"replica": "replica1", "shard": "shard2"}, with_zookeeper=True, + stay_alive=True, ) logging.info("Starting cluster...") @@ -1019,3 +1040,61 @@ def test_cluster_hosts_limit(started_cluster): """ ) assert int(hosts_2) == 2 + + +def test_graceful_shutdown(started_cluster): + node = started_cluster.instances["s0_0_0"] + node_to_shutdown = started_cluster.instances["s0_1_0"] + + expected = TSV("64\tBar\t8\n56\tFoo\t8\n") + + num_lock = threading.Lock() + errors = 0 + + def query_cycle(): + nonlocal errors + try: + i = 0 + while i < 10: + i += 1 + # Query time 3-4 seconds + # Processing single object 1-2 seconds + result = node.query(f""" + SELECT sum(value),name,sum(sleep(1)+1) as sleep FROM s3Cluster( + 'cluster_simple', + 'http://minio1:9001/root/data/graceful/*', 'minio', '{minio_secret_key}', 'CSV', + 'value UInt32, name String') + GROUP BY name + ORDER BY name + SETTINGS max_threads=2 + """) + with num_lock: + if TSV(result) != expected: + errors += 1 + if errors >= 1: + break + except QueryRuntimeException: + with num_lock: + errors += 1 + + threads = [] + + for _ in range(10): + thread = threading.Thread(target=query_cycle) + thread.start() + threads.append(thread) + time.sleep(0.2) + + time.sleep(3) + + node_to_shutdown.query("SYSTEM PRESHUTDOWN") + + # enough time to complete processing of objects, started before "SYSTEM PRESHUTDOWN" + time.sleep(3) + + node_to_shutdown.stop_clickhouse(kill=True) + + for thread in threads: + thread.join() + + assert errors == 0 From 11c40c45590b364f55fe9f134060e15cf9574133 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Thu, 12 Jun 2025 12:26:43 +0200 Subject: [PATCH 2/6] Fix tests --- src/Interpreters/Context.cpp | 1 + tests/integration/test_s3_cluster/test.py | 2 ++ tests/queries/0_stateless/01271_show_privileges.reference | 2 +- 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 42c29d6f1b5b..4d95e8b98955 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -748,6 +748,7 @@ struct ContextSharedPart : boost::noncopyable */ void shutdown() TSA_NO_THREAD_SAFETY_ANALYSIS { + preshutdown_called = true; bool is_shutdown_called = shutdown_called.exchange(true); if (is_shutdown_called) return; diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index 9b15bbd8a6d2..014cc4fc8c9b 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -1097,4 +1097,6 @@ def query_cycle(): for thread in threads: thread.join() + node_to_shutdown.start_clickhouse() + assert errors == 0 diff --git a/tests/queries/0_stateless/01271_show_privileges.reference b/tests/queries/0_stateless/01271_show_privileges.reference index b504ab6f4aa1..0f9ff76e16b8 100644 --- a/tests/queries/0_stateless/01271_show_privileges.reference +++ b/tests/queries/0_stateless/01271_show_privileges.reference @@ -112,7 +112,7 @@ NAMED COLLECTION ['NAMED COLLECTION USAGE','USE NAMED COLLECTION'] NAMED_COLLECT NAMED COLLECTION ADMIN ['NAMED COLLECTION CONTROL'] NAMED_COLLECTION ALL SET DEFINER [] DEFINER ALL TABLE ENGINE ['TABLE ENGINE'] TABLE_ENGINE ALL -SYSTEM SHUTDOWN ['SYSTEM KILL','SHUTDOWN'] GLOBAL SYSTEM +SYSTEM SHUTDOWN ['SYSTEM KILL','SHUTDOWN','PRESHUTDOWN'] GLOBAL SYSTEM SYSTEM DROP DNS CACHE ['SYSTEM DROP DNS','DROP DNS CACHE','DROP DNS'] GLOBAL SYSTEM DROP CACHE SYSTEM DROP CONNECTIONS CACHE ['SYSTEM DROP CONNECTIONS CACHE','DROP CONNECTIONS CACHE'] GLOBAL SYSTEM DROP CACHE SYSTEM PREWARM MARK CACHE ['SYSTEM PREWARM MARK','PREWARM MARK CACHE','PREWARM MARKS'] GLOBAL SYSTEM DROP CACHE From 43356706bd7f49cb032afcf3c091555db06be3aa Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Fri, 13 Jun 2025 16:59:48 +0200 Subject: [PATCH 3/6] I hate coroutines --- .../RemoteQueryExecutorReadContext.cpp | 23 ++++++++----------- .../RemoteQueryExecutorReadContext.h | 1 + 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp b/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp index 09ba3ee3cd41..37c349f11632 100644 --- a/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp +++ b/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp @@ -55,11 +55,9 @@ void RemoteQueryExecutorReadContext::Task::run(AsyncCallback async_callback, Sus if (read_context.executor.needToSkipUnavailableShard()) return; - bool has_data_packets = false; - - try + while (true) { - while (true) + try { read_context.has_read_packet_part = PacketPart::None; @@ -72,21 +70,20 @@ void RemoteQueryExecutorReadContext::Task::run(AsyncCallback async_callback, Sus read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback); read_context.has_read_packet_part = PacketPart::Body; if (read_context.packet.type == Protocol::Server::Data) - has_data_packets = true; - suspend_callback(); + read_context.has_data_packets = true; } - } - catch (const Exception & e) - { - if (e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF) + catch (const Exception & e) { - if (!has_data_packets && read_context.executor.skipUnavailableShards()) + if (e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF + && !read_context.has_data_packets.load() && read_context.executor.skipUnavailableShards()) { read_context.has_read_packet_part = PacketPart::None; - return; } + else + throw; } - throw; + + suspend_callback(); } } diff --git a/src/QueryPipeline/RemoteQueryExecutorReadContext.h b/src/QueryPipeline/RemoteQueryExecutorReadContext.h index abde6cb93ef3..e306aa3b3e47 100644 --- a/src/QueryPipeline/RemoteQueryExecutorReadContext.h +++ b/src/QueryPipeline/RemoteQueryExecutorReadContext.h @@ -85,6 +85,7 @@ class RemoteQueryExecutorReadContext : public AsyncTaskExecutor /// None -> Type -> Body -> None /// None -> Body -> None std::atomic has_read_packet_part = PacketPart::None; + std::atomic_bool has_data_packets = false; Packet packet; RemoteQueryExecutor & executor; From 3c51d2506c041a92c8391cd12cc11a1ecf182bbf Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 25 Jun 2025 17:35:57 +0200 Subject: [PATCH 4/6] Change PRESUTDOWN on STOP SWARM command --- programs/server/Server.cpp | 2 ++ src/Access/Common/AccessType.h | 3 +- src/Interpreters/ClusterDiscovery.cpp | 17 ++++++++-- src/Interpreters/ClusterDiscovery.h | 1 + src/Interpreters/Context.cpp | 34 ++++++++++++++----- src/Interpreters/Context.h | 6 ++-- src/Interpreters/InterpreterSystemQuery.cpp | 28 ++++++++++----- src/Parsers/ASTSystemQuery.cpp | 3 +- src/Parsers/ASTSystemQuery.h | 3 +- .../StorageObjectStorageSource.cpp | 8 ++--- tests/integration/test_s3_cluster/test.py | 4 +-- .../01271_show_privileges.reference | 3 +- 12 files changed, 81 insertions(+), 31 deletions(-) diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 904c100dc555..c453682fa855 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -2701,6 +2701,8 @@ try is_cancelled = true; + global_context->stopSwarm(); + LOG_DEBUG(log, "Waiting for current connections to close."); size_t current_connections = 0; diff --git a/src/Access/Common/AccessType.h b/src/Access/Common/AccessType.h index 7c0686f30522..83f4d948e941 100644 --- a/src/Access/Common/AccessType.h +++ b/src/Access/Common/AccessType.h @@ -163,7 +163,7 @@ enum class AccessType : uint8_t \ M(TABLE_ENGINE, "TABLE ENGINE", TABLE_ENGINE, ALL) \ \ - M(SYSTEM_SHUTDOWN, "SYSTEM KILL, SHUTDOWN, PRESHUTDOWN", GLOBAL, SYSTEM) \ + M(SYSTEM_SHUTDOWN, "SYSTEM KILL, SHUTDOWN", GLOBAL, SYSTEM) \ M(SYSTEM_DROP_DNS_CACHE, "SYSTEM DROP DNS, DROP DNS CACHE, DROP DNS", GLOBAL, SYSTEM_DROP_CACHE) \ M(SYSTEM_DROP_CONNECTIONS_CACHE, "SYSTEM DROP CONNECTIONS CACHE, DROP CONNECTIONS CACHE", GLOBAL, SYSTEM_DROP_CACHE) \ M(SYSTEM_PREWARM_MARK_CACHE, "SYSTEM PREWARM MARK, PREWARM MARK CACHE, PREWARM MARKS", GLOBAL, SYSTEM_DROP_CACHE) \ @@ -200,6 +200,7 @@ enum class AccessType : uint8_t M(SYSTEM_TTL_MERGES, "SYSTEM STOP TTL MERGES, SYSTEM START TTL MERGES, STOP TTL MERGES, START TTL MERGES", TABLE, SYSTEM) \ M(SYSTEM_FETCHES, "SYSTEM STOP FETCHES, SYSTEM START FETCHES, STOP FETCHES, START FETCHES", TABLE, SYSTEM) \ M(SYSTEM_MOVES, "SYSTEM STOP MOVES, SYSTEM START MOVES, STOP MOVES, START MOVES", TABLE, SYSTEM) \ + M(SYSTEM_SWARM, "SYSTEM STOP SWARM, SYSTEM START SWARM, STOP SWARM, START SWARM", GLOBAL, SYSTEM) \ M(SYSTEM_PULLING_REPLICATION_LOG, "SYSTEM STOP PULLING REPLICATION LOG, SYSTEM START PULLING REPLICATION LOG", TABLE, SYSTEM) \ M(SYSTEM_CLEANUP, "SYSTEM STOP CLEANUP, SYSTEM START CLEANUP", TABLE, SYSTEM) \ M(SYSTEM_VIEWS, "SYSTEM REFRESH VIEW, SYSTEM START VIEWS, SYSTEM STOP VIEWS, SYSTEM START VIEW, SYSTEM STOP VIEW, SYSTEM CANCEL VIEW, REFRESH VIEW, START VIEWS, STOP VIEWS, START VIEW, STOP VIEW, CANCEL VIEW", VIEW, SYSTEM) \ diff --git a/src/Interpreters/ClusterDiscovery.cpp b/src/Interpreters/ClusterDiscovery.cpp index d1a57a57771c..ad1c59ce385c 100644 --- a/src/Interpreters/ClusterDiscovery.cpp +++ b/src/Interpreters/ClusterDiscovery.cpp @@ -391,7 +391,9 @@ bool ClusterDiscovery::upsertCluster(ClusterInfo & cluster_info) return true; }; - if (!cluster_info.current_node_is_observer && !contains(node_uuids, current_node_name)) + if (!cluster_info.current_node_is_observer + && !context->isStopSwarmCalled() + && !contains(node_uuids, current_node_name)) { LOG_ERROR(log, "Can't find current node in cluster '{}', will register again", cluster_info.name); registerInZk(zk, cluster_info); @@ -455,9 +457,9 @@ void ClusterDiscovery::registerInZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & inf return; } - if (context->isPreShutdownCalled()) + if (context->isStopSwarmCalled()) { - LOG_DEBUG(log, "PreShutdown called, skip self-registering current node {} in cluster {}", current_node_name, info.name); + LOG_DEBUG(log, "STOP SWARM called, skip self-registering current node {} in cluster {}", current_node_name, info.name); return; } @@ -524,6 +526,15 @@ void ClusterDiscovery::initialUpdate() is_initialized = true; } +void ClusterDiscovery::registerAll() +{ + for (auto & [_, info] : clusters_info) + { + auto zk = context->getDefaultOrAuxiliaryZooKeeper(info.zk_name); + registerInZk(zk, info); + } +} + void ClusterDiscovery::unregisterAll() { for (auto & [_, info] : clusters_info) diff --git a/src/Interpreters/ClusterDiscovery.h b/src/Interpreters/ClusterDiscovery.h index 00658c4eed6a..4dce9019537d 100644 --- a/src/Interpreters/ClusterDiscovery.h +++ b/src/Interpreters/ClusterDiscovery.h @@ -38,6 +38,7 @@ class ClusterDiscovery ~ClusterDiscovery(); + void registerAll(); void unregisterAll(); private: diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 4d95e8b98955..0932882452f2 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -579,7 +579,7 @@ struct ContextSharedPart : boost::noncopyable std::map server_ports; std::atomic shutdown_called = false; - std::atomic preshutdown_called = false; + std::atomic stop_swarm_called = false; Stopwatch uptime_watch TSA_GUARDED_BY(mutex); @@ -748,7 +748,7 @@ struct ContextSharedPart : boost::noncopyable */ void shutdown() TSA_NO_THREAD_SAFETY_ANALYSIS { - preshutdown_called = true; + stop_swarm_called = true; bool is_shutdown_called = shutdown_called.exchange(true); if (is_shutdown_called) return; @@ -928,9 +928,14 @@ struct ContextSharedPart : boost::noncopyable total_memory_tracker.resetPageCache(); } - void preShutdown() + void stopSwarm() { - preshutdown_called = true; + stop_swarm_called = true; + } + + void startSwarm() + { + stop_swarm_called = false; } bool hasTraceCollector() const @@ -4677,6 +4682,14 @@ void Context::unregisterInDynamicClusters() shared->cluster_discovery->unregisterAll(); } +void Context::registerInDynamicClusters() +{ + std::lock_guard lock(shared->clusters_mutex); + if (!shared->cluster_discovery) + return; + shared->cluster_discovery->registerAll(); +} + void Context::reloadClusterConfig() const { while (true) @@ -5558,14 +5571,19 @@ void Context::shutdown() TSA_NO_THREAD_SAFETY_ANALYSIS shared->shutdown(); } -void Context::preShutdown() +void Context::stopSwarm() +{ + shared->stop_swarm_called = true; +} + +void Context::startSwarm() { - shared->preshutdown_called = true; + shared->stop_swarm_called = false; } -bool Context::isPreShutdownCalled() const +bool Context::isStopSwarmCalled() const { - return shared->preshutdown_called; + return shared->stop_swarm_called; } Context::ApplicationType Context::getApplicationType() const diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index d0fff7a5a57a..ec76c79a214e 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -1296,6 +1296,7 @@ class Context: public ContextData, public std::enable_shared_from_this size_t getClustersVersion() const; void startClusterDiscovery(); + void registerInDynamicClusters(); void unregisterInDynamicClusters(); /// Sets custom cluster, but doesn't update configuration @@ -1410,8 +1411,9 @@ class Context: public ContextData, public std::enable_shared_from_this void shutdown(); /// Stop some works to allow graceful shutdown later - void preShutdown(); - bool isPreShutdownCalled() const; + void stopSwarm(); + void startSwarm(); + bool isStopSwarmCalled() const; bool isInternalQuery() const { return is_internal_query; } void setInternalQuery(bool internal) { is_internal_query = internal; } diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index de7fa474f263..c3ae2e65027e 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -335,13 +335,6 @@ BlockIO InterpreterSystemQuery::execute() throw ErrnoException(ErrorCodes::CANNOT_KILL, "System call kill(0, SIGTERM) failed"); break; } - case Type::PRESHUTDOWN: - { - getContext()->checkAccess(AccessType::SYSTEM_SHUTDOWN); - getContext()->preShutdown(); - getContext()->unregisterInDynamicClusters(); - break; - } case Type::KILL: { getContext()->checkAccess(AccessType::SYSTEM_SHUTDOWN); @@ -722,6 +715,20 @@ BlockIO InterpreterSystemQuery::execute() case Type::START_MOVES: startStopAction(ActionLocks::PartsMove, true); break; + case Type::STOP_SWARM: + { + getContext()->checkAccess(AccessType::SYSTEM_SWARM); + getContext()->stopSwarm(); + getContext()->unregisterInDynamicClusters(); + break; + } + case Type::START_SWARM: + { + getContext()->checkAccess(AccessType::SYSTEM_SWARM); + getContext()->startSwarm(); + getContext()->registerInDynamicClusters(); + break; + } case Type::STOP_FETCHES: startStopAction(ActionLocks::PartsFetch, false); break; @@ -1534,7 +1541,6 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster() switch (query.type) { case Type::SHUTDOWN: - case Type::PRESHUTDOWN: case Type::KILL: case Type::SUSPEND: { @@ -1631,6 +1637,12 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster() required_access.emplace_back(AccessType::SYSTEM_MOVES, query.getDatabase(), query.getTable()); break; } + case Type::STOP_SWARM: + case Type::START_SWARM: + { + required_access.emplace_back(AccessType::SYSTEM_SWARM); + break; + } case Type::STOP_PULLING_REPLICATION_LOG: case Type::START_PULLING_REPLICATION_LOG: { diff --git a/src/Parsers/ASTSystemQuery.cpp b/src/Parsers/ASTSystemQuery.cpp index b6852143cfb4..0beabf18e570 100644 --- a/src/Parsers/ASTSystemQuery.cpp +++ b/src/Parsers/ASTSystemQuery.cpp @@ -453,7 +453,6 @@ void ASTSystemQuery::formatImpl(WriteBuffer & ostr, const FormatSettings & setti } case Type::KILL: case Type::SHUTDOWN: - case Type::PRESHUTDOWN: case Type::DROP_DNS_CACHE: case Type::DROP_CONNECTIONS_CACHE: case Type::DROP_MMAP_CACHE: @@ -496,6 +495,8 @@ void ASTSystemQuery::formatImpl(WriteBuffer & ostr, const FormatSettings & setti case Type::DROP_PAGE_CACHE: case Type::STOP_REPLICATED_DDL_QUERIES: case Type::START_REPLICATED_DDL_QUERIES: + case Type::STOP_SWARM: + case Type::START_SWARM: break; case Type::UNKNOWN: case Type::END: diff --git a/src/Parsers/ASTSystemQuery.h b/src/Parsers/ASTSystemQuery.h index 0c8d17023da0..833675d59866 100644 --- a/src/Parsers/ASTSystemQuery.h +++ b/src/Parsers/ASTSystemQuery.h @@ -17,7 +17,6 @@ class ASTSystemQuery : public IAST, public ASTQueryWithOnCluster { UNKNOWN, SHUTDOWN, - PRESHUTDOWN, KILL, SUSPEND, DROP_DNS_CACHE, @@ -83,6 +82,8 @@ class ASTSystemQuery : public IAST, public ASTQueryWithOnCluster START_FETCHES, STOP_MOVES, START_MOVES, + STOP_SWARM, + START_SWARM, STOP_REPLICATED_SENDS, START_REPLICATED_SENDS, STOP_REPLICATION_QUEUES, diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 9755d3930c22..e672f9030805 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -1057,9 +1057,9 @@ StorageObjectStorageSource::ReadTaskIterator::ReadTaskIterator( : WithContext(context_) , callback(callback_) { - if (getContext()->isPreShutdownCalled()) + if (getContext()->isStopSwarmCalled()) { - LOG_DEBUG(getLogger("StorageObjectStorageSource"), "PRESHUTDOWN called, stop getting new tasks"); + LOG_DEBUG(getLogger("StorageObjectStorageSource"), "STOP SWARM called, stop getting new tasks"); return; } @@ -1092,9 +1092,9 @@ StorageObjectStorage::ObjectInfoPtr StorageObjectStorageSource::ReadTaskIterator size_t current_index = index.fetch_add(1, std::memory_order_relaxed); if (current_index >= buffer.size()) { - if (getContext()->isPreShutdownCalled()) + if (getContext()->isStopSwarmCalled()) { - LOG_DEBUG(getLogger("StorageObjectStorageSource"), "PRESHUTDOWN called, stop getting new tasks"); + LOG_DEBUG(getLogger("StorageObjectStorageSource"), "STOP SWARM called, stop getting new tasks"); return nullptr; } diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index 014cc4fc8c9b..c5e6b8c5055f 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -1087,9 +1087,9 @@ def query_cycle(): time.sleep(3) - node_to_shutdown.query("SYSTEM PRESHUTDOWN") + node_to_shutdown.query("SYSTEM STOP SWARM") - # enough time to complete processing of objects, started before "SYSTEM PRESHUTDOWN" + # enough time to complete processing of objects, started before "SYSTEM STOP SWARM" time.sleep(3) node_to_shutdown.stop_clickhouse(kill=True) diff --git a/tests/queries/0_stateless/01271_show_privileges.reference b/tests/queries/0_stateless/01271_show_privileges.reference index 0f9ff76e16b8..ebf59913c5bd 100644 --- a/tests/queries/0_stateless/01271_show_privileges.reference +++ b/tests/queries/0_stateless/01271_show_privileges.reference @@ -112,7 +112,7 @@ NAMED COLLECTION ['NAMED COLLECTION USAGE','USE NAMED COLLECTION'] NAMED_COLLECT NAMED COLLECTION ADMIN ['NAMED COLLECTION CONTROL'] NAMED_COLLECTION ALL SET DEFINER [] DEFINER ALL TABLE ENGINE ['TABLE ENGINE'] TABLE_ENGINE ALL -SYSTEM SHUTDOWN ['SYSTEM KILL','SHUTDOWN','PRESHUTDOWN'] GLOBAL SYSTEM +SYSTEM SHUTDOWN ['SYSTEM KILL','SHUTDOWN'] GLOBAL SYSTEM SYSTEM DROP DNS CACHE ['SYSTEM DROP DNS','DROP DNS CACHE','DROP DNS'] GLOBAL SYSTEM DROP CACHE SYSTEM DROP CONNECTIONS CACHE ['SYSTEM DROP CONNECTIONS CACHE','DROP CONNECTIONS CACHE'] GLOBAL SYSTEM DROP CACHE SYSTEM PREWARM MARK CACHE ['SYSTEM PREWARM MARK','PREWARM MARK CACHE','PREWARM MARKS'] GLOBAL SYSTEM DROP CACHE @@ -149,6 +149,7 @@ SYSTEM MERGES ['SYSTEM STOP MERGES','SYSTEM START MERGES','STOP MERGES','START M SYSTEM TTL MERGES ['SYSTEM STOP TTL MERGES','SYSTEM START TTL MERGES','STOP TTL MERGES','START TTL MERGES'] TABLE SYSTEM SYSTEM FETCHES ['SYSTEM STOP FETCHES','SYSTEM START FETCHES','STOP FETCHES','START FETCHES'] TABLE SYSTEM SYSTEM MOVES ['SYSTEM STOP MOVES','SYSTEM START MOVES','STOP MOVES','START MOVES'] TABLE SYSTEM +SYSTEM SWARM ['SYSTEM STOP SWARM','SYSTEM START SWARM','STOP SWARM','START SWARM'] GLOBAL SYSTEM SYSTEM PULLING REPLICATION LOG ['SYSTEM STOP PULLING REPLICATION LOG','SYSTEM START PULLING REPLICATION LOG'] TABLE SYSTEM SYSTEM CLEANUP ['SYSTEM STOP CLEANUP','SYSTEM START CLEANUP'] TABLE SYSTEM SYSTEM VIEWS ['SYSTEM REFRESH VIEW','SYSTEM START VIEWS','SYSTEM STOP VIEWS','SYSTEM START VIEW','SYSTEM STOP VIEW','SYSTEM CANCEL VIEW','REFRESH VIEW','START VIEWS','STOP VIEWS','START VIEW','STOP VIEW','CANCEL VIEW'] VIEW SYSTEM From 60a4473571bff77f4c78f0e3aa10014ee4013d2c Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Thu, 3 Jul 2025 16:06:39 +0200 Subject: [PATCH 5/6] IsSwarmModeEnabled metric --- programs/server/Server.cpp | 4 ++- src/Access/Common/AccessType.h | 2 +- src/Common/CurrentMetrics.cpp | 1 + src/Interpreters/ClusterDiscovery.cpp | 6 ++-- src/Interpreters/Context.cpp | 29 +++++++------------ src/Interpreters/Context.h | 6 ++-- src/Interpreters/InterpreterSystemQuery.cpp | 12 ++++---- src/Parsers/ASTSystemQuery.cpp | 4 +-- src/Parsers/ASTSystemQuery.h | 4 +-- .../StorageObjectStorageSource.cpp | 8 ++--- tests/integration/test_s3_cluster/test.py | 4 +-- .../01271_show_privileges.reference | 2 +- 12 files changed, 39 insertions(+), 43 deletions(-) diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index c453682fa855..1b55e426941b 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -2292,6 +2292,8 @@ try } + global_context->startSwarmMode(); + { std::lock_guard lock(servers_lock); /// We should start interserver communications before (and more important shutdown after) tables. @@ -2701,7 +2703,7 @@ try is_cancelled = true; - global_context->stopSwarm(); + global_context->stopSwarmMode(); LOG_DEBUG(log, "Waiting for current connections to close."); diff --git a/src/Access/Common/AccessType.h b/src/Access/Common/AccessType.h index 83f4d948e941..ccdc21f9cbce 100644 --- a/src/Access/Common/AccessType.h +++ b/src/Access/Common/AccessType.h @@ -200,7 +200,7 @@ enum class AccessType : uint8_t M(SYSTEM_TTL_MERGES, "SYSTEM STOP TTL MERGES, SYSTEM START TTL MERGES, STOP TTL MERGES, START TTL MERGES", TABLE, SYSTEM) \ M(SYSTEM_FETCHES, "SYSTEM STOP FETCHES, SYSTEM START FETCHES, STOP FETCHES, START FETCHES", TABLE, SYSTEM) \ M(SYSTEM_MOVES, "SYSTEM STOP MOVES, SYSTEM START MOVES, STOP MOVES, START MOVES", TABLE, SYSTEM) \ - M(SYSTEM_SWARM, "SYSTEM STOP SWARM, SYSTEM START SWARM, STOP SWARM, START SWARM", GLOBAL, SYSTEM) \ + M(SYSTEM_SWARM, "SYSTEM STOP SWARM MODE, SYSTEM START SWARM MODE, STOP SWARM MODE, START SWARM MODE", GLOBAL, SYSTEM) \ M(SYSTEM_PULLING_REPLICATION_LOG, "SYSTEM STOP PULLING REPLICATION LOG, SYSTEM START PULLING REPLICATION LOG", TABLE, SYSTEM) \ M(SYSTEM_CLEANUP, "SYSTEM STOP CLEANUP, SYSTEM START CLEANUP", TABLE, SYSTEM) \ M(SYSTEM_VIEWS, "SYSTEM REFRESH VIEW, SYSTEM START VIEWS, SYSTEM STOP VIEWS, SYSTEM START VIEW, SYSTEM STOP VIEW, SYSTEM CANCEL VIEW, REFRESH VIEW, START VIEWS, STOP VIEWS, START VIEW, STOP VIEW, CANCEL VIEW", VIEW, SYSTEM) \ diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index ea3bd714930f..6cf960218232 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -430,6 +430,7 @@ M(StartupScriptsExecutionState, "State of startup scripts execution: 0 = not finished, 1 = success, 2 = failure.") \ \ M(IsServerShuttingDown, "Indicates if the server is shutting down: 0 = no, 1 = yes") \ + M(IsSwarmModeEnabled, "Indicates if the swarm mode enabled or not: 0 = disabled, 1 = enabled") \ \ M(TotalMergeFailures, "Number of all failed merges since startup, including the ones that were aborted") \ M(NonAbortedMergeFailures, "Number of failed merges since startup, excluding the merges that were aborted") \ diff --git a/src/Interpreters/ClusterDiscovery.cpp b/src/Interpreters/ClusterDiscovery.cpp index ad1c59ce385c..de2442446576 100644 --- a/src/Interpreters/ClusterDiscovery.cpp +++ b/src/Interpreters/ClusterDiscovery.cpp @@ -392,7 +392,7 @@ bool ClusterDiscovery::upsertCluster(ClusterInfo & cluster_info) }; if (!cluster_info.current_node_is_observer - && !context->isStopSwarmCalled() + && context->isSwarmModeEnabled() && !contains(node_uuids, current_node_name)) { LOG_ERROR(log, "Can't find current node in cluster '{}', will register again", cluster_info.name); @@ -457,9 +457,9 @@ void ClusterDiscovery::registerInZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & inf return; } - if (context->isStopSwarmCalled()) + if (!context->isSwarmModeEnabled()) { - LOG_DEBUG(log, "STOP SWARM called, skip self-registering current node {} in cluster {}", current_node_name, info.name); + LOG_DEBUG(log, "STOP SWARM MODE called, skip self-registering current node {} in cluster {}", current_node_name, info.name); return; } diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 0932882452f2..cafd36e085d4 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -188,6 +188,7 @@ namespace CurrentMetrics extern const Metric IcebergCatalogThreads; extern const Metric IcebergCatalogThreadsActive; extern const Metric IcebergCatalogThreadsScheduled; + extern const Metric IsSwarmModeEnabled; } @@ -579,7 +580,7 @@ struct ContextSharedPart : boost::noncopyable std::map server_ports; std::atomic shutdown_called = false; - std::atomic stop_swarm_called = false; + std::atomic swarm_mode_enabled = true; Stopwatch uptime_watch TSA_GUARDED_BY(mutex); @@ -748,7 +749,7 @@ struct ContextSharedPart : boost::noncopyable */ void shutdown() TSA_NO_THREAD_SAFETY_ANALYSIS { - stop_swarm_called = true; + swarm_mode_enabled = false; bool is_shutdown_called = shutdown_called.exchange(true); if (is_shutdown_called) return; @@ -928,16 +929,6 @@ struct ContextSharedPart : boost::noncopyable total_memory_tracker.resetPageCache(); } - void stopSwarm() - { - stop_swarm_called = true; - } - - void startSwarm() - { - stop_swarm_called = false; - } - bool hasTraceCollector() const { return trace_collector.has_value(); @@ -5571,19 +5562,21 @@ void Context::shutdown() TSA_NO_THREAD_SAFETY_ANALYSIS shared->shutdown(); } -void Context::stopSwarm() +void Context::stopSwarmMode() { - shared->stop_swarm_called = true; + CurrentMetrics::set(CurrentMetrics::IsSwarmModeEnabled, 0); + shared->swarm_mode_enabled = false; } -void Context::startSwarm() +void Context::startSwarmMode() { - shared->stop_swarm_called = false; + shared->swarm_mode_enabled = true; + CurrentMetrics::set(CurrentMetrics::IsSwarmModeEnabled, 1); } -bool Context::isStopSwarmCalled() const +bool Context::isSwarmModeEnabled() const { - return shared->stop_swarm_called; + return shared->swarm_mode_enabled; } Context::ApplicationType Context::getApplicationType() const diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index ec76c79a214e..70487cb14adf 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -1411,9 +1411,9 @@ class Context: public ContextData, public std::enable_shared_from_this void shutdown(); /// Stop some works to allow graceful shutdown later - void stopSwarm(); - void startSwarm(); - bool isStopSwarmCalled() const; + void stopSwarmMode(); + void startSwarmMode(); + bool isSwarmModeEnabled() const; bool isInternalQuery() const { return is_internal_query; } void setInternalQuery(bool internal) { is_internal_query = internal; } diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index c3ae2e65027e..6e43737d8d14 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -715,18 +715,18 @@ BlockIO InterpreterSystemQuery::execute() case Type::START_MOVES: startStopAction(ActionLocks::PartsMove, true); break; - case Type::STOP_SWARM: + case Type::STOP_SWARM_MODE: { getContext()->checkAccess(AccessType::SYSTEM_SWARM); - getContext()->stopSwarm(); + getContext()->stopSwarmMode(); getContext()->unregisterInDynamicClusters(); break; } - case Type::START_SWARM: + case Type::START_SWARM_MODE: { getContext()->checkAccess(AccessType::SYSTEM_SWARM); - getContext()->startSwarm(); getContext()->registerInDynamicClusters(); + getContext()->startSwarmMode(); break; } case Type::STOP_FETCHES: @@ -1637,8 +1637,8 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster() required_access.emplace_back(AccessType::SYSTEM_MOVES, query.getDatabase(), query.getTable()); break; } - case Type::STOP_SWARM: - case Type::START_SWARM: + case Type::STOP_SWARM_MODE: + case Type::START_SWARM_MODE: { required_access.emplace_back(AccessType::SYSTEM_SWARM); break; diff --git a/src/Parsers/ASTSystemQuery.cpp b/src/Parsers/ASTSystemQuery.cpp index 0beabf18e570..57f9c47ca9f0 100644 --- a/src/Parsers/ASTSystemQuery.cpp +++ b/src/Parsers/ASTSystemQuery.cpp @@ -495,8 +495,8 @@ void ASTSystemQuery::formatImpl(WriteBuffer & ostr, const FormatSettings & setti case Type::DROP_PAGE_CACHE: case Type::STOP_REPLICATED_DDL_QUERIES: case Type::START_REPLICATED_DDL_QUERIES: - case Type::STOP_SWARM: - case Type::START_SWARM: + case Type::STOP_SWARM_MODE: + case Type::START_SWARM_MODE: break; case Type::UNKNOWN: case Type::END: diff --git a/src/Parsers/ASTSystemQuery.h b/src/Parsers/ASTSystemQuery.h index 833675d59866..db93f9e1b816 100644 --- a/src/Parsers/ASTSystemQuery.h +++ b/src/Parsers/ASTSystemQuery.h @@ -82,8 +82,8 @@ class ASTSystemQuery : public IAST, public ASTQueryWithOnCluster START_FETCHES, STOP_MOVES, START_MOVES, - STOP_SWARM, - START_SWARM, + STOP_SWARM_MODE, + START_SWARM_MODE, STOP_REPLICATED_SENDS, START_REPLICATED_SENDS, STOP_REPLICATION_QUEUES, diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index e672f9030805..66c03d1c43b3 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -1057,9 +1057,9 @@ StorageObjectStorageSource::ReadTaskIterator::ReadTaskIterator( : WithContext(context_) , callback(callback_) { - if (getContext()->isStopSwarmCalled()) + if (!getContext()->isSwarmModeEnabled()) { - LOG_DEBUG(getLogger("StorageObjectStorageSource"), "STOP SWARM called, stop getting new tasks"); + LOG_DEBUG(getLogger("StorageObjectStorageSource"), "STOP SWARM MODE called, stop getting new tasks"); return; } @@ -1092,9 +1092,9 @@ StorageObjectStorage::ObjectInfoPtr StorageObjectStorageSource::ReadTaskIterator size_t current_index = index.fetch_add(1, std::memory_order_relaxed); if (current_index >= buffer.size()) { - if (getContext()->isStopSwarmCalled()) + if (!getContext()->isSwarmModeEnabled()) { - LOG_DEBUG(getLogger("StorageObjectStorageSource"), "STOP SWARM called, stop getting new tasks"); + LOG_DEBUG(getLogger("StorageObjectStorageSource"), "STOP SWARM MODE called, stop getting new tasks"); return nullptr; } diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index c5e6b8c5055f..691aafdd34ee 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -1087,9 +1087,9 @@ def query_cycle(): time.sleep(3) - node_to_shutdown.query("SYSTEM STOP SWARM") + node_to_shutdown.query("SYSTEM STOP SWARM MODE") - # enough time to complete processing of objects, started before "SYSTEM STOP SWARM" + # enough time to complete processing of objects, started before "SYSTEM STOP SWARM MODE" time.sleep(3) node_to_shutdown.stop_clickhouse(kill=True) diff --git a/tests/queries/0_stateless/01271_show_privileges.reference b/tests/queries/0_stateless/01271_show_privileges.reference index ebf59913c5bd..2d4bc4ecc150 100644 --- a/tests/queries/0_stateless/01271_show_privileges.reference +++ b/tests/queries/0_stateless/01271_show_privileges.reference @@ -149,7 +149,7 @@ SYSTEM MERGES ['SYSTEM STOP MERGES','SYSTEM START MERGES','STOP MERGES','START M SYSTEM TTL MERGES ['SYSTEM STOP TTL MERGES','SYSTEM START TTL MERGES','STOP TTL MERGES','START TTL MERGES'] TABLE SYSTEM SYSTEM FETCHES ['SYSTEM STOP FETCHES','SYSTEM START FETCHES','STOP FETCHES','START FETCHES'] TABLE SYSTEM SYSTEM MOVES ['SYSTEM STOP MOVES','SYSTEM START MOVES','STOP MOVES','START MOVES'] TABLE SYSTEM -SYSTEM SWARM ['SYSTEM STOP SWARM','SYSTEM START SWARM','STOP SWARM','START SWARM'] GLOBAL SYSTEM +SYSTEM SWARM ['SYSTEM STOP SWARM MODE','SYSTEM START SWARM MODE','STOP SWARM MODE','START SWARM MODE'] GLOBAL SYSTEM SYSTEM PULLING REPLICATION LOG ['SYSTEM STOP PULLING REPLICATION LOG','SYSTEM START PULLING REPLICATION LOG'] TABLE SYSTEM SYSTEM CLEANUP ['SYSTEM STOP CLEANUP','SYSTEM START CLEANUP'] TABLE SYSTEM SYSTEM VIEWS ['SYSTEM REFRESH VIEW','SYSTEM START VIEWS','SYSTEM STOP VIEWS','SYSTEM START VIEW','SYSTEM STOP VIEW','SYSTEM CANCEL VIEW','REFRESH VIEW','START VIEWS','STOP VIEWS','START VIEW','STOP VIEW','CANCEL VIEW'] VIEW SYSTEM From 0e7ffe0b731dea3db348cdba83200fdf479a87c5 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Thu, 10 Jul 2025 18:32:05 +0200 Subject: [PATCH 6/6] Atomic asyncronous stop/start swarm --- src/Interpreters/ClusterDiscovery.cpp | 42 ++++++++++++++----- src/Interpreters/ClusterDiscovery.h | 9 ++++ src/Interpreters/Context.cpp | 25 +++++++---- src/Interpreters/Context.h | 14 ++++--- src/Interpreters/InterpreterSystemQuery.cpp | 8 ++-- .../RemoteQueryExecutorReadContext.cpp | 3 ++ 6 files changed, 74 insertions(+), 27 deletions(-) diff --git a/src/Interpreters/ClusterDiscovery.cpp b/src/Interpreters/ClusterDiscovery.cpp index de2442446576..6c328a3f7beb 100644 --- a/src/Interpreters/ClusterDiscovery.cpp +++ b/src/Interpreters/ClusterDiscovery.cpp @@ -108,6 +108,13 @@ class ClusterDiscovery::Flags cv.notify_one(); } + void wakeup() + { + std::unique_lock lk(mu); + any_need_update = true; + cv.notify_one(); + } + private: std::condition_variable cv; std::mutex mu; @@ -528,20 +535,14 @@ void ClusterDiscovery::initialUpdate() void ClusterDiscovery::registerAll() { - for (auto & [_, info] : clusters_info) - { - auto zk = context->getDefaultOrAuxiliaryZooKeeper(info.zk_name); - registerInZk(zk, info); - } + register_change_flag = RegisterChangeFlag::RCF_REGISTER_ALL; + clusters_to_update->wakeup(); } void ClusterDiscovery::unregisterAll() { - for (auto & [_, info] : clusters_info) - { - auto zk = context->getDefaultOrAuxiliaryZooKeeper(info.zk_name); - unregisterFromZk(zk, info); - } + register_change_flag = RegisterChangeFlag::RCF_UNREGISTER_ALL; + clusters_to_update->wakeup(); } void ClusterDiscovery::findDynamicClusters( @@ -767,6 +768,27 @@ bool ClusterDiscovery::runMainThread(std::function up_to_date_callback) { up_to_date_callback(); } + + RegisterChangeFlag flag = register_change_flag.exchange(RegisterChangeFlag::RCF_NONE); + + if (flag == RegisterChangeFlag::RCF_REGISTER_ALL) + { + LOG_DEBUG(log, "Register in all dynamic clusters"); + for (auto & [_, info] : clusters_info) + { + auto zk = context->getDefaultOrAuxiliaryZooKeeper(info.zk_name); + registerInZk(zk, info); + } + } + else if (flag == RegisterChangeFlag::RCF_UNREGISTER_ALL) + { + LOG_DEBUG(log, "Unregister in all dynamic clusters"); + for (auto & [_, info] : clusters_info) + { + auto zk = context->getDefaultOrAuxiliaryZooKeeper(info.zk_name); + unregisterFromZk(zk, info); + } + } } LOG_DEBUG(log, "Worker thread stopped"); return finished; diff --git a/src/Interpreters/ClusterDiscovery.h b/src/Interpreters/ClusterDiscovery.h index 4dce9019537d..2d3bbe489f4e 100644 --- a/src/Interpreters/ClusterDiscovery.h +++ b/src/Interpreters/ClusterDiscovery.h @@ -211,6 +211,15 @@ class ClusterDiscovery std::shared_ptr>> multicluster_discovery_paths; MultiVersion::Version macros; + + enum RegisterChangeFlag + { + RCF_NONE, + RCF_REGISTER_ALL, + RCF_UNREGISTER_ALL, + }; + + std::atomic register_change_flag = RegisterChangeFlag::RCF_NONE; }; } diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index cafd36e085d4..a9b02aa69622 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -750,6 +750,7 @@ struct ContextSharedPart : boost::noncopyable void shutdown() TSA_NO_THREAD_SAFETY_ANALYSIS { swarm_mode_enabled = false; + CurrentMetrics::set(CurrentMetrics::IsSwarmModeEnabled, 0); bool is_shutdown_called = shutdown_called.exchange(true); if (is_shutdown_called) return; @@ -4665,7 +4666,7 @@ std::shared_ptr Context::tryGetCluster(const std::string & cluster_name return res; } -void Context::unregisterInDynamicClusters() +void Context::unregisterInAutodiscoveryClusters() { std::lock_guard lock(shared->clusters_mutex); if (!shared->cluster_discovery) @@ -4673,7 +4674,7 @@ void Context::unregisterInDynamicClusters() shared->cluster_discovery->unregisterAll(); } -void Context::registerInDynamicClusters() +void Context::registerInAutodiscoveryClusters() { std::lock_guard lock(shared->clusters_mutex); if (!shared->cluster_discovery) @@ -5562,16 +5563,24 @@ void Context::shutdown() TSA_NO_THREAD_SAFETY_ANALYSIS shared->shutdown(); } -void Context::stopSwarmMode() +bool Context::stopSwarmMode() { - CurrentMetrics::set(CurrentMetrics::IsSwarmModeEnabled, 0); - shared->swarm_mode_enabled = false; + bool expected_is_enabled = true; + bool is_stopped_now = shared->swarm_mode_enabled.compare_exchange_strong(expected_is_enabled, false); + if (is_stopped_now) + CurrentMetrics::set(CurrentMetrics::IsSwarmModeEnabled, 0); + // return true if stop successful + return is_stopped_now; } -void Context::startSwarmMode() +bool Context::startSwarmMode() { - shared->swarm_mode_enabled = true; - CurrentMetrics::set(CurrentMetrics::IsSwarmModeEnabled, 1); + bool expected_is_enabled = false; + bool is_started_now = shared->swarm_mode_enabled.compare_exchange_strong(expected_is_enabled, true); + if (is_started_now) + CurrentMetrics::set(CurrentMetrics::IsSwarmModeEnabled, 1); + // return true if start successful + return is_started_now; } bool Context::isSwarmModeEnabled() const diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 70487cb14adf..bc97a6d03c02 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -1296,8 +1296,8 @@ class Context: public ContextData, public std::enable_shared_from_this size_t getClustersVersion() const; void startClusterDiscovery(); - void registerInDynamicClusters(); - void unregisterInDynamicClusters(); + void registerInAutodiscoveryClusters(); + void unregisterInAutodiscoveryClusters(); /// Sets custom cluster, but doesn't update configuration void setCluster(const String & cluster_name, const std::shared_ptr & cluster); @@ -1410,9 +1410,13 @@ class Context: public ContextData, public std::enable_shared_from_this void shutdown(); - /// Stop some works to allow graceful shutdown later - void stopSwarmMode(); - void startSwarmMode(); + /// Stop some works to allow graceful shutdown later. + /// Returns true if stop successful. + bool stopSwarmMode(); + /// Resume some works if we change our mind. + /// Returns true if start successful. + bool startSwarmMode(); + /// Return current swarm mode state. bool isSwarmModeEnabled() const; bool isInternalQuery() const { return is_internal_query; } diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index 6e43737d8d14..c7a3b70b1cf0 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -718,15 +718,15 @@ BlockIO InterpreterSystemQuery::execute() case Type::STOP_SWARM_MODE: { getContext()->checkAccess(AccessType::SYSTEM_SWARM); - getContext()->stopSwarmMode(); - getContext()->unregisterInDynamicClusters(); + if (getContext()->stopSwarmMode()) + getContext()->unregisterInAutodiscoveryClusters(); break; } case Type::START_SWARM_MODE: { getContext()->checkAccess(AccessType::SYSTEM_SWARM); - getContext()->registerInDynamicClusters(); - getContext()->startSwarmMode(); + if (getContext()->startSwarmMode()) + getContext()->registerInAutodiscoveryClusters(); break; } case Type::STOP_FETCHES: diff --git a/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp b/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp index 37c349f11632..934909da40fd 100644 --- a/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp +++ b/src/QueryPipeline/RemoteQueryExecutorReadContext.cpp @@ -74,6 +74,9 @@ void RemoteQueryExecutorReadContext::Task::run(AsyncCallback async_callback, Sus } catch (const Exception & e) { + /// If cluster node unxepectedly shutted down (kill/segfault/power off/etc.) socket just closes. + /// If initiator did not process any data packets before, this fact can be ignored. + /// Unprocessed tasks will be executed on other nodes. if (e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF && !read_context.has_data_packets.load() && read_context.executor.skipUnavailableShards()) {