From f9b48f7552b93a7b5e6c12a62f95140ab1d400a7 Mon Sep 17 00:00:00 2001 From: windwheel Date: Mon, 7 Nov 2022 19:35:31 +0800 Subject: [PATCH 1/5] [HUDI-5161] add TIMESTAMP_LTZ --- .../catalog/TypeInfoLogicalTypeVisitor.java | 29 +++++++------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TypeInfoLogicalTypeVisitor.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TypeInfoLogicalTypeVisitor.java index e6b15788fe79..9d61e0e7c83f 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TypeInfoLogicalTypeVisitor.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TypeInfoLogicalTypeVisitor.java @@ -19,24 +19,7 @@ package org.apache.hudi.table.catalog; import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.logical.ArrayType; -import org.apache.flink.table.types.logical.BigIntType; -import org.apache.flink.table.types.logical.BooleanType; -import org.apache.flink.table.types.logical.CharType; -import org.apache.flink.table.types.logical.DateType; -import org.apache.flink.table.types.logical.DecimalType; -import org.apache.flink.table.types.logical.DoubleType; -import org.apache.flink.table.types.logical.FloatType; -import org.apache.flink.table.types.logical.IntType; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.MapType; -import org.apache.flink.table.types.logical.NullType; -import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.table.types.logical.SmallIntType; -import org.apache.flink.table.types.logical.TimestampType; -import org.apache.flink.table.types.logical.TinyIntType; -import org.apache.flink.table.types.logical.VarBinaryType; -import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.types.logical.*; import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; @@ -144,6 +127,16 @@ public TypeInfo visit(TimestampType timestampType) { } } + @Override + public TypeInfo visit(LocalZonedTimestampType localZonedTimestampType) { + int precision = localZonedTimestampType.getPrecision(); + if(precision >= 0 && precision <= 9) { + return TypeInfoFactory.timestampTypeInfo; + } else { + return TypeInfoFactory.longTypeInfo; + } + } + @Override public TypeInfo visit(ArrayType arrayType) { LogicalType elementType = arrayType.getElementType(); From b47f92ee87bf65245208c12eacc2ac49c10d9f1c Mon Sep 17 00:00:00 2001 From: windwheel Date: Mon, 7 Nov 2022 21:05:17 +0800 Subject: [PATCH 2/5] recover imports --- .../catalog/TypeInfoLogicalTypeVisitor.java | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TypeInfoLogicalTypeVisitor.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TypeInfoLogicalTypeVisitor.java index 9d61e0e7c83f..89ff12646dff 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TypeInfoLogicalTypeVisitor.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TypeInfoLogicalTypeVisitor.java @@ -19,7 +19,25 @@ package org.apache.hudi.table.catalog; import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.logical.*; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.CharType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.NullType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.flink.table.types.logical.VarBinaryType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; From 5452f62be22a3f1b88f42d2273e0addefccbfe5e Mon Sep 17 00:00:00 2001 From: windwheel Date: Mon, 7 Nov 2022 22:27:19 +0800 Subject: [PATCH 3/5] fix checkstyle --- .../apache/hudi/table/catalog/TypeInfoLogicalTypeVisitor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TypeInfoLogicalTypeVisitor.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TypeInfoLogicalTypeVisitor.java index 89ff12646dff..2e0f87528adc 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TypeInfoLogicalTypeVisitor.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TypeInfoLogicalTypeVisitor.java @@ -148,7 +148,7 @@ public TypeInfo visit(TimestampType timestampType) { @Override public TypeInfo visit(LocalZonedTimestampType localZonedTimestampType) { int precision = localZonedTimestampType.getPrecision(); - if(precision >= 0 && precision <= 9) { + if (precision >= 0 && precision <= 9) { return TypeInfoFactory.timestampTypeInfo; } else { return TypeInfoFactory.longTypeInfo; From 600112af7c0dd260120f9b7ce27f15e3a7ed15f9 Mon Sep 17 00:00:00 2001 From: chengxy Date: Fri, 10 Feb 2023 10:30:38 +0800 Subject: [PATCH 4/5] add TIMESTAMP_LTZ test --- .../apache/hudi/table/catalog/TypeInfoLogicalTypeVisitor.java | 3 ++- .../org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java | 4 +++- .../main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java | 2 ++ 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TypeInfoLogicalTypeVisitor.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TypeInfoLogicalTypeVisitor.java index 2e0f87528adc..9b61ecb44efe 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TypeInfoLogicalTypeVisitor.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TypeInfoLogicalTypeVisitor.java @@ -145,10 +145,11 @@ public TypeInfo visit(TimestampType timestampType) { } } + @Override public TypeInfo visit(LocalZonedTimestampType localZonedTimestampType) { int precision = localZonedTimestampType.getPrecision(); - if (precision >= 0 && precision <= 9) { + if (precision >=0 && precision <= 9){ return TypeInfoFactory.timestampTypeInfo; } else { return TypeInfoFactory.longTypeInfo; diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java index 5d27cdadbbb3..4772076247e2 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java @@ -83,6 +83,7 @@ public class TestHoodieHiveCatalog { .field("age", DataTypes.INT()) .field("par1", DataTypes.STRING()) .field("ts", DataTypes.BIGINT()) + .field("update_time", DataTypes.TIMESTAMP_LTZ()) .primaryKey("uuid") .build(); List partitions = Collections.singletonList("par1"); @@ -132,7 +133,8 @@ public void testCreateAndGetHoodieTable(HoodieTableType tableType) throws Except + "uuid:int," + "name:string," + "age:int," - + "ts:bigint"; + + "ts:bigint," + + "update_time:timestamp"; assertEquals(expectedFieldSchema, fieldSchema); String partitionSchema = hiveTable.getPartitionKeys().stream() .map(f -> f.getName() + ":" + f.getType()) diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java index 50a6aabd2bbd..141a71f5bcc9 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java @@ -232,6 +232,8 @@ private static String convertField(final Type parquetType, boolean supportTimest return field.append("DATE").toString(); } else if (supportTimestamp && originalType == OriginalType.TIMESTAMP_MICROS) { return field.append("TIMESTAMP").toString(); + } else if (supportTimestamp && originalType == OriginalType.TIMESTAMP_MILLIS) { + return field.append("TIMESTAMP_LTZ").toString(); } // TODO - fix the method naming here From 9c853bcc9d850a1d19c891824d170c7d535cc92f Mon Sep 17 00:00:00 2001 From: chengxy Date: Fri, 10 Feb 2023 10:36:28 +0800 Subject: [PATCH 5/5] add TIMESTAMP_LTZ test --- .../org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java index 4772076247e2..0421bd0b069f 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java @@ -150,7 +150,7 @@ public void testCreateAndGetHoodieTable(HoodieTableType tableType) throws Except String tableSchema = table1.getUnresolvedSchema().getColumns().stream() .map(Schema.UnresolvedColumn::toString) .collect(Collectors.joining(",")); - String expectedTableSchema = "`uuid` INT NOT NULL,`name` STRING,`age` INT,`par1` STRING,`ts` BIGINT"; + String expectedTableSchema = "`uuid` INT NOT NULL,`name` STRING,`age` INT,`par1` STRING,`ts` BIGINT,`update_time` TIMESTAMP_LTZ(6)"; assertEquals(expectedTableSchema, tableSchema); assertEquals(Collections.singletonList("uuid"), table1.getUnresolvedSchema().getPrimaryKey().get().getColumnNames()); assertEquals(Collections.singletonList("par1"), ((CatalogTable) table1).getPartitionKeys());