Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/data/sql_functions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -417,10 +417,10 @@ temporal:
description: Creates an interval of NUMERIC milliseconds.
- sql: LOCALTIME
table: localTime()
description: Returns the current SQL time in the local time zone. It is evaluated for each record in streaming mode. But in batch mode, it is evaluated once as the query starts and uses the same result for every row.
description: Returns the current SQL time in the local time zone, the return type is TIME(0). It is evaluated for each record in streaming mode. But in batch mode, it is evaluated once as the query starts and uses the same result for every row.
- sql: LOCALTIMESTAMP
table: localTimestamp()
description: Returns the current SQL timestamp in local time zone, the return type is TIMESTAMP WITHOUT ITME ZONE. It is evaluated for each record in streaming mode. But in batch mode, it is evaluated once as the query starts and uses the same result for every row.
description: Returns the current SQL timestamp in local time zone, the return type is TIMESTAMP(3). It is evaluated for each record in streaming mode. But in batch mode, it is evaluated once as the query starts and uses the same result for every row.
- sql: CURRENT_TIME
table: currentTime()
description: Returns the current SQL time in the local time zone, this is a synonym of LOCAL_TIME.
Expand All @@ -429,11 +429,11 @@ temporal:
description: Returns the current SQL date in the local time zone. It is evaluated for each record in streaming mode. But in batch mode, it is evaluated once as the query starts and uses the same result for every row.
- sql: CURRENT_TIMESTAMP
table: currentTimestamp()
description: Returns the current SQL timestamp in the local time zone, the return type is TIMESTAMP WITH LOCAL TIME ZONE. It is evaluated for each record in streaming mode. But in batch mode, it is evaluated once as the query starts and uses the same result for every row.
description: Returns the current SQL timestamp in the local time zone, the return type is TIMESTAMP_LTZ(3). It is evaluated for each record in streaming mode. But in batch mode, it is evaluated once as the query starts and uses the same result for every row.
- sql: NOW()
description: Returns the current SQL timestamp in the local time zone, this is a synonym of CURRENT_TIMESTAMP.
- sql: CURRENT_ROW_TIMESTAMP()
description: Returns the current SQL timestamp in the local time zone, the return type is TIMESTAMP WITH LOCAL TIME ZONE. It is evaluated for each record no matter in batch or streaming mode.
description: Returns the current SQL timestamp in the local time zone, the return type is TIMESTAMP_LTZ(3). It is evaluated for each record no matter in batch or streaming mode.
- sql: EXTRACT(timeinteravlunit FROM temporal)
table: TEMPORAL.extract(TIMEINTERVALUNIT)
description: Returns a long value extracted from the timeintervalunit part of temporal. E.g., EXTRACT(DAY FROM DATE '2006-06-05') returns 5.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.connectors.hive.FlinkHiveException;
import org.apache.flink.table.planner.delegation.hive.copy.HiveParserBetween;
import org.apache.flink.table.planner.delegation.hive.copy.HiveParserSqlFunctionConverter;
import org.apache.flink.table.planner.functions.sql.SqlCurrentTimestampFunction;
import org.apache.flink.table.planner.functions.sql.FlinkSqlTimestampFunction;
import org.apache.flink.util.Preconditions;

import org.apache.calcite.plan.RelOptCluster;
Expand Down Expand Up @@ -90,7 +90,7 @@ public RexNode visitCall(RexCall call) {
RelDataType type = call.getType();
return builder.makeCall(type, convertedOp, visitList(operands, update));
} else {
if (convertedOp instanceof SqlCurrentTimestampFunction) {
if (convertedOp instanceof FlinkSqlTimestampFunction) {
// flink's current_timestamp has different type from hive's, convert it to a literal
Timestamp currentTS =
((HiveParser.HiveParserSessionState) SessionState.get())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,13 @@ private Column validateTimeColumn(String columnName, List<Column> columns) {
columns.stream().map(Column::getName).collect(Collectors.toList())));
}
final LogicalType timeFieldType = timeColumn.get().getDataType().getLogicalType();
if (!canBeTimeAttributeType(timeFieldType) || getPrecision(timeFieldType) != 3) {
if (!canBeTimeAttributeType(timeFieldType) || getPrecision(timeFieldType) > 3) {
throw new ValidationException(
"Invalid data type of time field for watermark definition. "
+ "The field must be of type TIMESTAMP(3) or TIMESTAMP_LTZ(3).");
String.format(
"Invalid data type of time field for watermark definition. "
+ "The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p),"
+ " the supported precision 'p' is from 0 to 3, but the time field type is %s",
timeFieldType));
}
if (isProctimeAttribute(timeFieldType)) {
throw new ValidationException(
Expand All @@ -235,10 +238,13 @@ private Column validateTimeColumn(String columnName, List<Column> columns) {
}

private void validateWatermarkExpression(LogicalType watermarkType) {
if (!canBeTimeAttributeType(watermarkType) || getPrecision(watermarkType) != 3) {
if (!canBeTimeAttributeType(watermarkType) || getPrecision(watermarkType) > 3) {
throw new ValidationException(
"Invalid data type of expression for watermark definition. "
+ "The field must be of type TIMESTAMP(3) or TIMESTAMP_LTZ(3).");
String.format(
"Invalid data type of expression for watermark definition. "
+ "The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p),"
+ " the supported precision 'p' is from 0 to 3, but the watermark expression type is %s",
watermarkType));
}
}

Expand Down Expand Up @@ -277,7 +283,8 @@ private Column adjustRowtimeAttribute(List<WatermarkSpec> watermarkSpecs, Column
default:
throw new ValidationException(
"Invalid data type of expression for rowtime definition. "
+ "The field must be of type TIMESTAMP(3) or TIMESTAMP_LTZ(3).");
+ "The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p),"
+ " the supported precision 'p' is from 0 to 3.");
}
}
return column;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ public void testSchemaResolutionErrors() {
.column("ts", DataTypes.BOOLEAN())
.watermark("ts", callSql(WATERMARK_SQL))
.build(),
"Invalid data type of time field for watermark definition. The field must be of type TIMESTAMP(3) or TIMESTAMP_LTZ(3).");
"Invalid data type of time field for watermark definition."
+ " The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p), the supported precision 'p' is from 0 to 3, but the time field type is BOOLEAN");

testError(
Schema.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@
import javax.annotation.Nullable;

import java.io.PrintWriter;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -59,7 +61,10 @@
import java.util.stream.Stream;

import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
import static org.apache.flink.table.utils.TimestampStringUtils.localTimeToUnixDate;
import static org.apache.flink.table.utils.TimestampStringUtils.timeToInternal;
import static org.apache.flink.table.utils.TimestampStringUtils.timestampToString;
import static org.apache.flink.table.utils.TimestampStringUtils.unixTimeToString;

/** Utilities for print formatting. */
@Internal
Expand Down Expand Up @@ -238,7 +243,7 @@ public static String[] rowToString(
}

/**
* Normalizes field that contains TIMESTAMP and TIMESTAMP_LTZ type data.
* Normalizes field that contains TIMESTAMP, TIMESTAMP_LTZ and TIME type data.
*
* <p>This method also supports nested type ARRAY, ROW, MAP.
*/
Expand All @@ -252,6 +257,8 @@ private static Object formattedTimestamp(
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return formatTimestampField(field, fieldType, sessionTimeZone);
case TIME_WITHOUT_TIME_ZONE:
return formatTimeField(field);
case ARRAY:
LogicalType elementType = ((ArrayType) fieldType).getElementType();
if (field instanceof List) {
Expand Down Expand Up @@ -453,6 +460,21 @@ private static Object formatTimestampField(
}
}

/** Formats the print content of TIME type data. */
private static Object formatTimeField(Object timeField) {
if (timeField.getClass().isAssignableFrom(int.class) || timeField instanceof Integer) {
return unixTimeToString((int) timeField);
} else if (timeField.getClass().isAssignableFrom(long.class) || timeField instanceof Long) {
return unixTimeToString(((Long) timeField).intValue());
} else if (timeField instanceof Time) {
return unixTimeToString(timeToInternal((Time) timeField));
} else if (timeField instanceof LocalTime) {
return unixTimeToString(localTimeToUnixDate((LocalTime) timeField));
} else {
return timeField;
}
}

public static String genBorderLine(int[] colWidths) {
StringBuilder sb = new StringBuilder();
sb.append("+");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,27 @@
import org.apache.flink.annotation.Internal;

import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.TimeZone;

/** Utils to represent a LocalDateTime to String, considered the precision. */
/**
* Utils to represent a LocalDateTime to String, considered the precision.
*
* <p>TODO://This class keep same SQL formats with {@code
* org.apache.flink.table.runtime.functions.SqlDateTimeUtils} which used in Flink SQL codegen, The
* two utils will be unified once FLINK-21456 finished.
*/
@Internal
public class TimestampStringUtils {

// TODO this method is copied from org.apache.flink.table.runtime.functions.SqlDateTimeUtils,
// we can refactor these utils in the future
private static final long MILLIS_PER_SECOND = 1000L;
private static final long MILLIS_PER_MINUTE = 60000L;
private static final long MILLIS_PER_HOUR = 3600000L;
private static final long MILLIS_PER_DAY = 86400000L;

/** The local time zone, used to deal {@link java.sql.Time} value. */
private static final TimeZone LOCAL_TZ = TimeZone.getDefault();

public static String timestampToString(LocalDateTime ldt, int precision) {
String fraction = pad(9, (long) ldt.getNano());
while (fraction.length() > precision && fraction.endsWith("0")) {
Expand Down Expand Up @@ -99,4 +113,61 @@ private static void int2(StringBuilder buf, int i) {
buf.append((char) ('0' + (i / 10) % 10));
buf.append((char) ('0' + i % 10));
}

/**
* Cast TIME type value to VARCHAR(N), we use same SQL format with codegen in
* org.apache.flink.table.runtime.functions.SqlDateTimeUtils.
*/
public static String unixTimeToString(int time) {
final StringBuilder buf = new StringBuilder(8);
unixTimeToString(buf, time, 0); // set milli second precision to 0
return buf.toString();
}

private static void unixTimeToString(StringBuilder buf, int time, int precision) {
// we copy this method from Calcite DateTimeUtils but add the following changes
// time may be negative which means time milli seconds before 00:00:00
// this maybe a bug in calcite avatica
while (time < 0) {
time += MILLIS_PER_DAY;
}
int h = time / 3600000;
int time2 = time % 3600000;
int m = time2 / 60000;
int time3 = time2 % 60000;
int s = time3 / 1000;
int ms = time3 % 1000;
int2(buf, h);
buf.append(':');
int2(buf, m);
buf.append(':');
int2(buf, s);
if (precision > 0) {
buf.append('.');
while (precision > 0) {
buf.append((char) ('0' + (ms / 100)));
ms = ms % 100;
ms = ms * 10;

// keep consistent with Timestamp.toString()
if (ms == 0) {
break;
}

--precision;
}
}
}

public static int timeToInternal(java.sql.Time time) {
long ts = time.getTime() + LOCAL_TZ.getOffset(time.getTime());
return (int) (ts % MILLIS_PER_DAY);
}

public static int localTimeToUnixDate(LocalTime time) {
return time.getHour() * (int) MILLIS_PER_HOUR
+ time.getMinute() * (int) MILLIS_PER_MINUTE
+ time.getSecond() * (int) MILLIS_PER_SECOND
+ time.getNano() / 1000_000;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class PrintUtilsTest {

@Test
public void testArrayToString() {
Row row = new Row(6);
Row row = new Row(7);
row.setField(0, new int[] {1, 2});
row.setField(1, new Integer[] {3, 4});
row.setField(2, new Object[] {new int[] {5, 6}, new int[] {7, 8}});
Expand All @@ -67,6 +67,7 @@ public void testArrayToString() {
new Instant[] {Instant.ofEpochMilli(1), Instant.ofEpochMilli(10)},
new Instant[] {Instant.ofEpochSecond(1), Instant.ofEpochSecond(10)}
});
row.setField(6, new int[] {1123, 2123});

ResolvedSchema resolvedSchema =
ResolvedSchema.of(
Expand All @@ -81,18 +82,20 @@ public void testArrayToString() {
Column.physical(
"f5",
DataTypes.ARRAY(
DataTypes.ARRAY(DataTypes.TIMESTAMP_LTZ(3))))));
DataTypes.ARRAY(DataTypes.TIMESTAMP_LTZ(3)))),
Column.physical("f6", DataTypes.ARRAY(DataTypes.TIME()))));
assertEquals(
"[[1, 2], [3, 4], [[5, 6], [7, 8]], [[9, 10], [11, 12]],"
+ " [2021-04-18 18:00:00.123456, 2021-04-18 18:00:00.000001],"
+ " [[1970-01-01 00:00:00.001, 1970-01-01 00:00:00.010],"
+ " [1970-01-01 00:00:01.000, 1970-01-01 00:00:10.000]]]",
+ " [1970-01-01 00:00:01.000, 1970-01-01 00:00:10.000]],"
+ " [00:00:01, 00:00:02]]",
Arrays.toString(PrintUtils.rowToString(row, resolvedSchema, UTC_ZONE_ID)));
}

@Test
public void testNestedRowToString() {
Row row = new Row(3);
Row row = new Row(4);
row.setField(0, new int[] {1, 2});
Row row1 = new Row(4);
row1.setField(0, "hello");
Expand All @@ -111,6 +114,7 @@ public void testNestedRowToString() {
new int[] {1, 10},
new int[] {2, 20}
});
row.setField(3, new Integer[] {3000, 4000});

ResolvedSchema resolvedSchema =
ResolvedSchema.of(
Expand All @@ -124,11 +128,13 @@ public void testNestedRowToString() {
DataTypes.ARRAY(DataTypes.TIMESTAMP(6)),
DataTypes.ARRAY(DataTypes.TIMESTAMP_LTZ(6)))),
Column.physical(
"f2", DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())))));
"f2", DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT()))),
Column.physical("f3", DataTypes.ARRAY(DataTypes.TIME()))));
assertEquals(
"[[1, 2], +I[hello, [true, false],"
+ " [2021-04-18 18:00:00.123456, 2021-04-18 18:00:00.000001],"
+ " [1970-01-01 00:00:00.100000, 1970-01-01 00:00:00.200000]], [[1, 10], [2, 20]]]",
+ " [1970-01-01 00:00:00.100000, 1970-01-01 00:00:00.200000]], [[1, 10], [2, 20]],"
+ " [00:00:03, 00:00:04]]",
Arrays.toString(PrintUtils.rowToString(row, resolvedSchema, UTC_ZONE_ID)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,19 +561,24 @@ public void lookupOperatorOverloads(
OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.ANY),
SqlFunctionCategory.STRING);

// Flink timestamp functions
public static final SqlFunction LOCALTIMESTAMP =
new FlinkSqlTimestampFunction("LOCALTIMESTAMP", SqlTypeName.TIMESTAMP, 3);

public static final SqlFunction CURRENT_TIMESTAMP =
new FlinkSqlTimestampFunction(
"CURRENT_TIMESTAMP", SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 3);

public static final SqlFunction NOW =
new SqlCurrentTimestampFunction("NOW") {
new FlinkSqlTimestampFunction("NOW", SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 3) {
@Override
public SqlSyntax getSyntax() {
return SqlSyntax.FUNCTION;
}
};

public static final SqlFunction CURRENT_TIMESTAMP =
new SqlCurrentTimestampFunction("CURRENT_TIMESTAMP");

public static final SqlFunction CURRENT_ROW_TIMESTAMP =
new SqlCurrentTimestampFunction("CURRENT_ROW_TIMESTAMP") {
new FlinkSqlTimestampFunction(
"CURRENT_ROW_TIMESTAMP", SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 3) {

@Override
public SqlSyntax getSyntax() {
Expand Down Expand Up @@ -1103,7 +1108,6 @@ public List<SqlGroupedWindowFunction> getAuxiliaryFunctions() {
public static final SqlFunction FLOOR = SqlStdOperatorTable.FLOOR;
public static final SqlFunction CEIL = SqlStdOperatorTable.CEIL;
public static final SqlFunction LOCALTIME = SqlStdOperatorTable.LOCALTIME;
public static final SqlFunction LOCALTIMESTAMP = SqlStdOperatorTable.LOCALTIMESTAMP;
public static final SqlFunction CURRENT_TIME = SqlStdOperatorTable.CURRENT_TIME;
public static final SqlFunction CURRENT_DATE = SqlStdOperatorTable.CURRENT_DATE;
public static final SqlFunction CAST = SqlStdOperatorTable.CAST;
Expand Down
Loading