diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index 6cf960218232..bfb492a812aa 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -10,6 +10,7 @@ M(Merge, "Number of executing background merges") \ M(MergeParts, "Number of source parts participating in current background merges") \ M(Move, "Number of currently executing moves") \ + M(Export, "Number of currently executing exports") \ M(PartMutation, "Number of mutations (ALTER DELETE/UPDATE)") \ M(ReplicatedFetch, "Number of data parts being fetched from replica") \ M(ReplicatedSend, "Number of data parts being sent to replicas") \ diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index 14cadc3b3587..91ce75d3a32f 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -28,6 +28,10 @@ M(AsyncInsertBytes, "Data size in bytes of asynchronous INSERT queries.", ValueType::Bytes) \ M(AsyncInsertRows, "Number of rows inserted by asynchronous INSERT queries.", ValueType::Number) \ M(AsyncInsertCacheHits, "Number of times a duplicate hash id has been found in asynchronous INSERT hash id cache.", ValueType::Number) \ + M(PartsExports, "Number of successful part exports.", ValueType::Number) \ + M(PartsExportFailures, "Number of failed part exports.", ValueType::Number) \ + M(PartsExportDuplicated, "Number of part exports that failed because target already exists.", ValueType::Number) \ + M(PartsExportTotalMilliseconds, "Total time spent on part export operations.", ValueType::Milliseconds) \ M(FailedQuery, "Number of failed queries.", ValueType::Number) \ M(FailedSelectQuery, "Same as FailedQuery, but only for SELECT queries.", ValueType::Number) \ M(FailedInsertQuery, "Same as FailedQuery, but only for INSERT queries.", ValueType::Number) \ diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 44141270985f..023c7951fc91 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -6703,6 +6703,9 @@ Allows to change the behaviour of the result type of `dateTrunc` function. Possible values: - 0 - When the second argument is `DateTime64/Date32` the return type will be `DateTime64/Date32` regardless of the time unit in the first argument. - 1 - For `Date32` the result is always `Date`. For `DateTime64` the result is `DateTime` for time units `second` and higher. +)", 0) \ + DECLARE(Bool, export_merge_tree_part_overwrite_file_if_exists, false, R"( +Overwrite file if it already exists when exporting a merge tree part )", 0) \ \ /* ####################################################### */ \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 8dbfaff5c02f..714ac3796000 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -79,6 +79,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"object_storage_cluster_join_mode", "allow", "allow", "New setting"}, {"object_storage_remote_initiator", false, false, "New setting."}, {"allow_experimental_export_merge_tree_part", false, false, "New setting."}, + {"export_merge_tree_part_overwrite_file_if_exists", false, false, "New setting."}, }); addSettingsChanges(settings_changes_history, "25.6", { diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 6bc374c48c1e..14723c28025f 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -34,6 +34,7 @@ #include #include #include +#include #include #include #include @@ -465,6 +466,7 @@ struct ContextSharedPart : boost::noncopyable GlobalOvercommitTracker global_overcommit_tracker; MergeList merge_list; /// The list of executable merge (for (Replicated)?MergeTree) MovesList moves_list; /// The list of executing moves (for (Replicated)?MergeTree) + ExportsList exports_list; /// The list of executing exports (for (Replicated)?MergeTree) ReplicatedFetchList replicated_fetch_list; RefreshSet refresh_set; /// The list of active refreshes (for MaterializedView) ConfigurationPtr users_config TSA_GUARDED_BY(mutex); /// Config with the users, profiles and quotas sections. @@ -1158,6 +1160,8 @@ MergeList & Context::getMergeList() { return shared->merge_list; } const MergeList & Context::getMergeList() const { return shared->merge_list; } MovesList & Context::getMovesList() { return shared->moves_list; } const MovesList & Context::getMovesList() const { return shared->moves_list; } +ExportsList & Context::getExportsList() { return shared->exports_list; } +const ExportsList & Context::getExportsList() const { return shared->exports_list; } ReplicatedFetchList & Context::getReplicatedFetchList() { return shared->replicated_fetch_list; } const ReplicatedFetchList & Context::getReplicatedFetchList() const { return shared->replicated_fetch_list; } RefreshSet & Context::getRefreshSet() { return shared->refresh_set; } diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 14c61ca2f176..297f12d58a7d 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -88,6 +88,7 @@ class AsynchronousMetrics; class BackgroundSchedulePool; class MergeList; class MovesList; +class ExportsList; class ReplicatedFetchList; class RefreshSet; class Cluster; @@ -1141,6 +1142,9 @@ class Context: public ContextData, public std::enable_shared_from_this MovesList & getMovesList(); const MovesList & getMovesList() const; + ExportsList & getExportsList(); + const ExportsList & getExportsList() const; + ReplicatedFetchList & getReplicatedFetchList(); const ReplicatedFetchList & getReplicatedFetchList() const; diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index c86c06ca64a3..19a30d952c8f 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -444,24 +444,20 @@ class IStorage : public std::enable_shared_from_this, public TypePromo return false; } - struct ImportStats - { - ExecutionStatus status; - std::size_t elapsed_ns = 0; - std::size_t bytes_on_disk = 0; - std::size_t read_rows = 0; - std::size_t read_bytes = 0; - std::string file_path = ""; - }; - + /* +It is currently only implemented in StorageObjectStorage. + It is meant to be used to import merge tree data parts into object storage. It is similar to the write API, + but it won't re-partition the data and should allow the filename to be set by the caller. + */ virtual SinkToStoragePtr import( const std::string & /* file_name */, Block & /* block_with_partition_values */, - ContextPtr /* context */, - std::function /* stats_log */) - { - throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Import is not implemented for storage {}", getName()); - } + std::string & /* destination_file_path */, + bool /* overwrite_if_exists */, + ContextPtr /* context */) + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Import is not implemented for storage {}", getName()); + } /** Writes the data to a table in distributed manner. diff --git a/src/Storages/MergeTree/ExportList.cpp b/src/Storages/MergeTree/ExportList.cpp new file mode 100644 index 000000000000..63ecc6d7c1f1 --- /dev/null +++ b/src/Storages/MergeTree/ExportList.cpp @@ -0,0 +1,66 @@ +#include + +namespace DB +{ + +ExportsListElement::ExportsListElement( + const StorageID & source_table_id_, + const StorageID & destination_table_id_, + UInt64 part_size_, + const String & part_name_, + const String & target_file_name_, + UInt64 total_rows_to_read_, + UInt64 total_size_bytes_compressed_, + UInt64 total_size_bytes_uncompressed_, + time_t create_time_, + const ContextPtr & context) +: source_table_id(source_table_id_) +, destination_table_id(destination_table_id_) +, part_size(part_size_) +, part_name(part_name_) +, destination_file_path(target_file_name_) +, total_rows_to_read(total_rows_to_read_) +, total_size_bytes_compressed(total_size_bytes_compressed_) +, total_size_bytes_uncompressed(total_size_bytes_uncompressed_) +, create_time(create_time_) +{ + thread_group = ThreadGroup::createForBackgroundProcess(context); +} + +ExportsListElement::~ExportsListElement() +{ + background_memory_tracker.adjustOnBackgroundTaskEnd(&thread_group->memory_tracker); +} + +ExportInfo ExportsListElement::getInfo() const +{ + ExportInfo res; + res.source_database = source_table_id.database_name; + res.source_table = source_table_id.table_name; + res.destination_database = destination_table_id.database_name; + res.destination_table = destination_table_id.table_name; + res.part_name = part_name; + res.destination_file_path = destination_file_path; + res.rows_read = rows_read; + res.total_rows_to_read = total_rows_to_read; + res.total_size_bytes_compressed = total_size_bytes_compressed; + res.total_size_bytes_uncompressed = total_size_bytes_uncompressed; + res.bytes_read_uncompressed = bytes_read_uncompressed; + res.memory_usage = getMemoryUsage(); + res.peak_memory_usage = getPeakMemoryUsage(); + res.create_time = create_time; + res.elapsed = elapsed; + return res; +} + +UInt64 ExportsListElement::getMemoryUsage() const +{ + return thread_group->memory_tracker.get(); +} + +UInt64 ExportsListElement::getPeakMemoryUsage() const +{ + return thread_group->memory_tracker.getPeak(); +} + +} diff --git a/src/Storages/MergeTree/ExportList.h b/src/Storages/MergeTree/ExportList.h new file mode 100644 index 000000000000..ade18b69480c --- /dev/null +++ b/src/Storages/MergeTree/ExportList.h @@ -0,0 +1,90 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace CurrentMetrics +{ + extern const Metric Export; +} + +namespace DB +{ + +struct ExportInfo +{ + String source_database; + String source_table; + String destination_database; + String destination_table; + String part_name; + String destination_file_path; + UInt64 rows_read; + UInt64 total_rows_to_read; + UInt64 total_size_bytes_compressed; + UInt64 total_size_bytes_uncompressed; + UInt64 bytes_read_uncompressed; + UInt64 memory_usage; + UInt64 peak_memory_usage; + time_t create_time = 0; + Float64 elapsed; +}; + +struct ExportsListElement : private boost::noncopyable +{ + const StorageID source_table_id; + const StorageID destination_table_id; + const UInt64 part_size; + const String part_name; + const String destination_file_path; + UInt64 rows_read {0}; + UInt64 total_rows_to_read {0}; + UInt64 total_size_bytes_compressed {0}; + UInt64 total_size_bytes_uncompressed {0}; + UInt64 bytes_read_uncompressed {0}; + time_t create_time {0}; + Float64 elapsed {0}; + + Stopwatch watch; + ThreadGroupPtr thread_group; + + ExportsListElement( + const StorageID & source_table_id_, + const StorageID & destination_table_id_, + UInt64 part_size_, + const String & part_name_, + const String & destination_file_path_, + UInt64 total_rows_to_read_, + UInt64 total_size_bytes_compressed_, + UInt64 total_size_bytes_uncompressed_, + time_t create_time_, + const ContextPtr & context); + + ~ExportsListElement(); + + ExportInfo getInfo() const; + + UInt64 getMemoryUsage() const; + UInt64 getPeakMemoryUsage() const; +}; + + +class ExportsList final : public BackgroundProcessList +{ +private: + using Parent = BackgroundProcessList; + +public: + ExportsList() + : Parent(CurrentMetrics::Export) + {} +}; + +using ExportsListEntry = BackgroundProcessListEntry; + +} diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 77604d5de286..bdceeca0abdc 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -1,6 +1,7 @@ #include #include +#include #include #include #include @@ -112,6 +113,7 @@ #include #include #include +#include #include #include @@ -153,6 +155,10 @@ namespace ProfileEvents extern const Event LoadedDataPartsMicroseconds; extern const Event RestorePartsSkippedFiles; extern const Event RestorePartsSkippedBytes; + extern const Event PartsExports; + extern const Event PartsExportTotalMilliseconds; + extern const Event PartsExportFailures; + extern const Event PartsExportDuplicated; } namespace CurrentMetrics @@ -197,6 +203,8 @@ namespace Setting extern const SettingsUInt64 merge_tree_storage_snapshot_sleep_ms; extern const SettingsBool allow_experimental_export_merge_tree_part; extern const SettingsUInt64 min_bytes_to_use_direct_io; + extern const SettingsBool export_merge_tree_part_overwrite_file_if_exists; + extern const SettingsBool output_format_parallel_formatting; } namespace MergeTreeSetting @@ -309,6 +317,7 @@ namespace ErrorCodes extern const int CANNOT_FORGET_PARTITION; extern const int DATA_TYPE_CANNOT_BE_USED_IN_KEY; extern const int UNKNOWN_TABLE; + extern const int FILE_ALREADY_EXISTS; } static void checkSuspiciousIndices(const ASTFunction * index_function) @@ -5931,9 +5940,15 @@ void MergeTreeData::exportPartToTable(const PartitionCommand & command, ContextP part_name, getStorageID().getFullTableName()); { + MergeTreeExportManifest manifest( + dest_storage->getStorageID(), + part, + query_context->getSettingsRef()[Setting::export_merge_tree_part_overwrite_file_if_exists], + query_context->getSettingsRef()[Setting::output_format_parallel_formatting]); + std::lock_guard lock(export_manifests_mutex); - if (!export_manifests.emplace(dest_storage->getStorageID(), part).second) + if (!export_manifests.emplace(std::move(manifest)).second) { throw Exception(ErrorCodes::ABORTED, "Data part '{}' is already being exported to table '{}'", part_name, dest_storage->getStorageID().getFullTableName()); @@ -5947,24 +5962,6 @@ void MergeTreeData::exportPartToTableImpl( const MergeTreeExportManifest & manifest, ContextPtr local_context) { - std::function part_log_wrapper = [this, manifest](ImportStats stats) { - const auto & data_part = manifest.data_part; - - writePartLog( - PartLogElement::Type::EXPORT_PART, - stats.status, - stats.elapsed_ns, - data_part->name, - data_part, - {data_part}, - nullptr, - nullptr); - - std::lock_guard inner_lock(export_manifests_mutex); - - export_manifests.erase(manifest); - }; - auto metadata_snapshot = getInMemoryMetadataPtr(); Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical(); StorageSnapshotPtr storage_snapshot = getStorageSnapshot(metadata_snapshot, local_context); @@ -5991,17 +5988,31 @@ void MergeTreeData::exportPartToTableImpl( throw Exception(ErrorCodes::UNKNOWN_TABLE, "Failed to reconstruct destination storage: {}", destination_storage_id_name); } - auto sink = destination_storage->import( - manifest.data_part->name, - block_with_partition_values, - local_context, - part_log_wrapper); + SinkToStoragePtr sink; + std::string destination_file_path; - /// Most likely the file has already been imported, so we can just return - if (!sink) + try { - std::lock_guard inner_lock(export_manifests_mutex); + auto context_copy = Context::createCopy(local_context); + context_copy->setSetting("output_format_parallel_formatting", manifest.parallel_formatting); + sink = destination_storage->import( + manifest.data_part->name + "_" + manifest.data_part->checksums.getTotalChecksumHex(), + block_with_partition_values, + destination_file_path, + manifest.overwrite_file_if_exists, + context_copy); + } + catch (const Exception & e) + { + if (e.code() == ErrorCodes::FILE_ALREADY_EXISTS) + { + ProfileEvents::increment(ProfileEvents::PartsExportDuplicated); + } + + ProfileEvents::increment(ProfileEvents::PartsExportFailures); + + std::lock_guard inner_lock(export_manifests_mutex); export_manifests.erase(manifest); return; } @@ -6042,15 +6053,79 @@ void MergeTreeData::exportPartToTableImpl( local_context, getLogger("ExportPartition")); + auto exports_list_entry = getContext()->getExportsList().insert( + getStorageID(), + manifest.destination_storage_id, + manifest.data_part->getBytesOnDisk(), + manifest.data_part->name, + destination_file_path, + manifest.data_part->rows_count, + manifest.data_part->getBytesOnDisk(), + manifest.data_part->getBytesUncompressedOnDisk(), + manifest.create_time, + local_context); + + ThreadGroupSwitcher switcher((*exports_list_entry)->thread_group, ""); + QueryPlanOptimizationSettings optimization_settings(local_context); auto pipeline_settings = BuildQueryPipelineSettings(local_context); auto builder = plan_for_part.buildQueryPipeline(optimization_settings, pipeline_settings); + + builder->setProgressCallback([&exports_list_entry](const Progress & progress) + { + (*exports_list_entry)->bytes_read_uncompressed += progress.read_bytes; + (*exports_list_entry)->rows_read += progress.read_rows; + (*exports_list_entry)->elapsed = (*exports_list_entry)->watch.elapsedSeconds(); + }); + auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); pipeline.complete(sink); - CompletedPipelineExecutor exec(pipeline); - exec.execute(); + try + { + CompletedPipelineExecutor exec(pipeline); + exec.execute(); + + std::lock_guard inner_lock(export_manifests_mutex); + writePartLog( + PartLogElement::Type::EXPORT_PART, + {}, + static_cast((*exports_list_entry)->elapsed * 1000000000), + manifest.data_part->name, + manifest.data_part, + {manifest.data_part}, + nullptr, + nullptr, + exports_list_entry.get()); + + export_manifests.erase(manifest); + + ProfileEvents::increment(ProfileEvents::PartsExports); + ProfileEvents::increment(ProfileEvents::PartsExportTotalMilliseconds, static_cast((*exports_list_entry)->elapsed * 1000)); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__, fmt::format("while exporting the part {}. User should retry.", manifest.data_part->name)); + + ProfileEvents::increment(ProfileEvents::PartsExportFailures); + + std::lock_guard inner_lock(export_manifests_mutex); + writePartLog( + PartLogElement::Type::EXPORT_PART, + ExecutionStatus::fromCurrentException("", true), + static_cast((*exports_list_entry)->elapsed * 1000000000), + manifest.data_part->name, + manifest.data_part, + {manifest.data_part}, + nullptr, + nullptr, + exports_list_entry.get()); + + export_manifests.erase(manifest); + + throw; + } } void MergeTreeData::movePartitionToShard(const ASTPtr & /*partition*/, bool /*move_part*/, const String & /*to*/, ContextPtr /*query_context*/) @@ -8637,7 +8712,8 @@ void MergeTreeData::writePartLog( const DataPartPtr & result_part, const DataPartsVector & source_parts, const MergeListEntry * merge_entry, - std::shared_ptr profile_counters) + std::shared_ptr profile_counters, + const ExportsListEntry * exports_entry) try { auto table_id = getStorageID(); @@ -8705,6 +8781,13 @@ try part_log_elem.rows = (*merge_entry)->rows_written; part_log_elem.peak_memory_usage = (*merge_entry)->getMemoryTracker().getPeak(); } + else if (exports_entry) + { + part_log_elem.rows_read = (*exports_entry)->rows_read; + part_log_elem.bytes_read_uncompressed = (*exports_entry)->bytes_read_uncompressed; + part_log_elem.peak_memory_usage = (*exports_entry)->getPeakMemoryUsage(); + part_log_elem.path_on_disk = (*exports_entry)->destination_file_path; + } if (profile_counters) { diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 7e148a7d0539..b9007af0e6f0 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -1509,7 +1510,8 @@ class MergeTreeData : public IStorage, public WithMutableContext const DataPartPtr & result_part, const DataPartsVector & source_parts, const MergeListEntry * merge_entry, - std::shared_ptr profile_counters); + std::shared_ptr profile_counters, + const ExportsListEntry * exports_entry = nullptr); /// If part is assigned to merge or mutation (possibly replicated) /// Should be overridden by children, because they can have different diff --git a/src/Storages/MergeTree/MergeTreeExportManifest.h b/src/Storages/MergeTree/MergeTreeExportManifest.h index 89aed701f993..36831fd132ba 100644 --- a/src/Storages/MergeTree/MergeTreeExportManifest.h +++ b/src/Storages/MergeTree/MergeTreeExportManifest.h @@ -8,9 +8,24 @@ struct MergeTreeExportManifest { using DataPartPtr = std::shared_ptr; + + MergeTreeExportManifest( + const StorageID & destination_storage_id_, + const DataPartPtr & data_part_, + bool overwrite_file_if_exists_, + bool parallel_formatting_) + : destination_storage_id(destination_storage_id_), + data_part(data_part_), + overwrite_file_if_exists(overwrite_file_if_exists_), + parallel_formatting(parallel_formatting_), + create_time(time(nullptr)) {} + StorageID destination_storage_id; DataPartPtr data_part; - time_t create_time = time(nullptr); + bool overwrite_file_if_exists; + bool parallel_formatting; + + time_t create_time; mutable bool in_progress = false; bool operator<(const MergeTreeExportManifest & rhs) const diff --git a/src/Storages/ObjectStorage/MergeTree/StorageObjectStorageImporterSink.cpp b/src/Storages/ObjectStorage/MergeTree/StorageObjectStorageImporterSink.cpp deleted file mode 100644 index 82a1bbfc81ea..000000000000 --- a/src/Storages/ObjectStorage/MergeTree/StorageObjectStorageImporterSink.cpp +++ /dev/null @@ -1,65 +0,0 @@ - -#include - -namespace DB -{ - -StorageObjectStorageImporterSink::StorageObjectStorageImporterSink( - const std::string & path_, - const ObjectStoragePtr & object_storage_, - const ConfigurationPtr & configuration_, - const std::optional & format_settings_, - const Block & sample_block_, - const std::function & part_log_, - const ContextPtr & context_) - : SinkToStorage(sample_block_) - , object_storage(object_storage_) - , configuration(configuration_) - , format_settings(format_settings_) - , sample_block(sample_block_) - , context(context_) - , part_log(part_log_) -{ - stats.file_path = path_; - sink = std::make_shared( - stats.file_path, - object_storage, - configuration, - format_settings, - sample_block, - context); -} - -String StorageObjectStorageImporterSink::getName() const -{ - return "StorageObjectStorageMergeTreePartImporterSink"; -} - -void StorageObjectStorageImporterSink::consume(Chunk & chunk) -{ - sink->consume(chunk); - stats.read_bytes += chunk.bytes(); - stats.read_rows += chunk.getNumRows(); -} - -void StorageObjectStorageImporterSink::onFinish() -{ - sink->onFinish(); - - if (const auto object_metadata = object_storage->tryGetObjectMetadata(stats.file_path)) - { - stats.bytes_on_disk = object_metadata->size_bytes; - } - - part_log(stats); -} - -void StorageObjectStorageImporterSink::onException(std::exception_ptr exception) -{ - sink->onException(exception); - - stats.status = ExecutionStatus(-1, "Error importing part"); - part_log(stats); -} - -} diff --git a/src/Storages/ObjectStorage/MergeTree/StorageObjectStorageImporterSink.h b/src/Storages/ObjectStorage/MergeTree/StorageObjectStorageImporterSink.h deleted file mode 100644 index 051f5196f964..000000000000 --- a/src/Storages/ObjectStorage/MergeTree/StorageObjectStorageImporterSink.h +++ /dev/null @@ -1,54 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include - -namespace DB -{ - -using DataPartPtr = std::shared_ptr; - -/* - * Wrapper around `StorageObjectsStorageSink` that takes care of accounting & metrics for partition export - */ -class StorageObjectStorageImporterSink : public SinkToStorage -{ -public: - using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr; - - StorageObjectStorageImporterSink( - const std::string & path_, - const ObjectStoragePtr & object_storage_, - const ConfigurationPtr & configuration_, - const std::optional & format_settings_, - const Block & sample_block_, - const std::function & part_log_, - const ContextPtr & context_); - - String getName() const override; - - void consume(Chunk & chunk) override; - - void onFinish() override; - - void onException(std::exception_ptr exception) override; - -private: - std::shared_ptr sink; - ObjectStoragePtr object_storage; - ConfigurationPtr configuration; - std::optional format_settings; - Block sample_block; - ContextPtr context; - std::function part_log; - - IStorage::ImportStats stats; -}; - -} diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index 1b0d322840fd..d256fa36f00e 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -27,7 +27,6 @@ #include #include #include -#include #include #include #include @@ -50,6 +49,7 @@ namespace ErrorCodes extern const int NOT_IMPLEMENTED; extern const int LOGICAL_ERROR; extern const int INCORRECT_DATA; + extern const int FILE_ALREADY_EXISTS; } String StorageObjectStorage::getPathSample(ContextPtr context) @@ -454,8 +454,9 @@ bool StorageObjectStorage::supportsImport() const SinkToStoragePtr StorageObjectStorage::import( const std::string & file_name, Block & block_with_partition_values, - ContextPtr local_context, - std::function part_log) + std::string & destination_file_path, + bool overwrite_if_exists, + ContextPtr local_context) { std::string partition_key; @@ -469,23 +470,20 @@ SinkToStoragePtr StorageObjectStorage::import( } } - const auto file_path = configuration->getPathForWrite(partition_key, file_name).path; + destination_file_path = configuration->getPathForWrite(partition_key, file_name).path; - if (object_storage->exists(StoredObject(file_path))) + if (!overwrite_if_exists && object_storage->exists(StoredObject(destination_file_path))) { - LOG_INFO(getLogger("StorageObjectStorage"), "File {} already exists, skipping import", file_path); - return nullptr; + throw Exception(ErrorCodes::FILE_ALREADY_EXISTS, "File {} already exists", destination_file_path); } - - return std::make_shared( - file_path, + + return std::make_shared( + destination_file_path, object_storage, configuration, format_settings, getInMemoryMetadataPtr()->getSampleBlock(), - part_log, - local_context - ); + local_context); } void StorageObjectStorage::truncate( diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index 14c83e722948..f12c6bd6077f 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -106,8 +106,9 @@ class StorageObjectStorage : public IStorage SinkToStoragePtr import( const std::string & /* file_name */, Block & /* block_with_partition_values */, - ContextPtr /* context */, - std::function /* part_log */) override; + std::string & /* destination_file_path */, + bool /* overwrite_if_exists */, + ContextPtr /* context */) override; void truncate( const ASTPtr & query, diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index d5f0ce7e1330..03fcd9962c4a 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -612,13 +612,14 @@ bool StorageObjectStorageCluster::supportsImport() const SinkToStoragePtr StorageObjectStorageCluster::import( const std::string & file_name, Block & block_with_partition_values, - ContextPtr context, - std::function part_log) + std::string & destination_file_path, + bool overwrite_if_exists, + ContextPtr context) { if (pure_storage) - return pure_storage->import(file_name, block_with_partition_values, context, part_log); + return pure_storage->import(file_name, block_with_partition_values, destination_file_path, overwrite_if_exists, context); - return IStorageCluster::import(file_name, block_with_partition_values, context, part_log); + return IStorageCluster::import(file_name, block_with_partition_values, destination_file_path, overwrite_if_exists, context); } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 41d7e7e7d867..374d35877048 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -63,8 +63,9 @@ class StorageObjectStorageCluster : public IStorageCluster SinkToStoragePtr import( const std::string & /* file_name */, Block & /* block_with_partition_values */, - ContextPtr /* context */, - std::function /* part_log */) override; + std::string & /* destination_file_path */, + bool /* overwrite_if_exists */, + ContextPtr /* context */) override; private: void updateQueryToSendIfNeeded( diff --git a/src/Storages/System/StorageSystemExports.cpp b/src/Storages/System/StorageSystemExports.cpp index 8a25b4188426..979fa21708d6 100644 --- a/src/Storages/System/StorageSystemExports.cpp +++ b/src/Storages/System/StorageSystemExports.cpp @@ -1,11 +1,9 @@ +#include #include #include -#include -#include -#include "Columns/ColumnString.h" -#include "DataTypes/DataTypeString.h" -#include -#include "Storages/VirtualColumnUtils.h" +#include +#include +#include #include #include #include @@ -23,95 +21,45 @@ ColumnsDescription StorageSystemExports::getColumnsDescription() {"destination_database", std::make_shared(), "Name of the destination database."}, {"destination_table", std::make_shared(), "Name of the destination table."}, {"create_time", std::make_shared(), "Date and time when the export command was submitted for execution."}, - {"part_name", std::make_shared(), "Name of the part"} + {"part_name", std::make_shared(), "Name of the part"}, + {"destination_file_path", std::make_shared(), "File path where the part is being exported."}, + {"elapsed", std::make_shared(), "The time elapsed (in seconds) since the export started."}, + {"rows_read", std::make_shared(), "The number of rows read from the exported part."}, + {"total_rows_to_read", std::make_shared(), "The total number of rows to read from the exported part."}, + {"total_size_bytes_compressed", std::make_shared(), "The total size of the compressed data in the exported part."}, + {"total_size_bytes_uncompressed", std::make_shared(), "The total size of the uncompressed data in the exported part."}, + {"bytes_read_uncompressed", std::make_shared(), "The number of uncompressed bytes read from the exported part."}, + {"memory_usage", std::make_shared(), "Current memory usage in bytes for the export operation."}, + {"peak_memory_usage", std::make_shared(), "Peak memory usage in bytes during the export operation."}, }; } -void StorageSystemExports::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node * predicate, std::vector) const +void StorageSystemExports::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector) const { const auto access = context->getAccess(); - const bool check_access_for_databases = !access->isGranted(AccessType::SHOW_TABLES); - - /// Collect a set of *MergeTree tables. - std::map> merge_tree_tables; - for (const auto & db : DatabaseCatalog::instance().getDatabases()) - { - /// Check if database can contain MergeTree tables - if (!db.second->canContainMergeTreeTables()) - continue; - - const bool check_access_for_tables = check_access_for_databases && !access->isGranted(AccessType::SHOW_TABLES, db.first); - - for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next()) - { - const auto & table = iterator->table(); - if (!table) - continue; + const bool check_access_for_tables = !access->isGranted(AccessType::SHOW_TABLES); - if (!dynamic_cast(table.get())) - continue; - - if (check_access_for_tables && !access->isGranted(AccessType::SHOW_TABLES, db.first, iterator->name())) - continue; - - merge_tree_tables[db.first][iterator->name()] = table; - } - } - - MutableColumnPtr col_source_database_export = ColumnString::create(); - MutableColumnPtr col_source_table_export = ColumnString::create(); - - for (auto & db : merge_tree_tables) + for (const auto & export_info : context->getExportsList().get()) { - for (auto & table : db.second) - { - col_source_database_export->insert(db.first); - col_source_table_export->insert(table.first); - } - } - - ColumnPtr col_source_database = std::move(col_source_database_export); - ColumnPtr col_source_table = std::move(col_source_table_export); - - /// Determine what tables are needed by the conditions in the query. - { - Block filtered_block - { - { col_source_database, std::make_shared(), "source_database" }, - { col_source_table, std::make_shared(), "source_table" }, - }; - - VirtualColumnUtils::filterBlockWithPredicate(predicate, filtered_block, context); - - if (!filtered_block.rows()) - return; - - col_source_database = filtered_block.getByName("source_database").column; - col_source_table = filtered_block.getByName("source_table").column; - } - - for (size_t i_storage = 0; i_storage < col_source_database->size(); ++i_storage) - { - auto database = (*col_source_database)[i_storage].safeGet(); - auto table = (*col_source_table)[i_storage].safeGet(); - - std::vector statuses; - { - const IStorage * storage = merge_tree_tables[database][table].get(); - if (const auto * merge_tree = dynamic_cast(storage)) - statuses = merge_tree->getExportsStatus(); - } + if (check_access_for_tables && !access->isGranted(AccessType::SHOW_TABLES, export_info.source_database, export_info.source_table)) + continue; - for (const MergeTreeExportStatus & status : statuses) - { - size_t col_num = 0; - res_columns[col_num++]->insert(status.source_database); - res_columns[col_num++]->insert(status.source_table); - res_columns[col_num++]->insert(status.destination_database); - res_columns[col_num++]->insert(status.destination_table); - res_columns[col_num++]->insert(status.create_time); - res_columns[col_num++]->insert(status.part_name); - } + size_t i = 0; + res_columns[i++]->insert(export_info.source_database); + res_columns[i++]->insert(export_info.source_table); + res_columns[i++]->insert(export_info.destination_database); + res_columns[i++]->insert(export_info.destination_table); + res_columns[i++]->insert(export_info.create_time); + res_columns[i++]->insert(export_info.part_name); + res_columns[i++]->insert(export_info.destination_file_path); + res_columns[i++]->insert(export_info.elapsed); + res_columns[i++]->insert(export_info.rows_read); + res_columns[i++]->insert(export_info.total_rows_to_read); + res_columns[i++]->insert(export_info.total_size_bytes_compressed); + res_columns[i++]->insert(export_info.total_size_bytes_uncompressed); + res_columns[i++]->insert(export_info.bytes_read_uncompressed); + res_columns[i++]->insert(export_info.memory_usage); + res_columns[i++]->insert(export_info.peak_memory_usage); } } diff --git a/src/Storages/System/StorageSystemMerges.cpp b/src/Storages/System/StorageSystemMerges.cpp index 0fca5dc84a2b..c8c569ff4696 100644 --- a/src/Storages/System/StorageSystemMerges.cpp +++ b/src/Storages/System/StorageSystemMerges.cpp @@ -1,5 +1,5 @@ -#include #include +#include #include #include diff --git a/src/Storages/System/attachSystemTables.cpp b/src/Storages/System/attachSystemTables.cpp index ad07a4231260..a0f707b70a90 100644 --- a/src/Storages/System/attachSystemTables.cpp +++ b/src/Storages/System/attachSystemTables.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -207,6 +208,7 @@ void attachSystemTablesServer(ContextPtr context, IDatabase & system_database, b attach(context, system_database, "histogram_metrics", "Contains histogram metrics which can be calculated instantly and exported in the Prometheus format. For example, the keeper response time. This table is always up to date."); attach(context, system_database, "merges", "Contains a list of merges currently executing merges of MergeTree tables and their progress. Each merge operation is represented by a single row."); attach(context, system_database, "moves", "Contains information about in-progress data part moves of MergeTree tables. Each data part movement is represented by a single row."); + attach(context, system_database, "exports", "Contains a list of exports currently executing exports of MergeTree tables and their progress. Each export operation is represented by a single row."); attach(context, system_database, "mutations", "Contains a list of mutations and their progress. Each mutation command is represented by a single row."); attachNoDescription(context, system_database, "replicas", "Contains information and status of all table replicas on current server. Each replica is represented by a single row."); attach(context, system_database, "replication_queue", "Contains information about tasks from replication queues stored in ClickHouse Keeper, or ZooKeeper, for each table replica."); diff --git a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.reference b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.reference index 75c32111f7a3..d9089d37dd99 100644 --- a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.reference +++ b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.reference @@ -1,14 +1,14 @@ ---- Export 2020_1_1_0 and 2021_2_2_0 ---- Both data parts should appear -test/s3_table_NAME/year=2020/2020_1_1_0.parquet 1 -test/s3_table_NAME/year=2020/2020_1_1_0.parquet 2 -test/s3_table_NAME/year=2020/2020_1_1_0.parquet 3 -test/s3_table_NAME/year=2021/2021_2_2_0.parquet 4 +1 2020 +2 2020 +3 2020 +4 2021 ---- Export the same part again, it should be idempotent -test/s3_table_NAME/year=2020/2020_1_1_0.parquet 1 -test/s3_table_NAME/year=2020/2020_1_1_0.parquet 2 -test/s3_table_NAME/year=2020/2020_1_1_0.parquet 3 -test/s3_table_NAME/year=2021/2021_2_2_0.parquet 4 +1 2020 +2 2020 +3 2020 +4 2021 ---- Data in roundtrip MergeTree table (should match s3_table) 1 2020 2 2020 diff --git a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.sh b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.sh index 5e886eaf4755..7b3e9d3bb3d1 100755 --- a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.sh +++ b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.sh @@ -23,16 +23,16 @@ query "ALTER TABLE $mt_table EXPORT PART '2020_1_1_0' TO TABLE $s3_table SETTING query "ALTER TABLE $mt_table EXPORT PART '2021_2_2_0' TO TABLE $s3_table SETTINGS allow_experimental_export_merge_tree_part = 1" echo "---- Both data parts should appear" -query "SELECT DISTINCT ON (id) replaceRegexpAll(_path, '$s3_table', 's3_table_NAME'), id FROM $s3_table ORDER BY id" +query "SELECT * FROM $s3_table ORDER BY id" echo "---- Export the same part again, it should be idempotent" query "ALTER TABLE $mt_table EXPORT PART '2020_1_1_0' TO TABLE $s3_table SETTINGS allow_experimental_export_merge_tree_part = 1" -query "SELECT DISTINCT ON (id) replaceRegexpAll(_path, '$s3_table', 's3_table_NAME'), id FROM $s3_table ORDER BY id" +query "SELECT * FROM $s3_table ORDER BY id" query "CREATE TABLE $mt_table_roundtrip ENGINE = MergeTree() PARTITION BY year ORDER BY tuple() AS SELECT * FROM $s3_table" echo "---- Data in roundtrip MergeTree table (should match s3_table)" -query "SELECT DISTINCT ON (id) * FROM $mt_table_roundtrip ORDER BY id" +query "SELECT * FROM $s3_table ORDER BY id" query "DROP TABLE IF EXISTS $mt_table, $s3_table, $mt_table_roundtrip"