Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 5 additions & 10 deletions src/Storages/MergeTree/ExportPartTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ ExportPartTask::ExportPartTask(MergeTreeData & storage_, const MergeTreePartExpo

bool ExportPartTask::executeStep()
{
auto metadata_snapshot = storage.getInMemoryMetadataPtr();
const auto & metadata_snapshot = manifest.storage_snapshot->metadata;

Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical();
StorageSnapshotPtr storage_snapshot = storage.getStorageSnapshot(metadata_snapshot, local_context);

MergeTreeSequentialSourceType read_type = MergeTreeSequentialSourceType::Export;

Expand Down Expand Up @@ -142,13 +142,8 @@ bool ExportPartTask::executeStep()
bool read_with_direct_io = local_context->getSettingsRef()[Setting::min_bytes_to_use_direct_io] > manifest.data_part->getBytesOnDisk();
bool prefetch = false;

MergeTreeData::IMutationsSnapshot::Params params
{
.metadata_version = metadata_snapshot->getMetadataVersion(),
.min_part_metadata_version = manifest.data_part->getMetadataVersion(),
};

auto mutations_snapshot = storage.getMutationsSnapshot(params);
const auto & snapshot_data = assert_cast<const MergeTreeData::SnapshotData &>(*manifest.storage_snapshot->data);
auto mutations_snapshot = snapshot_data.mutations_snapshot;

auto alter_conversions = MergeTreeData::getAlterConversionsForPart(
manifest.data_part,
Expand All @@ -161,7 +156,7 @@ bool ExportPartTask::executeStep()
read_type,
plan_for_part,
storage,
storage_snapshot,
manifest.storage_snapshot,
RangesInDataPart(manifest.data_part),
alter_conversions,
nullptr,
Expand Down
9 changes: 5 additions & 4 deletions src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6239,13 +6239,13 @@ void MergeTreeData::exportPartToTable(
return ast ? ast->formatWithSecretsOneLine() : "";
};

auto src_snapshot = getInMemoryMetadataPtr();
auto destination_snapshot = dest_storage->getInMemoryMetadataPtr();
auto source_metadata_ptr = getInMemoryMetadataPtr();
auto destination_metadata_ptr = dest_storage->getInMemoryMetadataPtr();

if (destination_snapshot->getColumns().getAllPhysical().sizeOfDifference(src_snapshot->getColumns().getAllPhysical()))
if (destination_metadata_ptr->getColumns().getAllPhysical().sizeOfDifference(source_metadata_ptr->getColumns().getAllPhysical()))
throw Exception(ErrorCodes::INCOMPATIBLE_COLUMNS, "Tables have different structure");

if (query_to_string(src_snapshot->getPartitionKeyAST()) != query_to_string(destination_snapshot->getPartitionKeyAST()))
if (query_to_string(source_metadata_ptr->getPartitionKeyAST()) != query_to_string(destination_metadata_ptr->getPartitionKeyAST()))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Tables have different partition key");

auto part = getPartIfExists(part_name, {MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated});
Expand All @@ -6262,6 +6262,7 @@ void MergeTreeData::exportPartToTable(
transaction_id,
query_context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value,
format_settings,
getStorageSnapshot(source_metadata_ptr, query_context),
completion_callback);

std::lock_guard lock(export_manifests_mutex);
Expand Down
7 changes: 7 additions & 0 deletions src/Storages/MergeTree/MergeTreePartExportManifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <Interpreters/StorageID.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/StorageSnapshot.h>
#include <QueryPipeline/QueryPipeline.h>
#include <optional>

Expand Down Expand Up @@ -46,12 +47,14 @@ struct MergeTreePartExportManifest
const String & transaction_id_,
FileAlreadyExistsPolicy file_already_exists_policy_,
const FormatSettings & format_settings_,
const StorageSnapshotPtr & storage_snapshot_,
std::function<void(CompletionCallbackResult)> completion_callback_ = {})
: destination_storage_id(destination_storage_id_),
data_part(data_part_),
transaction_id(transaction_id_),
file_already_exists_policy(file_already_exists_policy_),
format_settings(format_settings_),
storage_snapshot(storage_snapshot_),
completion_callback(completion_callback_),
create_time(time(nullptr)) {}

Expand All @@ -62,6 +65,10 @@ struct MergeTreePartExportManifest
FileAlreadyExistsPolicy file_already_exists_policy;
FormatSettings format_settings;

/// Storage snapshot captured at the time of query validation to prevent race conditions with mutations
/// Otherwise the export could fail if the schema changes between validation and execution
StorageSnapshotPtr storage_snapshot;

std::function<void(CompletionCallbackResult)> completion_callback;

time_t create_time;
Expand Down
Loading