diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 0cf2cbfc5d11..792dacc93ba7 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -6111,8 +6111,12 @@ Possible values: )", EXPERIMENTAL) \ DECLARE(Bool, use_object_storage_list_objects_cache, false, R"( Cache the list of objects returned by list objects calls in object storage +)", EXPERIMENTAL) \ + DECLARE(Bool, object_storage_remote_initiator, false, R"( +Execute request to object storage as remote on one of object_storage_cluster nodes. )", EXPERIMENTAL) \ \ + /* ####################################################### */ \ /* ############ END OF EXPERIMENTAL FEATURES ############# */ \ /* ####################################################### */ \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index c9d35accbaa7..e26edbd4d424 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -69,6 +69,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() // Altinity Antalya modifications atop of 25.2 {"object_storage_cluster", "", "", "New setting"}, {"object_storage_max_nodes", 0, 0, "New setting"}, + {"object_storage_remote_initiator", false, false, "New setting."}, {"use_iceberg_metadata_files_cache", true, true, "New setting"}, {"iceberg_timestamp_ms", 0, 0, "New setting."}, {"iceberg_snapshot_id", 0, 0, "New setting."}, diff --git a/src/IO/ReadBufferFromS3.cpp b/src/IO/ReadBufferFromS3.cpp index dfa3b73be6ac..2854ff581fbd 100644 --- a/src/IO/ReadBufferFromS3.cpp +++ b/src/IO/ReadBufferFromS3.cpp @@ -430,6 +430,12 @@ Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t attempt, si log, "Read S3 object. Bucket: {}, Key: {}, Version: {}, Offset: {}", bucket, key, version_id.empty() ? "Latest" : version_id, range_begin); } + else + { + LOG_TEST( + log, "Read S3 object. Bucket: {}, Key: {}, Version: {}", + bucket, key, version_id.empty() ? "Latest" : version_id); + } ProfileEvents::increment(ProfileEvents::S3GetObject); if (client_ptr->isClientForDisk()) diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 8f66771aad6b..08b919e15bf8 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -1,5 +1,8 @@ #include +#include +#include + #include #include #include @@ -13,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -22,6 +26,9 @@ #include #include #include +#include +#include +#include #include #include @@ -36,6 +43,7 @@ namespace Setting extern const SettingsBool skip_unavailable_shards; extern const SettingsNonZeroUInt64 max_parallel_replicas; extern const SettingsUInt64 object_storage_max_nodes; + extern const SettingsBool object_storage_remote_initiator; } namespace ErrorCodes @@ -93,15 +101,16 @@ void IStorageCluster::read( storage_snapshot->check(column_names); - updateBeforeRead(context); - auto cluster = getClusterImpl(context, cluster_name_from_settings, context->getSettingsRef()[Setting::object_storage_max_nodes]); + const auto & settings = context->getSettingsRef(); + + auto cluster = getClusterImpl(context, cluster_name_from_settings, settings[Setting::object_storage_max_nodes]); /// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*) Block sample_block; ASTPtr query_to_send = query_info.query; - if (context->getSettingsRef()[Setting::allow_experimental_analyzer]) + if (settings[Setting::allow_experimental_analyzer]) { sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query_info.query, context, SelectQueryOptions(processed_stage)); } @@ -114,6 +123,17 @@ void IStorageCluster::read( updateQueryToSendIfNeeded(query_to_send, storage_snapshot, context); + if (settings[Setting::object_storage_remote_initiator]) + { + auto storage_and_context = convertToRemote(cluster, context, cluster_name_from_settings, query_to_send); + auto src_distributed = std::dynamic_pointer_cast(storage_and_context.storage); + auto modified_query_info = query_info; + modified_query_info.cluster = src_distributed->getCluster(); + auto new_storage_snapshot = storage_and_context.storage->getStorageSnapshot(storage_snapshot->metadata, storage_and_context.context); + storage_and_context.storage->read(query_plan, column_names, new_storage_snapshot, modified_query_info, storage_and_context.context, processed_stage, max_block_size, num_streams); + return; + } + RestoreQualifiedNamesVisitor::Data data; data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_info.query->as(), 0)); data.remote_table.database = context->getCurrentDatabase(); @@ -141,6 +161,62 @@ void IStorageCluster::read( query_plan.addStep(std::move(reading)); } +IStorageCluster::RemoteCallVariables IStorageCluster::convertToRemote( + ClusterPtr cluster, + ContextPtr context, + const std::string & cluster_name_from_settings, + ASTPtr query_to_send) +{ + auto host_addresses = cluster->getShardsAddresses(); + if (host_addresses.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty cluster {}", cluster_name_from_settings); + + static pcg64 rng(randomSeed()); + size_t shard_num = rng() % host_addresses.size(); + auto shard_addresses = host_addresses[shard_num]; + /// After getClusterImpl each shard must have exactly 1 replica + if (shard_addresses.size() != 1) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of shard {} in cluster {} is not equal 1", shard_num, cluster_name_from_settings); + auto host_name = shard_addresses[0].toString(); + + LOG_INFO(log, "Choose remote initiator '{}'", host_name); + + bool secure = shard_addresses[0].secure == Protocol::Secure::Enable; + std::string remote_function_name = secure ? "remoteSecure" : "remote"; + + /// Clean object_storage_remote_initiator setting to avoid infinite remote call + auto new_context = Context::createCopy(context); + new_context->setSetting("object_storage_remote_initiator", false); + + auto * select_query = query_to_send->as(); + if (!select_query) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query"); + + auto query_settings = select_query->settings(); + if (query_settings) + { + auto & settings_ast = query_settings->as(); + if (settings_ast.changes.removeSetting("object_storage_remote_initiator") && settings_ast.changes.empty()) + { + select_query->setExpression(ASTSelectQuery::Expression::SETTINGS, {}); + } + } + + ASTTableExpression * table_expression = extractTableExpressionASTPtrFromSelectQuery(query_to_send); + if (!table_expression) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table expression"); + + auto remote_query = makeASTFunction(remote_function_name, std::make_shared(host_name), table_expression->table_function); + + table_expression->table_function = remote_query; + + auto remote_function = TableFunctionFactory::instance().get(remote_query, new_context); + + auto storage = remote_function->execute(query_to_send, new_context, remote_function_name); + + return RemoteCallVariables{storage, new_context}; +} + SinkToStoragePtr IStorageCluster::write( const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index 1fbea4f4d96a..00e9eb2141f2 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -58,9 +58,20 @@ class IStorageCluster : public IStorage virtual String getClusterName(ContextPtr /* context */) const { return getOriginalClusterName(); } protected: - virtual void updateBeforeRead(const ContextPtr &) {} virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {} + struct RemoteCallVariables + { + StoragePtr storage; + ContextPtr context; + }; + + RemoteCallVariables convertToRemote( + ClusterPtr cluster, + ContextPtr context, + const std::string & cluster_name_from_settings, + ASTPtr query_to_send); + virtual void readFallBackToPure( QueryPlan & /* query_plan */, const Names & /* column_names */, diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h index 80a8fedd289a..5acb42d2a7dc 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h @@ -83,6 +83,16 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl return std::nullopt; } + std::optional tryGetSamplePathFromMetadata() const override + { + if (!current_metadata) + return std::nullopt; + auto data_files = current_metadata->getDataFiles(); + if (!data_files.empty()) + return data_files[0]; + return std::nullopt; + } + std::optional totalRows() override { if (!current_metadata) @@ -468,7 +478,12 @@ class StorageIcebergConfiguration : public StorageObjectStorage::Configuration, createDynamicStorage(type); } - virtual void assertInitialized() const override { return getImpl().assertInitialized(); } + void assertInitialized() const override { return getImpl().assertInitialized(); } + + std::optional tryGetSamplePathFromMetadata() const override + { + return getImpl().tryGetSamplePathFromMetadata(); + } private: inline StorageObjectStorage::Configuration & getImpl() const diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h index c9d4e865b059..95c527c560ce 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h @@ -39,6 +39,8 @@ class DeltaLakeMetadata final : public IDataLakeMetadata DeltaLakeMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_); + Strings getDataFiles() const override { return data_files; } + NamesAndTypesList getTableSchema() const override { return schema; } DeltaLakePartitionColumns getPartitionColumns() const { return partition_columns; } diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp index bc8a3e52e84f..830264f3d88f 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp @@ -33,6 +33,11 @@ bool DeltaLakeMetadataDeltaKernel::update(const ContextPtr &) return table_snapshot->update(); } +Strings DeltaLakeMetadataDeltaKernel::getDataFiles() const +{ + throwNotImplemented("getDataFiles()"); +} + ObjectIterator DeltaLakeMetadataDeltaKernel::iterate( const ActionsDAG * filter_dag, FileProgressCallback callback, diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h index f7f1f333cb6a..5147e358c1e1 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h @@ -41,6 +41,8 @@ class DeltaLakeMetadataDeltaKernel final : public IDataLakeMetadata bool update(const ContextPtr & context) override; + Strings getDataFiles() const override; + NamesAndTypesList getTableSchema() const override; NamesAndTypesList getReadSchema() const override; diff --git a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp index cdfe432b7e62..d028da51799f 100644 --- a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp @@ -91,7 +91,7 @@ HudiMetadata::HudiMetadata(ObjectStoragePtr object_storage_, ConfigurationObserv { } -Strings HudiMetadata::getDataFiles(const ActionsDAG *) const +Strings HudiMetadata::getDataFiles() const { if (data_files.empty()) data_files = getDataFilesImpl(); @@ -99,11 +99,11 @@ Strings HudiMetadata::getDataFiles(const ActionsDAG *) const } ObjectIterator HudiMetadata::iterate( - const ActionsDAG * filter_dag, + const ActionsDAG * /* filter_dag */, FileProgressCallback callback, size_t /* list_batch_size */) const { - return createKeysIterator(getDataFiles(filter_dag), object_storage, callback); + return createKeysIterator(getDataFiles(), object_storage, callback); } } diff --git a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h index a64ecfeb55dd..5b515dc1f37e 100644 --- a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h @@ -19,6 +19,8 @@ class HudiMetadata final : public IDataLakeMetadata, private WithContext HudiMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_); + Strings getDataFiles() const override; + NamesAndTypesList getTableSchema() const override { return {}; } bool operator ==(const IDataLakeMetadata & other) const override @@ -49,7 +51,6 @@ class HudiMetadata final : public IDataLakeMetadata, private WithContext mutable Strings data_files; Strings getDataFilesImpl() const; - Strings getDataFiles(const ActionsDAG * filter_dag) const; }; } diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h index fe1fa151b9a0..1984750351bc 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h @@ -20,6 +20,9 @@ class IDataLakeMetadata : boost::noncopyable virtual bool operator==(const IDataLakeMetadata & other) const = 0; + /// List all data files. + /// For better parallelization, iterate() method should be used. + virtual Strings getDataFiles() const = 0; /// Return iterator to `data files`. using FileProgressCallback = std::function; virtual ObjectIterator iterate( diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index be7093338fb1..8f7259d842d1 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -609,7 +609,7 @@ ManifestFilePtr IcebergMetadata::getManifestFile(const String & filename, Int64 return create_fn(); } -Strings IcebergMetadata::getDataFiles(const ActionsDAG * filter_dag) const +Strings IcebergMetadata::getDataFilesImpl(const ActionsDAG * filter_dag) const { if (!relevant_snapshot) return {}; @@ -716,7 +716,7 @@ ObjectIterator IcebergMetadata::iterate( FileProgressCallback callback, size_t /* list_batch_size */) const { - return createKeysIterator(getDataFiles(filter_dag), object_storage, callback); + return createKeysIterator(getDataFilesImpl(filter_dag), object_storage, callback); } } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h index 3f4005d153c4..23b0cd04b897 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h @@ -42,6 +42,11 @@ class IcebergMetadata : public IDataLakeMetadata, private WithContext const Poco::JSON::Object::Ptr & metadata_object, IcebergMetadataFilesCachePtr cache_ptr); + /// Get data files. On first request it reads manifest_list file and iterates through manifest files to find all data files. + /// All subsequent calls when the same data snapshot is relevant will return saved list of files (because it cannot be changed + /// without changing metadata file). Drops on every snapshot update. + Strings getDataFiles() const override { return getDataFilesImpl(nullptr); } + /// Get table schema parsed from metadata. NamesAndTypesList getTableSchema() const override { @@ -118,7 +123,7 @@ class IcebergMetadata : public IDataLakeMetadata, private WithContext void updateState(const ContextPtr & local_context, bool metadata_file_changed); - Strings getDataFiles(const ActionsDAG * filter_dag) const; + Strings getDataFilesImpl(const ActionsDAG * filter_dag) const; void updateSnapshot(); diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index 97fcd9833f30..eea47cbf9044 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -90,7 +90,8 @@ StorageObjectStorage::StorageObjectStorage( bool distributed_processing_, ASTPtr partition_by_, bool is_table_function_, - bool lazy_init) + bool lazy_init, + std::optional sample_path_) : IStorage(table_id_) , configuration(configuration_) , object_storage(object_storage_) @@ -130,7 +131,7 @@ StorageObjectStorage::StorageObjectStorage( if (!do_lazy_init) do_init(); - std::string sample_path; + std::string sample_path = sample_path_.value_or(""); ColumnsDescription columns{columns_}; resolveSchemaAndFormat(columns, object_storage, configuration, format_settings, sample_path, context); configuration->check(context); @@ -352,6 +353,11 @@ std::optional StorageObjectStorage::Configuration::tryGetTab throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method tryGetTableStructureFromMetadata is not implemented for basic configuration"); } +std::optional StorageObjectStorage::Configuration::tryGetSamplePathFromMetadata() const +{ + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method tryGetSamplePathFromMetadata is not implemented for basic configuration"); +} + void StorageObjectStorage::read( QueryPlan & query_plan, const Names & column_names, @@ -522,9 +528,7 @@ ColumnsDescription StorageObjectStorage::resolveSchemaFromData( auto table_structure = configuration->tryGetTableStructureFromMetadata(); if (table_structure) - { return table_structure.value(); - } } ObjectInfos read_keys; diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index dfcbd709af8c..2aef633a64d4 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -72,7 +72,8 @@ class StorageObjectStorage : public IStorage bool distributed_processing_ = false, ASTPtr partition_by_ = nullptr, bool is_table_function_ = false, - bool lazy_init = false); + bool lazy_init = false, + std::optional sample_path_ = std::nullopt); String getName() const override; @@ -246,6 +247,7 @@ class StorageObjectStorage::Configuration ContextPtr local_context); virtual std::optional tryGetTableStructureFromMetadata() const; + virtual std::optional tryGetSamplePathFromMetadata() const; virtual bool supportsFileIterator() const { return false; } virtual ObjectIterator iterate( diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 404a89b5a3c0..ee2f59f64d99 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -111,7 +111,10 @@ StorageObjectStorageCluster::StorageObjectStorageCluster( format_settings_, mode_, /* distributed_processing */false, - partition_by_); + partition_by_, + /* is_table_function */false, + /* lazy_init */false, + sample_path); auto virtuals_ = getVirtualsPtr(); if (virtuals_) diff --git a/src/Storages/extractTableFunctionArgumentsFromSelectQuery.cpp b/src/Storages/extractTableFunctionArgumentsFromSelectQuery.cpp index 711cd62a5ff3..1d7d7bcf557b 100644 --- a/src/Storages/extractTableFunctionArgumentsFromSelectQuery.cpp +++ b/src/Storages/extractTableFunctionArgumentsFromSelectQuery.cpp @@ -11,7 +11,7 @@ namespace DB { -ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query) +ASTTableExpression * extractTableExpressionASTPtrFromSelectQuery(ASTPtr & query) { auto * select_query = query->as(); if (!select_query || !select_query->tables()) @@ -19,10 +19,22 @@ ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query) auto * tables = select_query->tables()->as(); auto * table_expression = tables->children[0]->as()->table_expression->as(); - if (!table_expression->table_function) + return table_expression; +} + +ASTPtr extractTableFunctionASTPtrFromSelectQuery(ASTPtr & query) +{ + auto table_expression = extractTableExpressionASTPtrFromSelectQuery(query); + return table_expression ? table_expression->table_function : nullptr; +} + +ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query) +{ + auto table_function_ast = extractTableFunctionASTPtrFromSelectQuery(query); + if (!table_function_ast) return nullptr; - auto * table_function = table_expression->table_function->as(); + auto * table_function = table_function_ast->as(); return table_function; } diff --git a/src/Storages/extractTableFunctionArgumentsFromSelectQuery.h b/src/Storages/extractTableFunctionArgumentsFromSelectQuery.h index 87edf01c1c82..9834f3dc7573 100644 --- a/src/Storages/extractTableFunctionArgumentsFromSelectQuery.h +++ b/src/Storages/extractTableFunctionArgumentsFromSelectQuery.h @@ -6,7 +6,10 @@ namespace DB { +struct ASTTableExpression; +ASTTableExpression * extractTableExpressionASTPtrFromSelectQuery(ASTPtr & query); +ASTPtr extractTableFunctionASTPtrFromSelectQuery(ASTPtr & query); ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query); ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query); diff --git a/src/TableFunctions/TableFunctionRemote.h b/src/TableFunctions/TableFunctionRemote.h index 0f75bf2b854c..4de60a79aea3 100644 --- a/src/TableFunctions/TableFunctionRemote.h +++ b/src/TableFunctions/TableFunctionRemote.h @@ -26,6 +26,8 @@ class TableFunctionRemote : public ITableFunction bool needStructureConversion() const override { return false; } + void setRemoteTableFunction(ASTPtr remote_table_function_ptr_) { remote_table_function_ptr = remote_table_function_ptr_; } + private: StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override; diff --git a/tests/integration/test_cluster_discovery/test.py b/tests/integration/test_cluster_discovery/test.py index ebcb910479ca..be0c5b2f640d 100644 --- a/tests/integration/test_cluster_discovery/test.py +++ b/tests/integration/test_cluster_discovery/test.py @@ -125,6 +125,13 @@ def test_cluster_discovery_startup_and_stop(start_cluster): def test_cluster_discovery_macros(start_cluster): + # wait for all nodes to be started + check_nodes_count = functools.partial( + check_on_cluster, what="count()", msg="Wrong nodes count in cluster" + ) + total_nodes = len(nodes) - 1 + check_nodes_count([nodes["node_observer"]], total_nodes) + # check macros res = nodes["node_observer"].query( "SELECT sum(number) FROM clusterAllReplicas('{autocluster}', system.numbers) WHERE number=1"