diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index 37e60f3f5115..fa71621a0242 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -746,8 +746,12 @@ void RemoteQueryExecutor::processReadTaskRequest() if (!extension || !extension->task_iterator) throw Exception(ErrorCodes::LOGICAL_ERROR, "Distributed task iterator is not initialized"); + if (!extension->replica_info) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Replica info is not initialized"); + ProfileEvents::increment(ProfileEvents::ReadTaskRequestsReceived); - auto response = (*extension->task_iterator)(); + + auto response = (*extension->task_iterator)(extension->replica_info->number_of_current_replica); connections->sendReadTaskResponse(response); } diff --git a/src/QueryPipeline/RemoteQueryExecutor.h b/src/QueryPipeline/RemoteQueryExecutor.h index 32d0cca3480d..f3381828e84d 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.h +++ b/src/QueryPipeline/RemoteQueryExecutor.h @@ -28,7 +28,7 @@ class RemoteQueryExecutorReadContext; class ParallelReplicasReadingCoordinator; /// This is the same type as StorageS3Source::IteratorWrapper -using TaskIterator = std::function; +using TaskIterator = std::function; /// This class allows one to launch queries on remote replicas of one shard and get results class RemoteQueryExecutor diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp index 6b5e9f0e49ba..bbdcee5ac5d4 100644 --- a/src/Storages/IStorageCluster.cpp +++ b/src/Storages/IStorageCluster.cpp @@ -34,6 +34,7 @@ namespace Setting extern const SettingsBool async_query_sending_for_remote; extern const SettingsBool async_socket_for_remote; extern const SettingsBool skip_unavailable_shards; + extern const SettingsNonZeroUInt64 max_parallel_replicas; } namespace ErrorCodes @@ -67,7 +68,17 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate) if (extension) return; - extension = storage->getTaskIteratorExtension(predicate, context); + std::vector ids_of_hosts; + for (const auto & shard : cluster->getShardsInfo()) + { + if (shard.per_replica_pools.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {} with empty shard {}", cluster->getName(), shard.shard_num); + if (!shard.per_replica_pools[0]) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {}, shard {} with empty node", cluster->getName(), shard.shard_num); + ids_of_hosts.push_back(shard.per_replica_pools[0]->getAddress()); + } + + extension = storage->getTaskIteratorExtension(predicate, context, ids_of_hosts); } /// The code executes on initiator @@ -155,8 +166,6 @@ SinkToStoragePtr IStorageCluster::write( void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) { - createExtension(nullptr); - const Scalars & scalars = context->hasQueryContext() ? context->getQueryContext()->getScalars() : Scalars{}; const bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState; @@ -164,29 +173,44 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const auto new_context = updateSettings(context->getSettingsRef()); const auto & current_settings = new_context->getSettingsRef(); auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings); + + size_t replica_index = 0; + + createExtension(nullptr); + for (const auto & shard_info : cluster->getShardsInfo()) { - auto try_results = shard_info.pool->getMany(timeouts, current_settings, PoolMode::GET_MANY); - for (auto & try_result : try_results) - { - auto remote_query_executor = std::make_shared( - std::vector{try_result}, - queryToString(query_to_send), - getOutputHeader(), - new_context, - /*throttler=*/nullptr, - scalars, - Tables(), - processed_stage, - extension); - - remote_query_executor->setLogger(log); - pipes.emplace_back(std::make_shared( - remote_query_executor, - add_agg_info, - current_settings[Setting::async_socket_for_remote], - current_settings[Setting::async_query_sending_for_remote])); - } + /// We're taking all replicas as shards, + /// so each shard will have only one address to connect to. + auto try_results = shard_info.pool->getMany( + timeouts, + current_settings, + PoolMode::GET_ONE, + {}, + /*skip_unavailable_endpoints=*/true); + + if (try_results.empty()) + continue; + + IConnections::ReplicaInfo replica_info{ .number_of_current_replica = replica_index++ }; + + auto remote_query_executor = std::make_shared( + std::vector{try_results.front()}, + queryToString(query_to_send), + getOutputHeader(), + new_context, + /*throttler=*/nullptr, + scalars, + Tables(), + processed_stage, + RemoteQueryExecutor::Extension{.task_iterator = extension->task_iterator, .replica_info = std::move(replica_info)}); + + remote_query_executor->setLogger(log); + pipes.emplace_back(std::make_shared( + remote_query_executor, + add_agg_info, + current_settings[Setting::async_socket_for_remote], + current_settings[Setting::async_query_sending_for_remote])); } auto pipe = Pipe::unitePipes(std::move(pipes)); diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h index 2992c3bc2497..89155e4041e5 100644 --- a/src/Storages/IStorageCluster.h +++ b/src/Storages/IStorageCluster.h @@ -40,7 +40,10 @@ class IStorageCluster : public IStorage ClusterPtr getCluster(ContextPtr context) const { return getClusterImpl(context, cluster_name); } /// Query is needed for pruning by virtual columns (_file, _path) - virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const = 0; + virtual RemoteQueryExecutor::Extension getTaskIteratorExtension( + const ActionsDAG::Node * predicate, + const ContextPtr & context, + std::optional> ids_of_hosts = std::nullopt) const = 0; QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index affc68245ae4..cd44e6ca978d 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -17,6 +17,8 @@ #include #include #include +#include + namespace DB { @@ -281,19 +283,21 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded( } RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension( - const ActionsDAG::Node * predicate, const ContextPtr & local_context) const + const ActionsDAG::Node * predicate, + const ContextPtr & local_context, + std::optional> ids_of_replicas) const { auto iterator = StorageObjectStorageSource::createFileIterator( configuration, configuration->getQuerySettings(local_context), object_storage, /* distributed_processing */false, local_context, predicate, getVirtualsList(), nullptr, local_context->getFileProgressCallback()); - auto callback = std::make_shared>([iterator]() mutable -> String - { - auto object_info = iterator->next(0); - if (object_info) - return object_info->getPath(); - return ""; - }); + auto task_distributor = std::make_shared(iterator, ids_of_replicas); + + auto callback = std::make_shared( + [task_distributor](size_t number_of_current_replica) mutable -> String { + return task_distributor->getNextTask(number_of_current_replica).value_or(""); + }); + return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) }; } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 492ea02c505a..f4f4d565d355 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -30,7 +30,9 @@ class StorageObjectStorageCluster : public IStorageCluster std::string getName() const override; RemoteQueryExecutor::Extension getTaskIteratorExtension( - const ActionsDAG::Node * predicate, const ContextPtr & context) const override; + const ActionsDAG::Node * predicate, + const ContextPtr & context, + std::optional> ids_of_replicas) const override; String getPathSample(StorageInMemoryMetadata metadata, ContextPtr context); diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp new file mode 100644 index 000000000000..d2127a7f45c4 --- /dev/null +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -0,0 +1,178 @@ +#include "StorageObjectStorageStableTaskDistributor.h" +#include +#include +#include + +namespace DB +{ + +StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor( + std::shared_ptr iterator_, + std::optional> ids_of_nodes_) + : iterator(std::move(iterator_)) + , connection_to_files(ids_of_nodes_.has_value() ? ids_of_nodes_.value().size() : 1) + , ids_of_nodes(ids_of_nodes_) + , iterator_exhausted(false) +{ +} + +std::optional StorageObjectStorageStableTaskDistributor::getNextTask(size_t number_of_current_replica) +{ + LOG_TRACE( + log, + "Received a new connection from replica {} looking for a file", + number_of_current_replica + ); + + // 1. Check pre-queued files first + if (auto file = getPreQueuedFile(number_of_current_replica)) + return file; + + // 2. Try to find a matching file from the iterator + if (auto file = getMatchingFileFromIterator(number_of_current_replica)) + return file; + + // 3. Process unprocessed files if iterator is exhausted + return getAnyUnprocessedFile(number_of_current_replica); +} + +size_t StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String & file_path) +{ + if (!ids_of_nodes.has_value()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "No list of nodes inside Task Distributer."); + + const auto & ids_of_nodes_value = ids_of_nodes.value(); + size_t nodes_count = ids_of_nodes_value.size(); + + /// Trivial case + if (nodes_count < 2) + return 0; + + /// Rendezvous hashing + size_t best_id = 0; + UInt64 best_weight = sipHash64(ids_of_nodes_value[0] + file_path); + for (size_t id = 1; id < nodes_count; ++id) + { + UInt64 weight = sipHash64(ids_of_nodes_value[id] + file_path); + if (weight > best_weight) + { + best_weight = weight; + best_id = id; + } + } + return best_id; +} + +std::optional StorageObjectStorageStableTaskDistributor::getPreQueuedFile(size_t number_of_current_replica) +{ + std::lock_guard lock(mutex); + + auto & files = connection_to_files[number_of_current_replica]; + + while (!files.empty()) + { + String next_file = files.back(); + files.pop_back(); + + auto it = unprocessed_files.find(next_file); + if (it == unprocessed_files.end()) + continue; + + unprocessed_files.erase(it); + + LOG_TRACE( + log, + "Assigning pre-queued file {} to replica {}", + next_file, + number_of_current_replica + ); + + return next_file; + } + + return std::nullopt; +} + +std::optional StorageObjectStorageStableTaskDistributor::getMatchingFileFromIterator(size_t number_of_current_replica) +{ + { + std::lock_guard lock(mutex); + if (iterator_exhausted) + return std::nullopt; + } + + while (true) + { + ObjectInfoPtr object_info; + + { + std::lock_guard lock(mutex); + object_info = iterator->next(0); + + if (!object_info) + { + iterator_exhausted = true; + break; + } + } + + String file_path; + + auto archive_object_info = std::dynamic_pointer_cast(object_info); + if (archive_object_info) + { + file_path = archive_object_info->getPathToArchive(); + } + else + { + file_path = object_info->getPath(); + } + + size_t file_replica_idx = getReplicaForFile(file_path); + if (file_replica_idx == number_of_current_replica) + { + LOG_TRACE( + log, + "Found file {} for replica {}", + file_path, + number_of_current_replica + ); + + return file_path; + } + + // Queue file for its assigned replica + { + std::lock_guard lock(mutex); + unprocessed_files.insert(file_path); + connection_to_files[file_replica_idx].push_back(file_path); + } + } + + return std::nullopt; +} + +std::optional StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(size_t number_of_current_replica) +{ + std::lock_guard lock(mutex); + + if (!unprocessed_files.empty()) + { + auto it = unprocessed_files.begin(); + String next_file = *it; + unprocessed_files.erase(it); + + LOG_TRACE( + log, + "Iterator exhausted. Assigning unprocessed file {} to replica {}", + next_file, + number_of_current_replica + ); + + return next_file; + } + + return std::nullopt; +} + +} diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h new file mode 100644 index 000000000000..a87884885a45 --- /dev/null +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h @@ -0,0 +1,44 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +class StorageObjectStorageStableTaskDistributor +{ +public: + StorageObjectStorageStableTaskDistributor( + std::shared_ptr iterator_, + std::optional> ids_of_nodes_); + + std::optional getNextTask(size_t number_of_current_replica); + +private: + size_t getReplicaForFile(const String & file_path); + std::optional getPreQueuedFile(size_t number_of_current_replica); + std::optional getMatchingFileFromIterator(size_t number_of_current_replica); + std::optional getAnyUnprocessedFile(size_t number_of_current_replica); + + std::shared_ptr iterator; + + std::vector> connection_to_files; + std::unordered_set unprocessed_files; + + std::optional> ids_of_nodes; + + std::mutex mutex; + bool iterator_exhausted = false; + + LoggerPtr log = getLogger("StorageClusterTaskDistributor"); +}; + +} diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index debe646fcb1e..0627053bc177 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -1131,9 +1131,6 @@ std::optional StorageDistributed::distributedWriteFromClusterStor if (filter) predicate = filter->getOutputs().at(0); - /// Select query is needed for pruining on virtual columns - auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context); - auto dst_cluster = getCluster(); auto new_query = std::dynamic_pointer_cast(query.clone()); @@ -1160,8 +1157,13 @@ std::optional StorageDistributed::distributedWriteFromClusterStor const auto & current_settings = query_context->getSettingsRef(); auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings); - /// Here we take addresses from destination cluster and assume source table exists on these nodes const auto cluster = getCluster(); + + /// Select query is needed for pruining on virtual columns + auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context); + + /// Here we take addresses from destination cluster and assume source table exists on these nodes + size_t replica_index = 0; for (const auto & replicas : cluster->getShardsInfo()) { /// Skip unavailable hosts if necessary @@ -1170,6 +1172,8 @@ std::optional StorageDistributed::distributedWriteFromClusterStor /// There will be only one replica, because we consider each replica as a shard for (const auto & try_result : try_results) { + IConnections::ReplicaInfo replica_info{ .number_of_current_replica = replica_index++ }; + auto remote_query_executor = std::make_shared( std::vector{try_result}, new_query_str, @@ -1179,7 +1183,7 @@ std::optional StorageDistributed::distributedWriteFromClusterStor Scalars{}, Tables{}, QueryProcessingStage::Complete, - extension); + RemoteQueryExecutor::Extension{.task_iterator = extension.task_iterator, .replica_info = std::move(replica_info)}); QueryPipeline remote_pipeline(std::make_shared( remote_query_executor, false, settings[Setting::async_socket_for_remote], settings[Setting::async_query_sending_for_remote])); diff --git a/src/Storages/StorageFileCluster.cpp b/src/Storages/StorageFileCluster.cpp index c01738067c40..c4a2ca91ba23 100644 --- a/src/Storages/StorageFileCluster.cpp +++ b/src/Storages/StorageFileCluster.cpp @@ -74,10 +74,13 @@ void StorageFileCluster::updateQueryToSendIfNeeded(DB::ASTPtr & query, const Sto expression_list->children, storage_snapshot->metadata->getColumns().getAll().toNamesAndTypesDescription(), format_name, context); } -RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const +RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension( + const ActionsDAG::Node * predicate, + const ContextPtr & context, + std::optional>) const { auto iterator = std::make_shared(paths, std::nullopt, predicate, getVirtualsList(), context); - auto callback = std::make_shared([iter = std::move(iterator)]() mutable -> String { return iter->next(); }); + auto callback = std::make_shared([iter = std::move(iterator)](size_t) mutable -> String { return iter->next(); }); return RemoteQueryExecutor::Extension{.task_iterator = std::move(callback)}; } diff --git a/src/Storages/StorageFileCluster.h b/src/Storages/StorageFileCluster.h index 9549f3a035c3..3329223739ae 100644 --- a/src/Storages/StorageFileCluster.h +++ b/src/Storages/StorageFileCluster.h @@ -27,7 +27,10 @@ class StorageFileCluster : public IStorageCluster const ConstraintsDescription & constraints_); std::string getName() const override { return "FileCluster"; } - RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const override; + RemoteQueryExecutor::Extension getTaskIteratorExtension( + const ActionsDAG::Node * predicate, + const ContextPtr & context, + std::optional> ids_of_nodes) const override; private: void updateQueryToSendIfNeeded(ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) override; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index e5c42d347839..f3717d907843 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -5987,7 +5987,6 @@ SinkToStoragePtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/, con std::optional StorageReplicatedMergeTree::distributedWriteFromClusterStorage(const std::shared_ptr & src_storage_cluster, const ASTInsertQuery & query, ContextPtr local_context) { const auto & settings = local_context->getSettingsRef(); - auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context); /// Here we won't check that the cluster formed from table replicas is a subset of a cluster specified in s3Cluster/hdfsCluster table function auto src_cluster = src_storage_cluster->getCluster(local_context); @@ -6006,6 +6005,9 @@ std::optional StorageReplicatedMergeTree::distributedWriteFromClu ContextMutablePtr query_context = Context::createCopy(local_context); query_context->increaseDistributedDepth(); + auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context); + + size_t replica_index = 0; for (const auto & replicas : src_cluster->getShardsAddresses()) { /// There will be only one replica, because we consider each replica as a shard @@ -6020,6 +6022,8 @@ std::optional StorageReplicatedMergeTree::distributedWriteFromClu node.secure ); + IConnections::ReplicaInfo replica_info{ .number_of_current_replica = replica_index++ }; + auto remote_query_executor = std::make_shared( connection, query_str, @@ -6029,7 +6033,7 @@ std::optional StorageReplicatedMergeTree::distributedWriteFromClu Scalars{}, Tables{}, QueryProcessingStage::Complete, - extension); + RemoteQueryExecutor::Extension{.task_iterator = extension.task_iterator, .replica_info = std::move(replica_info)}); QueryPipeline remote_pipeline(std::make_shared( remote_query_executor, false, settings[Setting::async_socket_for_remote], settings[Setting::async_query_sending_for_remote])); diff --git a/src/Storages/StorageURLCluster.cpp b/src/Storages/StorageURLCluster.cpp index 7beb73d2047f..56e5cd3b26a1 100644 --- a/src/Storages/StorageURLCluster.cpp +++ b/src/Storages/StorageURLCluster.cpp @@ -93,11 +93,14 @@ void StorageURLCluster::updateQueryToSendIfNeeded(ASTPtr & query, const StorageS expression_list->children, storage_snapshot->metadata->getColumns().getAll().toNamesAndTypesDescription(), format_name, context); } -RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const +RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension( + const ActionsDAG::Node * predicate, + const ContextPtr & context, + std::optional>) const { auto iterator = std::make_shared( uri, context->getSettingsRef()[Setting::glob_expansion_max_elements], predicate, getVirtualsList(), context); - auto callback = std::make_shared([iter = std::move(iterator)]() mutable -> String { return iter->next(); }); + auto callback = std::make_shared([iter = std::move(iterator)](size_t) mutable -> String { return iter->next(); }); return RemoteQueryExecutor::Extension{.task_iterator = std::move(callback)}; } diff --git a/src/Storages/StorageURLCluster.h b/src/Storages/StorageURLCluster.h index 31bffa062104..d09d2a36bd7e 100644 --- a/src/Storages/StorageURLCluster.h +++ b/src/Storages/StorageURLCluster.h @@ -30,7 +30,10 @@ class StorageURLCluster : public IStorageCluster const StorageURL::Configuration & configuration_); std::string getName() const override { return "URLCluster"; } - RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const override; + RemoteQueryExecutor::Extension getTaskIteratorExtension( + const ActionsDAG::Node * predicate, + const ContextPtr & context, + std::optional> ids_of_replicas) const override; private: void updateQueryToSendIfNeeded(ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) override; diff --git a/tests/integration/test_s3_cache_locality/__init__.py b/tests/integration/test_s3_cache_locality/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/integration/test_s3_cache_locality/configs/cluster.xml b/tests/integration/test_s3_cache_locality/configs/cluster.xml new file mode 100644 index 000000000000..db54c35374b9 --- /dev/null +++ b/tests/integration/test_s3_cache_locality/configs/cluster.xml @@ -0,0 +1,126 @@ + + + + + + + + clickhouse1 + 9000 + + + clickhouse2 + 9000 + + + clickhouse3 + 9000 + + + clickhouse4 + 9000 + + + clickhouse5 + 9000 + + + + + + + + clickhouse1 + 9000 + + + clickhouse2 + 9000 + + + clickhouse3 + 9000 + + + clickhouse4 + 9000 + + + + + + + + clickhouse2 + 9000 + + + clickhouse3 + 9000 + + + clickhouse4 + 9000 + + + clickhouse5 + 9000 + + + + + + + + clickhouse3 + 9000 + + + clickhouse4 + 9000 + + + clickhouse5 + 9000 + + + clickhouse1 + 9000 + + + clickhouse2 + 9000 + + + + + + + + clickhouse4 + 9000 + + + clickhouse5 + 9000 + + + clickhouse2 + 9000 + + + clickhouse3 + 9000 + + + + + + + + + /var/lib/clickhouse/raw_s3_cache + 10Gi + + + diff --git a/tests/integration/test_s3_cache_locality/configs/named_collections.xml b/tests/integration/test_s3_cache_locality/configs/named_collections.xml new file mode 100644 index 000000000000..511078d6f0d9 --- /dev/null +++ b/tests/integration/test_s3_cache_locality/configs/named_collections.xml @@ -0,0 +1,10 @@ + + + + http://minio1:9001/root/data/* + minio + minio123 + CSV> + + + diff --git a/tests/integration/test_s3_cache_locality/configs/users.xml b/tests/integration/test_s3_cache_locality/configs/users.xml new file mode 100644 index 000000000000..4b6ba057ecb1 --- /dev/null +++ b/tests/integration/test_s3_cache_locality/configs/users.xml @@ -0,0 +1,9 @@ + + + + + default + 1 + + + diff --git a/tests/integration/test_s3_cache_locality/test.py b/tests/integration/test_s3_cache_locality/test.py new file mode 100644 index 000000000000..088c24576d10 --- /dev/null +++ b/tests/integration/test_s3_cache_locality/test.py @@ -0,0 +1,187 @@ +import csv +import logging +import os +import shutil +import uuid + +import pytest + +from helpers.cluster import ClickHouseCluster + +logging.getLogger().setLevel(logging.INFO) +logging.getLogger().addHandler(logging.StreamHandler()) + +SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) + + +def create_buckets_s3(cluster): + minio = cluster.minio_client + + s3_data = [] + + for file_number in range(1000): + file_name = f"data/generated/file_{file_number}.csv" + os.makedirs(os.path.join(SCRIPT_DIR, "data/generated/"), exist_ok=True) + s3_data.append(file_name) + with open(os.path.join(SCRIPT_DIR, file_name), "w+", encoding="utf-8") as f: + # a String, b UInt64 + data = [] + + # Make all files a bit different + data.append( + ["str_" + str(file_number), file_number] + ) + + writer = csv.writer(f) + writer.writerows(data) + + for file in s3_data: + minio.fput_object( + bucket_name=cluster.minio_bucket, + object_name=file, + file_path=os.path.join(SCRIPT_DIR, file), + ) + + for obj in minio.list_objects(cluster.minio_bucket, recursive=True): + print(obj.object_name) + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster = ClickHouseCluster(__file__) + # clickhouse0 not a member of cluster_XXX + for i in range(6): + cluster.add_instance( + f"clickhouse{i}", + main_configs=["configs/cluster.xml", "configs/named_collections.xml"], + user_configs=["configs/users.xml"], + macros={"replica": f"clickhouse{i}"}, + with_minio=True, + with_zookeeper=True, + ) + + logging.info("Starting cluster...") + cluster.start() + logging.info("Cluster started") + + create_buckets_s3(cluster) + + yield cluster + finally: + shutil.rmtree(os.path.join(SCRIPT_DIR, "data/generated/")) + cluster.shutdown() + + +def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache): + for host in list(cluster.instances.values()): + host.query("SYSTEM DROP FILESYSTEM CACHE 'raw_s3_cache'") + + query_id_first = str(uuid.uuid4()) + result_first = node.query( + f""" + SELECT count(*) + FROM s3Cluster('{cluster_first}', 'http://minio1:9001/root/data/generated/*', 'minio', 'minio123', 'CSV', 'a String, b UInt64') + WHERE b=42 + SETTINGS + enable_filesystem_cache={enable_filesystem_cache}, + filesystem_cache_name='raw_s3_cache' + """, + query_id=query_id_first + ) + assert result_first == expected_result + query_id_second = str(uuid.uuid4()) + result_second = node.query( + f""" + SELECT count(*) + FROM s3Cluster('{cluster_second}', 'http://minio1:9001/root/data/generated/*', 'minio', 'minio123', 'CSV', 'a String, b UInt64') + WHERE b=42 + SETTINGS + enable_filesystem_cache={enable_filesystem_cache}, + filesystem_cache_name='raw_s3_cache' + """, + query_id=query_id_second + ) + assert result_second == expected_result + + node.query("SYSTEM FLUSH LOGS") + node.query(f"SYSTEM FLUSH LOGS ON CLUSTER {cluster_first}") + node.query(f"SYSTEM FLUSH LOGS ON CLUSTER {cluster_second}") + + s3_get_first = node.query( + f""" + SELECT sum(ProfileEvents['S3GetObject']) + FROM clusterAllReplicas('{cluster_first}', system.query_log) + WHERE type='QueryFinish' + AND initial_query_id='{query_id_first}' + """ + ) + s3_get_second = node.query( + f""" + SELECT sum(ProfileEvents['S3GetObject']) + FROM clusterAllReplicas('{cluster_second}', system.query_log) + WHERE type='QueryFinish' + AND initial_query_id='{query_id_second}' + """ + ) + + return int(s3_get_first), int(s3_get_second) + + +def check_s3_gets_repeat(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache): + # Repeat test several times to get average result + iterations = 10 + s3_get_first_sum = 0 + s3_get_second_sum = 0 + for _ in range(iterations): + (s3_get_first, s3_get_second) = check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache) + s3_get_first_sum += s3_get_first + s3_get_second_sum += s3_get_second + return s3_get_first_sum, s3_get_second_sum + + +def test_cache_locality(started_cluster): + node = started_cluster.instances["clickhouse0"] + + expected_result = node.query( + """ + SELECT count(*) + FROM s3('http://minio1:9001/root/data/generated/*', 'minio', 'minio123', 'CSV', 'a String, b UInt64') + WHERE b=42 + """ + ) + + # Algorithm does not give 100% guarantee, so add 10% on dispersion + dispersion = 0.1 + + # No cache + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_12345', 0) + assert s3_get_second == s3_get_first + + # With cache + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_12345', 1) + assert s3_get_second <= s3_get_first * dispersion + + # Different nodes order + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_34512', 1) + assert s3_get_second <= s3_get_first * dispersion + + # No last node + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_1234', 1) + assert s3_get_second <= s3_get_first * (0.2 + dispersion) + + # No first node + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_2345', 1) + assert s3_get_second <= s3_get_first * (0.2 + dispersion) + + # No first node, different nodes order + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_4523', 1) + assert s3_get_second <= s3_get_first * (0.2 + dispersion) + + # Add new node, different nodes order + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_4523', 'cluster_12345', 1) + assert s3_get_second <= s3_get_first * (0.2 + dispersion) + + # New node and old node, different nodes order + (s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_1234', 'cluster_4523', 1) + assert s3_get_second <= s3_get_first * (0.4375 + dispersion)