From 880a38e2720d154d328adea296f562027bfdf7af Mon Sep 17 00:00:00 2001 From: jian yonghua Date: Fri, 29 Jul 2022 16:34:57 +0800 Subject: [PATCH 1/3] be able to disable precombine field when table schema contains a field named ts --- .../apache/hudi/table/HoodieTableFactory.java | 2 +- .../hudi/table/ITTestHoodieDataSource.java | 24 +++++++++++++++++++ .../hudi/table/TestHoodieTableFactory.java | 14 +++++++++++ 3 files changed, 39 insertions(+), 1 deletion(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index 987ae10fe75c..cc73229027ba 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -147,7 +147,7 @@ private void sanityCheck(Configuration conf, ResolvedSchema schema) { throw new HoodieValidationException("Option '" + FlinkOptions.PRECOMBINE_FIELD.key() + "' is required for payload class: " + DefaultHoodieRecordPayload.class.getName()); } - if (preCombineField.equals(FlinkOptions.PRECOMBINE_FIELD.defaultValue())) { + if (preCombineField.equals(FlinkOptions.PRECOMBINE_FIELD.defaultValue()) || preCombineField.equals(FlinkOptions.NO_PRE_COMBINE)){ conf.setString(FlinkOptions.PRECOMBINE_FIELD, FlinkOptions.NO_PRE_COMBINE); } else { throw new HoodieValidationException("Field " + preCombineField + " does not exist in the table schema." diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index 0c423df6b7bd..95bafb95b833 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -453,6 +453,30 @@ void testWriteAndReadWithProctimeSequence(HoodieTableType tableType, boolean hiv assertRowsEquals(result1, "[+I[id1, Danny, 23, 1970-01-01T00:00:01, par1]]"); } + @ParameterizedTest + @MethodSource("tableTypeAndPartitioningParams") + void testWriteAndReadWithProctimeSequenceWithTsColumnExisting(HoodieTableType tableType, boolean hiveStylePartitioning) { + TableEnvironment tableEnv = batchTableEnv; + String hoodieTableDDL = sql("t1") + .field("uuid varchar(20)") + .field("name varchar(10)") + .field("age int") + .field("ts timestamp(3)") // use the default precombine field 'ts' + .field("`partition` varchar(10)") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.TABLE_TYPE, tableType) + .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning) + .option(FlinkOptions.PRECOMBINE_FIELD, FlinkOptions.NO_PRE_COMBINE) + .end(); + tableEnv.executeSql(hoodieTableDDL); + + execInsertSql(tableEnv, TestSQL.INSERT_SAME_KEY_T1); + + List result1 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1").execute().collect()); + assertRowsEquals(result1, "[+I[id1, Danny, 23, 1970-01-01T00:00:01, par1]]"); + } + @ParameterizedTest @EnumSource(value = HoodieTableType.class) void testBatchModeUpsertWithoutPartition(HoodieTableType tableType) { diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java index efd365064454..30cf82cf40af 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java @@ -152,6 +152,20 @@ void testRequiredOptionsForSource() { HoodieTableSink tableSink5 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sourceContext5); assertThat(tableSource5.getConf().getString(FlinkOptions.PAYLOAD_CLASS_NAME), is(EventTimeAvroPayload.class.getName())); assertThat(tableSink5.getConf().getString(FlinkOptions.PAYLOAD_CLASS_NAME), is(EventTimeAvroPayload.class.getName())); + + // given pk and set pre combine key to no_precombine will be ok + ResolvedSchema schema5 = SchemaBuilder.instance() + .field("f0", DataTypes.INT().notNull()) + .field("f1", DataTypes.VARCHAR(20)) + .field("f2", DataTypes.TIMESTAMP(3)) + .field("ts", DataTypes.TIMESTAMP(3)) + .primaryKey("f0") + .build(); + this.conf.setString(FlinkOptions.PRECOMBINE_FIELD, FlinkOptions.NO_PRE_COMBINE); + final MockContext sourceContext6 = MockContext.getInstance(this.conf, schema5, "f2"); + + assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSource(sourceContext6)); + assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink(sourceContext6)); } @Test From 39773f8cf8f7a8441963080fd43d8ea04f1a74c9 Mon Sep 17 00:00:00 2001 From: flashJd <47289660@qq.com> Date: Thu, 4 Aug 2022 18:41:40 +0800 Subject: [PATCH 2/3] make the judgment more elegant --- .../main/java/org/apache/hudi/table/HoodieTableFactory.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index cc73229027ba..582ef6e9acfb 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -147,9 +147,9 @@ private void sanityCheck(Configuration conf, ResolvedSchema schema) { throw new HoodieValidationException("Option '" + FlinkOptions.PRECOMBINE_FIELD.key() + "' is required for payload class: " + DefaultHoodieRecordPayload.class.getName()); } - if (preCombineField.equals(FlinkOptions.PRECOMBINE_FIELD.defaultValue()) || preCombineField.equals(FlinkOptions.NO_PRE_COMBINE)){ + if (preCombineField.equals(FlinkOptions.PRECOMBINE_FIELD.defaultValue())){ conf.setString(FlinkOptions.PRECOMBINE_FIELD, FlinkOptions.NO_PRE_COMBINE); - } else { + } else if (!preCombineField.equals(FlinkOptions.NO_PRE_COMBINE)) { throw new HoodieValidationException("Field " + preCombineField + " does not exist in the table schema." + "Please check '" + FlinkOptions.PRECOMBINE_FIELD.key() + "' option."); } From 7b04e73fecb574e199a3aad9e74dd6c9ae45d123 Mon Sep 17 00:00:00 2001 From: flashJd <47289660@qq.com> Date: Fri, 5 Aug 2022 09:32:10 +0800 Subject: [PATCH 3/3] fix checkStyle conflict --- .../apache/hudi/table/HoodieTableFactory.java | 2 +- .../hudi/table/ITTestHoodieDataSource.java | 22 +++++++++---------- .../hudi/table/TestHoodieTableFactory.java | 12 +++++----- 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index 582ef6e9acfb..41e0abd6baf1 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -147,7 +147,7 @@ private void sanityCheck(Configuration conf, ResolvedSchema schema) { throw new HoodieValidationException("Option '" + FlinkOptions.PRECOMBINE_FIELD.key() + "' is required for payload class: " + DefaultHoodieRecordPayload.class.getName()); } - if (preCombineField.equals(FlinkOptions.PRECOMBINE_FIELD.defaultValue())){ + if (preCombineField.equals(FlinkOptions.PRECOMBINE_FIELD.defaultValue())) { conf.setString(FlinkOptions.PRECOMBINE_FIELD, FlinkOptions.NO_PRE_COMBINE); } else if (!preCombineField.equals(FlinkOptions.NO_PRE_COMBINE)) { throw new HoodieValidationException("Field " + preCombineField + " does not exist in the table schema." diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index 48a67681ee79..e7c38cc7df0f 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -466,22 +466,22 @@ void testWriteAndReadWithProctimeSequence(HoodieTableType tableType, boolean hiv void testWriteAndReadWithProctimeSequenceWithTsColumnExisting(HoodieTableType tableType, boolean hiveStylePartitioning) { TableEnvironment tableEnv = batchTableEnv; String hoodieTableDDL = sql("t1") - .field("uuid varchar(20)") - .field("name varchar(10)") - .field("age int") - .field("ts timestamp(3)") // use the default precombine field 'ts' - .field("`partition` varchar(10)") - .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) - .option(FlinkOptions.TABLE_TYPE, tableType) - .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning) - .option(FlinkOptions.PRECOMBINE_FIELD, FlinkOptions.NO_PRE_COMBINE) - .end(); + .field("uuid varchar(20)") + .field("name varchar(10)") + .field("age int") + .field("ts timestamp(3)") // use the default precombine field 'ts' + .field("`partition` varchar(10)") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.TABLE_TYPE, tableType) + .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning) + .option(FlinkOptions.PRECOMBINE_FIELD, FlinkOptions.NO_PRE_COMBINE) + .end(); tableEnv.executeSql(hoodieTableDDL); execInsertSql(tableEnv, TestSQL.INSERT_SAME_KEY_T1); List result1 = CollectionUtil.iterableToList( - () -> tableEnv.sqlQuery("select * from t1").execute().collect()); + () -> tableEnv.sqlQuery("select * from t1").execute().collect()); assertRowsEquals(result1, "[+I[id1, Danny, 23, 1970-01-01T00:00:01, par1]]"); } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java index 3e1f34745f94..1fca92567a55 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java @@ -155,12 +155,12 @@ void testRequiredOptionsForSource() { // given pk and set pre combine key to no_precombine will be ok ResolvedSchema schema5 = SchemaBuilder.instance() - .field("f0", DataTypes.INT().notNull()) - .field("f1", DataTypes.VARCHAR(20)) - .field("f2", DataTypes.TIMESTAMP(3)) - .field("ts", DataTypes.TIMESTAMP(3)) - .primaryKey("f0") - .build(); + .field("f0", DataTypes.INT().notNull()) + .field("f1", DataTypes.VARCHAR(20)) + .field("f2", DataTypes.TIMESTAMP(3)) + .field("ts", DataTypes.TIMESTAMP(3)) + .primaryKey("f0") + .build(); this.conf.setString(FlinkOptions.PRECOMBINE_FIELD, FlinkOptions.NO_PRE_COMBINE); final MockContext sourceContext6 = MockContext.getInstance(this.conf, schema5, "f2");