Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
134 commits
Select commit Hold shift + click to select a range
412c4a9
Revert "[MINOR] retain avro's namespace (#6783)"
Sep 26, 2022
adfee8c
Fixing `HoodieParquetReader` to properly specify projected schema whe…
Aug 10, 2022
fc0c9ef
Converted `DataTypeUtils` to Scala;
Aug 15, 2022
cc2cb49
Fixed passing of the schema into Spark 3 `DefaultSource` impl to rely…
Aug 15, 2022
0b45836
Missing license
Aug 15, 2022
aa806af
Cleaned up test for basic Schema Evolution, added cases being tested
Aug 15, 2022
c45473c
Added `AvroSchemaUtils.isNullable` check to validate whether Avro sch…
Aug 16, 2022
1216aed
Fixed `isSchemaCompatible` sequence to treat "dropped column" as legi…
Aug 16, 2022
ac5c218
Fixing tests
Aug 16, 2022
9a77d29
Fixed basic Schema Evolution test
Aug 16, 2022
dd5dd78
Run basic schema-evoluation test for all operations
Aug 16, 2022
3938134
Fixed `HoodieSparkSqlWriter` to fix schema handling for row-writing p…
Aug 16, 2022
0cbd9d2
Throw exception in case table and incoming schemas could not be recon…
Aug 17, 2022
ee41dc0
Extracted `TestBasicSchemaEvolution` as standalone test
Aug 17, 2022
bb935fc
Fixing handling of incompatible schemas
Aug 17, 2022
dcfa147
Extended tests to MOR
Aug 17, 2022
0c5871f
Fixing compilation for Spark 2.x
Aug 17, 2022
a664fb4
Tidying up
Sep 2, 2022
6d91d84
Fixing handling of partition-column being dropped
Sep 2, 2022
0b6d2c1
Fixed `testSchemaEvolutionForTableType` and expanded
Sep 3, 2022
2c60179
Fixed more tests (inserting w/ incompatible schema)
Sep 3, 2022
305b93d
Fixed latest table's schema derivation seq to
Sep 3, 2022
1c157d9
Added `RecordType.name` to be able to encode Avro Record's names for …
Sep 3, 2022
bbe1f68
Fixed error message
Sep 3, 2022
266fd5b
Cleaned up schema bespoke compatibility validation seq, to instead re…
Sep 3, 2022
d85b03d
Fixing invalid test
Sep 15, 2022
986e6a3
Tidying up
Sep 15, 2022
880cb05
Fixing `InternalSchema` <> Avro conversion for Avro 1.8.2
Sep 16, 2022
55476a5
Tidying up
Sep 16, 2022
e2881d2
Rebased `InternalSchema` ctors to accept `RecordType` in lieu of list…
Sep 16, 2022
dfef2a4
Make sure `RecordType`s name is appropriately propagated during modif…
Sep 16, 2022
4d97cd0
Use full-name when converting back to Avro schema
Sep 16, 2022
0b90a0f
Tidying up
Sep 16, 2022
ac6d2cf
Fixing tests (after rebase)
Sep 16, 2022
0437b75
Tidying up
Sep 16, 2022
edc702d
Make sure all Avro schemas are properly registered w/ Kryo
Sep 16, 2022
5efb6c2
`WRITE_SCHEMA` > `WRITE_SCHEMA_OVERRIDE`;
Sep 17, 2022
3e484ad
Make schema compatibility validation configurable w/in `HoodieSparkSq…
Sep 17, 2022
08b3655
Disambiguated writing schema to be just a single filed w/in `WriteHan…
Sep 17, 2022
5eff6ee
Fixing compilation
Sep 17, 2022
e78760e
Make sure original source schema is used to parse original record's A…
Sep 17, 2022
b229090
Cleaned up `RECONCILE_SCHEMA` config description;
Sep 21, 2022
1e057c4
Disable schema canonicalization, reconciliation and validation for `M…
Sep 21, 2022
93fa568
Fixing compilation
Sep 21, 2022
0b15651
Added test w/ data-type changing to basic SE test-case
Sep 21, 2022
06486cb
Fixing tests
Sep 24, 2022
472719a
Fixing compilation
Sep 27, 2022
1a07651
Fixed schema evolution to be handled appropriately for the first commit;
Sep 27, 2022
0c7e12a
Tidying up
Sep 27, 2022
4908462
Fixing typo
Sep 27, 2022
909e161
Fixed invalid API usage
Sep 29, 2022
261bc5e
Avoid file-reader re-initialization, abstracting common vars;
Sep 30, 2022
c5beb69
Fixed `shouldRewriteInWriterSchema` flag handling to properly handle …
Oct 1, 2022
e1018fb
Extracted (advanced) schema evolution handling into a standalone method
Oct 1, 2022
d71e76d
Dismabiguate writer/reader schemas in `HoodieMergeHelper`
Oct 1, 2022
55dae7e
Tidying up
Oct 1, 2022
95b5206
Rebased Schema Evolution seq to produce opaque record transformer;
Oct 1, 2022
39b1a6e
Rebased record transformation to bypass Avro encoding/decoding (using…
Oct 1, 2022
db0d494
Pass in record-iterator to bootstrap handler
Oct 1, 2022
2322871
Added ability to `HoodieMergeHelper` to rewrite records in the writer…
Oct 1, 2022
f7afb21
Fixing compilation
Oct 1, 2022
73f2567
Missing license
Oct 1, 2022
9c35a5d
Rebase onto proper record re-writing utility
Oct 1, 2022
692c6d0
Cleaned up unnecessary generics
Oct 1, 2022
dad1c63
Cleaned up `MergeHelper` impls
Oct 4, 2022
19a417b
Fixed Parquet-to-Avro converster to avoid treating "list" element wra…
Oct 4, 2022
69e0333
Fixing compilation
Oct 4, 2022
afce319
Tidying up
Oct 5, 2022
4bb9519
Cleaning up erroneous assertion
Oct 5, 2022
2a069c4
Removed superfluous impls of `MergeHelper`s for Flink/Java
Oct 5, 2022
7d16aa9
Enable table schema validation by default
Oct 5, 2022
ac873ad
Make sure that schema of the incoming batch is properly canonicalized…
Oct 5, 2022
6a508ab
Make sure Avro's record-name/-namespace are appropriately preserved
Oct 5, 2022
539ad42
Tidying up
Oct 5, 2022
797e6f5
Relocating test methods
Oct 6, 2022
21b3bec
Fixed `AvroInternalSchemaConverter` to generate compatible schemas
Oct 6, 2022
479fce0
Disable schema validation in test that is relying on malformed commit…
Oct 6, 2022
1906ed7
Cleaned up `AvroInternalSchemaConverter` to be aligned w/ Catalyst's …
Oct 6, 2022
f154048
Cleaned up MOR `DataBlock`s impls to avoid spilling of Schema Evoluti…
Oct 6, 2022
7585467
Revisited Schema Evolution handling in AbstractHoodieLogRecordReader …
Oct 6, 2022
dd2b7b0
Missing license
Oct 6, 2022
aa6c423
Fixing compilation
Oct 6, 2022
e57473b
Fix `Fixed` schema handling when rewriting the record into the new Sc…
Oct 6, 2022
8815638
Fixing NPEs
Oct 6, 2022
033ebd3
Fixing tests
Oct 6, 2022
f7315f5
Read in projected schema in case when writer's schema is simply a pro…
Oct 7, 2022
4a0576d
Fixed `InternalSchema` conversion to generate schemas compatible w/ p…
Oct 7, 2022
282c869
Fixing typo
Oct 7, 2022
0cfcf8b
Reverting inadvertent change
Oct 7, 2022
1b2738b
Make use of Avro's schema full-name when converting from `InternalSch…
Oct 7, 2022
1e14d26
Extracted writer schema derivation seq into a standalone method;
Oct 7, 2022
6e55b0c
Revisited `DeltaSync` to leverage `HoodieSparkSqlWriter.deduceWriterS…
Oct 7, 2022
3afb9c7
Fixing compilation
Oct 7, 2022
071a8a6
`lint`
Oct 7, 2022
dc12728
Fixed converted schema to bear table's fully-qualified Avro record-name
Oct 8, 2022
a1f2f2f
Localized `AvroSchemaCompatibility` from Avro 1.10;
Oct 8, 2022
f40ad2f
Added appropriate current location tracking
Oct 8, 2022
1a79e83
Fixed Avro schema name-checking to only occur in 2 cases:
Oct 8, 2022
bce6364
Fixing tests
Oct 8, 2022
63ad6d0
Fixing tests
Oct 8, 2022
0274f46
Abstracted reflection-based handling of the default value for Avro sc…
Oct 8, 2022
1c4f8f9
Fixed `SparkAvroPostProcessor` to preserve Avro schema's qualified name
Oct 8, 2022
384b7a1
Fixed fixtures for post-processed schema test
Oct 8, 2022
dbd4eb9
Fixed NPE in default value extraction
Oct 10, 2022
79b7ebd
Updating tests fixtures
Oct 10, 2022
626ef5b
Relocated default config overriding to `HoodieParquetReader`;
Oct 21, 2022
58ce679
Fixed Flink's `ParquetSchemaConverter` to properly carryover nullabil…
Oct 21, 2022
d1289b6
Tidying up
Oct 21, 2022
43eb094
`lint`
Nov 11, 2022
c835245
Fixing compilation
Nov 11, 2022
31dada7
Fixing new tests
Nov 11, 2022
d549ba0
Tidying up docs
Nov 12, 2022
d7bf9b1
Fixed schema evolution handling in `HoodieMergeHelper`
Nov 12, 2022
7fa57e3
Fixing more tests
Nov 12, 2022
e243707
Tidying up
Nov 17, 2022
acdb4eb
Make CANONICALIZE_SCHEMA config package-private
Nov 17, 2022
49fb34b
Fixing compilation
Nov 17, 2022
12760b5
Fixing compilation
Nov 17, 2022
a433f57
Fixed `isProjection` to properly account for nested projections;
Nov 17, 2022
17ae7ad
Fixing invalid test
Nov 21, 2022
829cee3
(WIP) Disable flaky `TestCleaner` test
Nov 21, 2022
ad261b7
Moved `isSchemaCompatible` from `TableSchemaResolver` to `AvroSchemaU…
Nov 22, 2022
9f52f1f
Fixed `AvroSchemaUtils.isProjection` for Array, Map and Union types
Nov 23, 2022
9e3a35e
Revisited schema reconciliation sequence to implement legacy semantic
Nov 23, 2022
bf93139
Fixing compilation
Nov 23, 2022
5b3c71b
Fixed schema projection sequence to properly match atomic-types
Nov 23, 2022
35c0f3b
Added tests
Nov 23, 2022
996a06b
Updated tests
Nov 23, 2022
f36fdf8
Updated RECONCILE_SCHEMA config description
Nov 23, 2022
03385a3
Fixed invalid schema fetching in bootstrap tests
Nov 23, 2022
3fde95c
Fixed `HiveAvroSerializer` to properly resolve nullable array element…
Nov 23, 2022
b9fce65
Fixed invalid compaction test
Nov 23, 2022
5357f6f
Fixing compilation
Nov 23, 2022
94c53ba
Fixing `isCompatibleProjection` check to ignore schema names when mat…
Nov 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

package org.apache.hudi.client;

import com.codahale.metrics.Timer;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.async.AsyncArchiveService;
import org.apache.hudi.async.AsyncCleanerService;
import org.apache.hudi.avro.HoodieAvroUtils;
Expand Down Expand Up @@ -94,11 +98,6 @@
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.upgrade.SupportsUpgradeDowngrade;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;

import com.codahale.metrics.Timer;
import org.apache.avro.Schema;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the imports moving follows the correct check style?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, seems like i need to fix my setup

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand All @@ -114,6 +113,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.avro.AvroSchemaUtils.getAvroRecordQualifiedName;
import static org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY;

/**
Expand Down Expand Up @@ -300,7 +300,7 @@ private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCom
schemasManager.persistHistorySchemaStr(instantTime, SerDeHelper.inheritSchemas(evolvedSchema, historySchemaStr));
}
// update SCHEMA_KEY
metadata.addMetadata(SCHEMA_KEY, AvroInternalSchemaConverter.convert(evolvedSchema, avroSchema.getName()).toString());
metadata.addMetadata(SCHEMA_KEY, AvroInternalSchemaConverter.convert(evolvedSchema, avroSchema.getFullName()).toString());
}
}

Expand Down Expand Up @@ -1769,7 +1769,7 @@ private void commitTableChange(InternalSchema newSchema, HoodieTableMetaClient m
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
String historySchemaStr = schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElseGet(
() -> SerDeHelper.inheritSchemas(getInternalSchema(schemaUtil), ""));
Schema schema = AvroInternalSchemaConverter.convert(newSchema, config.getTableName());
Schema schema = AvroInternalSchemaConverter.convert(newSchema, getAvroRecordQualifiedName(config.getTableName()));
String commitActionType = CommitUtils.getCommitActionType(WriteOperationType.ALTER_SCHEMA, metaClient.getTableType());
String instantTime = HoodieActiveTimeline.createNewInstantTime();
startCommitWithTime(instantTime, commitActionType, metaClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ public class HoodieWriteConfig extends HoodieConfig {

public static final ConfigProperty<String> AVRO_SCHEMA_VALIDATE_ENABLE = ConfigProperty
.key("hoodie.avro.schema.validate")
.defaultValue("false")
.defaultValue("true")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is flipped to default to make sure proper schema validation are run for every operation on the table

.withDocumentation("Validate the schema used for the write against the latest schema, for backwards compatibility.");

public static final ConfigProperty<String> INSERT_PARALLELISM_VALUE = ConfigProperty
Expand Down Expand Up @@ -438,15 +438,15 @@ public class HoodieWriteConfig extends HoodieConfig {
+ "OPTIMISTIC_CONCURRENCY_CONTROL: Multiple writers can operate on the table and exactly one of them succeed "
+ "if a conflict (writes affect the same file group) is detected.");

/**
* Currently the use this to specify the write schema.
*/
public static final ConfigProperty<String> WRITE_SCHEMA = ConfigProperty
public static final ConfigProperty<String> WRITE_SCHEMA_OVERRIDE = ConfigProperty
.key("hoodie.write.schema")
.noDefaultValue()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems the WRITE_SCHEMA more in line with the option key, not a big deal, it's just a preference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this config naming is confusing -- i've updated the field name but can't change the config since it's publicly exposed. Override is a more appropriate name for it provided that we pass a nominal writer-schema (which this config is overriding) t/h a different config property

.withDocumentation("The specified write schema. In most case, we do not need set this parameter,"
+ " but for the case the write schema is not equal to the specified table schema, we can"
+ " specify the write schema by this parameter. Used by MergeIntoHoodieTableCommand");
.withDocumentation("Config allowing to override writer's schema. This might be necessary in "
+ "cases when writer's schema derived from the incoming dataset might actually be different from "
+ "the schema we actually want to use when writing. This, for ex, could be the case for"
+ "'partial-update' use-cases (like `MERGE INTO` Spark SQL statement for ex) where only "
+ "a projection of the incoming dataset might be used to update the records in the existing table, "
+ "prompting us to override the writer's schema");

/**
* HUDI-858 : There are users who had been directly using RDD APIs and have relied on a behavior in 0.4.x to allow
Expand Down Expand Up @@ -938,6 +938,19 @@ public void setSchema(String schemaStr) {
setValue(AVRO_SCHEMA_STRING, schemaStr);
}

/**
* Returns schema used for writing records
*
* NOTE: This method respects {@link HoodieWriteConfig#WRITE_SCHEMA_OVERRIDE} being
* specified overriding original writing schema
*/
public String getWriteSchema() {
if (props.containsKey(WRITE_SCHEMA_OVERRIDE.key())) {
return getString(WRITE_SCHEMA_OVERRIDE);
}
return getSchema();
}

public String getInternalSchema() {
return getString(INTERNAL_SCHEMA_STRING);
}
Expand All @@ -962,21 +975,7 @@ public void setSchemaEvolutionEnable(boolean enable) {
setValue(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE, String.valueOf(enable));
}

/**
* Get the write schema for written records.
*
* If the WRITE_SCHEMA has specified, we use the WRITE_SCHEMA.
* Or else we use the AVRO_SCHEMA as the write schema.
* @return
*/
public String getWriteSchema() {
if (props.containsKey(WRITE_SCHEMA.key())) {
return getString(WRITE_SCHEMA);
}
return getSchema();
}

public boolean getAvroSchemaValidate() {
public boolean shouldValidateAvroSchema() {
return getBoolean(AVRO_SCHEMA_VALIDATE_ENABLE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,9 @@ private Option<IndexedRecord> getIndexedRecord(HoodieRecord<T> hoodieRecord) {

private Option<IndexedRecord> getInsertValue(HoodieRecord<T> hoodieRecord) throws IOException {
if (useWriterSchema) {
return hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, recordProperties);
return hoodieRecord.getData().getInsertValue(writeSchemaWithMetaFields, recordProperties);
} else {
return hoodieRecord.getData().getInsertValue(tableSchema, recordProperties);
return hoodieRecord.getData().getInsertValue(writeSchema, recordProperties);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,9 @@ public void write() {
final String key = keyIterator.next();
HoodieRecord<T> record = recordMap.get(key);
if (useWriterSchema) {
write(record, record.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps()));
write(record, record.getData().getInsertValue(writeSchemaWithMetaFields, config.getProps()));
} else {
write(record, record.getData().getInsertValue(tableSchema, config.getProps()));
write(record, record.getData().getInsertValue(writeSchema, config.getProps()));
}
}
} catch (IOException io) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.table.HoodieTable;
Expand Down Expand Up @@ -201,8 +202,8 @@ private void init(String fileId, String partitionPath, HoodieBaseFile baseFileTo
createMarkerFile(partitionPath, newFilePath.getName());

// Create the writer for writing the new version file
fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config,
writeSchemaWithMetaFields, taskContextSupplier);
fileWriter = HoodieFileWriterFactory.getFileWriter(instantTime, newFilePath, hoodieTable,
config, writeSchemaWithMetaFields, taskContextSupplier);
} catch (IOException io) {
LOG.error("Error in update task at commit " + instantTime, io);
writeStatus.setGlobalError(io);
Expand All @@ -229,7 +230,7 @@ protected void initializeIncomingRecordsMap() {
long memoryForMerge = IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config);
LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge);
this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, config.getSpillableMapBasePath(),
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(tableSchema),
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(writeSchema),
config.getCommonConfig().getSpillableDiskMapType(),
config.getCommonConfig().isBitCaskDiskMapCompressionEnabled());
} catch (IOException io) {
Expand Down Expand Up @@ -285,7 +286,7 @@ protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, GenericRecord
}

protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOException {
Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;
Schema schema = useWriterSchemaForCompaction ? writeSchemaWithMetaFields : writeSchema;
Option<IndexedRecord> insertRecord = hoodieRecord.getData().getInsertValue(schema, config.getProps());
// just skip the ignored record
if (insertRecord.isPresent() && insertRecord.get().equals(IGNORE_RECORD)) {
Expand Down Expand Up @@ -345,7 +346,7 @@ public void write(GenericRecord oldRecord) {
try {
Option<IndexedRecord> combinedAvroRecord =
hoodieRecord.getData().combineAndGetUpdateValue(oldRecord,
useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema,
useWriterSchemaForCompaction ? writeSchemaWithMetaFields : writeSchema,
config.getPayloadConfig().getProps());

if (combinedAvroRecord.isPresent() && combinedAvroRecord.get().equals(IGNORE_RECORD)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public HoodieMergeHandleWithChangeLog(HoodieWriteConfig config, String instantTi
hoodieTable.getMetaClient().getTableConfig(),
partitionPath,
fs,
tableSchema,
getWriterSchema(),
createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
}
Expand All @@ -72,7 +72,7 @@ public HoodieMergeHandleWithChangeLog(HoodieWriteConfig config, String instantTi
hoodieTable.getMetaClient().getTableConfig(),
partitionPath,
fs,
tableSchema,
getWriterSchema(),
createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ public void write(GenericRecord oldRecord) {
}
try {
if (useWriterSchemaForCompaction) {
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps()));
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writeSchemaWithMetaFields, config.getProps()));
} else {
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema, config.getProps()));
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writeSchema, config.getProps()));
}
insertRecordsWritten++;
writtenRecordKeys.add(keyToPreWrite);
Expand All @@ -117,9 +117,9 @@ public List<WriteStatus> close() {
HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key);
if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {
if (useWriterSchemaForCompaction) {
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps()));
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writeSchemaWithMetaFields, config.getProps()));
} else {
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema, config.getProps()));
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writeSchema, config.getProps()));
}
insertRecordsWritten++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.WriteMarkersFactory;

Expand Down Expand Up @@ -81,20 +79,7 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O>
public static IgnoreRecord IGNORE_RECORD = new IgnoreRecord();

/**
* The specified schema of the table. ("specified" denotes that this is configured by the client,
* as opposed to being implicitly fetched out of the commit metadata)
*/
protected final Schema tableSchema;
protected final Schema tableSchemaWithMetaFields;

/**
* The write schema. In most case the write schema is the same to the
* input schema. But if HoodieWriteConfig#WRITE_SCHEMA is specified,
* we use the WRITE_SCHEMA as the write schema.
*
* This is useful for the case of custom HoodieRecordPayload which do some conversion
* to the incoming record in it. e.g. the ExpressionPayload do the sql expression conversion
* to the input.
* Schema used to write records into data files
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on my understanding of the latest master, the write schema (with or without meta fields) is only used for bootstrap base files. Make sure we have unit/functional tests around this, so it is not affected.

*/
protected final Schema writeSchema;
protected final Schema writeSchemaWithMetaFields;
Expand All @@ -120,8 +105,6 @@ protected HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String
super(config, Option.of(instantTime), hoodieTable);
this.partitionPath = partitionPath;
this.fileId = fileId;
this.tableSchema = overriddenSchema.orElseGet(() -> getSpecifiedTableSchema(config));
this.tableSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields(tableSchema, config.allowOperationMetadataField());
this.writeSchema = overriddenSchema.orElseGet(() -> getWriteSchema(config));
this.writeSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields(writeSchema, config.allowOperationMetadataField());
this.timer = HoodieTimer.start();
Expand All @@ -132,25 +115,6 @@ protected HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String
schemaOnReadEnabled = !isNullOrEmpty(hoodieTable.getConfig().getInternalSchema());
}

/**
* Get the specified table schema.
* @param config
* @return
*/
private static Schema getSpecifiedTableSchema(HoodieWriteConfig config) {
return new Schema.Parser().parse(config.getSchema());
}

/**
* Get the schema, of the actual write.
*
* @param config
* @return
*/
private static Schema getWriteSchema(HoodieWriteConfig config) {
return new Schema.Parser().parse(config.getWriteSchema());
}

/**
* Generate a write token based on the currently running spark task and its place in the spark dag.
*/
Expand Down Expand Up @@ -272,9 +236,8 @@ protected long getAttemptId() {
return taskContextSupplier.getAttemptIdSupplier().get();
}

protected HoodieFileWriter createNewFileWriter(String instantTime, Path path, HoodieTable<T, I, K, O> hoodieTable,
HoodieWriteConfig config, Schema schema, TaskContextSupplier taskContextSupplier) throws IOException {
return HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config, schema, taskContextSupplier);
private static Schema getWriteSchema(HoodieWriteConfig config) {
return new Schema.Parser().parse(config.getWriteSchema());
}

protected HoodieLogFormat.Writer createLogWriter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.avro.AvroSchemaUtils.isSchemaCompatible;
import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS;
import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataPartition;
Expand Down Expand Up @@ -781,7 +782,7 @@ public TaskContextSupplier getTaskContextSupplier() {
*/
private void validateSchema() throws HoodieUpsertException, HoodieInsertException {

if (!config.getAvroSchemaValidate() || getActiveTimeline().getCommitsTimeline().filterCompletedInstants().empty()) {
if (!config.shouldValidateAvroSchema() || getActiveTimeline().getCommitsTimeline().filterCompletedInstants().empty()) {
// Check not required
return;
}
Expand All @@ -793,7 +794,7 @@ private void validateSchema() throws HoodieUpsertException, HoodieInsertExceptio
TableSchemaResolver schemaResolver = new TableSchemaResolver(getMetaClient());
writerSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema());
tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaResolver.getTableAvroSchemaWithoutMetadataFields());
isValid = TableSchemaResolver.isSchemaCompatible(tableSchema, writerSchema);
isValid = isSchemaCompatible(tableSchema, writerSchema);
} catch (Exception e) {
throw new HoodieException("Failed to read schema/check compatibility for base path " + metaClient.getBasePath(), e);
}
Expand All @@ -808,15 +809,15 @@ public void validateUpsertSchema() throws HoodieUpsertException {
try {
validateSchema();
} catch (HoodieException e) {
throw new HoodieUpsertException("Failed upsert schema compatibility check.", e);
throw new HoodieUpsertException("Failed upsert schema compatibility check", e);
}
}

public void validateInsertSchema() throws HoodieInsertException {
try {
validateSchema();
} catch (HoodieException e) {
throw new HoodieInsertException("Failed insert schema compability check.", e);
throw new HoodieInsertException("Failed insert schema compatibility check", e);
}
}

Expand Down
Loading