Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 28 additions & 32 deletions hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@
import java.util.List;
import java.util.Map;
import java.util.Deque;
import java.util.LinkedList;
import java.util.ArrayDeque;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.StreamSupport;
import java.util.TimeZone;
import java.util.stream.Collectors;

Expand All @@ -94,6 +97,10 @@ public class HoodieAvroUtils {
//Export for test
public static final Conversions.DecimalConversion DECIMAL_CONVERSION = new Conversions.DecimalConversion();

// Name of ArrayType/MapType for avro Schema
private static final String ARRAY_TYPE_ELEMENT_NAME = "element";
private static final String MAP_TYPE_VALUE_NAME = "value";

// As per https://avro.apache.org/docs/current/spec.html#names
private static final String INVALID_AVRO_CHARS_IN_NAMES = "[^A-Za-z0-9_]";
private static final String INVALID_AVRO_FIRST_CHAR_IN_NAMES = "[^A-Za-z_]";
Expand Down Expand Up @@ -410,7 +417,7 @@ public static GenericRecord rewriteRecordWithMetadata(GenericRecord genericRecor

// TODO Unify the logical of rewriteRecordWithMetadata and rewriteEvolutionRecordWithMetadata, and delete this function.
public static GenericRecord rewriteEvolutionRecordWithMetadata(GenericRecord genericRecord, Schema newSchema, String fileName) {
GenericRecord newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(genericRecord, newSchema, new HashMap<>());
GenericRecord newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(genericRecord, newSchema, Collections.emptyMap());
// do not preserve FILENAME_METADATA_FIELD
newRecord.put(HoodieRecord.FILENAME_METADATA_FIELD_POS, fileName);
return newRecord;
Expand Down Expand Up @@ -745,7 +752,7 @@ public static Object getRecordColumnValues(HoodieRecord<? extends HoodieRecordPa
* @return newRecord for new Schema
*/
public static GenericRecord rewriteRecordWithNewSchema(IndexedRecord oldRecord, Schema newSchema, Map<String, String> renameCols) {
Object newRecord = rewriteRecordWithNewSchema(oldRecord, oldRecord.getSchema(), newSchema, renameCols, new LinkedList<>());
Object newRecord = rewriteRecordWithNewSchema(oldRecord, oldRecord.getSchema(), newSchema, renameCols, new ArrayDeque<>());
return (GenericData.Record) newRecord;
}

Expand Down Expand Up @@ -773,39 +780,32 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSch
}
IndexedRecord indexedRecord = (IndexedRecord) oldRecord;
List<Schema.Field> fields = newSchema.getFields();
Map<Integer, Object> helper = new HashMap<>();

GenericData.Record newRecord = new GenericData.Record(newSchema);
for (int i = 0; i < fields.size(); i++) {
Schema.Field field = fields.get(i);
String fieldName = field.name();
fieldNames.push(fieldName);
if (oldSchema.getField(field.name()) != null) {
Schema.Field oldField = oldSchema.getField(field.name());
helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
newRecord.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
} else {
String fieldFullName = createFullName(fieldNames);
String[] colNamePartsFromOldSchema = renameCols.getOrDefault(fieldFullName, "").split("\\.");
String lastColNameFromOldSchema = colNamePartsFromOldSchema[colNamePartsFromOldSchema.length - 1];
String fieldNameFromOldSchema = renameCols.getOrDefault(fieldFullName, "");
// deal with rename
if (oldSchema.getField(field.name()) == null && oldSchema.getField(lastColNameFromOldSchema) != null) {
if (oldSchema.getField(field.name()) == null && oldSchema.getField(fieldNameFromOldSchema) != null) {
// find rename
Schema.Field oldField = oldSchema.getField(lastColNameFromOldSchema);
helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
}
}
fieldNames.pop();
}
GenericData.Record newRecord = new GenericData.Record(newSchema);
for (int i = 0; i < fields.size(); i++) {
if (helper.containsKey(i)) {
newRecord.put(i, helper.get(i));
} else {
if (fields.get(i).defaultVal() instanceof JsonProperties.Null) {
newRecord.put(i, null);
Schema.Field oldField = oldSchema.getField(fieldNameFromOldSchema);
newRecord.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
} else {
newRecord.put(i, fields.get(i).defaultVal());
// deal with default value
if (fields.get(i).defaultVal() instanceof JsonProperties.Null) {
newRecord.put(i, null);
} else {
newRecord.put(i, fields.get(i).defaultVal());
}
}
}
fieldNames.pop();
}
return newRecord;
case ARRAY:
Expand All @@ -814,7 +814,7 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSch
}
Collection array = (Collection)oldRecord;
List<Object> newArray = new ArrayList();
fieldNames.push("element");
fieldNames.push(ARRAY_TYPE_ELEMENT_NAME);
for (Object element : array) {
newArray.add(rewriteRecordWithNewSchema(element, oldSchema.getElementType(), newSchema.getElementType(), renameCols, fieldNames));
}
Expand All @@ -826,7 +826,7 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSch
}
Map<Object, Object> map = (Map<Object, Object>) oldRecord;
Map<Object, Object> newMap = new HashMap<>();
fieldNames.push("value");
fieldNames.push(MAP_TYPE_VALUE_NAME);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the variable only used by this method, there is no need to make static variables, the JVM would cache the string constants.

for (Map.Entry<Object, Object> entry : map.entrySet()) {
newMap.put(entry.getKey(), rewriteRecordWithNewSchema(entry.getValue(), oldSchema.getValueType(), newSchema.getValueType(), renameCols, fieldNames));
}
Expand All @@ -840,13 +840,9 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSch
}

private static String createFullName(Deque<String> fieldNames) {
String result = "";
if (!fieldNames.isEmpty()) {
List<String> parentNames = new ArrayList<>();
fieldNames.descendingIterator().forEachRemaining(parentNames::add);
result = parentNames.stream().collect(Collectors.joining("."));
}
return result;
return StreamSupport
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for Stream you can just do String.join

.stream(Spliterators.spliteratorUnknownSize(fieldNames.descendingIterator(), Spliterator.ORDERED), false)
.collect(Collectors.joining("."));
}

private static Object rewritePrimaryType(Object oldValue, Schema oldSchema, Schema newSchema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -380,7 +380,7 @@ private void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> keySpec
Option<Schema> schemaOption = getMergedSchema(dataBlock);
while (recordIterator.hasNext()) {
IndexedRecord currentRecord = recordIterator.next();
IndexedRecord record = schemaOption.isPresent() ? HoodieAvroUtils.rewriteRecordWithNewSchema(currentRecord, schemaOption.get(), new HashMap<>()) : currentRecord;
IndexedRecord record = schemaOption.isPresent() ? HoodieAvroUtils.rewriteRecordWithNewSchema(currentRecord, schemaOption.get(), Collections.emptyMap()) : currentRecord;
processNextRecord(createHoodieRecord(record, this.hoodieTableMetaClient.getTableConfig(), this.payloadClassFQN,
this.preCombineField, this.withOperationField, this.simpleKeyGenFields, this.partitionName));
totalLogRecords.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,7 @@ public InternalSchemaMerger(InternalSchema fileSchema, InternalSchema querySchem
}

public InternalSchemaMerger(InternalSchema fileSchema, InternalSchema querySchema, boolean ignoreRequiredAttribute, boolean useColumnTypeFromFileSchema) {
this.fileSchema = fileSchema;
this.querySchema = querySchema;
this.ignoreRequiredAttribute = ignoreRequiredAttribute;
this.useColumnTypeFromFileSchema = useColumnTypeFromFileSchema;
this(fileSchema, querySchema, ignoreRequiredAttribute, useColumnTypeFromFileSchema, true);
}

/**
Expand Down Expand Up @@ -151,14 +148,15 @@ private Types.Field dealWithRename(int fieldId, Type newType, Types.Field oldFie
Types.Field fieldFromFileSchema = fileSchema.findField(fieldId);
String nameFromFileSchema = fieldFromFileSchema.name();
String nameFromQuerySchema = querySchema.findField(fieldId).name();
String finalFieldName = useColNameFromFileSchema ? nameFromFileSchema : nameFromQuerySchema;
Type typeFromFileSchema = fieldFromFileSchema.type();
// Current design mechanism guarantees nestedType change is not allowed, so no need to consider.
if (newType.isNestedType()) {
return Types.Field.get(oldField.fieldId(), oldField.isOptional(),
useColNameFromFileSchema ? nameFromFileSchema : nameFromQuerySchema, newType, oldField.doc());
finalFieldName, newType, oldField.doc());
} else {
return Types.Field.get(oldField.fieldId(), oldField.isOptional(),
useColNameFromFileSchema ? nameFromFileSchema : nameFromQuerySchema, useColumnTypeFromFileSchema ? typeFromFileSchema : newType, oldField.doc());
finalFieldName, useColumnTypeFromFileSchema ? typeFromFileSchema : newType, oldField.doc());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,14 +273,17 @@ public static String createFullName(String name, Deque<String> fieldNames) {
*
* @param oldSchema oldSchema
* @param newSchema newSchema which modified from oldSchema
* @return renameCols Map. (k, v) -> (colNameFromNewSchema, colNameFromOldSchema)
* @return renameCols Map. (k, v) -> (colNameFromNewSchema, colNameLastPartFromOldSchema)
*/
public static Map<String, String> collectRenameCols(InternalSchema oldSchema, InternalSchema newSchema) {
List<String> colNamesFromWriteSchema = oldSchema.getAllColsFullName();
return colNamesFromWriteSchema.stream().filter(f -> {
int filedIdFromWriteSchema = oldSchema.findIdByName(f);
// try to find the cols which has the same id, but have different colName;
return newSchema.getAllIds().contains(filedIdFromWriteSchema) && !newSchema.findfullName(filedIdFromWriteSchema).equalsIgnoreCase(f);
}).collect(Collectors.toMap(e -> newSchema.findfullName(oldSchema.findIdByName(e)), e -> e));
}).collect(Collectors.toMap(e -> newSchema.findfullName(oldSchema.findIdByName(e)), e -> {
int lastDotIndex = e.lastIndexOf(".");
return e.substring(lastDotIndex == -1 ? 0 : lastDotIndex + 1);
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Assertions;

import java.io.IOException;
import java.math.BigDecimal;
Expand All @@ -35,6 +36,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.HashMap;

import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -98,6 +100,12 @@ public class TestHoodieAvroUtils {
+ "{\"name\":\"student\",\"type\":{\"name\":\"student\",\"type\":\"record\",\"fields\":["
+ "{\"name\":\"firstname\",\"type\":[\"null\" ,\"string\"],\"default\": null},{\"name\":\"lastname\",\"type\":[\"null\" ,\"string\"],\"default\": null}]}}]}";

private static String SCHEMA_WITH_NESTED_FIELD_RENAMED = "{\"name\":\"MyClass\",\"type\":\"record\",\"namespace\":\"com.acme.avro\",\"fields\":["
+ "{\"name\":\"fn\",\"type\":\"string\"},"
+ "{\"name\":\"ln\",\"type\":\"string\"},"
+ "{\"name\":\"ss\",\"type\":{\"name\":\"ss\",\"type\":\"record\",\"fields\":["
+ "{\"name\":\"fn\",\"type\":[\"null\" ,\"string\"],\"default\": null},{\"name\":\"ln\",\"type\":[\"null\" ,\"string\"],\"default\": null}]}}]}";

@Test
public void testPropsPresent() {
Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(EXAMPLE_SCHEMA));
Expand Down Expand Up @@ -342,4 +350,26 @@ public void testGetNestedFieldSchema() throws IOException {
assertEquals(Schema.create(Schema.Type.STRING), getNestedFieldSchemaFromWriteSchema(rec3.getSchema(), "student.firstname"));
assertEquals(Schema.create(Schema.Type.STRING), getNestedFieldSchemaFromWriteSchema(nestedSchema, "student.firstname"));
}

@Test
public void testReWriteAvroRecordWithNewSchema() {
Schema nestedSchema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_FIELD);
GenericRecord rec3 = new GenericData.Record(nestedSchema);
rec3.put("firstname", "person1");
rec3.put("lastname", "person2");
GenericRecord studentRecord = new GenericData.Record(rec3.getSchema().getField("student").schema());
studentRecord.put("firstname", "person1");
studentRecord.put("lastname", "person2");
rec3.put("student", studentRecord);

Schema nestedSchemaRename = new Schema.Parser().parse(SCHEMA_WITH_NESTED_FIELD_RENAMED);
Map<String, String> colRenames = new HashMap<>();
colRenames.put("fn", "firstname");
colRenames.put("ln", "lastname");
colRenames.put("ss", "student");
colRenames.put("ss.fn", "firstname");
colRenames.put("ss.ln", "lastname");
GenericRecord studentRecordRename = HoodieAvroUtils.rewriteRecordWithNewSchema(rec3, nestedSchemaRename, colRenames);
Assertions.assertEquals(GenericData.get().validate(nestedSchemaRename, studentRecordRename), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -284,7 +285,7 @@ public void testReWriteRecordWithTypeChanged() {
.updateColumnType("col6", Types.StringType.get());
InternalSchema newSchema = SchemaChangeUtils.applyTableChanges2Schema(internalSchema, updateChange);
Schema newAvroSchema = AvroInternalSchemaConverter.convert(newSchema, avroSchema.getName());
GenericRecord newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema, new HashMap<>());
GenericRecord newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema, Collections.emptyMap());

Assertions.assertEquals(GenericData.get().validate(newAvroSchema, newRecord), true);
}
Expand Down Expand Up @@ -349,9 +350,26 @@ public void testReWriteNestRecord() {
);

Schema newAvroSchema = AvroInternalSchemaConverter.convert(newRecord, schema.getName());
GenericRecord newAvroRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema, new HashMap<>());
GenericRecord newAvroRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema, Collections.emptyMap());
// test the correctly of rewrite
Assertions.assertEquals(GenericData.get().validate(newAvroSchema, newAvroRecord), true);

// test rewrite with rename
InternalSchema internalSchema = AvroInternalSchemaConverter.convert(schema);
// do change rename operation
TableChanges.ColumnUpdateChange updateChange = TableChanges.ColumnUpdateChange.get(internalSchema);
updateChange
.renameColumn("id", "idx")
.renameColumn("data", "datax")
.renameColumn("preferences.feature1", "f1")
.renameColumn("preferences.feature2", "f2")
.renameColumn("locations.value.lat", "lt");
InternalSchema internalSchemaRename = SchemaChangeUtils.applyTableChanges2Schema(internalSchema, updateChange);
Schema avroSchemaRename = AvroInternalSchemaConverter.convert(internalSchemaRename, schema.getName());
Map<String, String> renameCols = InternalSchemaUtils.collectRenameCols(internalSchema, internalSchemaRename);
GenericRecord avroRecordRename = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, avroSchemaRename, renameCols);
// test the correctly of rewrite
Assertions.assertEquals(GenericData.get().validate(avroSchemaRename, avroRecordRename), true);
}

@Test
Expand Down