Skip to content
7 changes: 6 additions & 1 deletion src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ namespace Setting
extern const SettingsUInt64 min_bytes_to_use_direct_io;
extern const SettingsBool export_merge_tree_part_overwrite_file_if_exists;
extern const SettingsBool output_format_parallel_formatting;
extern const SettingsBool output_format_parquet_parallel_encoding;
}

namespace MergeTreeSetting
Expand Down Expand Up @@ -6244,7 +6245,9 @@ void MergeTreeData::exportPartToTable(const PartitionCommand & command, ContextP
dest_storage->getStorageID(),
part,
query_context->getSettingsRef()[Setting::export_merge_tree_part_overwrite_file_if_exists],
query_context->getSettingsRef()[Setting::output_format_parallel_formatting]);
query_context->getSettingsRef()[Setting::output_format_parallel_formatting],
query_context->getSettingsRef()[Setting::output_format_parquet_parallel_encoding],
query_context->getSettingsRef()[Setting::max_threads]);

std::lock_guard lock(export_manifests_mutex);

Expand Down Expand Up @@ -6292,6 +6295,8 @@ void MergeTreeData::exportPartToTableImpl(
{
auto context_copy = Context::createCopy(local_context);
context_copy->setSetting("output_format_parallel_formatting", manifest.parallel_formatting);
context_copy->setSetting("output_format_parquet_parallel_encoding", manifest.parquet_parallel_encoding);
context_copy->setSetting("max_threads", manifest.max_threads);

sink = destination_storage->import(
manifest.data_part->name + "_" + manifest.data_part->checksums.getTotalChecksumHex(),
Expand Down
9 changes: 8 additions & 1 deletion src/Storages/MergeTree/MergeTreeExportManifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,24 @@ struct MergeTreeExportManifest
const StorageID & destination_storage_id_,
const DataPartPtr & data_part_,
bool overwrite_file_if_exists_,
bool parallel_formatting_)
bool parallel_formatting_,
bool parallel_formatting_parquet_,
std::size_t max_threads_)
: destination_storage_id(destination_storage_id_),
data_part(data_part_),
overwrite_file_if_exists(overwrite_file_if_exists_),
parallel_formatting(parallel_formatting_),
parquet_parallel_encoding(parallel_formatting_parquet_),
max_threads(max_threads_),
create_time(time(nullptr)) {}

StorageID destination_storage_id;
DataPartPtr data_part;
bool overwrite_file_if_exists;
bool parallel_formatting;
/// parquet has a different setting for parallel formatting
bool parquet_parallel_encoding;
std::size_t max_threads;

time_t create_time;
mutable bool in_progress = false;
Expand Down
3 changes: 2 additions & 1 deletion src/Storages/MergeTree/MergeTreeSequentialSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
addThrottler(read_settings.local_throttler, context->getMergesThrottler());
break;
case Export:
read_settings.local_throttler = context->getExportsThrottler();
addThrottler(read_settings.local_throttler, context->getExportsThrottler());
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merge glitch, only found it now

addThrottler(read_settings.remote_throttler, context->getExportsThrottler());
break;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ namespace DB

result += raw_path;

if (raw_path.back() != '/')
if (!raw_path.empty() && raw_path.back() != '/')
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated but required

{
result += "/";
}
Expand Down
3 changes: 2 additions & 1 deletion src/Storages/ObjectStorage/StorageObjectStorage.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <optional>
#include <thread>
#include <Core/ColumnWithTypeAndName.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
Expand Down Expand Up @@ -507,7 +508,7 @@ SinkToStoragePtr StorageObjectStorage::import(
destination_file_path,
object_storage,
configuration,
format_settings,
std::nullopt, /// passing nullopt to force rebuild for format_settings based on query context
std::make_shared<const Block>(getInMemoryMetadataPtr()->getSampleBlock()),
local_context);
}
Expand Down
Loading