6464import java .sql .Timestamp ;
6565import java .time .LocalDate ;
6666import java .util .ArrayList ;
67- import java .util .Arrays ;
6867import java .util .Collection ;
6968import java .util .Collections ;
7069import java .util .HashMap ;
7170import java .util .Iterator ;
7271import java .util .List ;
7372import java .util .Map ;
73+ import java .util .Deque ;
74+ import java .util .LinkedList ;
7475import java .util .TimeZone ;
7576import java .util .stream .Collectors ;
7677
@@ -736,11 +737,24 @@ public static Object getRecordColumnValues(HoodieRecord<? extends HoodieRecordPa
736737 * @return newRecord for new Schema
737738 */
738739 public static GenericRecord rewriteRecordWithNewSchema (IndexedRecord oldRecord , Schema newSchema , Map <String , String > renameCols ) {
739- Object newRecord = rewriteRecordWithNewSchema (oldRecord , oldRecord .getSchema (), newSchema , renameCols );
740+ Object newRecord = rewriteRecordWithNewSchema (oldRecord , oldRecord .getSchema (), newSchema , renameCols , new LinkedList <>() );
740741 return (GenericData .Record ) newRecord ;
741742 }
742743
743- private static Object rewriteRecordWithNewSchema (Object oldRecord , Schema oldSchema , Schema newSchema , Map <String , String > renameCols ) {
744+ /**
745+ * Given a avro record with a given schema, rewrites it into the new schema while setting fields only from the new schema.
746+ * support deep rewrite for nested record and adjust rename operation.
747+ * This particular method does the following things :
748+ * a) Create a new empty GenericRecord with the new schema.
749+ * b) For GenericRecord, copy over the data from the old schema to the new schema or set default values for all fields of this transformed schema
750+ *
751+ * @param oldRecord oldRecord to be rewritten
752+ * @param newSchema newSchema used to rewrite oldRecord
753+ * @param renameCols a map store all rename cols, (k, v)-> (colNameFromNewSchema, colNameFromOldSchema)
754+ * @param fieldNames track the full name of visited field when we travel new schema.
755+ * @return newRecord for new Schema
756+ */
757+ private static Object rewriteRecordWithNewSchema (Object oldRecord , Schema oldSchema , Schema newSchema , Map <String , String > renameCols , Deque <String > fieldNames ) {
744758 if (oldRecord == null ) {
745759 return null ;
746760 }
@@ -755,23 +769,23 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSch
755769
756770 for (int i = 0 ; i < fields .size (); i ++) {
757771 Schema .Field field = fields .get (i );
772+ String fieldName = field .name ();
773+ fieldNames .push (fieldName );
758774 if (oldSchema .getField (field .name ()) != null ) {
759775 Schema .Field oldField = oldSchema .getField (field .name ());
760- helper .put (i , rewriteRecordWithNewSchema (indexedRecord .get (oldField .pos ()), oldField .schema (), fields .get (i ).schema (), renameCols ));
761- }
762- // deal with rename
763- if (!renameCols .isEmpty () && oldSchema .getField (field .name ()) == null ) {
764- String fieldName = field .name ();
765- for (Map .Entry <String , String > entry : renameCols .entrySet ()) {
766- List <String > nameParts = Arrays .asList (entry .getKey ().split ("\\ ." ));
767- List <String > namePartsOld = Arrays .asList (entry .getValue ().split ("\\ ." ));
768- if (nameParts .get (nameParts .size () - 1 ).equals (fieldName ) && oldSchema .getField (namePartsOld .get (namePartsOld .size () - 1 )) != null ) {
769- // find rename
770- Schema .Field oldField = oldSchema .getField (namePartsOld .get (namePartsOld .size () - 1 ));
771- helper .put (i , rewriteRecordWithNewSchema (indexedRecord .get (oldField .pos ()), oldField .schema (), fields .get (i ).schema (), renameCols ));
772- }
776+ helper .put (i , rewriteRecordWithNewSchema (indexedRecord .get (oldField .pos ()), oldField .schema (), fields .get (i ).schema (), renameCols , fieldNames ));
777+ } else {
778+ String fieldFullName = createFullName (fieldNames );
779+ String [] colNamePartsFromOldSchema = renameCols .getOrDefault (fieldFullName , "" ).split ("\\ ." );
780+ String lastColNameFromOldSchema = colNamePartsFromOldSchema [colNamePartsFromOldSchema .length - 1 ];
781+ // deal with rename
782+ if (oldSchema .getField (field .name ()) == null && oldSchema .getField (lastColNameFromOldSchema ) != null ) {
783+ // find rename
784+ Schema .Field oldField = oldSchema .getField (lastColNameFromOldSchema );
785+ helper .put (i , rewriteRecordWithNewSchema (indexedRecord .get (oldField .pos ()), oldField .schema (), fields .get (i ).schema (), renameCols , fieldNames ));
773786 }
774787 }
788+ fieldNames .pop ();
775789 }
776790 GenericData .Record newRecord = new GenericData .Record (newSchema );
777791 for (int i = 0 ; i < fields .size (); i ++) {
@@ -792,27 +806,41 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSch
792806 }
793807 Collection array = (Collection )oldRecord ;
794808 List <Object > newArray = new ArrayList ();
809+ fieldNames .push ("element" );
795810 for (Object element : array ) {
796- newArray .add (rewriteRecordWithNewSchema (element , oldSchema .getElementType (), newSchema .getElementType (), renameCols ));
811+ newArray .add (rewriteRecordWithNewSchema (element , oldSchema .getElementType (), newSchema .getElementType (), renameCols , fieldNames ));
797812 }
813+ fieldNames .pop ();
798814 return newArray ;
799815 case MAP :
800816 if (!(oldRecord instanceof Map )) {
801817 throw new IllegalArgumentException ("cannot rewrite record with different type" );
802818 }
803819 Map <Object , Object > map = (Map <Object , Object >) oldRecord ;
804820 Map <Object , Object > newMap = new HashMap <>();
821+ fieldNames .push ("value" );
805822 for (Map .Entry <Object , Object > entry : map .entrySet ()) {
806- newMap .put (entry .getKey (), rewriteRecordWithNewSchema (entry .getValue (), oldSchema .getValueType (), newSchema .getValueType (), renameCols ));
823+ newMap .put (entry .getKey (), rewriteRecordWithNewSchema (entry .getValue (), oldSchema .getValueType (), newSchema .getValueType (), renameCols , fieldNames ));
807824 }
825+ fieldNames .pop ();
808826 return newMap ;
809827 case UNION :
810- return rewriteRecordWithNewSchema (oldRecord , getActualSchemaFromUnion (oldSchema , oldRecord ), getActualSchemaFromUnion (newSchema , oldRecord ), renameCols );
828+ return rewriteRecordWithNewSchema (oldRecord , getActualSchemaFromUnion (oldSchema , oldRecord ), getActualSchemaFromUnion (newSchema , oldRecord ), renameCols , fieldNames );
811829 default :
812830 return rewritePrimaryType (oldRecord , oldSchema , newSchema );
813831 }
814832 }
815833
834+ private static String createFullName (Deque <String > fieldNames ) {
835+ String result = "" ;
836+ if (!fieldNames .isEmpty ()) {
837+ List <String > parentNames = new ArrayList <>();
838+ fieldNames .descendingIterator ().forEachRemaining (parentNames ::add );
839+ result = parentNames .stream ().collect (Collectors .joining ("." ));
840+ }
841+ return result ;
842+ }
843+
816844 private static Object rewritePrimaryType (Object oldValue , Schema oldSchema , Schema newSchema ) {
817845 Schema realOldSchema = oldSchema ;
818846 if (realOldSchema .getType () == UNION ) {
0 commit comments