Skip to content

Conversation

@xiarixiaoyao
Copy link
Contributor

Tips

What is the purpose of the pull request

(For example: This pull request adds quick-start document.)

Brief change log

(for example:)

  • Modify AnnotationLocation checkstyle rule in checkstyle.xml

Verify this pull request

(Please pick either of the following options)

This pull request is a trivial rework / code cleanup without any test coverage.

(or)

This pull request is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end.
  • Added HoodieClientWriteTest to verify the change.
  • Manually verified the change by running a job locally.

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

// TODO Unify the logical of rewriteRecordWithMetadata and rewriteEvolutionRecordWithMetadata, and delete this function.
public static GenericRecord rewriteEvolutionRecordWithMetadata(GenericRecord genericRecord, Schema newSchema, String fileName) {
GenericRecord newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(genericRecord, newSchema, new HashMap<>());
// do not preserve FILENAME_METADATA_FIELD
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just copy from rewriteRecordWithMetadata,
but i donnot know why we need rewrite genericRecord, it will cost some time.
can we modfiy genericRecord directly ? just like genericRecord.put(HoodieRecord.FILENAME_METADATA_FIELD_POS, fileName);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Records should be immutable by default, with only limited scopes where treating them as mutable is acceptable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks. After this operation, we will write parquet files directly, The life cycle of genericRecord has come to an end. I think we can try to turn these records into mutable in this place. Of course, let me try this in the next version.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These records might actually be used upstream in some follow-up operations, hence it's preferred to keep them immutable since at this level we don't control their lifecycle

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, lets revisit after 0.11 to see if we can avoid full rewrite in some cases. I understand the intent to recreate new record to avoid mutations, but it does incur perf hits. I should have thought about this when we fixed the HoodieMergeHandle for commit time fix in earlier patch. missed to bring it up.

@xiarixiaoyao xiarixiaoyao added the priority:critical Production degraded; pipelines stalled label Apr 20, 2022
@xiarixiaoyao
Copy link
Contributor Author

@alexeykudinkin @bvaradar @xushiyan
could you pls help me review this pr? thanks

@xushiyan xushiyan added priority:blocker Production down; release blocker and removed priority:critical Production degraded; pipelines stalled labels Apr 20, 2022
@alexeykudinkin
Copy link
Contributor

@xiarixiaoyao left some comments.

Can you please add a description for this PR? There's very little context in this PR itself, but also not a lot of in the Jira issue, so hard to understand how exactly HUDI-3855 is related to it.

protected final String writeToken;
protected final TaskContextSupplier taskContextSupplier;
// For full schema evolution
protected final boolean schemaOnReadEnable;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: better to suffix it w/ Enabled

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

// TODO Unify the logical of rewriteRecordWithMetadata and rewriteEvolutionRecordWithMetadata, and delete this function.
public static GenericRecord rewriteEvolutionRecordWithMetadata(GenericRecord genericRecord, Schema newSchema, String fileName) {
GenericRecord newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(genericRecord, newSchema, new HashMap<>());
// do not preserve FILENAME_METADATA_FIELD
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Records should be immutable by default, with only limited scopes where treating them as mutable is acceptable

if (!renameCols.isEmpty() && oldSchema.getField(field.name()) == null) {
String fieldName = field.name();
for (Map.Entry<String, String> entry : renameCols.entrySet()) {
List<String> nameParts = Arrays.asList(entry.getKey().split("\\."));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're accepting the dot-path specification, why are we looking at the last element of the chain?
Since we're doing top-down traversal, we should look at the first and make sure we're also need to either to keep an index at what level we are, or trim the column names as we traverse

@xiarixiaoyao
Copy link
Contributor Author

@alexeykudinkin
Thank you very much for your review, addressed all comments
add more test for nested rename operation.

by HUDI-3855: we will rewrite old record before write it to parquet file
for schema evolution rename scene, since old parquet file has old name, when we rewrite the old record with new schema, the value belong to old name will be missed which lead to a serious problem
for example;
1)now current cow hoodie table have a old parquet file which schema is: a int, b string
2) we rename a -> aa, now new schema for hoodie table will be : aa int, b string
3) let us insert new data to current hoodie table, during the insert operation we need to read old record from old parquet file,
before HUDI-3855: we can read old record directly and write it to new parquet directly, rename operation has nothing influence to it
after HUDI-3855: before we write old record, we need rewrite it with new schema, now the schema of old record is: a int, b string but the new schema is: aa int, b string, if we rewrite the old record forcely we will miss the value of column a since it is not exists in new schema.

// TODO Unify the logical of rewriteRecordWithMetadata and rewriteEvolutionRecordWithMetadata, and delete this function.
public static GenericRecord rewriteEvolutionRecordWithMetadata(GenericRecord genericRecord, Schema newSchema, String fileName) {
GenericRecord newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(genericRecord, newSchema, new HashMap<>());
// do not preserve FILENAME_METADATA_FIELD
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, lets revisit after 0.11 to see if we can avoid full rewrite in some cases. I understand the intent to recreate new record to avoid mutations, but it does incur perf hits. I should have thought about this when we fixed the HoodieMergeHandle for commit time fix in earlier patch. missed to bring it up.

@xiarixiaoyao
Copy link
Contributor Author

@nsivabalan @alexeykudinkin could you pls review again

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Copy link
Contributor

@alexeykudinkin alexeykudinkin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@xiarixiaoyao i left a bunch of comments, would be great if you could follow up on them in a separate PR


// TODO Unify the logical of rewriteRecordWithMetadata and rewriteEvolutionRecordWithMetadata, and delete this function.
public static GenericRecord rewriteEvolutionRecordWithMetadata(GenericRecord genericRecord, Schema newSchema, String fileName) {
GenericRecord newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(genericRecord, newSchema, new HashMap<>());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiarixiaoyao in general instead of doing new HashMap let's do Collections.emptyMap to avoid allocating any unnecessary objects on the hot-path

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

public static GenericRecord rewriteRecordWithNewSchema(IndexedRecord oldRecord, Schema newSchema) {
Object newRecord = rewriteRecordWithNewSchema(oldRecord, oldRecord.getSchema(), newSchema);
public static GenericRecord rewriteRecordWithNewSchema(IndexedRecord oldRecord, Schema newSchema, Map<String, String> renameCols) {
Object newRecord = rewriteRecordWithNewSchema(oldRecord, oldRecord.getSchema(), newSchema, renameCols, new LinkedList<>());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would suggest to use ArrayDeque instead (it's more performant than LinkedList under most loads)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

if (oldSchema.getField(field.name()) != null) {
Schema.Field oldField = oldSchema.getField(field.name());
helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema()));
helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need helper? We can just insert into the target record right away, right?

helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
} else {
String fieldFullName = createFullName(fieldNames);
String[] colNamePartsFromOldSchema = renameCols.getOrDefault(fieldFullName, "").split("\\.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to split actually, we just need to find the part after the last "." (will reduce amount of memory churn)

} else {
String fieldFullName = createFullName(fieldNames);
String[] colNamePartsFromOldSchema = renameCols.getOrDefault(fieldFullName, "").split("\\.");
String lastColNameFromOldSchema = colNamePartsFromOldSchema[colNamePartsFromOldSchema.length - 1];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: fieldNameFromOldSchema

if (newType.isNestedType()) {
return Types.Field.get(oldField.fieldId(), oldField.isOptional(), nameFromFileSchema, newType, oldField.doc());
return Types.Field.get(oldField.fieldId(), oldField.isOptional(),
useColNameFromFileSchema ? nameFromFileSchema : nameFromQuerySchema, newType, oldField.doc());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please inline as a var and reuse

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

return colNamesFromWriteSchema.stream().filter(f -> {
int filedIdFromWriteSchema = oldSchema.findIdByName(f);
// try to find the cols which has the same id, but have different colName;
return newSchema.getAllIds().contains(filedIdFromWriteSchema) && !newSchema.findfullName(filedIdFromWriteSchema).equalsIgnoreCase(f);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of duplicating the code just do a map first, where you map the name if it's a rename, otherwise return null, then filter all nulls

InternalSchema newSchema = SchemaChangeUtils.applyTableChanges2Schema(internalSchema, updateChange);
Schema newAvroSchema = AvroInternalSchemaConverter.convert(newSchema, avroSchema.getName());
GenericRecord newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema);
GenericRecord newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema, new HashMap<>());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


Schema newAvroSchema = AvroInternalSchemaConverter.convert(newRecord, schema.getName());
GenericRecord newAvroRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema);
GenericRecord newAvroRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema, new HashMap<>());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Seq(null),
Seq(Map("t1" -> 10.0d))
)
spark.sql(s"alter table ${tableName} rename column members to mem")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's in addition to these ones add tests for record rewriting utils in HoodieAvroUtils

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, already added

@nsivabalan
Copy link
Contributor

@xiarixiaoyao : please address the feedback in a follow up PR. I am going ahead and landing this.

@nsivabalan nsivabalan merged commit 037f89e into apache:master Apr 21, 2022
nsivabalan pushed a commit that referenced this pull request Apr 22, 2022
- when columns names are renamed (schema evolution enabled), while copying records from old data file with HoodieMergeHande, renamed columns wasn't handled well.
@xiarixiaoyao
Copy link
Contributor Author

@nsivabalan @alexeykudinkin thanks for your review, let me put another pr to optimize the code

@xiarixiaoyao
Copy link
Contributor Author

xushiyan pushed a commit that referenced this pull request Apr 22, 2022
- when columns names are renamed (schema evolution enabled), while copying records from old data file with HoodieMergeHande, renamed columns wasn't handled well.
@xushiyan xushiyan changed the title [HUDI-3921] Fixed schema evolution cannot work with HUDI-3855 [HUDI-3921] Reconcile schema evolution logic with base file re-writing May 23, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

priority:blocker Production down; release blocker

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants