diff --git a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py index 3bf14409e82c..a4cb0807d6ee 100644 --- a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py +++ b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py @@ -9,6 +9,63 @@ from helpers.cluster import ClickHouseCluster from helpers.network import PartitionManager + +def wait_for_export_status( + node, + mt_table: str, + s3_table: str, + partition_id: str, + expected_status: str = "COMPLETED", + timeout: int = 30, + poll_interval: float = 0.5, +): + start_time = time.time() + while time.time() - start_time < timeout: + status = node.query( + f""" + SELECT status FROM system.replicated_partition_exports + WHERE source_table = '{mt_table}' + AND destination_table = '{s3_table}' + AND partition_id = '{partition_id}' + """ + ).strip() + + if status and status == expected_status: + return status + + time.sleep(poll_interval) + + raise TimeoutError( + f"Export status did not reach '{expected_status}' within {timeout}s. ") + + +def wait_for_export_to_start( + node, + mt_table: str, + s3_table: str, + partition_id: str, + timeout: int = 10, + poll_interval: float = 0.2, +): + start_time = time.time() + while time.time() - start_time < timeout: + count = node.query( + f""" + SELECT count() FROM system.replicated_partition_exports + WHERE source_table = '{mt_table}' + AND destination_table = '{s3_table}' + AND partition_id = '{partition_id}' + """ + ).strip() + + if count != '0': + return True + + time.sleep(poll_interval) + + raise TimeoutError(f"Export did not start within {timeout}s. ") + + @pytest.fixture(scope="module") def cluster(): try: @@ -115,7 +172,8 @@ def test_restart_nodes_during_export(cluster): node.query(export_queries) # wait for the exports to start - time.sleep(3) + wait_for_export_to_start(node, mt_table, s3_table, "2020") + wait_for_export_to_start(node, mt_table, s3_table, "2021") node.stop_clickhouse(kill=True) node2.stop_clickhouse(kill=True) @@ -128,7 +186,8 @@ def test_restart_nodes_during_export(cluster): node.start_clickhouse() node2.start_clickhouse() - time.sleep(5) + wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED") + wait_for_export_status(node, mt_table, s3_table, "2021", "COMPLETED") assert node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020") != f'0\n', "Export of partition 2020 did not resume after crash" @@ -184,7 +243,7 @@ def test_kill_export(cluster): node.query(f"KILL EXPORT PARTITION WHERE partition_id = '2020' and source_table = '{mt_table}' and destination_table = '{s3_table}'") # wait for 2021 to finish - time.sleep(5) + wait_for_export_status(node, mt_table, s3_table, "2021", "COMPLETED") # checking for the commit file because maybe the data file was too fast? assert node.query(f"SELECT count() FROM s3(s3_conn, filename='{s3_table}/commit_2020_*', format=LineAsString)") == '0\n', "Partition 2020 was written to S3, it was not killed as expected" @@ -268,7 +327,8 @@ def test_concurrent_exports_to_different_targets(cluster): f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table_b}" ) - time.sleep(5) + wait_for_export_status(node, mt_table, s3_table_a, "2020", "COMPLETED") + wait_for_export_status(node, mt_table, s3_table_b, "2020", "COMPLETED") # Both targets should receive the same data independently assert node.query(f"SELECT count() FROM {s3_table_a} WHERE year = 2020") == '3\n', "First target did not receive expected rows" @@ -317,7 +377,7 @@ def test_failure_is_logged_in_system_table(cluster): ) # Wait so that the export fails - time.sleep(5) + wait_for_export_status(node, mt_table, s3_table, "2020", "FAILED", timeout=10) # Network restored; verify the export is marked as FAILED in the system table # Also verify we captured at least one exception and no commit file exists @@ -386,7 +446,7 @@ def test_inject_short_living_failures(cluster): time.sleep(5) # wait for the export to finish - time.sleep(5) + wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED") # Assert the export succeeded assert node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020") == '3\n', "Export did not succeed" @@ -419,7 +479,7 @@ def test_export_ttl(cluster): mt_table = "export_ttl_mt_table" s3_table = "export_ttl_s3_table" - expiration_time = 5 + expiration_time = 3 create_tables_and_insert_data(node, mt_table, s3_table, "replica1") @@ -431,7 +491,8 @@ def test_export_ttl(cluster): assert "Export with key" in error, "Expected error about expired export" # wait for the export to finish and for the manifest to expire - time.sleep(expiration_time) + wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED") + time.sleep(expiration_time * 2) # assert that the export succeeded, check the commit file assert node.query(f"SELECT count() FROM s3(s3_conn, filename='{s3_table}/commit_2020_*', format=LineAsString)") == '1\n', "Export did not succeed" @@ -440,7 +501,7 @@ def test_export_ttl(cluster): node.query(f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table}") # wait for the export to finish - time.sleep(expiration_time) + wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED") # assert that the export succeeded, check the commit file # there should be two commit files now, one for the first export and one for the second export @@ -474,14 +535,14 @@ def test_export_partition_file_already_exists_policy(cluster): ) == "COMPLETED\n", "Export should be marked as COMPLETED" # wait for the exports to finish - time.sleep(3) + wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED") # try to export the partition node.query( f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table} SETTINGS export_merge_tree_partition_force_export=1" ) - time.sleep(3) + wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED") assert node.query( f""" @@ -499,7 +560,7 @@ def test_export_partition_file_already_exists_policy(cluster): ) # wait for the export to finish - time.sleep(3) + wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED") # check system.replicated_partition_exports for the export # ideally we would make sure the transaction id is different, but I do not have the time to do that now @@ -520,7 +581,7 @@ def test_export_partition_file_already_exists_policy(cluster): ) # wait for the export to finish - time.sleep(3) + wait_for_export_status(node, mt_table, s3_table, "2020", "FAILED") # check system.replicated_partition_exports for the export assert node.query( @@ -603,7 +664,7 @@ def test_export_partition_permissions(cluster): ) # Wait for export to complete - time.sleep(5) + wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED") # Verify the export succeeded result = node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020") @@ -633,7 +694,8 @@ def test_multiple_exports_within_a_single_query(cluster): node.query(f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table}, EXPORT PARTITION ID '2021' TO TABLE {s3_table};") - time.sleep(5) + wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED") + wait_for_export_status(node, mt_table, s3_table, "2021", "COMPLETED") # assert the exports have been executed assert node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020") == '3\n', "Export did not succeed"