diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml index 30b6eb6f83eb5..6b7caa159c205 100644 --- a/docs/data/sql_functions.yml +++ b/docs/data/sql_functions.yml @@ -417,10 +417,10 @@ temporal: description: Creates an interval of NUMERIC milliseconds. - sql: LOCALTIME table: localTime() - description: Returns the current SQL time in the local time zone. It is evaluated for each record in streaming mode. But in batch mode, it is evaluated once as the query starts and uses the same result for every row. + description: Returns the current SQL time in the local time zone, the return type is TIME(0). It is evaluated for each record in streaming mode. But in batch mode, it is evaluated once as the query starts and uses the same result for every row. - sql: LOCALTIMESTAMP table: localTimestamp() - description: Returns the current SQL timestamp in local time zone, the return type is TIMESTAMP WITHOUT ITME ZONE. It is evaluated for each record in streaming mode. But in batch mode, it is evaluated once as the query starts and uses the same result for every row. + description: Returns the current SQL timestamp in local time zone, the return type is TIMESTAMP(3). It is evaluated for each record in streaming mode. But in batch mode, it is evaluated once as the query starts and uses the same result for every row. - sql: CURRENT_TIME table: currentTime() description: Returns the current SQL time in the local time zone, this is a synonym of LOCAL_TIME. @@ -429,11 +429,11 @@ temporal: description: Returns the current SQL date in the local time zone. It is evaluated for each record in streaming mode. But in batch mode, it is evaluated once as the query starts and uses the same result for every row. - sql: CURRENT_TIMESTAMP table: currentTimestamp() - description: Returns the current SQL timestamp in the local time zone, the return type is TIMESTAMP WITH LOCAL TIME ZONE. It is evaluated for each record in streaming mode. But in batch mode, it is evaluated once as the query starts and uses the same result for every row. + description: Returns the current SQL timestamp in the local time zone, the return type is TIMESTAMP_LTZ(3). It is evaluated for each record in streaming mode. But in batch mode, it is evaluated once as the query starts and uses the same result for every row. - sql: NOW() description: Returns the current SQL timestamp in the local time zone, this is a synonym of CURRENT_TIMESTAMP. - sql: CURRENT_ROW_TIMESTAMP() - description: Returns the current SQL timestamp in the local time zone, the return type is TIMESTAMP WITH LOCAL TIME ZONE. It is evaluated for each record no matter in batch or streaming mode. + description: Returns the current SQL timestamp in the local time zone, the return type is TIMESTAMP_LTZ(3). It is evaluated for each record no matter in batch or streaming mode. - sql: EXTRACT(timeinteravlunit FROM temporal) table: TEMPORAL.extract(TIMEINTERVALUNIT) description: Returns a long value extracted from the timeintervalunit part of temporal. E.g., EXTRACT(DAY FROM DATE '2006-06-05') returns 5. diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/SqlFunctionConverter.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/SqlFunctionConverter.java index 0f2066294025a..5465ebfa5914c 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/SqlFunctionConverter.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/SqlFunctionConverter.java @@ -21,7 +21,7 @@ import org.apache.flink.connectors.hive.FlinkHiveException; import org.apache.flink.table.planner.delegation.hive.copy.HiveParserBetween; import org.apache.flink.table.planner.delegation.hive.copy.HiveParserSqlFunctionConverter; -import org.apache.flink.table.planner.functions.sql.SqlCurrentTimestampFunction; +import org.apache.flink.table.planner.functions.sql.FlinkSqlTimestampFunction; import org.apache.flink.util.Preconditions; import org.apache.calcite.plan.RelOptCluster; @@ -90,7 +90,7 @@ public RexNode visitCall(RexCall call) { RelDataType type = call.getType(); return builder.makeCall(type, convertedOp, visitList(operands, update)); } else { - if (convertedOp instanceof SqlCurrentTimestampFunction) { + if (convertedOp instanceof FlinkSqlTimestampFunction) { // flink's current_timestamp has different type from hive's, convert it to a literal Timestamp currentTS = ((HiveParser.HiveParserSessionState) SessionState.get()) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java index 5aa6015ff62ef..5b008ccd9608d 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java @@ -222,10 +222,13 @@ private Column validateTimeColumn(String columnName, List columns) { columns.stream().map(Column::getName).collect(Collectors.toList()))); } final LogicalType timeFieldType = timeColumn.get().getDataType().getLogicalType(); - if (!canBeTimeAttributeType(timeFieldType) || getPrecision(timeFieldType) != 3) { + if (!canBeTimeAttributeType(timeFieldType) || getPrecision(timeFieldType) > 3) { throw new ValidationException( - "Invalid data type of time field for watermark definition. " - + "The field must be of type TIMESTAMP(3) or TIMESTAMP_LTZ(3)."); + String.format( + "Invalid data type of time field for watermark definition. " + + "The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p)," + + " the supported precision 'p' is from 0 to 3, but the time field type is %s", + timeFieldType)); } if (isProctimeAttribute(timeFieldType)) { throw new ValidationException( @@ -235,10 +238,13 @@ private Column validateTimeColumn(String columnName, List columns) { } private void validateWatermarkExpression(LogicalType watermarkType) { - if (!canBeTimeAttributeType(watermarkType) || getPrecision(watermarkType) != 3) { + if (!canBeTimeAttributeType(watermarkType) || getPrecision(watermarkType) > 3) { throw new ValidationException( - "Invalid data type of expression for watermark definition. " - + "The field must be of type TIMESTAMP(3) or TIMESTAMP_LTZ(3)."); + String.format( + "Invalid data type of expression for watermark definition. " + + "The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p)," + + " the supported precision 'p' is from 0 to 3, but the watermark expression type is %s", + watermarkType)); } } @@ -277,7 +283,8 @@ private Column adjustRowtimeAttribute(List watermarkSpecs, Column default: throw new ValidationException( "Invalid data type of expression for rowtime definition. " - + "The field must be of type TIMESTAMP(3) or TIMESTAMP_LTZ(3)."); + + "The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p)," + + " the supported precision 'p' is from 0 to 3."); } } return column; diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java index 0dfb68017fbd3..aeb5bda6a242e 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java @@ -202,7 +202,8 @@ public void testSchemaResolutionErrors() { .column("ts", DataTypes.BOOLEAN()) .watermark("ts", callSql(WATERMARK_SQL)) .build(), - "Invalid data type of time field for watermark definition. The field must be of type TIMESTAMP(3) or TIMESTAMP_LTZ(3)."); + "Invalid data type of time field for watermark definition." + + " The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p), the supported precision 'p' is from 0 to 3, but the time field type is BOOLEAN"); testError( Schema.newBuilder() diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java index 48ff9e617c2d8..891bfa0757de7 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java @@ -46,9 +46,11 @@ import javax.annotation.Nullable; import java.io.PrintWriter; +import java.sql.Time; import java.sql.Timestamp; import java.time.Instant; import java.time.LocalDateTime; +import java.time.LocalTime; import java.time.ZoneId; import java.util.ArrayList; import java.util.Arrays; @@ -59,7 +61,10 @@ import java.util.stream.Stream; import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; +import static org.apache.flink.table.utils.TimestampStringUtils.localTimeToUnixDate; +import static org.apache.flink.table.utils.TimestampStringUtils.timeToInternal; import static org.apache.flink.table.utils.TimestampStringUtils.timestampToString; +import static org.apache.flink.table.utils.TimestampStringUtils.unixTimeToString; /** Utilities for print formatting. */ @Internal @@ -238,7 +243,7 @@ public static String[] rowToString( } /** - * Normalizes field that contains TIMESTAMP and TIMESTAMP_LTZ type data. + * Normalizes field that contains TIMESTAMP, TIMESTAMP_LTZ and TIME type data. * *

This method also supports nested type ARRAY, ROW, MAP. */ @@ -252,6 +257,8 @@ private static Object formattedTimestamp( case TIMESTAMP_WITHOUT_TIME_ZONE: case TIMESTAMP_WITH_LOCAL_TIME_ZONE: return formatTimestampField(field, fieldType, sessionTimeZone); + case TIME_WITHOUT_TIME_ZONE: + return formatTimeField(field); case ARRAY: LogicalType elementType = ((ArrayType) fieldType).getElementType(); if (field instanceof List) { @@ -453,6 +460,21 @@ private static Object formatTimestampField( } } + /** Formats the print content of TIME type data. */ + private static Object formatTimeField(Object timeField) { + if (timeField.getClass().isAssignableFrom(int.class) || timeField instanceof Integer) { + return unixTimeToString((int) timeField); + } else if (timeField.getClass().isAssignableFrom(long.class) || timeField instanceof Long) { + return unixTimeToString(((Long) timeField).intValue()); + } else if (timeField instanceof Time) { + return unixTimeToString(timeToInternal((Time) timeField)); + } else if (timeField instanceof LocalTime) { + return unixTimeToString(localTimeToUnixDate((LocalTime) timeField)); + } else { + return timeField; + } + } + public static String genBorderLine(int[] colWidths) { StringBuilder sb = new StringBuilder(); sb.append("+"); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TimestampStringUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TimestampStringUtils.java index ce411105c2af9..abde5b836358a 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TimestampStringUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TimestampStringUtils.java @@ -21,13 +21,27 @@ import org.apache.flink.annotation.Internal; import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.TimeZone; -/** Utils to represent a LocalDateTime to String, considered the precision. */ +/** + * Utils to represent a LocalDateTime to String, considered the precision. + * + *

TODO://This class keep same SQL formats with {@code + * org.apache.flink.table.runtime.functions.SqlDateTimeUtils} which used in Flink SQL codegen, The + * two utils will be unified once FLINK-21456 finished. + */ @Internal public class TimestampStringUtils { - // TODO this method is copied from org.apache.flink.table.runtime.functions.SqlDateTimeUtils, - // we can refactor these utils in the future + private static final long MILLIS_PER_SECOND = 1000L; + private static final long MILLIS_PER_MINUTE = 60000L; + private static final long MILLIS_PER_HOUR = 3600000L; + private static final long MILLIS_PER_DAY = 86400000L; + + /** The local time zone, used to deal {@link java.sql.Time} value. */ + private static final TimeZone LOCAL_TZ = TimeZone.getDefault(); + public static String timestampToString(LocalDateTime ldt, int precision) { String fraction = pad(9, (long) ldt.getNano()); while (fraction.length() > precision && fraction.endsWith("0")) { @@ -99,4 +113,61 @@ private static void int2(StringBuilder buf, int i) { buf.append((char) ('0' + (i / 10) % 10)); buf.append((char) ('0' + i % 10)); } + + /** + * Cast TIME type value to VARCHAR(N), we use same SQL format with codegen in + * org.apache.flink.table.runtime.functions.SqlDateTimeUtils. + */ + public static String unixTimeToString(int time) { + final StringBuilder buf = new StringBuilder(8); + unixTimeToString(buf, time, 0); // set milli second precision to 0 + return buf.toString(); + } + + private static void unixTimeToString(StringBuilder buf, int time, int precision) { + // we copy this method from Calcite DateTimeUtils but add the following changes + // time may be negative which means time milli seconds before 00:00:00 + // this maybe a bug in calcite avatica + while (time < 0) { + time += MILLIS_PER_DAY; + } + int h = time / 3600000; + int time2 = time % 3600000; + int m = time2 / 60000; + int time3 = time2 % 60000; + int s = time3 / 1000; + int ms = time3 % 1000; + int2(buf, h); + buf.append(':'); + int2(buf, m); + buf.append(':'); + int2(buf, s); + if (precision > 0) { + buf.append('.'); + while (precision > 0) { + buf.append((char) ('0' + (ms / 100))); + ms = ms % 100; + ms = ms * 10; + + // keep consistent with Timestamp.toString() + if (ms == 0) { + break; + } + + --precision; + } + } + } + + public static int timeToInternal(java.sql.Time time) { + long ts = time.getTime() + LOCAL_TZ.getOffset(time.getTime()); + return (int) (ts % MILLIS_PER_DAY); + } + + public static int localTimeToUnixDate(LocalTime time) { + return time.getHour() * (int) MILLIS_PER_HOUR + + time.getMinute() * (int) MILLIS_PER_MINUTE + + time.getSecond() * (int) MILLIS_PER_SECOND + + time.getNano() / 1000_000; + } } diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/PrintUtilsTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/PrintUtilsTest.java index 0fe823fb390b8..e883944f71e91 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/PrintUtilsTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/PrintUtilsTest.java @@ -50,7 +50,7 @@ public class PrintUtilsTest { @Test public void testArrayToString() { - Row row = new Row(6); + Row row = new Row(7); row.setField(0, new int[] {1, 2}); row.setField(1, new Integer[] {3, 4}); row.setField(2, new Object[] {new int[] {5, 6}, new int[] {7, 8}}); @@ -67,6 +67,7 @@ public void testArrayToString() { new Instant[] {Instant.ofEpochMilli(1), Instant.ofEpochMilli(10)}, new Instant[] {Instant.ofEpochSecond(1), Instant.ofEpochSecond(10)} }); + row.setField(6, new int[] {1123, 2123}); ResolvedSchema resolvedSchema = ResolvedSchema.of( @@ -81,18 +82,20 @@ public void testArrayToString() { Column.physical( "f5", DataTypes.ARRAY( - DataTypes.ARRAY(DataTypes.TIMESTAMP_LTZ(3)))))); + DataTypes.ARRAY(DataTypes.TIMESTAMP_LTZ(3)))), + Column.physical("f6", DataTypes.ARRAY(DataTypes.TIME())))); assertEquals( "[[1, 2], [3, 4], [[5, 6], [7, 8]], [[9, 10], [11, 12]]," + " [2021-04-18 18:00:00.123456, 2021-04-18 18:00:00.000001]," + " [[1970-01-01 00:00:00.001, 1970-01-01 00:00:00.010]," - + " [1970-01-01 00:00:01.000, 1970-01-01 00:00:10.000]]]", + + " [1970-01-01 00:00:01.000, 1970-01-01 00:00:10.000]]," + + " [00:00:01, 00:00:02]]", Arrays.toString(PrintUtils.rowToString(row, resolvedSchema, UTC_ZONE_ID))); } @Test public void testNestedRowToString() { - Row row = new Row(3); + Row row = new Row(4); row.setField(0, new int[] {1, 2}); Row row1 = new Row(4); row1.setField(0, "hello"); @@ -111,6 +114,7 @@ public void testNestedRowToString() { new int[] {1, 10}, new int[] {2, 20} }); + row.setField(3, new Integer[] {3000, 4000}); ResolvedSchema resolvedSchema = ResolvedSchema.of( @@ -124,11 +128,13 @@ public void testNestedRowToString() { DataTypes.ARRAY(DataTypes.TIMESTAMP(6)), DataTypes.ARRAY(DataTypes.TIMESTAMP_LTZ(6)))), Column.physical( - "f2", DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT()))))); + "f2", DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT()))), + Column.physical("f3", DataTypes.ARRAY(DataTypes.TIME())))); assertEquals( "[[1, 2], +I[hello, [true, false]," + " [2021-04-18 18:00:00.123456, 2021-04-18 18:00:00.000001]," - + " [1970-01-01 00:00:00.100000, 1970-01-01 00:00:00.200000]], [[1, 10], [2, 20]]]", + + " [1970-01-01 00:00:00.100000, 1970-01-01 00:00:00.200000]], [[1, 10], [2, 20]]," + + " [00:00:03, 00:00:04]]", Arrays.toString(PrintUtils.rowToString(row, resolvedSchema, UTC_ZONE_ID))); } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java index 60c7bb32b5048..5cb3e8746a19d 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java @@ -561,19 +561,24 @@ public void lookupOperatorOverloads( OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.ANY), SqlFunctionCategory.STRING); + // Flink timestamp functions + public static final SqlFunction LOCALTIMESTAMP = + new FlinkSqlTimestampFunction("LOCALTIMESTAMP", SqlTypeName.TIMESTAMP, 3); + + public static final SqlFunction CURRENT_TIMESTAMP = + new FlinkSqlTimestampFunction( + "CURRENT_TIMESTAMP", SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 3); + public static final SqlFunction NOW = - new SqlCurrentTimestampFunction("NOW") { + new FlinkSqlTimestampFunction("NOW", SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 3) { @Override public SqlSyntax getSyntax() { return SqlSyntax.FUNCTION; } }; - - public static final SqlFunction CURRENT_TIMESTAMP = - new SqlCurrentTimestampFunction("CURRENT_TIMESTAMP"); - public static final SqlFunction CURRENT_ROW_TIMESTAMP = - new SqlCurrentTimestampFunction("CURRENT_ROW_TIMESTAMP") { + new FlinkSqlTimestampFunction( + "CURRENT_ROW_TIMESTAMP", SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 3) { @Override public SqlSyntax getSyntax() { @@ -1103,7 +1108,6 @@ public List getAuxiliaryFunctions() { public static final SqlFunction FLOOR = SqlStdOperatorTable.FLOOR; public static final SqlFunction CEIL = SqlStdOperatorTable.CEIL; public static final SqlFunction LOCALTIME = SqlStdOperatorTable.LOCALTIME; - public static final SqlFunction LOCALTIMESTAMP = SqlStdOperatorTable.LOCALTIMESTAMP; public static final SqlFunction CURRENT_TIME = SqlStdOperatorTable.CURRENT_TIME; public static final SqlFunction CURRENT_DATE = SqlStdOperatorTable.CURRENT_DATE; public static final SqlFunction CAST = SqlStdOperatorTable.CAST; diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlCurrentTimestampFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlTimestampFunction.java similarity index 54% rename from flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlCurrentTimestampFunction.java rename to flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlTimestampFunction.java index 17e552d3c66ce..f938ee23e5abf 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlCurrentTimestampFunction.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlTimestampFunction.java @@ -18,18 +18,35 @@ package org.apache.flink.table.planner.functions.sql; +import org.apache.flink.annotation.Internal; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.SqlOperatorBinding; import org.apache.calcite.sql.fun.SqlAbstractTimeFunction; import org.apache.calcite.sql.type.SqlTypeName; /** - * Function that returns current timestamp, the function return type is {@link - * SqlTypeName#TIMESTAMP_WITH_LOCAL_TIME_ZONE}. + * Function that used to define SQL time function like LOCALTIMESTAMP, CURRENT_TIMESTAMP, + * CURRENT_ROW_TIMESTAMP(), NOW() in Flink, the function support configuring the return type and the + * precision of return type. */ -public class SqlCurrentTimestampFunction extends SqlAbstractTimeFunction { +@Internal +public class FlinkSqlTimestampFunction extends SqlAbstractTimeFunction { + + private final SqlTypeName returnTypeName; + private final int precision; - public SqlCurrentTimestampFunction(String name) { + public FlinkSqlTimestampFunction( + String functionName, SqlTypeName returnTypeName, int precision) { // access protected constructor - super(name, SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE); + super(functionName, returnTypeName); + this.returnTypeName = returnTypeName; + this.precision = precision; + } + + @Override + public RelDataType inferReturnType(SqlOperatorBinding opBinding) { + return opBinding.getTypeFactory().createSqlType(returnTypeName, precision); } @Override diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml index f56b37b736217..beff39de08300 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml @@ -88,6 +88,25 @@ LogicalProject(a=[$0], b=[$1]) 10)]) +- TableSourceScan(table=[[default_catalog, default_database, VirtualTable, watermark=[-(+($2, 5000:INTERVAL SECOND), 5000:INTERVAL SECOND)], filter=[]]], fields=[a, b, c]) +]]> + + + + + + + + + + + diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala index c0bf232ccf13e..230930fbfdc05 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala @@ -1311,6 +1311,11 @@ class TableEnvironmentTest { | f24 int not null, | f25 varchar not null, | f26 row not null, + | f27 AS LOCALTIME, + | f28 AS CURRENT_TIME, + | f29 AS LOCALTIMESTAMP, + | f30 AS CURRENT_TIMESTAMP, + | f31 AS CURRENT_ROW_TIMESTAMP(), | ts AS to_timestamp(f25), | PRIMARY KEY(f24, f26) NOT ENFORCED, | WATERMARK FOR ts AS ts - INTERVAL '1' SECOND @@ -1355,6 +1360,13 @@ class TableEnvironmentTest { Row.of("f25", "STRING", Boolean.box(false), null, null, null), Row.of("f26", "ROW<`f0` INT NOT NULL, `f1` INT>", Boolean.box(false), "PRI(f24, f26)", null, null), + Row.of("f27", "TIME(0)", Boolean.box(false), null, "AS LOCALTIME", null), + Row.of("f28", "TIME(0)", Boolean.box(false), null, "AS CURRENT_TIME", null), + Row.of("f29", "TIMESTAMP(3)", Boolean.box(false), null, "AS LOCALTIMESTAMP", null), + Row.of("f30", "TIMESTAMP_LTZ(3)", Boolean.box(false), null, + "AS CURRENT_TIMESTAMP", null), + Row.of("f31", "TIMESTAMP_LTZ(3)", Boolean.box(false), null, + "AS CURRENT_ROW_TIMESTAMP()", null), Row.of("ts", "TIMESTAMP(3) *ROWTIME*", Boolean.box(true), null, "AS TO_TIMESTAMP(`f25`)", "`ts` - INTERVAL '1' SECOND")) val tableResult1 = tableEnv.executeSql("describe T1") diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala index a88135b14b350..ed87e33925377 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala @@ -81,9 +81,9 @@ class NonDeterministicTests extends ExpressionTestBase { config.setLocalTimeZone(zoneId) config.getConfiguration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH) - config.getConfiguration.setLong(InternalConfigOptions.TABLE_QUERY_START_EPOCH_TIME, 1000L) + config.getConfiguration.setLong(InternalConfigOptions.TABLE_QUERY_START_EPOCH_TIME, 1123L) config.getConfiguration.setLong(InternalConfigOptions.TABLE_QUERY_START_LOCAL_TIME, - 1000L + TimeZone.getTimeZone(zoneId).getOffset(1000L)) + 1123L + TimeZone.getTimeZone(zoneId).getOffset(1123L)) val temporalFunctions = getCodeGenFunctions(List( "CURRENT_DATE", @@ -96,10 +96,10 @@ class NonDeterministicTests extends ExpressionTestBase { val expected = mutable.MutableList[String]( "1970-01-01", "08:00:01", - "1970-01-01 08:00:01", - "1970-01-01 08:00:01", + "1970-01-01 08:00:01.123", + "1970-01-01 08:00:01.123", "08:00:01", - "1970-01-01 08:00:01") + "1970-01-01 08:00:01.123") val result = evaluateFunctionResult(temporalFunctions) assertEquals(expected.toList.sorted, result.sorted) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala index 859c869a73daa..3da9ef2267601 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala @@ -3027,39 +3027,50 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { // we just test the format of the output // manual test can be found in NonDeterministicTests + // e.g: 2021-04-19 testAllApis( - currentDate().cast(DataTypes.STRING).charLength() >= 5, - "currentDate().cast(STRING).charLength() >= 5", - "CHAR_LENGTH(CAST(CURRENT_DATE AS VARCHAR)) >= 5", - "true") + currentDate().cast(DataTypes.STRING).charLength(), + "currentDate().cast(STRING).charLength()", + "CHAR_LENGTH(CAST(CURRENT_DATE AS VARCHAR))", + "10") + // e.g: 12:13:43 testAllApis( - currentTime().cast(DataTypes.STRING).charLength() >= 5, - "currentTime().cast(STRING).charLength() >= 5", - "CHAR_LENGTH(CAST(CURRENT_TIME AS VARCHAR)) >= 5", - "true") + localTime().cast(DataTypes.STRING).charLength(), + "localTime().cast(STRING).charLength()", + "CHAR_LENGTH(CAST(LOCALTIME AS VARCHAR))", + "8") + // e.g: 12:13:43 testAllApis( - currentTimestamp().cast(DataTypes.STRING).charLength() >= 12, - "currentTimestamp().cast(STRING).charLength() >= 12", - "CHAR_LENGTH(CAST(CURRENT_TIMESTAMP AS VARCHAR)) >= 12", - "true") + currentTime().cast(DataTypes.STRING).charLength(), + "currentTime().cast(STRING).charLength()", + "CHAR_LENGTH(CAST(CURRENT_TIME AS VARCHAR))", + "8") + // e.g: 2021-04-19 12:13:43.678 testAllApis( - localTimestamp().cast(DataTypes.STRING).charLength() >= 12, - "localTimestamp().cast(STRING).charLength() >= 12", - "CHAR_LENGTH(CAST(LOCALTIMESTAMP AS VARCHAR)) >= 12", - "true") + localTimestamp().cast(DataTypes.STRING).charLength(), + "localTimestamp().cast(STRING).charLength()", + "CHAR_LENGTH(CAST(LOCALTIMESTAMP AS VARCHAR))", + "23") + // e.g: 2021-04-19 12:13:43.678 testAllApis( - localTime().cast(DataTypes.STRING).charLength() >= 5, - "localTime().cast(STRING).charLength() >= 5", - "CHAR_LENGTH(CAST(LOCALTIME AS VARCHAR)) >= 5", - "true") + currentTimestamp().cast(DataTypes.STRING).charLength(), + "currentTimestamp().cast(STRING).charLength()", + "CHAR_LENGTH(CAST(CURRENT_TIMESTAMP AS VARCHAR))", + "23") + // e.g: 2021-04-19 12:13:43.678 testSqlApi( - "CHAR_LENGTH(CAST(NOW() AS VARCHAR)) >= 12", - "true") + "CHAR_LENGTH(CAST(NOW() AS VARCHAR))", + "23") + + // e.g: 2021-04-19 12:13:43.678 + testSqlApi( + "CHAR_LENGTH(CAST(CURRENT_ROW_TIMESTAMP() AS VARCHAR))", + "23") // comparisons are deterministic testAllApis( diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.scala index 3c8994356e32e..8b756f33cb27d 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.scala @@ -119,6 +119,21 @@ class SourceWatermarkTest extends TableTestBase { | 'readable-metadata' = 'originTime:BIGINT' | ) """.stripMargin) + + util.tableEnv.executeSql( + s""" + | CREATE TABLE timeTestTable( + | a INT, + | b BIGINT, + | rowtime AS TO_TIMESTAMP_LTZ(b, 0), + | WATERMARK FOR rowtime AS rowtime + | ) WITH ( + | 'connector' = 'values', + | 'enable-watermark-push-down' = 'true', + | 'bounded' = 'false', + | 'disable-lookup' = 'true' + | ) + """.stripMargin) } @Test @@ -156,6 +171,11 @@ class SourceWatermarkTest extends TableTestBase { util.verifyExecPlan("SELECT a, b FROM MyLtzTable") } + @Test + def testWatermarkOnCurrentRowTimestampFunction(): Unit = { + util.verifyExecPlan("SELECT * FROM timeTestTable") + } + @Test def testProjectTransposeWatermarkAssigner(): Unit = { val sourceDDL = diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala index 82bcc62110aa2..d3f93c8be8742 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala @@ -99,7 +99,7 @@ class WindowTableFunctionTest extends TableTestBase { |FROM TABLE( | TUMBLE(TABLE v1, DESCRIPTOR(cur_time), INTERVAL '15' MINUTE)) |""".stripMargin - thrown.expectMessage("requires the timecol is a time attribute type, but is TIMESTAMP(0)") + thrown.expectMessage("requires the timecol is a time attribute type, but is TIMESTAMP(3)") thrown.expect(classOf[ValidationException]) util.verifyRelPlan(sql) }