7272import java .util .List ;
7373import java .util .Map ;
7474import java .util .Deque ;
75- import java .util .LinkedList ;
75+ import java .util .ArrayDeque ;
76+ import java .util .Spliterator ;
77+ import java .util .Spliterators ;
78+ import java .util .stream .StreamSupport ;
7679import java .util .TimeZone ;
7780import java .util .stream .Collectors ;
7881
@@ -94,6 +97,10 @@ public class HoodieAvroUtils {
9497 //Export for test
9598 public static final Conversions .DecimalConversion DECIMAL_CONVERSION = new Conversions .DecimalConversion ();
9699
100+ // Name of ArrayType/MapType for avro Schema
101+ private static final String ARRAY_TYPE_ELEMENT_NAME = "element" ;
102+ private static final String MAP_TYPE_VALUE_NAME = "value" ;
103+
97104 // As per https://avro.apache.org/docs/current/spec.html#names
98105 private static final String INVALID_AVRO_CHARS_IN_NAMES = "[^A-Za-z0-9_]" ;
99106 private static final String INVALID_AVRO_FIRST_CHAR_IN_NAMES = "[^A-Za-z_]" ;
@@ -410,7 +417,7 @@ public static GenericRecord rewriteRecordWithMetadata(GenericRecord genericRecor
410417
411418 // TODO Unify the logical of rewriteRecordWithMetadata and rewriteEvolutionRecordWithMetadata, and delete this function.
412419 public static GenericRecord rewriteEvolutionRecordWithMetadata (GenericRecord genericRecord , Schema newSchema , String fileName ) {
413- GenericRecord newRecord = HoodieAvroUtils .rewriteRecordWithNewSchema (genericRecord , newSchema , new HashMap <> ());
420+ GenericRecord newRecord = HoodieAvroUtils .rewriteRecordWithNewSchema (genericRecord , newSchema , Collections . emptyMap ());
414421 // do not preserve FILENAME_METADATA_FIELD
415422 newRecord .put (HoodieRecord .FILENAME_METADATA_FIELD_POS , fileName );
416423 return newRecord ;
@@ -745,7 +752,7 @@ public static Object getRecordColumnValues(HoodieRecord<? extends HoodieRecordPa
745752 * @return newRecord for new Schema
746753 */
747754 public static GenericRecord rewriteRecordWithNewSchema (IndexedRecord oldRecord , Schema newSchema , Map <String , String > renameCols ) {
748- Object newRecord = rewriteRecordWithNewSchema (oldRecord , oldRecord .getSchema (), newSchema , renameCols , new LinkedList <>());
755+ Object newRecord = rewriteRecordWithNewSchema (oldRecord , oldRecord .getSchema (), newSchema , renameCols , new ArrayDeque <>());
749756 return (GenericData .Record ) newRecord ;
750757 }
751758
@@ -773,39 +780,32 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSch
773780 }
774781 IndexedRecord indexedRecord = (IndexedRecord ) oldRecord ;
775782 List <Schema .Field > fields = newSchema .getFields ();
776- Map <Integer , Object > helper = new HashMap <>();
777-
783+ GenericData .Record newRecord = new GenericData .Record (newSchema );
778784 for (int i = 0 ; i < fields .size (); i ++) {
779785 Schema .Field field = fields .get (i );
780786 String fieldName = field .name ();
781787 fieldNames .push (fieldName );
782788 if (oldSchema .getField (field .name ()) != null ) {
783789 Schema .Field oldField = oldSchema .getField (field .name ());
784- helper .put (i , rewriteRecordWithNewSchema (indexedRecord .get (oldField .pos ()), oldField .schema (), fields .get (i ).schema (), renameCols , fieldNames ));
790+ newRecord .put (i , rewriteRecordWithNewSchema (indexedRecord .get (oldField .pos ()), oldField .schema (), fields .get (i ).schema (), renameCols , fieldNames ));
785791 } else {
786792 String fieldFullName = createFullName (fieldNames );
787- String [] colNamePartsFromOldSchema = renameCols .getOrDefault (fieldFullName , "" ).split ("\\ ." );
788- String lastColNameFromOldSchema = colNamePartsFromOldSchema [colNamePartsFromOldSchema .length - 1 ];
793+ String fieldNameFromOldSchema = renameCols .getOrDefault (fieldFullName , "" );
789794 // deal with rename
790- if (oldSchema .getField (field .name ()) == null && oldSchema .getField (lastColNameFromOldSchema ) != null ) {
795+ if (oldSchema .getField (field .name ()) == null && oldSchema .getField (fieldNameFromOldSchema ) != null ) {
791796 // find rename
792- Schema .Field oldField = oldSchema .getField (lastColNameFromOldSchema );
793- helper .put (i , rewriteRecordWithNewSchema (indexedRecord .get (oldField .pos ()), oldField .schema (), fields .get (i ).schema (), renameCols , fieldNames ));
794- }
795- }
796- fieldNames .pop ();
797- }
798- GenericData .Record newRecord = new GenericData .Record (newSchema );
799- for (int i = 0 ; i < fields .size (); i ++) {
800- if (helper .containsKey (i )) {
801- newRecord .put (i , helper .get (i ));
802- } else {
803- if (fields .get (i ).defaultVal () instanceof JsonProperties .Null ) {
804- newRecord .put (i , null );
797+ Schema .Field oldField = oldSchema .getField (fieldNameFromOldSchema );
798+ newRecord .put (i , rewriteRecordWithNewSchema (indexedRecord .get (oldField .pos ()), oldField .schema (), fields .get (i ).schema (), renameCols , fieldNames ));
805799 } else {
806- newRecord .put (i , fields .get (i ).defaultVal ());
800+ // deal with default value
801+ if (fields .get (i ).defaultVal () instanceof JsonProperties .Null ) {
802+ newRecord .put (i , null );
803+ } else {
804+ newRecord .put (i , fields .get (i ).defaultVal ());
805+ }
807806 }
808807 }
808+ fieldNames .pop ();
809809 }
810810 return newRecord ;
811811 case ARRAY :
@@ -814,7 +814,7 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSch
814814 }
815815 Collection array = (Collection )oldRecord ;
816816 List <Object > newArray = new ArrayList ();
817- fieldNames .push ("element" );
817+ fieldNames .push (ARRAY_TYPE_ELEMENT_NAME );
818818 for (Object element : array ) {
819819 newArray .add (rewriteRecordWithNewSchema (element , oldSchema .getElementType (), newSchema .getElementType (), renameCols , fieldNames ));
820820 }
@@ -826,7 +826,7 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSch
826826 }
827827 Map <Object , Object > map = (Map <Object , Object >) oldRecord ;
828828 Map <Object , Object > newMap = new HashMap <>();
829- fieldNames .push ("value" );
829+ fieldNames .push (MAP_TYPE_VALUE_NAME );
830830 for (Map .Entry <Object , Object > entry : map .entrySet ()) {
831831 newMap .put (entry .getKey (), rewriteRecordWithNewSchema (entry .getValue (), oldSchema .getValueType (), newSchema .getValueType (), renameCols , fieldNames ));
832832 }
@@ -840,13 +840,9 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSch
840840 }
841841
842842 private static String createFullName (Deque <String > fieldNames ) {
843- String result = "" ;
844- if (!fieldNames .isEmpty ()) {
845- List <String > parentNames = new ArrayList <>();
846- fieldNames .descendingIterator ().forEachRemaining (parentNames ::add );
847- result = parentNames .stream ().collect (Collectors .joining ("." ));
848- }
849- return result ;
843+ return StreamSupport
844+ .stream (Spliterators .spliteratorUnknownSize (fieldNames .descendingIterator (), Spliterator .ORDERED ), false )
845+ .collect (Collectors .joining ("." ));
850846 }
851847
852848 private static Object rewritePrimaryType (Object oldValue , Schema oldSchema , Schema newSchema ) {
0 commit comments