Skip to content

Commit 096cf9e

Browse files
author
public (bdcee5037027)
committed
fix comments
1 parent 17feef3 commit 096cf9e

6 files changed

Lines changed: 19 additions & 15 deletions

File tree

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.hudi.client.transaction.TransactionManager;
4040
import org.apache.hudi.client.utils.TransactionUtils;
4141
import org.apache.hudi.common.HoodiePendingRollbackInfo;
42+
import org.apache.hudi.common.config.HoodieCommonConfig;
4243
import org.apache.hudi.common.engine.HoodieEngineContext;
4344
import org.apache.hudi.common.model.HoodieCommitMetadata;
4445
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
@@ -276,7 +277,7 @@ private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCom
276277
TableSchemaResolver schemaUtil = new TableSchemaResolver(table.getMetaClient());
277278
String historySchemaStr = schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElse("");
278279
FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(table.getMetaClient());
279-
if (!historySchemaStr.isEmpty() || Boolean.parseBoolean(config.getString("hoodie.datasource.write.reconcile.schema"))) {
280+
if (!historySchemaStr.isEmpty() || Boolean.parseBoolean(config.getString(HoodieCommonConfig.RECONCILE_SCHEMA.key()))) {
280281
InternalSchema internalSchema;
281282
Schema avroSchema = HoodieAvroUtils.createHoodieWriteSchema(new Schema.Parser().parse(config.getSchema()));
282283
if (historySchemaStr.isEmpty()) {

hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -745,15 +745,18 @@ public static GenericRecord rewriteRecordWithNewSchema(IndexedRecord oldRecord,
745745
* b) For GenericRecord, copy over the data from the old schema to the new schema or set default values for all fields of this transformed schema
746746
*
747747
* @param oldRecord oldRecord to be rewritten
748+
* @param oldAvroSchema old avro schema.
748749
* @param newSchema newSchema used to rewrite oldRecord
749750
* @param renameCols a map store all rename cols, (k, v)-> (colNameFromNewSchema, colNameFromOldSchema)
750751
* @param fieldNames track the full name of visited field when we travel new schema.
751752
* @return newRecord for new Schema
752753
*/
753-
private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSchema, Schema newSchema, Map<String, String> renameCols, Deque<String> fieldNames) {
754+
private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldAvroSchema, Schema newSchema, Map<String, String> renameCols, Deque<String> fieldNames) {
754755
if (oldRecord == null) {
755756
return null;
756757
}
758+
// try to get real schema for union type
759+
Schema oldSchema = getActualSchemaFromUnion(oldAvroSchema, oldRecord);
757760
switch (newSchema.getType()) {
758761
case RECORD:
759762
if (!(oldRecord instanceof IndexedRecord)) {
@@ -797,7 +800,7 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSch
797800
List<Object> newArray = new ArrayList();
798801
fieldNames.push("element");
799802
for (Object element : array) {
800-
newArray.add(rewriteRecordWithNewSchema(element, getActualSchemaFromUnion(oldSchema.getElementType(), element), newSchema.getElementType(), renameCols, fieldNames));
803+
newArray.add(rewriteRecordWithNewSchema(element, oldSchema.getElementType(), newSchema.getElementType(), renameCols, fieldNames));
801804
}
802805
fieldNames.pop();
803806
return newArray;
@@ -809,8 +812,7 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSch
809812
Map<Object, Object> newMap = new HashMap<>();
810813
fieldNames.push("value");
811814
for (Map.Entry<Object, Object> entry : map.entrySet()) {
812-
newMap.put(entry.getKey(), rewriteRecordWithNewSchema(entry.getValue(),
813-
getActualSchemaFromUnion(oldSchema.getValueType(), entry.getValue()), newSchema.getValueType(), renameCols, fieldNames));
815+
newMap.put(entry.getKey(), rewriteRecordWithNewSchema(entry.getValue(), oldSchema.getValueType(), newSchema.getValueType(), renameCols, fieldNames));
814816
}
815817
fieldNames.pop();
816818
return newMap;

hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,13 @@ public class HoodieCommonConfig extends HoodieConfig {
3636
.defaultValue(false)
3737
.withDocumentation("Enables support for Schema Evolution feature");
3838

39+
public static final ConfigProperty<Boolean> RECONCILE_SCHEMA = ConfigProperty
40+
.key("hoodie.datasource.write.reconcile.schema")
41+
.defaultValue(false)
42+
.withDocumentation("When a new batch of write has records with old schema, but latest table schema got "
43+
+ "evolved, this config will upgrade the records to leverage latest table schema(default values will be "
44+
+ "injected to missing fields). If not, the write batch would fail.");
45+
3946
public static final ConfigProperty<ExternalSpillableMap.DiskMapType> SPILLABLE_DISK_MAP_TYPE = ConfigProperty
4047
.key("hoodie.common.spillable.diskmap.type")
4148
.defaultValue(ExternalSpillableMap.DiskMapType.BITCASK)

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -399,12 +399,7 @@ object DataSourceWriteOptions {
399399
.defaultValue(classOf[HiveSyncTool].getName)
400400
.withDocumentation("Sync tool class name used to sync to metastore. Defaults to Hive.")
401401

402-
val RECONCILE_SCHEMA: ConfigProperty[Boolean] = ConfigProperty
403-
.key("hoodie.datasource.write.reconcile.schema")
404-
.defaultValue(false)
405-
.withDocumentation("When a new batch of write has records with old schema, but latest table schema got "
406-
+ "evolved, this config will upgrade the records to leverage latest table schema(default values will be "
407-
+ "injected to missing fields). If not, the write batch would fail.")
402+
val RECONCILE_SCHEMA: ConfigProperty[Boolean] = HoodieCommonConfig.RECONCILE_SCHEMA
408403

409404
// HIVE SYNC SPECIFIC CONFIGS
410405
// NOTE: DO NOT USE uppercase for the keys as they are internally lower-cased. Using upper-cases causes

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,8 @@ object HoodieSparkSqlWriter {
244244
var schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
245245
val lastestSchema = getLatestTableSchema(fs, basePath, sparkContext, schema)
246246
var internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext)
247-
if (reconcileSchema && parameters(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key()).toBoolean && internalSchemaOpt.isEmpty) {
247+
if (reconcileSchema && parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), "false").toBoolean
248+
&& internalSchemaOpt.isEmpty) {
248249
// force apply full schema evolution.
249250
internalSchemaOpt = Some(AvroInternalSchemaConverter.convert(schema))
250251
}

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,9 +199,7 @@ class TestHoodieSparkUtils {
199199
fail("createRdd should fail, because records don't have a column which is not nullable in the passed in schema")
200200
} catch {
201201
case e: Exception =>
202-
val cause = e.getCause
203-
assertTrue(cause.isInstanceOf[SchemaCompatibilityException])
204-
assertTrue(e.getMessage.contains("Unable to validate the rewritten record {\"innerKey\": \"innerKey1_2\", \"innerValue\": 2} against schema"))
202+
assertTrue(e.getMessage.contains("null of string in field new_nested_col of test_namespace.test_struct_name.nullableInnerStruct of union"))
205203
}
206204
spark.stop()
207205
}

0 commit comments

Comments
 (0)