diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java index 3b184e8e78305..7549d1bc9da0c 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java @@ -266,7 +266,7 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { writer.keyword("IF NOT EXISTS"); } tableName.unparse(writer, leftPrec, rightPrec); - if (columnList.size() > 0) { + if (columnList.size() > 0 || tableConstraints.size() > 0 || watermark != null) { SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.create("sds"), "(", ")"); for (SqlNode column : columnList) { diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index 53dc861a5efc3..ef2cb0cc5679c 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -816,6 +816,35 @@ public void testCreateTableWithLikeClause() { sql(sql).ok(expected); } + @Test + public void testCreateTableWithLikeClauseWithoutColumns() { + final String sql = + "" + + "create TEMPORARY table source_table (\n" + + " WATERMARK FOR ts AS ts - INTERVAL '5' SECOND\n" + + ") with (\n" + + " 'scan.startup.mode' = 'specific-offsets',\n" + + " 'scan.startup.specific-offsets' = 'partition:0,offset:1169129'\n" + + ") like t_order_course (\n" + + " OVERWRITING WATERMARKS\n" + + " OVERWRITING OPTIONS\n" + + " EXCLUDING CONSTRAINTS\n" + + ")"; + final String expected = + "CREATE TEMPORARY TABLE `SOURCE_TABLE` (\n" + + " WATERMARK FOR `TS` AS (`TS` - INTERVAL '5' SECOND)\n" + + ") WITH (\n" + + " 'scan.startup.mode' = 'specific-offsets',\n" + + " 'scan.startup.specific-offsets' = 'partition:0,offset:1169129'\n" + + ")\n" + + "LIKE `T_ORDER_COURSE` (\n" + + " OVERWRITING WATERMARKS\n" + + " OVERWRITING OPTIONS\n" + + " EXCLUDING CONSTRAINTS\n" + + ")"; + sql(sql).ok(expected); + } + @Test public void testCreateTemporaryTable() { final String sql = @@ -851,6 +880,25 @@ public void testCreateTableWithNoColumns() { sql(sql).ok(expected); } + @Test + public void testCreateTableWithOnlyWaterMark() { + final String sql = + "create table source_table (\n" + + " watermark FOR ts AS ts - interval '3' second\n" + + ") with (\n" + + " 'x' = 'y',\n" + + " 'abc' = 'def'\n" + + ")"; + final String expected = + "CREATE TABLE `SOURCE_TABLE` (\n" + + " WATERMARK FOR `TS` AS (`TS` - INTERVAL '3' SECOND)\n" + + ") WITH (\n" + + " 'x' = 'y',\n" + + " 'abc' = 'def'\n" + + ")"; + sql(sql).ok(expected); + } + @Test public void testDropTable() { final String sql = "DROP table catalog1.db1.tbl1";