diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java index 846afec7c1db3..c69d8746d1913 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java @@ -82,9 +82,9 @@ public I combineOnCondition( */ public I deduplicateRecords( I records, HoodieTable table, int parallelism) { - return deduplicateRecords(records, table.getIndex(), parallelism); + return deduplicateRecords(records, table.getIndex(), parallelism, table.getConfig().getSchema()); } public abstract I deduplicateRecords( - I records, HoodieIndex index, int parallelism); + I records, HoodieIndex index, int parallelism, String schema); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java index b56d39b8e3679..1406213c4477b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java @@ -19,6 +19,7 @@ package org.apache.hudi.table.action.commit; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.config.SerializableSchema; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieAvroRecord; @@ -29,6 +30,8 @@ import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieTable; +import java.util.Properties; + public class HoodieWriteHelper extends BaseWriteHelper>, HoodieData, HoodieData, R> { @@ -51,8 +54,9 @@ protected HoodieData> tag(HoodieData> dedupedRec @Override public HoodieData> deduplicateRecords( - HoodieData> records, HoodieIndex index, int parallelism) { + HoodieData> records, HoodieIndex index, int parallelism, String schemaStr) { boolean isIndexingGlobal = index.isGlobal(); + final SerializableSchema schema = new SerializableSchema(schemaStr); return records.mapToPair(record -> { HoodieKey hoodieKey = record.getKey(); // If index used is global, then records are expected to differ in their partitionPath @@ -60,7 +64,7 @@ public HoodieData> deduplicateRecords( return Pair.of(key, record); }).reduceByKey((rec1, rec2) -> { @SuppressWarnings("unchecked") - T reducedData = (T) rec2.getData().preCombine(rec1.getData()); + T reducedData = (T) rec2.getData().preCombine(rec1.getData(), schema.get(), new Properties()); HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey(); return new HoodieAvroRecord<>(reducedKey, reducedData); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java index 57e5aa9ad50c0..b7b6e60b1adbe 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java @@ -32,11 +32,14 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.avro.Schema; + import java.time.Duration; import java.time.Instant; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Properties; import java.util.stream.Collectors; /** @@ -88,16 +91,18 @@ protected List> tag(List> dedupedRecords, Hoodie @Override public List> deduplicateRecords( - List> records, HoodieIndex index, int parallelism) { + List> records, HoodieIndex index, int parallelism, String schemaStr) { // If index used is global, then records are expected to differ in their partitionPath Map>> keyedRecords = records.stream() .collect(Collectors.groupingBy(record -> record.getKey().getRecordKey())); + // caution that the avro schema is not serializable + final Schema schema = new Schema.Parser().parse(schemaStr); return keyedRecords.values().stream().map(x -> x.stream().reduce((rec1, rec2) -> { final T data1 = rec1.getData(); final T data2 = rec2.getData(); - @SuppressWarnings("unchecked") final T reducedData = (T) data2.preCombine(data1); + @SuppressWarnings("unchecked") final T reducedData = (T) data2.preCombine(data1, schema, new Properties()); // we cannot allow the user to change the key or partitionPath, since that will affect // everything // so pick it from one of the records. diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java index 4504a9bdccddf..b1101b8fd1d5b 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java @@ -29,9 +29,12 @@ import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieTable; +import org.apache.avro.Schema; + import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Properties; import java.util.stream.Collectors; public class JavaWriteHelper extends BaseWriteHelper>, @@ -55,7 +58,7 @@ protected List> tag(List> dedupedRecords, Hoodie @Override public List> deduplicateRecords( - List> records, HoodieIndex index, int parallelism) { + List> records, HoodieIndex index, int parallelism, String schemaStr) { boolean isIndexingGlobal = index.isGlobal(); Map>>> keyedRecords = records.stream().map(record -> { HoodieKey hoodieKey = record.getKey(); @@ -64,9 +67,10 @@ public List> deduplicateRecords( return Pair.of(key, record); }).collect(Collectors.groupingBy(Pair::getLeft)); + final Schema schema = new Schema.Parser().parse(schemaStr); return keyedRecords.values().stream().map(x -> x.stream().map(Pair::getRight).reduce((rec1, rec2) -> { @SuppressWarnings("unchecked") - T reducedData = (T) rec1.getData().preCombine(rec2.getData()); + T reducedData = (T) rec1.getData().preCombine(rec2.getData(), schema, new Properties()); // we cannot allow the user to change the key or partitionPath, since that will affect // everything // so pick it from one of the records. diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index 3f9bda49e8ffe..de89affbfc693 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -458,11 +458,15 @@ private void testDeduplication( HoodieData> records = HoodieJavaRDD.of( jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1)); + HoodieWriteConfig.Builder configBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY) + .combineInput(true, true); + addConfigsForPopulateMetaFields(configBuilder, populateMetaFields); + HoodieWriteConfig writeConfig = configBuilder.build(); // Global dedup should be done based on recordKey only HoodieIndex index = mock(HoodieIndex.class); when(index.isGlobal()).thenReturn(true); - List> dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1).collectAsList(); + List> dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1, writeConfig.getSchema()).collectAsList(); assertEquals(1, dedupedRecs.size()); assertEquals(dedupedRecs.get(0).getPartitionPath(), recordThree.getPartitionPath()); assertNodupesWithinPartition(dedupedRecs); @@ -470,17 +474,14 @@ private void testDeduplication( // non-Global dedup should be done based on both recordKey and partitionPath index = mock(HoodieIndex.class); when(index.isGlobal()).thenReturn(false); - dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1).collectAsList(); + dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1, writeConfig.getSchema()).collectAsList(); assertEquals(2, dedupedRecs.size()); assertNodupesWithinPartition(dedupedRecs); // Perform write-action and check JavaRDD recordList = jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1); - HoodieWriteConfig.Builder configBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY) - .combineInput(true, true); - addConfigsForPopulateMetaFields(configBuilder, populateMetaFields); - try (SparkRDDWriteClient client = getHoodieWriteClient(configBuilder.build());) { + try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig)) { client.startCommitWithTime(newCommitTime); List statuses = writeFn.apply(client, recordList, newCommitTime).collect(); assertNoWriteErrors(statuses); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java index f959a8f0d9526..6fa4facb56881 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java @@ -22,10 +22,12 @@ import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.model.DefaultHoodieRecordPayload; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.PartialUpdateAvroPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; @@ -45,7 +47,6 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -75,6 +76,11 @@ private static Stream writeLogTest() { return Stream.of(data).map(Arguments::of); } + private static Stream writePayloadTest() { + // Payload class + return Stream.of(new Object[] {DefaultHoodieRecordPayload.class.getName(), PartialUpdateAvroPayload.class.getName()}).map(Arguments::of); + } + private HoodieTestDataGenerator dataGen; private SparkRDDWriteClient client; private HoodieTableMetaClient metaClient; @@ -84,14 +90,16 @@ public void setup() { dataGen = new HoodieTestDataGenerator(); } - @Test - public void testWriteDuringCompaction() throws IOException { + @ParameterizedTest + @MethodSource("writePayloadTest") + public void testWriteDuringCompaction(String payloadClass) throws IOException { HoodieWriteConfig config = HoodieWriteConfig.newBuilder() .forTable("test-trip-table") .withPath(basePath()) .withSchema(TRIP_EXAMPLE_SCHEMA) .withParallelism(2, 2) .withAutoCommit(false) + .withWritePayLoad(payloadClass) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withMaxNumDeltaCommitsBeforeCompaction(1).build()) .withStorageConfig(HoodieStorageConfig.newBuilder() diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableSchema.java b/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableSchema.java index 4f6de8ba5f3c3..66dc9df1f92f2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableSchema.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableSchema.java @@ -35,10 +35,14 @@ public class SerializableSchema implements Serializable { public SerializableSchema() { } + public SerializableSchema(String schemaStr) { + this.schema = new Schema.Parser().parse(schemaStr); + } + public SerializableSchema(Schema schema) { this.schema = newCopy(schema); } - + public SerializableSchema(SerializableSchema serializableSchema) { this(serializableSchema.schema); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java index 6752607d2f48c..d4e61da9bbf63 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java @@ -58,6 +58,20 @@ default T preCombine(T oldValue, Properties properties) { return preCombine(oldValue); } + /** + * When more than one HoodieRecord have the same HoodieKey in the incoming batch, this function combines them before attempting to insert/upsert by taking in a schema. + * Implementation can leverage the schema to decide their business logic to do preCombine. + * + * @param oldValue instance of the old {@link HoodieRecordPayload} to be combined with. + * @param schema Payload related schema. For example use schema to overwrite old instance for specified fields that doesn't equal to default value. + * @param properties Payload related properties. For example pass the ordering field(s) name to extract from value in storage. + * @return the combined value + */ + @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) + default T preCombine(T oldValue, Schema schema, Properties properties) { + return preCombine(oldValue, properties); + } + /** * This methods is deprecated. Please refer to {@link #combineAndGetUpdateValue(IndexedRecord, Schema, Properties)} for java docs. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java index 9ce241bc7822f..5ee2b58373c51 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java @@ -58,22 +58,43 @@ public Option combineAndGetUpdateValue(IndexedRecord currentValue GenericRecord insertRecord = (GenericRecord) recordOption.get(); GenericRecord currentRecord = (GenericRecord) currentValue; - if (isDeleteRecord(insertRecord)) { + return mergeRecords(schema, insertRecord, currentRecord); + } + + /** + * Merges the given records into one. + * The fields in {@code baseRecord} has higher priority: + * it is set up into the merged record if it is not null or equals to the default. + * + * @param schema The record schema + * @param baseRecord The base record to merge with + * @param mergedRecord The record to be merged + * + * @return the merged record option + */ + protected Option mergeRecords(Schema schema, GenericRecord baseRecord, GenericRecord mergedRecord) { + if (isDeleteRecord(baseRecord)) { return Option.empty(); } else { final GenericRecordBuilder builder = new GenericRecordBuilder(schema); List fields = schema.getFields(); - fields.forEach(field -> { - Object value = insertRecord.get(field.name()); - value = field.schema().getType().equals(Schema.Type.STRING) && value != null ? value.toString() : value; - Object defaultValue = field.defaultVal(); - if (!overwriteField(value, defaultValue)) { - builder.set(field, value); - } else { - builder.set(field, currentRecord.get(field.name())); - } - }); + fields.forEach(field -> setField(baseRecord, mergedRecord, builder, field)); return Option.of(builder.build()); } } + + protected void setField( + GenericRecord baseRecord, + GenericRecord mergedRecord, + GenericRecordBuilder builder, + Schema.Field field) { + Object value = baseRecord.get(field.name()); + value = field.schema().getType().equals(Schema.Type.STRING) && value != null ? value.toString() : value; + Object defaultValue = field.defaultVal(); + if (!overwriteField(value, defaultValue)) { + builder.set(field, value); + } else { + builder.set(field, mergedRecord.get(field.name())); + } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java new file mode 100644 index 0000000000000..daa40acc76404 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.avro.generic.IndexedRecord; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +/** + * Payload clazz that is used for partial update Hudi Table. + * + *

Simplified partial update Logic: + *

+ *  1. #preCombine
+ *  For records with the same record key in one batch
+ *  or in the delta logs that belongs to same File Group,
+ *  Checks whether one record's ordering value is larger than the other record.
+ *  If yes, overwrites the existing one for specified fields that doesn't equal to null.
+ *
+ *  2. #combineAndGetUpdateValue
+ *  For every incoming record with existing record in storage (same record key)
+ *  Checks whether incoming record's ordering value is larger than the existing record.
+ *  If yes, overwrites the existing one for specified fields that doesn't equal to null.
+ *  else overwrites the incoming one with the existing record for specified fields that doesn't equal to null
+ *  and returns a merged record.
+ *
+ *  Illustration with simple data.
+ *  let's say the order field is 'ts' and schema is :
+ *  {
+ *    [
+ *      {"name":"id","type":"string"},
+ *      {"name":"ts","type":"long"},
+ *      {"name":"name","type":"string"},
+ *      {"name":"price","type":"string"}
+ *    ]
+ *  }
+ *
+ *  case 1
+ *  Current data:
+ *      id      ts      name    price
+ *      1       1       name_1  price_1
+ *  Insert data:
+ *      id      ts      name    price
+ *      1       2       null    price_2
+ *
+ *  Result data after #preCombine or #combineAndGetUpdateValue:
+ *      id      ts      name    price
+ *      1       2       name_1  price_2
+ *
+ *  case 2
+ *  Current data:
+ *      id      ts      name    price
+ *      1       2       name_1  null
+ *  Insert data:
+ *      id      ts      name    price
+ *      1       1       null    price_1
+ *
+ *  Result data after preCombine or combineAndGetUpdateValue:
+ *      id      ts      name    price
+ *      1       2       name_1  price_1
+ *
+ */ +public class PartialUpdateAvroPayload extends OverwriteNonDefaultsWithLatestAvroPayload { + + public PartialUpdateAvroPayload(GenericRecord record, Comparable orderingVal) { + super(record, orderingVal); + } + + public PartialUpdateAvroPayload(Option record) { + super(record); // natural order + } + + @Override + public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload oldValue, Schema schema, Properties properties) { + if (oldValue.recordBytes.length == 0) { + // use natural order for delete record + return this; + } + // pick the payload with greater ordering value as insert record + final boolean shouldPickOldRecord = oldValue.orderingVal.compareTo(orderingVal) > 0 ? true : false; + try { + GenericRecord oldRecord = HoodieAvroUtils.bytesToAvro(oldValue.recordBytes, schema); + Option mergedRecord = mergeOldRecord(oldRecord, schema, shouldPickOldRecord); + if (mergedRecord.isPresent()) { + return new PartialUpdateAvroPayload((GenericRecord) mergedRecord.get(), + shouldPickOldRecord ? oldValue.orderingVal : this.orderingVal); + } + } catch (Exception ex) { + return this; + } + return this; + } + + @Override + public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException { + return this.mergeOldRecord(currentValue, schema, false); + } + + @Override + public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties prop) throws IOException { + return mergeOldRecord(currentValue, schema, isRecordNewer(orderingVal, currentValue, prop)); + } + + /** + * Return true if value equals defaultValue otherwise false. + */ + public Boolean overwriteField(Object value, Object defaultValue) { + return value == null; + } + + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + + private Option mergeOldRecord(IndexedRecord oldRecord, + Schema schema, + boolean isOldRecordNewer) throws IOException { + Option recordOption = getInsertValue(schema); + + if (!recordOption.isPresent()) { + // use natural order for delete record + return Option.empty(); + } + + if (isOldRecordNewer && schema.getField(HoodieRecord.COMMIT_TIME_METADATA_FIELD) != null) { + // handling disorder, should use the metadata fields of the updating record + return mergeDisorderRecordsWithMetadata(schema, (GenericRecord) oldRecord, (GenericRecord) recordOption.get()); + } else if (isOldRecordNewer) { + return mergeRecords(schema, (GenericRecord) oldRecord, (GenericRecord) recordOption.get()); + } else { + return mergeRecords(schema, (GenericRecord) recordOption.get(), (GenericRecord) oldRecord); + } + } + + /** + * Merges the given disorder records with metadata. + * + * @param schema The record schema + * @param oldRecord The current record from file + * @param updatingRecord The incoming record + * + * @return the merged record option + */ + protected Option mergeDisorderRecordsWithMetadata( + Schema schema, + GenericRecord oldRecord, + GenericRecord updatingRecord) { + if (isDeleteRecord(oldRecord)) { + return Option.empty(); + } else { + final GenericRecordBuilder builder = new GenericRecordBuilder(schema); + List fields = schema.getFields(); + fields.forEach(field -> { + final GenericRecord baseRecord; + final GenericRecord mergedRecord; + if (HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.containsKey(field.name())) { + // this is a metadata field + baseRecord = updatingRecord; + mergedRecord = oldRecord; + } else { + baseRecord = oldRecord; + mergedRecord = updatingRecord; + } + setField(baseRecord, mergedRecord, builder, field); + }); + return Option.of(builder.build()); + } + } + + /** + * Returns whether the given record is newer than the record of this payload. + * + * @param orderingVal + * @param record The record + * @param prop The payload properties + * + * @return true if the given record is newer + */ + private static boolean isRecordNewer(Comparable orderingVal, IndexedRecord record, Properties prop) { + String orderingField = prop.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY); + if (!StringUtils.isNullOrEmpty(orderingField)) { + boolean consistentLogicalTimestampEnabled = Boolean.parseBoolean(prop.getProperty( + KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), + KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue())); + + Comparable oldOrderingVal = + (Comparable) HoodieAvroUtils.getNestedFieldVal( + (GenericRecord) record, + orderingField, + true, + consistentLogicalTimestampEnabled); + + // pick the payload with greater ordering value as insert record + return oldOrderingVal != null + && ReflectionUtils.isSameClass(oldOrderingVal, orderingVal) + && oldOrderingVal.compareTo(orderingVal) > 0; + } + return false; + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index 4566b1f5cd6b1..5bfb395dbc54c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -22,6 +22,7 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodiePayloadProps; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableConfig; @@ -61,6 +62,7 @@ import java.util.Deque; import java.util.HashSet; import java.util.List; +import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -95,6 +97,7 @@ public abstract class AbstractHoodieLogRecordReader { private final String payloadClassFQN; // preCombine field private final String preCombineField; + private final Properties payloadProps = new Properties(); // simple key gen fields private Option> simpleKeyGenFields = Option.empty(); // Log File Paths @@ -159,6 +162,9 @@ protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List keys; private final boolean fullKey; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java index e3d8554d00fd8..679a0e6f7e312 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java @@ -150,7 +150,7 @@ protected void processNextRecord(HoodieRecord hoo HoodieRecord oldRecord = records.get(key); HoodieRecordPayload oldValue = oldRecord.getData(); - HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(oldValue); + HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(oldValue, readerSchema, this.getPayloadProps()); // If combinedValue is oldValue, no need rePut oldRecord if (combinedValue != oldValue) { HoodieOperation operation = hoodieRecord.getOperation(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java new file mode 100644 index 0000000000000..217240666094d --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; + +import org.apache.hudi.common.util.Option; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Properties; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + + +/** + * Unit tests {@link TestPartialUpdateAvroPayload}. + */ +public class TestPartialUpdateAvroPayload { + private Schema schema; + + String jsonSchema = "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"partialRecord\", \"namespace\":\"org.apache.hudi\",\n" + + " \"fields\": [\n" + + " {\"name\": \"_hoodie_commit_time\", \"type\": [\"null\", \"string\"]},\n" + + " {\"name\": \"_hoodie_commit_seqno\", \"type\": [\"null\", \"string\"]},\n" + + " {\"name\": \"_hoodie_record_key\", \"type\": [\"null\", \"string\"]},\n" + + " {\"name\": \"_hoodie_partition_path\", \"type\": [\"null\", \"string\"]},\n" + + " {\"name\": \"_hoodie_file_name\", \"type\": [\"null\", \"string\"]},\n" + + " {\"name\": \"id\", \"type\": [\"null\", \"string\"]},\n" + + " {\"name\": \"partition\", \"type\": [\"null\", \"string\"]},\n" + + " {\"name\": \"ts\", \"type\": [\"null\", \"long\"]},\n" + + " {\"name\": \"_hoodie_is_deleted\", \"type\": [\"null\", \"boolean\"], \"default\":false},\n" + + " {\"name\": \"city\", \"type\": [\"null\", \"string\"]},\n" + + " {\"name\": \"child\", \"type\": [\"null\", {\"type\": \"array\", \"items\": \"string\"}]}\n" + + " ]\n" + + "}"; + + @BeforeEach + public void setUp() throws Exception { + schema = new Schema.Parser().parse(jsonSchema); + } + + @Test + public void testActiveRecords() throws IOException { + Properties properties = new Properties(); + properties.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, "ts"); + + GenericRecord record1 = new GenericData.Record(schema); + record1.put("id", "1"); + record1.put("partition", "partition1"); + record1.put("ts", 0L); + record1.put("_hoodie_is_deleted", false); + record1.put("city", "NY0"); + record1.put("child", Arrays.asList("A")); + + GenericRecord record2 = new GenericData.Record(schema); + record2.put("id", "1"); + record2.put("partition", "partition1"); + record2.put("ts", 1L); + record2.put("_hoodie_is_deleted", false); + record2.put("city", null); + record2.put("child", Arrays.asList("B")); + + GenericRecord record3 = new GenericData.Record(schema); + record3.put("id", "1"); + record3.put("partition", "partition1"); + record3.put("ts", 2L); + record3.put("_hoodie_is_deleted", false); + record3.put("city", "NY0"); + record3.put("child", Arrays.asList("A")); + + GenericRecord record4 = new GenericData.Record(schema); + record4.put("id", "1"); + record4.put("partition", "partition1"); + record4.put("ts", 1L); + record4.put("_hoodie_is_deleted", false); + record4.put("city", "NY0"); + record4.put("child", Arrays.asList("B")); + + // Test preCombine: since payload2's ordering val is larger, so payload2 will overwrite payload1 with its non-default field's value + PartialUpdateAvroPayload payload1 = new PartialUpdateAvroPayload(record1, 0L); + PartialUpdateAvroPayload payload2 = new PartialUpdateAvroPayload(record2, 1L); + assertArrayEquals(payload1.preCombine(payload2, schema, properties).recordBytes, new PartialUpdateAvroPayload(record4, 1L).recordBytes); + assertArrayEquals(payload2.preCombine(payload1, schema, properties).recordBytes, new PartialUpdateAvroPayload(record4, 1L).recordBytes); + + assertEquals(record1, payload1.getInsertValue(schema).get()); + assertEquals(record2, payload2.getInsertValue(schema).get()); + + // Test combineAndGetUpdateValue: let payload1's ordering val larger than payload2, then payload1 will overwrite payload2 with its non-default field's value + record1.put("ts", 2L); + payload1 = new PartialUpdateAvroPayload(record1, 2L); + assertEquals(payload1.combineAndGetUpdateValue(record2, schema, properties).get(), record3); + // Test combineAndGetUpdateValue: let payload1's ordering val equal to payload2, then payload2 will be considered to newer record + record1.put("ts", 1L); + assertEquals(payload2.combineAndGetUpdateValue(record1, schema, properties).get(), record4); + + // Test preCombine again: let payload1's ordering val larger than payload2 + record1.put("ts", 2L); + payload1 = new PartialUpdateAvroPayload(record1, 2L); + payload2 = new PartialUpdateAvroPayload(record2, 1L); + assertArrayEquals(payload1.preCombine(payload2, schema, properties).recordBytes, new PartialUpdateAvroPayload(record3, 2L).recordBytes); + assertArrayEquals(payload2.preCombine(payload1, schema, properties).recordBytes, new PartialUpdateAvroPayload(record3, 2L).recordBytes); + } + + @Test + public void testDeletedRecord() throws IOException { + GenericRecord record1 = new GenericData.Record(schema); + record1.put("id", "1"); + record1.put("partition", "partition0"); + record1.put("ts", 0L); + record1.put("_hoodie_is_deleted", false); + record1.put("city", "NY0"); + record1.put("child", Collections.emptyList()); + + GenericRecord delRecord1 = new GenericData.Record(schema); + delRecord1.put("id", "2"); + delRecord1.put("partition", "partition1"); + delRecord1.put("ts", 1L); + delRecord1.put("_hoodie_is_deleted", true); + delRecord1.put("city", "NY0"); + delRecord1.put("child", Collections.emptyList()); + + GenericRecord record2 = new GenericData.Record(schema); + record2.put("id", "1"); + record2.put("partition", "partition0"); + record2.put("ts", 0L); + record2.put("_hoodie_is_deleted", true); + record2.put("city", "NY0"); + record2.put("child", Collections.emptyList()); + + PartialUpdateAvroPayload payload1 = new PartialUpdateAvroPayload(record1, 0L); + PartialUpdateAvroPayload payload2 = new PartialUpdateAvroPayload(delRecord1, 1L); + + assertArrayEquals(payload1.preCombine(payload2).recordBytes, payload2.recordBytes); + assertArrayEquals(payload2.preCombine(payload1).recordBytes, payload2.recordBytes); + + assertEquals(record1, payload1.getInsertValue(schema).get()); + assertFalse(payload2.getInsertValue(schema).isPresent()); + + Properties properties = new Properties(); + properties.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, "ts"); + assertEquals(payload1.combineAndGetUpdateValue(delRecord1, schema, properties), Option.empty()); + assertFalse(payload2.combineAndGetUpdateValue(record1, schema, properties).isPresent()); + } + + @Test + public void testUseLatestRecordMetaValue() throws IOException { + Properties properties = new Properties(); + properties.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, "ts"); + + GenericRecord record1 = new GenericData.Record(schema); + record1.put("_hoodie_commit_time", "20220915000000000"); + record1.put("_hoodie_commit_seqno", "20220915000000000_1_000"); + record1.put("id", "1"); + record1.put("partition", "partition1"); + record1.put("ts", 0L); + record1.put("_hoodie_is_deleted", false); + record1.put("city", "NY0"); + record1.put("child", Arrays.asList("A")); + + GenericRecord record2 = new GenericData.Record(schema); + record1.put("_hoodie_commit_time", "20220915000000001"); + record1.put("_hoodie_commit_seqno", "20220915000000001_2_000"); + record2.put("id", "1"); + record2.put("partition", "partition1"); + record2.put("ts", 1L); + record2.put("_hoodie_is_deleted", false); + record2.put("city", null); + record2.put("child", Arrays.asList("B")); + + PartialUpdateAvroPayload payload1 = new PartialUpdateAvroPayload(record1, 0L); + PartialUpdateAvroPayload payload2 = new PartialUpdateAvroPayload(record2, 1L); + + // let payload1 as the latest one, then should use payload1's meta field's value as the result even its ordering val is smaller + GenericRecord mergedRecord1 = (GenericRecord) payload1.preCombine(payload2, schema, properties).getInsertValue(schema, properties).get(); + assertEquals(mergedRecord1.get("_hoodie_commit_time").toString(), record1.get("_hoodie_commit_time").toString()); + assertEquals(mergedRecord1.get("_hoodie_commit_seqno").toString(), record1.get("_hoodie_commit_seqno").toString()); + + // let payload2 as the latest one, then should use payload2's meta field's value as the result + GenericRecord mergedRecord2 = (GenericRecord) payload2.preCombine(payload1, schema, properties).getInsertValue(schema, properties).get(); + assertEquals(mergedRecord2.get("_hoodie_commit_time").toString(), "20220915000000001"); + assertEquals(mergedRecord2.get("_hoodie_commit_seqno").toString(), "20220915000000001_2_000"); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index 2748af5290646..64178a82fb91b 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -421,7 +421,7 @@ private boolean flushBucket(DataBucket bucket) { List records = bucket.writeBuffer(); ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records"); if (config.getBoolean(FlinkOptions.PRE_COMBINE)) { - records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1); + records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1, this.writeClient.getConfig().getSchema()); } bucket.preWrite(records); final List writeStatus = new ArrayList<>(writeFunction.apply(records, instant)); @@ -456,7 +456,8 @@ private void flushRemaining(boolean endInput) { List records = bucket.writeBuffer(); if (records.size() > 0) { if (config.getBoolean(FlinkOptions.PRE_COMBINE)) { - records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1); + records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1, + this.writeClient.getConfig().getSchema()); } bucket.preWrite(records); writeStatus.addAll(writeFunction.apply(records, currentInstant)); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index d94ff1477aa66..035ad9b1297a8 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -34,6 +34,7 @@ import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; +import org.apache.hudi.common.model.PartialUpdateAvroPayload; import org.apache.hudi.common.model.WriteConcurrencyMode; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableConfig; @@ -1442,6 +1443,25 @@ public void testPayloadClassUpdate() throws Exception { assertEquals(new HoodieConfig(props).getString(HoodieTableConfig.PAYLOAD_CLASS_NAME), DummyAvroPayload.class.getName()); } + @Test + public void testPartialPayloadClass() throws Exception { + String dataSetBasePath = dfsBasePath + "/test_dataset_mor"; + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT, + Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, false, + true, true, PartialUpdateAvroPayload.class.getName(), "MERGE_ON_READ"); + new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync(); + TestHelpers.assertRecordCount(1000, dataSetBasePath, sqlContext); + + //now assert that hoodie.properties file now has updated payload class name + Properties props = new Properties(); + String metaPath = dataSetBasePath + "/.hoodie/hoodie.properties"; + FileSystem fs = FSUtils.getFs(cfg.targetBasePath, jsc.hadoopConfiguration()); + try (FSDataInputStream inputStream = fs.open(new Path(metaPath))) { + props.load(inputStream); + } + assertEquals(new HoodieConfig(props).getString(HoodieTableConfig.PAYLOAD_CLASS_NAME), PartialUpdateAvroPayload.class.getName()); + } + @Test public void testPayloadClassUpdateWithCOWTable() throws Exception { String dataSetBasePath = dfsBasePath + "/test_dataset_cow";