Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 5 additions & 10 deletions src/Storages/MergeTree/ExportPartTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ ExportPartTask::ExportPartTask(MergeTreeData & storage_, const MergeTreePartExpo

bool ExportPartTask::executeStep()
{
auto metadata_snapshot = storage.getInMemoryMetadataPtr();
const auto & metadata_snapshot = manifest.storage_snapshot->metadata;

Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical();
StorageSnapshotPtr storage_snapshot = storage.getStorageSnapshot(metadata_snapshot, local_context);

MergeTreeSequentialSourceType read_type = MergeTreeSequentialSourceType::Export;

Expand Down Expand Up @@ -142,13 +142,8 @@ bool ExportPartTask::executeStep()
bool read_with_direct_io = local_context->getSettingsRef()[Setting::min_bytes_to_use_direct_io] > manifest.data_part->getBytesOnDisk();
bool prefetch = false;

MergeTreeData::IMutationsSnapshot::Params params
{
.metadata_version = metadata_snapshot->getMetadataVersion(),
.min_part_metadata_version = manifest.data_part->getMetadataVersion(),
};

auto mutations_snapshot = storage.getMutationsSnapshot(params);
const auto & snapshot_data = assert_cast<const MergeTreeData::SnapshotData &>(*manifest.storage_snapshot->data);
auto mutations_snapshot = snapshot_data.mutations_snapshot;

auto alter_conversions = MergeTreeData::getAlterConversionsForPart(
manifest.data_part,
Expand All @@ -161,7 +156,7 @@ bool ExportPartTask::executeStep()
read_type,
plan_for_part,
storage,
storage_snapshot,
manifest.storage_snapshot,
RangesInDataPart(manifest.data_part),
alter_conversions,
nullptr,
Expand Down
9 changes: 5 additions & 4 deletions src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6239,13 +6239,13 @@ void MergeTreeData::exportPartToTable(
return ast ? ast->formatWithSecretsOneLine() : "";
};

auto src_snapshot = getInMemoryMetadataPtr();
auto destination_snapshot = dest_storage->getInMemoryMetadataPtr();
auto source_metadata_ptr = getInMemoryMetadataPtr();
auto destination_metadata_ptr = dest_storage->getInMemoryMetadataPtr();

if (destination_snapshot->getColumns().getAllPhysical().sizeOfDifference(src_snapshot->getColumns().getAllPhysical()))
if (destination_metadata_ptr->getColumns().getAllPhysical().sizeOfDifference(source_metadata_ptr->getColumns().getAllPhysical()))
throw Exception(ErrorCodes::INCOMPATIBLE_COLUMNS, "Tables have different structure");

if (query_to_string(src_snapshot->getPartitionKeyAST()) != query_to_string(destination_snapshot->getPartitionKeyAST()))
if (query_to_string(source_metadata_ptr->getPartitionKeyAST()) != query_to_string(destination_metadata_ptr->getPartitionKeyAST()))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Tables have different partition key");

auto part = getPartIfExists(part_name, {MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated});
Expand All @@ -6262,6 +6262,7 @@ void MergeTreeData::exportPartToTable(
transaction_id,
query_context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value,
format_settings,
getStorageSnapshot(source_metadata_ptr, query_context),
completion_callback);

std::lock_guard lock(export_manifests_mutex);
Expand Down
7 changes: 7 additions & 0 deletions src/Storages/MergeTree/MergeTreePartExportManifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <Interpreters/StorageID.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/StorageSnapshot.h>
#include <QueryPipeline/QueryPipeline.h>
#include <optional>

Expand Down Expand Up @@ -46,12 +47,14 @@ struct MergeTreePartExportManifest
const String & transaction_id_,
FileAlreadyExistsPolicy file_already_exists_policy_,
const FormatSettings & format_settings_,
const StorageSnapshotPtr & storage_snapshot_,
std::function<void(CompletionCallbackResult)> completion_callback_ = {})
: destination_storage_id(destination_storage_id_),
data_part(data_part_),
transaction_id(transaction_id_),
file_already_exists_policy(file_already_exists_policy_),
format_settings(format_settings_),
storage_snapshot(storage_snapshot_),
completion_callback(completion_callback_),
create_time(time(nullptr)) {}

Expand All @@ -62,6 +65,10 @@ struct MergeTreePartExportManifest
FileAlreadyExistsPolicy file_already_exists_policy;
FormatSettings format_settings;

/// Storage snapshot captured at the time of query validation to prevent race conditions with mutations
/// Otherwise the export could fail if the schema changes between validation and execution
StorageSnapshotPtr storage_snapshot;

std::function<void(CompletionCallbackResult)> completion_callback;

time_t create_time;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<clickhouse>
<named_collections>
<s3_conn>
<url>http://minio1:9001/root/data</url>
<access_key_id>minio</access_key_id>
<secret_access_key>ClickHouse_Minio_P@ssw0rd</secret_access_key>
</s3_conn>
</named_collections>
</clickhouse>
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import logging
import pytest
import random
import string
import time
from typing import Optional
import uuid

from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager

@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"node1",
main_configs=["configs/named_collections.xml"],
with_minio=True,
)
logging.info("Starting cluster...")
cluster.start()
yield cluster
finally:
cluster.shutdown()


def create_s3_table(node, s3_table):
node.query(f"CREATE TABLE {s3_table} (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='{s3_table}', format=Parquet, partition_strategy='hive') PARTITION BY year")


def create_tables_and_insert_data(node, mt_table, s3_table):
node.query(f"CREATE TABLE {mt_table} (id UInt64, year UInt16) ENGINE = MergeTree() PARTITION BY year ORDER BY tuple()")
node.query(f"INSERT INTO {mt_table} VALUES (1, 2020), (2, 2020), (3, 2020), (4, 2021)")

create_s3_table(node, s3_table)


def test_drop_column_during_export_snapshot(cluster):
node = cluster.instances["node1"]

mt_table = "mutations_snapshot_mt_table"
s3_table = "mutations_snapshot_s3_table"

create_tables_and_insert_data(node, mt_table, s3_table)

# Block traffic to/from MinIO to force upload errors and retries, following existing S3 tests style
minio_ip = cluster.minio_ip
minio_port = cluster.minio_port

# Ensure export sees a consistent snapshot at start time even if we mutate the source later
with PartitionManager() as pm:
# Block responses from MinIO (source_port matches MinIO service)
pm_rule_reject_responses = {
"destination": node.ip_address,
"source_port": minio_port,
"action": "REJECT --reject-with tcp-reset",
}
pm._add_rule(pm_rule_reject_responses)

# Block requests to MinIO (destination: MinIO, destination_port: minio_port)
pm_rule_reject_requests = {
"destination": minio_ip,
"destination_port": minio_port,
"action": "REJECT --reject-with tcp-reset",
}
pm._add_rule(pm_rule_reject_requests)

# Start export of 2020
node.query(
f"ALTER TABLE {mt_table} EXPORT PART '2020_1_1_0' TO TABLE {s3_table};"
)

# Drop a column that is required for the export
node.query(f"ALTER TABLE {mt_table} DROP COLUMN id")

time.sleep(3)
# assert the mutation has been applied AND the data has not been exported yet
assert "Unknown expression identifier `id`" in node.query_and_get_error(f"SELECT id FROM {mt_table}"), "Column id is not removed"

# Wait for export to finish and then verify destination still reflects the original snapshot (3 rows)
time.sleep(5)
assert node.query(f"SELECT count() FROM {s3_table} WHERE id >= 0") == '3\n', "Export did not preserve snapshot at start time after source mutation"


def test_add_column_during_export(cluster):
node = cluster.instances["node1"]

mt_table = "add_column_during_export_mt_table"
s3_table = "add_column_during_export_s3_table"

create_tables_and_insert_data(node, mt_table, s3_table)

# Block traffic to/from MinIO to force upload errors and retries, following existing S3 tests style
minio_ip = cluster.minio_ip
minio_port = cluster.minio_port

# Ensure export sees a consistent snapshot at start time even if we mutate the source later
with PartitionManager() as pm:
# Block responses from MinIO (source_port matches MinIO service)
pm_rule_reject_responses = {
"destination": node.ip_address,
"source_port": minio_port,
"action": "REJECT --reject-with tcp-reset",
}
pm._add_rule(pm_rule_reject_responses)

# Block requests to MinIO (destination: MinIO, destination_port: minio_port)
pm_rule_reject_requests = {
"destination": minio_ip,
"destination_port": minio_port,
"action": "REJECT --reject-with tcp-reset",
}
pm._add_rule(pm_rule_reject_requests)

# Start export of 2020
node.query(
f"ALTER TABLE {mt_table} EXPORT PART '2020_1_1_0' TO TABLE {s3_table};"
)

node.query(f"ALTER TABLE {mt_table} ADD COLUMN id2 UInt64")

time.sleep(3)

# assert the mutation has been applied AND the data has not been exported yet
assert node.query(f"SELECT count(id2) FROM {mt_table}") == '4\n', "Column id2 is not added"

# Wait for export to finish and then verify destination still reflects the original snapshot (3 rows)
time.sleep(5)
assert node.query(f"SELECT count() FROM {s3_table} WHERE id >= 0") == '3\n', "Export did not preserve snapshot at start time after source mutation"
assert "Unknown expression identifier `id2`" in node.query_and_get_error(f"SELECT id2 FROM {s3_table}"), "Column id2 is present in the exported data"
Loading