Skip to content

Commit bea9d07

Browse files
leonardBangzhaoxing
authored andcommitted
[FLINK-22354][table] Fix TIME field display in sql-client
This closes apache#15689
1 parent bbccb96 commit bea9d07

3 files changed

Lines changed: 109 additions & 10 deletions

File tree

flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,11 @@
4646
import javax.annotation.Nullable;
4747

4848
import java.io.PrintWriter;
49+
import java.sql.Time;
4950
import java.sql.Timestamp;
5051
import java.time.Instant;
5152
import java.time.LocalDateTime;
53+
import java.time.LocalTime;
5254
import java.time.ZoneId;
5355
import java.util.ArrayList;
5456
import java.util.Arrays;
@@ -59,7 +61,10 @@
5961
import java.util.stream.Stream;
6062

6163
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
64+
import static org.apache.flink.table.utils.TimestampStringUtils.localTimeToUnixDate;
65+
import static org.apache.flink.table.utils.TimestampStringUtils.timeToInternal;
6266
import static org.apache.flink.table.utils.TimestampStringUtils.timestampToString;
67+
import static org.apache.flink.table.utils.TimestampStringUtils.unixTimeToString;
6368

6469
/** Utilities for print formatting. */
6570
@Internal
@@ -238,7 +243,7 @@ public static String[] rowToString(
238243
}
239244

240245
/**
241-
* Normalizes field that contains TIMESTAMP and TIMESTAMP_LTZ type data.
246+
* Normalizes field that contains TIMESTAMP, TIMESTAMP_LTZ and TIME type data.
242247
*
243248
* <p>This method also supports nested type ARRAY, ROW, MAP.
244249
*/
@@ -252,6 +257,8 @@ private static Object formattedTimestamp(
252257
case TIMESTAMP_WITHOUT_TIME_ZONE:
253258
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
254259
return formatTimestampField(field, fieldType, sessionTimeZone);
260+
case TIME_WITHOUT_TIME_ZONE:
261+
return formatTimeField(field);
255262
case ARRAY:
256263
LogicalType elementType = ((ArrayType) fieldType).getElementType();
257264
if (field instanceof List) {
@@ -453,6 +460,21 @@ private static Object formatTimestampField(
453460
}
454461
}
455462

463+
/** Formats the print content of TIME type data. */
464+
private static Object formatTimeField(Object timeField) {
465+
if (timeField.getClass().isAssignableFrom(int.class) || timeField instanceof Integer) {
466+
return unixTimeToString((int) timeField);
467+
} else if (timeField.getClass().isAssignableFrom(long.class) || timeField instanceof Long) {
468+
return unixTimeToString(((Long) timeField).intValue());
469+
} else if (timeField instanceof Time) {
470+
return unixTimeToString(timeToInternal((Time) timeField));
471+
} else if (timeField instanceof LocalTime) {
472+
return unixTimeToString(localTimeToUnixDate((LocalTime) timeField));
473+
} else {
474+
return timeField;
475+
}
476+
}
477+
456478
public static String genBorderLine(int[] colWidths) {
457479
StringBuilder sb = new StringBuilder();
458480
sb.append("+");

flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TimestampStringUtils.java

Lines changed: 74 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,27 @@
2121
import org.apache.flink.annotation.Internal;
2222

2323
import java.time.LocalDateTime;
24+
import java.time.LocalTime;
25+
import java.util.TimeZone;
2426

25-
/** Utils to represent a LocalDateTime to String, considered the precision. */
27+
/**
28+
* Utils to represent a LocalDateTime to String, considered the precision.
29+
*
30+
* <p>TODO://This class keep same SQL formats with {@code
31+
* org.apache.flink.table.runtime.functions.SqlDateTimeUtils} which used in Flink SQL codegen, The
32+
* two utils will be unified once FLINK-21456 finished.
33+
*/
2634
@Internal
2735
public class TimestampStringUtils {
2836

29-
// TODO this method is copied from org.apache.flink.table.runtime.functions.SqlDateTimeUtils,
30-
// we can refactor these utils in the future
37+
private static final long MILLIS_PER_SECOND = 1000L;
38+
private static final long MILLIS_PER_MINUTE = 60000L;
39+
private static final long MILLIS_PER_HOUR = 3600000L;
40+
private static final long MILLIS_PER_DAY = 86400000L;
41+
42+
/** The local time zone, used to deal {@link java.sql.Time} value. */
43+
private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
44+
3145
public static String timestampToString(LocalDateTime ldt, int precision) {
3246
String fraction = pad(9, (long) ldt.getNano());
3347
while (fraction.length() > precision && fraction.endsWith("0")) {
@@ -99,4 +113,61 @@ private static void int2(StringBuilder buf, int i) {
99113
buf.append((char) ('0' + (i / 10) % 10));
100114
buf.append((char) ('0' + i % 10));
101115
}
116+
117+
/**
118+
* Cast TIME type value to VARCHAR(N), we use same SQL format with codegen in
119+
* org.apache.flink.table.runtime.functions.SqlDateTimeUtils.
120+
*/
121+
public static String unixTimeToString(int time) {
122+
final StringBuilder buf = new StringBuilder(8);
123+
unixTimeToString(buf, time, 0); // set milli second precision to 0
124+
return buf.toString();
125+
}
126+
127+
private static void unixTimeToString(StringBuilder buf, int time, int precision) {
128+
// we copy this method from Calcite DateTimeUtils but add the following changes
129+
// time may be negative which means time milli seconds before 00:00:00
130+
// this maybe a bug in calcite avatica
131+
while (time < 0) {
132+
time += MILLIS_PER_DAY;
133+
}
134+
int h = time / 3600000;
135+
int time2 = time % 3600000;
136+
int m = time2 / 60000;
137+
int time3 = time2 % 60000;
138+
int s = time3 / 1000;
139+
int ms = time3 % 1000;
140+
int2(buf, h);
141+
buf.append(':');
142+
int2(buf, m);
143+
buf.append(':');
144+
int2(buf, s);
145+
if (precision > 0) {
146+
buf.append('.');
147+
while (precision > 0) {
148+
buf.append((char) ('0' + (ms / 100)));
149+
ms = ms % 100;
150+
ms = ms * 10;
151+
152+
// keep consistent with Timestamp.toString()
153+
if (ms == 0) {
154+
break;
155+
}
156+
157+
--precision;
158+
}
159+
}
160+
}
161+
162+
public static int timeToInternal(java.sql.Time time) {
163+
long ts = time.getTime() + LOCAL_TZ.getOffset(time.getTime());
164+
return (int) (ts % MILLIS_PER_DAY);
165+
}
166+
167+
public static int localTimeToUnixDate(LocalTime time) {
168+
return time.getHour() * (int) MILLIS_PER_HOUR
169+
+ time.getMinute() * (int) MILLIS_PER_MINUTE
170+
+ time.getSecond() * (int) MILLIS_PER_SECOND
171+
+ time.getNano() / 1000_000;
172+
}
102173
}

flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/PrintUtilsTest.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public class PrintUtilsTest {
5050

5151
@Test
5252
public void testArrayToString() {
53-
Row row = new Row(6);
53+
Row row = new Row(7);
5454
row.setField(0, new int[] {1, 2});
5555
row.setField(1, new Integer[] {3, 4});
5656
row.setField(2, new Object[] {new int[] {5, 6}, new int[] {7, 8}});
@@ -67,6 +67,7 @@ public void testArrayToString() {
6767
new Instant[] {Instant.ofEpochMilli(1), Instant.ofEpochMilli(10)},
6868
new Instant[] {Instant.ofEpochSecond(1), Instant.ofEpochSecond(10)}
6969
});
70+
row.setField(6, new int[] {1123, 2123});
7071

7172
ResolvedSchema resolvedSchema =
7273
ResolvedSchema.of(
@@ -81,18 +82,20 @@ public void testArrayToString() {
8182
Column.physical(
8283
"f5",
8384
DataTypes.ARRAY(
84-
DataTypes.ARRAY(DataTypes.TIMESTAMP_LTZ(3))))));
85+
DataTypes.ARRAY(DataTypes.TIMESTAMP_LTZ(3)))),
86+
Column.physical("f6", DataTypes.ARRAY(DataTypes.TIME()))));
8587
assertEquals(
8688
"[[1, 2], [3, 4], [[5, 6], [7, 8]], [[9, 10], [11, 12]],"
8789
+ " [2021-04-18 18:00:00.123456, 2021-04-18 18:00:00.000001],"
8890
+ " [[1970-01-01 00:00:00.001, 1970-01-01 00:00:00.010],"
89-
+ " [1970-01-01 00:00:01.000, 1970-01-01 00:00:10.000]]]",
91+
+ " [1970-01-01 00:00:01.000, 1970-01-01 00:00:10.000]],"
92+
+ " [00:00:01, 00:00:02]]",
9093
Arrays.toString(PrintUtils.rowToString(row, resolvedSchema, UTC_ZONE_ID)));
9194
}
9295

9396
@Test
9497
public void testNestedRowToString() {
95-
Row row = new Row(3);
98+
Row row = new Row(4);
9699
row.setField(0, new int[] {1, 2});
97100
Row row1 = new Row(4);
98101
row1.setField(0, "hello");
@@ -111,6 +114,7 @@ public void testNestedRowToString() {
111114
new int[] {1, 10},
112115
new int[] {2, 20}
113116
});
117+
row.setField(3, new Integer[] {3000, 4000});
114118

115119
ResolvedSchema resolvedSchema =
116120
ResolvedSchema.of(
@@ -124,11 +128,13 @@ public void testNestedRowToString() {
124128
DataTypes.ARRAY(DataTypes.TIMESTAMP(6)),
125129
DataTypes.ARRAY(DataTypes.TIMESTAMP_LTZ(6)))),
126130
Column.physical(
127-
"f2", DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())))));
131+
"f2", DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT()))),
132+
Column.physical("f3", DataTypes.ARRAY(DataTypes.TIME()))));
128133
assertEquals(
129134
"[[1, 2], +I[hello, [true, false],"
130135
+ " [2021-04-18 18:00:00.123456, 2021-04-18 18:00:00.000001],"
131-
+ " [1970-01-01 00:00:00.100000, 1970-01-01 00:00:00.200000]], [[1, 10], [2, 20]]]",
136+
+ " [1970-01-01 00:00:00.100000, 1970-01-01 00:00:00.200000]], [[1, 10], [2, 20]],"
137+
+ " [00:00:03, 00:00:04]]",
132138
Arrays.toString(PrintUtils.rowToString(row, resolvedSchema, UTC_ZONE_ID)));
133139
}
134140

0 commit comments

Comments
 (0)