diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index e9cf056b1135..6e7dcb2cec6f 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -213,6 +213,7 @@ namespace Setting extern const SettingsUInt64 min_bytes_to_use_direct_io; extern const SettingsBool export_merge_tree_part_overwrite_file_if_exists; extern const SettingsBool output_format_parallel_formatting; + extern const SettingsBool output_format_parquet_parallel_encoding; } namespace MergeTreeSetting @@ -6244,7 +6245,9 @@ void MergeTreeData::exportPartToTable(const PartitionCommand & command, ContextP dest_storage->getStorageID(), part, query_context->getSettingsRef()[Setting::export_merge_tree_part_overwrite_file_if_exists], - query_context->getSettingsRef()[Setting::output_format_parallel_formatting]); + query_context->getSettingsRef()[Setting::output_format_parallel_formatting], + query_context->getSettingsRef()[Setting::output_format_parquet_parallel_encoding], + query_context->getSettingsRef()[Setting::max_threads]); std::lock_guard lock(export_manifests_mutex); @@ -6292,6 +6295,8 @@ void MergeTreeData::exportPartToTableImpl( { auto context_copy = Context::createCopy(local_context); context_copy->setSetting("output_format_parallel_formatting", manifest.parallel_formatting); + context_copy->setSetting("output_format_parquet_parallel_encoding", manifest.parquet_parallel_encoding); + context_copy->setSetting("max_threads", manifest.max_threads); sink = destination_storage->import( manifest.data_part->name + "_" + manifest.data_part->checksums.getTotalChecksumHex(), diff --git a/src/Storages/MergeTree/MergeTreeExportManifest.h b/src/Storages/MergeTree/MergeTreeExportManifest.h index 36831fd132ba..5e3d264f47eb 100644 --- a/src/Storages/MergeTree/MergeTreeExportManifest.h +++ b/src/Storages/MergeTree/MergeTreeExportManifest.h @@ -13,17 +13,24 @@ struct MergeTreeExportManifest const StorageID & destination_storage_id_, const DataPartPtr & data_part_, bool overwrite_file_if_exists_, - bool parallel_formatting_) + bool parallel_formatting_, + bool parallel_formatting_parquet_, + std::size_t max_threads_) : destination_storage_id(destination_storage_id_), data_part(data_part_), overwrite_file_if_exists(overwrite_file_if_exists_), parallel_formatting(parallel_formatting_), + parquet_parallel_encoding(parallel_formatting_parquet_), + max_threads(max_threads_), create_time(time(nullptr)) {} StorageID destination_storage_id; DataPartPtr data_part; bool overwrite_file_if_exists; bool parallel_formatting; + /// parquet has a different setting for parallel formatting + bool parquet_parallel_encoding; + std::size_t max_threads; time_t create_time; mutable bool in_progress = false; diff --git a/src/Storages/MergeTree/MergeTreeSequentialSource.cpp b/src/Storages/MergeTree/MergeTreeSequentialSource.cpp index cc43895f0c76..56613657e68a 100644 --- a/src/Storages/MergeTree/MergeTreeSequentialSource.cpp +++ b/src/Storages/MergeTree/MergeTreeSequentialSource.cpp @@ -169,7 +169,8 @@ MergeTreeSequentialSource::MergeTreeSequentialSource( addThrottler(read_settings.local_throttler, context->getMergesThrottler()); break; case Export: - read_settings.local_throttler = context->getExportsThrottler(); + addThrottler(read_settings.local_throttler, context->getExportsThrottler()); + addThrottler(read_settings.remote_throttler, context->getExportsThrottler()); break; } diff --git a/src/Storages/ObjectStorage/ObjectStorageFilePathGenerator.h b/src/Storages/ObjectStorage/ObjectStorageFilePathGenerator.h index a7e3b102e3e7..a66965984bb0 100644 --- a/src/Storages/ObjectStorage/ObjectStorageFilePathGenerator.h +++ b/src/Storages/ObjectStorage/ObjectStorageFilePathGenerator.h @@ -55,7 +55,7 @@ namespace DB result += raw_path; - if (raw_path.back() != '/') + if (!raw_path.empty() && raw_path.back() != '/') { result += "/"; } diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index 377a1f5e9beb..711ef86664f0 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -507,7 +508,7 @@ SinkToStoragePtr StorageObjectStorage::import( destination_file_path, object_storage, configuration, - format_settings, + std::nullopt, /// passing nullopt to force rebuild for format_settings based on query context std::make_shared(getInMemoryMetadataPtr()->getSampleBlock()), local_context); }