diff --git a/src/Storages/System/StorageSystemDDLWorkerQueue.cpp b/src/Storages/System/StorageSystemDDLWorkerQueue.cpp index 75661b50aa39..1042339e1abd 100644 --- a/src/Storages/System/StorageSystemDDLWorkerQueue.cpp +++ b/src/Storages/System/StorageSystemDDLWorkerQueue.cpp @@ -29,6 +29,11 @@ namespace Setting extern const SettingsUInt64 max_query_size; } +namespace ErrorCodes +{ + extern const int SYNTAX_ERROR; +} + enum class Status : uint8_t { INACTIVE, @@ -62,7 +67,7 @@ ColumnsDescription StorageSystemDDLWorkerQueue::getColumnsDescription() {"entry_version", std::make_shared(std::make_shared()), "Version of the entry."}, {"initiator_host", std::make_shared(std::make_shared()), "Host that initiated the DDL operation."}, {"initiator_port", std::make_shared(std::make_shared()), "Port used by the initiator."}, - {"cluster", std::make_shared(), "Cluster name."}, + {"cluster", std::make_shared(), "Cluster name, empty if not determined."}, {"query", std::make_shared(), "Query executed."}, {"settings", std::make_shared(std::make_shared(), std::make_shared()), "Settings used in the DDL operation."}, {"query_create_time", std::make_shared(), "Query created time."}, @@ -85,8 +90,23 @@ static String clusterNameFromDDLQuery(ContextPtr context, const DDLTask & task) String description = fmt::format("from {}", task.entry_path); ParserQuery parser_query(end, settings[Setting::allow_settings_after_format_in_insert]); - ASTPtr query = parseQuery( - parser_query, begin, end, description, settings[Setting::max_query_size], settings[Setting::max_parser_depth], settings[Setting::max_parser_backtracks]); + ASTPtr query; + + try + { + query = parseQuery( + parser_query, begin, end, description, settings[Setting::max_query_size], settings[Setting::max_parser_depth], settings[Setting::max_parser_backtracks]); + } + catch (const Exception & e) + { + LOG_INFO(getLogger("StorageSystemDDLWorkerQueue"), "Failed to determine cluster"); + if (e.code() == ErrorCodes::SYNTAX_ERROR) + { + /// ignore parse error and present available information + return ""; + } + throw; + } String cluster_name; if (const auto * query_on_cluster = dynamic_cast(query.get())) diff --git a/tests/integration/test_system_ddl_worker_queue/configs/remote_servers.xml b/tests/integration/test_system_ddl_worker_queue/configs/remote_servers.xml index 791af83a2d6d..f6392caf5e51 100644 --- a/tests/integration/test_system_ddl_worker_queue/configs/remote_servers.xml +++ b/tests/integration/test_system_ddl_worker_queue/configs/remote_servers.xml @@ -25,4 +25,5 @@ + 1 diff --git a/tests/integration/test_system_ddl_worker_queue/test.py b/tests/integration/test_system_ddl_worker_queue/test.py index 4659e5b92e84..1bebf709a821 100644 --- a/tests/integration/test_system_ddl_worker_queue/test.py +++ b/tests/integration/test_system_ddl_worker_queue/test.py @@ -1,4 +1,5 @@ import pytest +import time from helpers.cluster import ClickHouseCluster @@ -25,46 +26,131 @@ def started_cluster(): try: cluster.start() - for i, node in enumerate([node1, node2]): - node.query("CREATE DATABASE testdb") - node.query( - """CREATE TABLE testdb.test_table(id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/test_table1', '{}') ORDER BY id;""".format( - i - ) - ) - for i, node in enumerate([node3, node4]): - node.query("CREATE DATABASE testdb") - node.query( - """CREATE TABLE testdb.test_table(id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/test_table2', '{}') ORDER BY id;""".format( - i - ) - ) yield cluster finally: cluster.shutdown() +def maintain_test_table(test_table): + tmark = time.time() # to guarantee ZK path uniqueness + + for i, node in enumerate([node1, node2]): + node.query(f"DROP TABLE IF EXISTS testdb.{test_table} SYNC") + node.query("DROP DATABASE IF EXISTS testdb") + + node.query("CREATE DATABASE testdb") + node.query( + f"CREATE TABLE testdb.{test_table}(id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/{test_table}1-{tmark}', '{i}') ORDER BY id;" + ) + for i, node in enumerate([node3, node4]): + node.query(f"DROP TABLE IF EXISTS testdb.{test_table} SYNC") + node.query("DROP DATABASE IF EXISTS testdb") + + node.query("CREATE DATABASE testdb") + node.query( + f"CREATE TABLE testdb.{test_table}(id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/{test_table}2-{tmark}', '{i}') ORDER BY id;" + ) + + def test_distributed_ddl_queue(started_cluster): + test_table = "test_table" + maintain_test_table(test_table) node1.query( - "INSERT INTO testdb.test_table SELECT number, toString(number) FROM numbers(100)" + f"INSERT INTO testdb.{test_table} SELECT number, toString(number) FROM numbers(100)" ) node3.query( - "INSERT INTO testdb.test_table SELECT number, toString(number) FROM numbers(100)" + f"INSERT INTO testdb.{test_table} SELECT number, toString(number) FROM numbers(100)" ) - node2.query("SYSTEM SYNC REPLICA testdb.test_table") - node4.query("SYSTEM SYNC REPLICA testdb.test_table") + node2.query(f"SYSTEM SYNC REPLICA testdb.{test_table}") + node4.query(f"SYSTEM SYNC REPLICA testdb.{test_table}") node1.query( - "ALTER TABLE testdb.test_table ON CLUSTER test_cluster ADD COLUMN somecolumn UInt8 AFTER val", + f"ALTER TABLE testdb.{test_table} ON CLUSTER test_cluster ADD COLUMN somecolumn UInt8 AFTER val", settings={"replication_alter_partitions_sync": "2"}, ) for node in nodes: - node.query("SYSTEM SYNC REPLICA testdb.test_table") - assert node.query("SELECT somecolumn FROM testdb.test_table LIMIT 1") == "0\n" + node.query(f"SYSTEM SYNC REPLICA testdb.{test_table}") + assert ( + node.query(f"SELECT somecolumn FROM testdb.{test_table} LIMIT 1") == "0\n" + ) assert ( node.query( "SELECT If((SELECT count(*) FROM system.distributed_ddl_queue WHERE cluster='test_cluster' AND entry='query-0000000000') > 0, 'ok', 'fail')" ) == "ok\n" ) + + node1.query( + f"ALTER TABLE testdb.{test_table} ON CLUSTER test_cluster DROP COLUMN somecolumn", + settings={"replication_alter_partitions_sync": "2"}, + ) + + +def test_distributed_ddl_rubbish(started_cluster): + test_table = "test_table_rubbish" + maintain_test_table(test_table) + node1.query( + f"ALTER TABLE testdb.{test_table} ON CLUSTER test_cluster ADD COLUMN somenewcolumn UInt8 AFTER val", + settings={"replication_alter_partitions_sync": "2"}, + ) + + zk_content = node1.query( + "SELECT name, value, path FROM system.zookeeper WHERE path LIKE '/clickhouse/task_queue/ddl%' SETTINGS allow_unrestricted_reads_from_keeper=true", + parse=True, + ).to_dict("records") + + original_query = "" + new_query = "query-artificial-" + str(time.monotonic_ns()) + + # Copy information about query (one that added 'somenewcolumn') with new query ID + # and broken query text (TABLE => TUBLE) + for row in zk_content: + if row["value"].find("somenewcolumn") >= 0: + original_query = row["name"] + break + + rows_to_insert = [] + + for row in zk_content: + if row["name"] == original_query: + rows_to_insert.append( + { + "name": new_query, + "path": row["path"], + "value": row["value"].replace("TABLE", "TUBLE"), + } + ) + continue + pos = row["path"].find(original_query) + if pos >= 0: + rows_to_insert.append( + { + "name": row["name"], + "path": row["path"].replace(original_query, new_query), + "value": row["value"], + } + ) + + # Ingest it to ZK + for row in rows_to_insert: + node1.query( + "insert into system.zookeeper (name, path, value) values ('{}', '{}', '{}')".format( + f'{row["name"]}', f'{row["path"]}', f'{row["value"]}' + ) + ) + + # Ensure that data is visible via system.distributed_ddl_queue + assert ( + int( + node1.query( + f"SELECT count(1) FROM system.distributed_ddl_queue WHERE entry='{new_query}' AND cluster=''" + ) + ) + == 4 + ) + + node1.query( + f"ALTER TABLE testdb.{test_table} ON CLUSTER test_cluster DROP COLUMN somenewcolumn", + settings={"replication_alter_partitions_sync": "2"}, + )