Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,63 @@
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager


def wait_for_export_status(
node,
mt_table: str,
s3_table: str,
partition_id: str,
expected_status: str = "COMPLETED",
timeout: int = 30,
poll_interval: float = 0.5,
):
start_time = time.time()
while time.time() - start_time < timeout:
status = node.query(
f"""
SELECT status FROM system.replicated_partition_exports
WHERE source_table = '{mt_table}'
AND destination_table = '{s3_table}'
AND partition_id = '{partition_id}'
"""
).strip()

if status and status == expected_status:
return status

time.sleep(poll_interval)

raise TimeoutError(
f"Export status did not reach '{expected_status}' within {timeout}s. ")


def wait_for_export_to_start(
node,
mt_table: str,
s3_table: str,
partition_id: str,
timeout: int = 10,
poll_interval: float = 0.2,
):
start_time = time.time()
while time.time() - start_time < timeout:
count = node.query(
f"""
SELECT count() FROM system.replicated_partition_exports
WHERE source_table = '{mt_table}'
AND destination_table = '{s3_table}'
AND partition_id = '{partition_id}'
"""
).strip()

if count != '0':
return True

time.sleep(poll_interval)

raise TimeoutError(f"Export did not start within {timeout}s. ")


@pytest.fixture(scope="module")
def cluster():
try:
Expand Down Expand Up @@ -115,7 +172,8 @@ def test_restart_nodes_during_export(cluster):
node.query(export_queries)

# wait for the exports to start
time.sleep(3)
wait_for_export_to_start(node, mt_table, s3_table, "2020")
wait_for_export_to_start(node, mt_table, s3_table, "2021")

node.stop_clickhouse(kill=True)
node2.stop_clickhouse(kill=True)
Expand All @@ -128,7 +186,8 @@ def test_restart_nodes_during_export(cluster):
node.start_clickhouse()
node2.start_clickhouse()

time.sleep(5)
wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED")
wait_for_export_status(node, mt_table, s3_table, "2021", "COMPLETED")

assert node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020") != f'0\n', "Export of partition 2020 did not resume after crash"

Expand Down Expand Up @@ -184,7 +243,7 @@ def test_kill_export(cluster):
node.query(f"KILL EXPORT PARTITION WHERE partition_id = '2020' and source_table = '{mt_table}' and destination_table = '{s3_table}'")

# wait for 2021 to finish
time.sleep(5)
wait_for_export_status(node, mt_table, s3_table, "2021", "COMPLETED")

# checking for the commit file because maybe the data file was too fast?
assert node.query(f"SELECT count() FROM s3(s3_conn, filename='{s3_table}/commit_2020_*', format=LineAsString)") == '0\n', "Partition 2020 was written to S3, it was not killed as expected"
Expand Down Expand Up @@ -268,7 +327,8 @@ def test_concurrent_exports_to_different_targets(cluster):
f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table_b}"
)

time.sleep(5)
wait_for_export_status(node, mt_table, s3_table_a, "2020", "COMPLETED")
wait_for_export_status(node, mt_table, s3_table_b, "2020", "COMPLETED")

# Both targets should receive the same data independently
assert node.query(f"SELECT count() FROM {s3_table_a} WHERE year = 2020") == '3\n', "First target did not receive expected rows"
Expand Down Expand Up @@ -317,7 +377,7 @@ def test_failure_is_logged_in_system_table(cluster):
)

# Wait so that the export fails
time.sleep(5)
wait_for_export_status(node, mt_table, s3_table, "2020", "FAILED", timeout=10)

# Network restored; verify the export is marked as FAILED in the system table
# Also verify we captured at least one exception and no commit file exists
Expand Down Expand Up @@ -386,7 +446,7 @@ def test_inject_short_living_failures(cluster):
time.sleep(5)

# wait for the export to finish
time.sleep(5)
wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED")

# Assert the export succeeded
assert node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020") == '3\n', "Export did not succeed"
Expand Down Expand Up @@ -419,7 +479,7 @@ def test_export_ttl(cluster):
mt_table = "export_ttl_mt_table"
s3_table = "export_ttl_s3_table"

expiration_time = 5
expiration_time = 3

create_tables_and_insert_data(node, mt_table, s3_table, "replica1")

Expand All @@ -431,7 +491,8 @@ def test_export_ttl(cluster):
assert "Export with key" in error, "Expected error about expired export"

# wait for the export to finish and for the manifest to expire
time.sleep(expiration_time)
wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED")
time.sleep(expiration_time * 2)

# assert that the export succeeded, check the commit file
assert node.query(f"SELECT count() FROM s3(s3_conn, filename='{s3_table}/commit_2020_*', format=LineAsString)") == '1\n', "Export did not succeed"
Expand All @@ -440,7 +501,7 @@ def test_export_ttl(cluster):
node.query(f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table}")

# wait for the export to finish
time.sleep(expiration_time)
wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED")

# assert that the export succeeded, check the commit file
# there should be two commit files now, one for the first export and one for the second export
Expand Down Expand Up @@ -474,14 +535,14 @@ def test_export_partition_file_already_exists_policy(cluster):
) == "COMPLETED\n", "Export should be marked as COMPLETED"

# wait for the exports to finish
time.sleep(3)
wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED")

# try to export the partition
node.query(
f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table} SETTINGS export_merge_tree_partition_force_export=1"
)

time.sleep(3)
wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED")

assert node.query(
f"""
Expand All @@ -499,7 +560,7 @@ def test_export_partition_file_already_exists_policy(cluster):
)

# wait for the export to finish
time.sleep(3)
wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED")

# check system.replicated_partition_exports for the export
# ideally we would make sure the transaction id is different, but I do not have the time to do that now
Expand All @@ -520,7 +581,7 @@ def test_export_partition_file_already_exists_policy(cluster):
)

# wait for the export to finish
time.sleep(3)
wait_for_export_status(node, mt_table, s3_table, "2020", "FAILED")

# check system.replicated_partition_exports for the export
assert node.query(
Expand Down Expand Up @@ -603,7 +664,7 @@ def test_export_partition_permissions(cluster):
)

# Wait for export to complete
time.sleep(5)
wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED")

# Verify the export succeeded
result = node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020")
Expand Down Expand Up @@ -633,7 +694,8 @@ def test_multiple_exports_within_a_single_query(cluster):

node.query(f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table}, EXPORT PARTITION ID '2021' TO TABLE {s3_table};")

time.sleep(5)
wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED")
wait_for_export_status(node, mt_table, s3_table, "2021", "COMPLETED")

# assert the exports have been executed
assert node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020") == '3\n', "Export did not succeed"
Expand Down
Loading