Skip to content

Commit 247df23

Browse files
committed
[HUDI-3350][HUDI-3351] fix comments
1 parent 073ca5c commit 247df23

11 files changed

Lines changed: 51 additions & 88 deletions

File tree

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ public HoodieData<HoodieRecord<T>> deduplicateRecords(
5959
}).reduceByKey((rec1, rec2) -> {
6060
@SuppressWarnings("unchecked")
6161
HoodieRecord<T> reducedRecord = hoodieMerge.preCombine(rec1, rec2);
62-
return reducedRecord.newInstance();
62+
HoodieKey reducedKey = rec1.getData().equals(reducedRecord.getData()) ? rec1.getKey() : rec2.getKey();
63+
return reducedRecord.newInstance(reducedKey);
6364
}, parallelism).map(Pair::getRight);
6465
}
6566

hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,10 @@ public List<HoodieRecord<T>> deduplicateRecords(
6767
return keyedRecords.values().stream().map(x -> x.stream().map(Pair::getRight).reduce((rec1, rec2) -> {
6868
@SuppressWarnings("unchecked")
6969
HoodieRecord<T> reducedRecord = hoodieMerge.preCombine(rec1,rec2);
70-
7170
// we cannot allow the user to change the key or partitionPath, since that will affect
7271
// everything
7372
// so pick it from one of the records.
74-
return reducedRecord.newInstance();
73+
return reducedRecord.newInstance(rec1.getKey());
7574
}).orElse(null)).filter(Objects::nonNull).collect(Collectors.toList());
7675
}
7776
}

hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField;
2626
import org.apache.hudi.common.util.Option;
2727
import org.apache.hudi.common.util.StringUtils;
28+
import org.apache.hudi.common.util.ValidationUtils;
2829
import org.apache.hudi.common.util.collection.Pair;
2930
import org.apache.hudi.exception.HoodieException;
3031
import org.apache.hudi.exception.HoodieIOException;
@@ -765,9 +766,7 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSch
765766
}
766767
switch (newSchema.getType()) {
767768
case RECORD:
768-
if (!(oldRecord instanceof IndexedRecord)) {
769-
throw new IllegalArgumentException("cannot rewrite record with different type");
770-
}
769+
ValidationUtils.checkArgument(oldRecord instanceof IndexedRecord, "cannot rewrite record with different type");
771770
IndexedRecord indexedRecord = (IndexedRecord) oldRecord;
772771
List<Schema.Field> fields = newSchema.getFields();
773772
Map<Integer, Object> helper = new HashMap<>();
@@ -806,9 +805,7 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSch
806805
}
807806
return newRecord;
808807
case ARRAY:
809-
if (!(oldRecord instanceof Collection)) {
810-
throw new IllegalArgumentException("cannot rewrite record with different type");
811-
}
808+
ValidationUtils.checkArgument(oldRecord instanceof Collection, "cannot rewrite record with different type");
812809
Collection array = (Collection)oldRecord;
813810
List<Object> newArray = new ArrayList();
814811
fieldNames.push("element");
@@ -818,9 +815,7 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSch
818815
fieldNames.pop();
819816
return newArray;
820817
case MAP:
821-
if (!(oldRecord instanceof Map)) {
822-
throw new IllegalArgumentException("cannot rewrite record with different type");
823-
}
818+
ValidationUtils.checkArgument(oldRecord instanceof Map, "cannot rewrite record with different type");
824819
Map<Object, Object> map = (Map<Object, Object>) oldRecord;
825820
Map<Object, Object> newMap = new HashMap<>();
826821
fieldNames.push("value");

hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerge.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer) {
3434
HoodieRecordPayload picked = unsafeCast(((HoodieAvroRecord) newer).getData().preCombine(((HoodieAvroRecord) older).getData()));
3535
if (picked instanceof HoodieMetadataPayload) {
3636
// NOTE: HoodieMetadataPayload return a new payload
37-
return new HoodieAvroRecord(newer.getKey(), ((HoodieMetadataPayload) picked), newer.getOperation());
37+
return new HoodieAvroRecord(newer.getKey(), picked, newer.getOperation());
3838
}
3939
return picked.equals(((HoodieAvroRecord) newer).getData()) ? newer : older;
4040
}
@@ -43,13 +43,9 @@ public HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer) {
4343
public Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException {
4444
Option<IndexedRecord> previousRecordAvroPayload;
4545
if (older instanceof HoodieAvroIndexedRecord) {
46-
previousRecordAvroPayload = Option.of(((HoodieAvroIndexedRecord) older).getData());
46+
previousRecordAvroPayload = Option.ofNullable(((HoodieAvroIndexedRecord) older).getData());
4747
} else {
48-
if (null == props) {
49-
previousRecordAvroPayload = ((HoodieRecordPayload)older.getData()).getInsertValue(schema);
50-
} else {
51-
previousRecordAvroPayload = ((HoodieRecordPayload)older.getData()).getInsertValue(schema, props);
52-
}
48+
previousRecordAvroPayload = ((HoodieRecordPayload)older.getData()).getInsertValue(schema, props);
5349
}
5450
if (!previousRecordAvroPayload.isPresent()) {
5551
return Option.empty();

hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -151,16 +151,15 @@ public HoodieRecord(HoodieKey key, T data, HoodieOperation operation, Comparable
151151
this.newLocation = null;
152152
this.sealed = false;
153153
this.operation = operation;
154-
this.orderingVal = orderingVal;
154+
// default natural order is 0
155+
this.orderingVal = orderingVal == null ? 0 : orderingVal;
155156
}
156157

157158
public HoodieRecord(HoodieRecord<T> record) {
158-
this(record.key, record.data, record.orderingVal);
159+
this(record.key, record.data, record.operation, record.orderingVal);
159160
this.currentLocation = record.currentLocation;
160161
this.newLocation = record.newLocation;
161162
this.sealed = record.sealed;
162-
this.operation = record.operation;
163-
this.orderingVal = record.orderingVal;
164163
}
165164

166165
public HoodieRecord() {
@@ -181,10 +180,6 @@ public HoodieOperation getOperation() {
181180
}
182181

183182
public Comparable getOrderingValue() {
184-
if (null == orderingVal) {
185-
// default natural order is 0
186-
return 0;
187-
}
188183
return orderingVal;
189184
}
190185

hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,6 @@ public static Class<?> getClass(String clazzName) {
6363
return CLAZZ_CACHE.get(clazzName);
6464
}
6565

66-
private static Object getInstance(String clazzName) {
67-
return INSTANCE_CACHE.get(clazzName);
68-
}
69-
7066
public static <T> T loadClass(String fqcn) {
7167
try {
7268
return (T) getClass(fqcn).newInstance();
@@ -92,10 +88,10 @@ public static <T extends HoodieRecordPayload> T loadPayload(String recordPayload
9288
*/
9389
public static HoodieMerge loadHoodieMerge(String mergeClass) {
9490
try {
95-
HoodieMerge hoodieMerge = (HoodieMerge) getInstance(mergeClass);
91+
HoodieMerge hoodieMerge = (HoodieMerge) INSTANCE_CACHE.get(mergeClass);
9692
if (null == hoodieMerge) {
9793
synchronized (HoodieMerge.class) {
98-
hoodieMerge = (HoodieMerge) getInstance(mergeClass);
94+
hoodieMerge = (HoodieMerge) INSTANCE_CACHE.get(mergeClass);
9995
if (null == hoodieMerge) {
10096
hoodieMerge = (HoodieMerge)loadClass(mergeClass, new Object[]{});
10197
INSTANCE_CACHE.put(mergeClass, hoodieMerge);

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import java.util.Iterator;
6666
import java.util.LinkedHashMap;
6767
import java.util.List;
68+
import java.util.Properties;
6869
import java.util.Set;
6970
import java.util.stream.IntStream;
7071

@@ -761,8 +762,7 @@ private Option<IndexedRecord> mergeRowWithLog(
761762
String curKey) throws IOException {
762763
final HoodieAvroRecord<?> record = (HoodieAvroRecord) scanner.getRecords().get(curKey);
763764
GenericRecord historyAvroRecord = (GenericRecord) rowDataToAvroConverter.convert(tableSchema, curRow);
764-
// TODO IndexedRecord to HoodieRecord
765-
Option<HoodieRecord> resultRecord = hoodieMerge.combineAndGetUpdateValue(new HoodieAvroIndexedRecord(historyAvroRecord), record, tableSchema, null);
765+
Option<HoodieRecord> resultRecord = hoodieMerge.combineAndGetUpdateValue(new HoodieAvroIndexedRecord(historyAvroRecord), record, tableSchema, new Properties());
766766
return ((HoodieAvroIndexedRecord) resultRecord.get()).toIndexedRecord();
767767
}
768768
}

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,6 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
305305
private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_ <: HoodieRecordPayload[_]]): Option[IndexedRecord] = {
306306
// NOTE: We have to pass in Avro Schema used to read from Delta Log file since we invoke combining API
307307
// on the record from the Delta Log
308-
// TODO IndexedRecord to HoodieRecord
309308
if (hoodieMerge.combineAndGetUpdateValue(new HoodieAvroIndexedRecord(curAvroRecord), newRecord, logFileReaderAvroSchema, payloadProps).isPresent) {
310309
toScalaOption(hoodieMerge.combineAndGetUpdateValue(new HoodieAvroIndexedRecord(curAvroRecord), newRecord, logFileReaderAvroSchema, payloadProps)
311310
.get.asInstanceOf[HoodieAvroIndexedRecord].toIndexedRecord)

hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieInternalRowUtils.scala

Lines changed: 16 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -23,40 +23,40 @@ import java.util
2323
import java.util.concurrent.ConcurrentHashMap
2424
import org.apache.avro.Schema
2525
import org.apache.hudi.AvroConversionUtils
26+
import org.apache.hudi.avro.HoodieAvroUtils
2627
import org.apache.hudi.avro.HoodieAvroUtils.{createFullName, fromJavaDate, toJavaDate}
2728
import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField
29+
import org.apache.hudi.common.util.ValidationUtils
2830
import org.apache.hudi.exception.HoodieException
2931
import org.apache.spark.sql.catalyst.InternalRow
3032
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, JoinedRow, MutableProjection, Projection}
3133
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData}
3234
import org.apache.spark.sql.hudi.ColumnStatsExpressionUtils.AllowedTransformationExpression.exprUtils.generateMutableProjection
3335
import org.apache.spark.sql.types._
34-
import scala.collection.mutable
3536

37+
import scala.collection.mutable
3638

39+
/**
40+
* Helper class to do common stuff across Spark InternalRow.
41+
* Provides common methods similar to {@link HoodieAvroUtils}.
42+
*/
3743
object HoodieInternalRowUtils {
3844

3945
val projectionMap = new ConcurrentHashMap[(StructType, StructType), MutableProjection]
4046
val schemaMap = new ConcurrentHashMap[Schema, StructType]
4147
val SchemaPosMap = new ConcurrentHashMap[StructType, Map[String, (StructField, Int)]]
4248

43-
/**
44-
* @see org.apache.hudi.avro.HoodieAvroUtils#stitchRecords(org.apache.avro.generic.GenericRecord, org.apache.avro.generic.GenericRecord, org.apache.avro.Schema)
45-
*/
4649
def stitchRecords(left: InternalRow, leftSchema: StructType, right: InternalRow, rightSchema: StructType, stitchedSchema: StructType): InternalRow = {
4750
val mergeSchema = StructType(leftSchema.fields ++ rightSchema.fields)
4851
val row = new JoinedRow(left, right)
49-
val projection = getCacheProjection(mergeSchema, stitchedSchema)
52+
val projection = getCachedProjection(mergeSchema, stitchedSchema)
5053
projection(row)
5154
}
5255

53-
/**
54-
* @see org.apache.hudi.avro.HoodieAvroUtils#rewriteRecord(org.apache.avro.generic.GenericRecord, org.apache.avro.Schema)
55-
*/
5656
def rewriteRecord(oldRecord: InternalRow, oldSchema: StructType, newSchema: StructType): InternalRow = {
5757
val newRow = new GenericInternalRow(Array.fill(newSchema.fields.length)(null).asInstanceOf[Array[Any]])
5858

59-
val oldFieldMap = getCacheSchemaPosMap(oldSchema)
59+
val oldFieldMap = getCachedSchemaPosMap(oldSchema)
6060
for ((field, pos) <- newSchema.fields.zipWithIndex) {
6161
var oldValue: AnyRef = null
6262
if (oldFieldMap.contains(field.name)) {
@@ -87,29 +87,21 @@ object HoodieInternalRowUtils {
8787
newRow
8888
}
8989

90-
/**
91-
* @see org.apache.hudi.avro.HoodieAvroUtils#rewriteRecordWithNewSchema(org.apache.avro.generic.IndexedRecord, org.apache.avro.Schema, java.util.Map)
92-
*/
9390
def rewriteRecordWithNewSchema(oldRecord: InternalRow, oldSchema: StructType, newSchema: StructType, renameCols: util.Map[String, String]): InternalRow = {
9491
rewriteRecordWithNewSchema(oldRecord, oldSchema, newSchema, renameCols, new util.LinkedList[String]).asInstanceOf[InternalRow]
9592
}
9693

97-
/**
98-
* @see org.apache.hudi.avro.HoodieAvroUtils#rewriteRecordWithNewSchema(java.lang.Object, org.apache.avro.Schema, org.apache.avro.Schema, java.util.Map, java.util.Deque)
99-
*/
10094
private def rewriteRecordWithNewSchema(oldRecord: Any, oldSchema: DataType, newSchema: DataType, renameCols: util.Map[String, String], fieldNames: util.Deque[String]): Any = {
10195
if (oldRecord == null) {
10296
null
10397
} else {
10498
newSchema match {
10599
case targetSchema: StructType =>
106-
if (!oldRecord.isInstanceOf[InternalRow]) {
107-
throw new IllegalArgumentException("cannot rewrite record with different type")
108-
}
100+
ValidationUtils.checkArgument(oldRecord.isInstanceOf[InternalRow], "cannot rewrite record with different type")
109101
val oldRow = oldRecord.asInstanceOf[InternalRow]
110102
val helper = mutable.Map[Integer, Any]()
111103

112-
val oldSchemaPos = getCacheSchemaPosMap(oldSchema.asInstanceOf[StructType])
104+
val oldSchemaPos = getCachedSchemaPosMap(oldSchema.asInstanceOf[StructType])
113105
targetSchema.fields.zipWithIndex.foreach { case (field, i) =>
114106
fieldNames.push(field.name)
115107
if (oldSchemaPos.contains(field.name)) {
@@ -140,9 +132,7 @@ object HoodieInternalRowUtils {
140132

141133
newRow
142134
case targetSchema: ArrayType =>
143-
if (!oldRecord.isInstanceOf[ArrayData]) {
144-
throw new IllegalArgumentException("cannot rewrite record with different type")
145-
}
135+
ValidationUtils.checkArgument(oldRecord.isInstanceOf[ArrayData], "cannot rewrite record with different type")
146136
val oldElementType = oldSchema.asInstanceOf[ArrayType].elementType
147137
val oldArray = oldRecord.asInstanceOf[ArrayData]
148138
val newElementType = targetSchema.elementType
@@ -153,9 +143,7 @@ object HoodieInternalRowUtils {
153143

154144
newArray
155145
case targetSchema: MapType =>
156-
if (!oldRecord.isInstanceOf[MapData]) {
157-
throw new IllegalArgumentException("cannot rewrite record with different type")
158-
}
146+
ValidationUtils.checkArgument(oldRecord.isInstanceOf[MapData], "cannot rewrite record with different type")
159147
val oldValueType = oldSchema.asInstanceOf[MapType].valueType
160148
val oldKeyType = oldSchema.asInstanceOf[MapType].keyType
161149
val oldMap = oldRecord.asInstanceOf[MapData]
@@ -174,27 +162,21 @@ object HoodieInternalRowUtils {
174162
}
175163
}
176164

177-
/**
178-
* @see org.apache.hudi.avro.HoodieAvroUtils#rewriteRecordWithMetadata(org.apache.avro.generic.GenericRecord, org.apache.avro.Schema, java.lang.String)
179-
*/
180165
def rewriteRecordWithMetadata(record: InternalRow, oldSchema: StructType, newSchema: StructType, fileName: String): InternalRow = {
181166
val newRecord = rewriteRecord(record, oldSchema, newSchema)
182167
newRecord.update(HoodieMetadataField.FILENAME_METADATA_FIELD.ordinal, fileName)
183168

184169
newRecord
185170
}
186171

187-
/**
188-
* @see org.apache.hudi.avro.HoodieAvroUtils#rewriteEvolutionRecordWithMetadata(org.apache.avro.generic.GenericRecord, org.apache.avro.Schema, java.lang.String)
189-
*/
190172
def rewriteEvolutionRecordWithMetadata(record: InternalRow, oldSchema: StructType, newSchema: StructType, fileName: String): InternalRow = {
191173
val newRecord = rewriteRecordWithNewSchema(record, oldSchema, newSchema, new util.HashMap[String, String]())
192174
newRecord.update(HoodieMetadataField.FILENAME_METADATA_FIELD.ordinal, fileName)
193175

194176
newRecord
195177
}
196178

197-
def getCacheSchema(schema: Schema): StructType = {
179+
def getCachedSchema(schema: Schema): StructType = {
198180
if (!schemaMap.contains(schema)) {
199181
schemaMap.synchronized {
200182
if (!schemaMap.contains(schema)) {
@@ -206,7 +188,7 @@ object HoodieInternalRowUtils {
206188
schemaMap.get(schema)
207189
}
208190

209-
private def getCacheProjection(from: StructType, to: StructType): Projection = {
191+
private def getCachedProjection(from: StructType, to: StructType): Projection = {
210192
val schemaPair = (from, to)
211193
if (!projectionMap.contains(schemaPair)) {
212194
projectionMap.synchronized {
@@ -219,7 +201,7 @@ object HoodieInternalRowUtils {
219201
projectionMap.get(schemaPair)
220202
}
221203

222-
def getCacheSchemaPosMap(schema: StructType): Map[String, (StructField, Int)] = {
204+
def getCachedSchemaPosMap(schema: StructType): Map[String, (StructField, Int)] = {
223205
if (!SchemaPosMap.contains(schema)) {
224206
SchemaPosMap.synchronized {
225207
if (!SchemaPosMap.contains(schema)) {

0 commit comments

Comments
 (0)