diff --git a/src/Storages/MergeTree/ExportPartTask.cpp b/src/Storages/MergeTree/ExportPartTask.cpp index 654e803f035b..cdb61350e376 100644 --- a/src/Storages/MergeTree/ExportPartTask.cpp +++ b/src/Storages/MergeTree/ExportPartTask.cpp @@ -47,9 +47,9 @@ ExportPartTask::ExportPartTask(MergeTreeData & storage_, const MergeTreePartExpo bool ExportPartTask::executeStep() { - auto metadata_snapshot = storage.getInMemoryMetadataPtr(); + const auto & metadata_snapshot = manifest.storage_snapshot->metadata; + Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical(); - StorageSnapshotPtr storage_snapshot = storage.getStorageSnapshot(metadata_snapshot, local_context); MergeTreeSequentialSourceType read_type = MergeTreeSequentialSourceType::Export; @@ -142,13 +142,8 @@ bool ExportPartTask::executeStep() bool read_with_direct_io = local_context->getSettingsRef()[Setting::min_bytes_to_use_direct_io] > manifest.data_part->getBytesOnDisk(); bool prefetch = false; - MergeTreeData::IMutationsSnapshot::Params params - { - .metadata_version = metadata_snapshot->getMetadataVersion(), - .min_part_metadata_version = manifest.data_part->getMetadataVersion(), - }; - - auto mutations_snapshot = storage.getMutationsSnapshot(params); + const auto & snapshot_data = assert_cast(*manifest.storage_snapshot->data); + auto mutations_snapshot = snapshot_data.mutations_snapshot; auto alter_conversions = MergeTreeData::getAlterConversionsForPart( manifest.data_part, @@ -161,7 +156,7 @@ bool ExportPartTask::executeStep() read_type, plan_for_part, storage, - storage_snapshot, + manifest.storage_snapshot, RangesInDataPart(manifest.data_part), alter_conversions, nullptr, diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index f889a4cc50d1..0bc0851746e6 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -6239,13 +6239,13 @@ void MergeTreeData::exportPartToTable( return ast ? ast->formatWithSecretsOneLine() : ""; }; - auto src_snapshot = getInMemoryMetadataPtr(); - auto destination_snapshot = dest_storage->getInMemoryMetadataPtr(); + auto source_metadata_ptr = getInMemoryMetadataPtr(); + auto destination_metadata_ptr = dest_storage->getInMemoryMetadataPtr(); - if (destination_snapshot->getColumns().getAllPhysical().sizeOfDifference(src_snapshot->getColumns().getAllPhysical())) + if (destination_metadata_ptr->getColumns().getAllPhysical().sizeOfDifference(source_metadata_ptr->getColumns().getAllPhysical())) throw Exception(ErrorCodes::INCOMPATIBLE_COLUMNS, "Tables have different structure"); - if (query_to_string(src_snapshot->getPartitionKeyAST()) != query_to_string(destination_snapshot->getPartitionKeyAST())) + if (query_to_string(source_metadata_ptr->getPartitionKeyAST()) != query_to_string(destination_metadata_ptr->getPartitionKeyAST())) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Tables have different partition key"); auto part = getPartIfExists(part_name, {MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated}); @@ -6262,6 +6262,7 @@ void MergeTreeData::exportPartToTable( transaction_id, query_context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value, format_settings, + getStorageSnapshot(source_metadata_ptr, query_context), completion_callback); std::lock_guard lock(export_manifests_mutex); diff --git a/src/Storages/MergeTree/MergeTreePartExportManifest.h b/src/Storages/MergeTree/MergeTreePartExportManifest.h index c89ddc0daf68..f7d1bf1f1623 100644 --- a/src/Storages/MergeTree/MergeTreePartExportManifest.h +++ b/src/Storages/MergeTree/MergeTreePartExportManifest.h @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -46,12 +47,14 @@ struct MergeTreePartExportManifest const String & transaction_id_, FileAlreadyExistsPolicy file_already_exists_policy_, const FormatSettings & format_settings_, + const StorageSnapshotPtr & storage_snapshot_, std::function completion_callback_ = {}) : destination_storage_id(destination_storage_id_), data_part(data_part_), transaction_id(transaction_id_), file_already_exists_policy(file_already_exists_policy_), format_settings(format_settings_), + storage_snapshot(storage_snapshot_), completion_callback(completion_callback_), create_time(time(nullptr)) {} @@ -62,6 +65,10 @@ struct MergeTreePartExportManifest FileAlreadyExistsPolicy file_already_exists_policy; FormatSettings format_settings; + /// Storage snapshot captured at the time of query validation to prevent race conditions with mutations + /// Otherwise the export could fail if the schema changes between validation and execution + StorageSnapshotPtr storage_snapshot; + std::function completion_callback; time_t create_time; diff --git a/tests/integration/test_export_merge_tree_part_to_object_storage/__init__.py b/tests/integration/test_export_merge_tree_part_to_object_storage/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/integration/test_export_merge_tree_part_to_object_storage/configs/named_collections.xml b/tests/integration/test_export_merge_tree_part_to_object_storage/configs/named_collections.xml new file mode 100644 index 000000000000..d46920b7ba88 --- /dev/null +++ b/tests/integration/test_export_merge_tree_part_to_object_storage/configs/named_collections.xml @@ -0,0 +1,9 @@ + + + + http://minio1:9001/root/data + minio + ClickHouse_Minio_P@ssw0rd + + + \ No newline at end of file diff --git a/tests/integration/test_export_merge_tree_part_to_object_storage/test.py b/tests/integration/test_export_merge_tree_part_to_object_storage/test.py new file mode 100644 index 000000000000..ce6b23bf4231 --- /dev/null +++ b/tests/integration/test_export_merge_tree_part_to_object_storage/test.py @@ -0,0 +1,131 @@ +import logging +import pytest +import random +import string +import time +from typing import Optional +import uuid + +from helpers.cluster import ClickHouseCluster +from helpers.network import PartitionManager + +@pytest.fixture(scope="module") +def cluster(): + try: + cluster = ClickHouseCluster(__file__) + cluster.add_instance( + "node1", + main_configs=["configs/named_collections.xml"], + with_minio=True, + ) + logging.info("Starting cluster...") + cluster.start() + yield cluster + finally: + cluster.shutdown() + + +def create_s3_table(node, s3_table): + node.query(f"CREATE TABLE {s3_table} (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='{s3_table}', format=Parquet, partition_strategy='hive') PARTITION BY year") + + +def create_tables_and_insert_data(node, mt_table, s3_table): + node.query(f"CREATE TABLE {mt_table} (id UInt64, year UInt16) ENGINE = MergeTree() PARTITION BY year ORDER BY tuple()") + node.query(f"INSERT INTO {mt_table} VALUES (1, 2020), (2, 2020), (3, 2020), (4, 2021)") + + create_s3_table(node, s3_table) + + +def test_drop_column_during_export_snapshot(cluster): + node = cluster.instances["node1"] + + mt_table = "mutations_snapshot_mt_table" + s3_table = "mutations_snapshot_s3_table" + + create_tables_and_insert_data(node, mt_table, s3_table) + + # Block traffic to/from MinIO to force upload errors and retries, following existing S3 tests style + minio_ip = cluster.minio_ip + minio_port = cluster.minio_port + + # Ensure export sees a consistent snapshot at start time even if we mutate the source later + with PartitionManager() as pm: + # Block responses from MinIO (source_port matches MinIO service) + pm_rule_reject_responses = { + "destination": node.ip_address, + "source_port": minio_port, + "action": "REJECT --reject-with tcp-reset", + } + pm._add_rule(pm_rule_reject_responses) + + # Block requests to MinIO (destination: MinIO, destination_port: minio_port) + pm_rule_reject_requests = { + "destination": minio_ip, + "destination_port": minio_port, + "action": "REJECT --reject-with tcp-reset", + } + pm._add_rule(pm_rule_reject_requests) + + # Start export of 2020 + node.query( + f"ALTER TABLE {mt_table} EXPORT PART '2020_1_1_0' TO TABLE {s3_table};" + ) + + # Drop a column that is required for the export + node.query(f"ALTER TABLE {mt_table} DROP COLUMN id") + + time.sleep(3) + # assert the mutation has been applied AND the data has not been exported yet + assert "Unknown expression identifier `id`" in node.query_and_get_error(f"SELECT id FROM {mt_table}"), "Column id is not removed" + + # Wait for export to finish and then verify destination still reflects the original snapshot (3 rows) + time.sleep(5) + assert node.query(f"SELECT count() FROM {s3_table} WHERE id >= 0") == '3\n', "Export did not preserve snapshot at start time after source mutation" + + +def test_add_column_during_export(cluster): + node = cluster.instances["node1"] + + mt_table = "add_column_during_export_mt_table" + s3_table = "add_column_during_export_s3_table" + + create_tables_and_insert_data(node, mt_table, s3_table) + + # Block traffic to/from MinIO to force upload errors and retries, following existing S3 tests style + minio_ip = cluster.minio_ip + minio_port = cluster.minio_port + + # Ensure export sees a consistent snapshot at start time even if we mutate the source later + with PartitionManager() as pm: + # Block responses from MinIO (source_port matches MinIO service) + pm_rule_reject_responses = { + "destination": node.ip_address, + "source_port": minio_port, + "action": "REJECT --reject-with tcp-reset", + } + pm._add_rule(pm_rule_reject_responses) + + # Block requests to MinIO (destination: MinIO, destination_port: minio_port) + pm_rule_reject_requests = { + "destination": minio_ip, + "destination_port": minio_port, + "action": "REJECT --reject-with tcp-reset", + } + pm._add_rule(pm_rule_reject_requests) + + # Start export of 2020 + node.query( + f"ALTER TABLE {mt_table} EXPORT PART '2020_1_1_0' TO TABLE {s3_table};" + ) + + node.query(f"ALTER TABLE {mt_table} ADD COLUMN id2 UInt64") + + time.sleep(3) + + # assert the mutation has been applied AND the data has not been exported yet + assert node.query(f"SELECT count(id2) FROM {mt_table}") == '4\n', "Column id2 is not added" + + # Wait for export to finish and then verify destination still reflects the original snapshot (3 rows) + time.sleep(5) + assert node.query(f"SELECT count() FROM {s3_table} WHERE id >= 0") == '3\n', "Export did not preserve snapshot at start time after source mutation" + assert "Unknown expression identifier `id2`" in node.query_and_get_error(f"SELECT id2 FROM {s3_table}"), "Column id2 is present in the exported data"