-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-3085] improve bulk insert partitioner abstraction #4441
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
eba225e to
10b433d
Compare
|
@YuweiXiao Could you rebase your PR on master to resolve the conflicts? |
Thanks for the reminder. The conflict has been resolved. |
|
@hudi-bot run azure |
|
@hudi-bot run azure |
|
@hudi-bot run azure |
|
@hudi-bot run azure |
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
Outdated
Show resolved
Hide resolved
| * partitions should be almost equal to (#inputRecords / #outputSparkPartitions) to avoid possible skews. | ||
| */ | ||
| public interface BulkInsertPartitioner<I> { | ||
| public abstract class BulkInsertPartitioner<I> implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This interface is public and users may implement their own bulk insert partitioner as a plugin. The change from interface to abstract class is not backward compatible. Could you keep it as an interface and use default methods for new logic?
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
Outdated
Show resolved
Hide resolved
| return fileIdPfx; | ||
| } | ||
|
|
||
| public void setDefaultWriteHandleFactory(WriteHandleFactory defaultWriteHandleFactory) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should setDefaultWriteHandleFactory() functionality be implemented through the constructor with the defaultWriteHandleFactory passed in? e.g.,
public GlobalSortPartitioner(WriteHandleFactory defaultWriteHandleFactory);
| config, instantTime, table, | ||
| fileIdPrefixProvider.createFilePrefix(""), table.getTaskContextSupplier(), | ||
| new CreateHandleFactory<>()).forEachRemaining(writeStatuses::addAll); | ||
| partitioner.getWriteHandleFactory(0)).forEachRemaining(writeStatuses::addAll); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks hacky
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True... It is only for Java Engine, where the bulk_insert routes all record to a single file group. Let me check if there is a better way to do abstraction for Java engine.
| private WriteHandleFactory defaultWriteHandleFactory; | ||
| private List<String> fileIdPfx; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After looking at this PR as a whole, I'm thinking that it may be better to store a generating function of partitionId -> fileIdPrefix, and partitionId -> writeHandleFactory and have those functions passed in from the constructor.
Do you have any PoC of BulkInsertPartitioner implementation that provides partition-specific file ID and write handle factory? I'd like to understand how these are coupled with the repartition logic and how the interface design can accommodate the use case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, in order to enable concurrent clustering and upsert to a same file group, we have to control how records are routing to file group in the clustering (which uses bulk_insert to write records). So in my case, customized ClusteringExecutionStrategy and BulkInsertPartitioner are implemented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The overall design indeed is partitionId -> fileIdPrefix (fileIdPfxList), partitionId -> writeHandleFactory (getWriteHandleFactory interface). Of course, will go with having those organized in the constructor, which should make the design more clear.
| HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build(); | ||
|
|
||
| BulkInsertPartitioner partitioner = getPartitioner(strategyParams, schema); | ||
| partitioner.setDefaultWriteHandleFactory(new CreateHandleFactory(preserveHoodieMetadata)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be achieved through constructor.
| @Override | ||
| public List<HoodieRecord<T>> repartitionRecords( | ||
| List<HoodieRecord<T>> records, int outputSparkPartitions) { | ||
| generateFileIdPfx(outputSparkPartitions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering if this can be achieved by a function (func) passed to the constructor and the logic of sth like IntStream.range(0, outputSparkPartitions).mapToObj(i -> func.apply(i))?
| return new SparkLazyInsertIterable<>(recordItr, areRecordsSorted, config, instantTime, hoodieTable, | ||
| fileIDPrefixes.get(partition), hoodieTable.getTaskContextSupplier(), useWriterSchema, | ||
| writeHandleFactory); | ||
| (String)partitioner.getFileIdPfx().get(partition), hoodieTable.getTaskContextSupplier(), useWriterSchema, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to apply the partition ID -> file ID here if the partitioner just stores the function.
|
Thanks for the review! @yihua Will try to use passed-in function to manage the partition-fileId-writeHandle mapping, believe will have a better, clear interface. |
2243884 to
58a02eb
Compare
|
Hey Yihua @yihua, the PR is ready for another round of review~:) |
…e handling of fileIdPfx & write handle factory into partitioner
|
@hudi-bot run azure |
|
|
||
| /** | ||
| * Return write handle factory for the given partition. | ||
| * By default, return CreateHandleFactory which will always write to a new file group |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The description is not correct since it returns empty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Fixed.
|
|
||
| // write new files | ||
| List<WriteStatus> writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false)); | ||
| List<WriteStatus> writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, partitioner, false, config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this line can be split into two lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, fixed.
| config, instantTime, table, | ||
| fileIdPrefixProvider.createFilePrefix(""), table.getTaskContextSupplier(), | ||
| new CreateHandleFactory<>()).forEachRemaining(writeStatuses::addAll); | ||
| (WriteHandleFactory) partitioner.getWriteHandleFactory(0).orElse(writeHandleFactory)).forEachRemaining(writeStatuses::addAll); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here what's the meaning of passing 0 as partitioneId?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
0 means getting write handle factory for partition 0. The code is consistent with previous behavior, as java engine always has only one data partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can add some comments here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
|
|
||
| @Override | ||
| public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions) { | ||
| SerializableSchema serializableSchema = new SerializableSchema(schema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice improvement.
| executor.getCommitActionType(), instantTime), Option.empty(), | ||
| config.shouldAllowMultiWriteOnSameInstant()); | ||
|
|
||
| BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.isPresent() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the duplicate code in JavaBulkInsertHelper, can we unify it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The BulkInsertPartitionerFactory is different for spark&java. I could extract an interface(e.g., GetBulkInsertPartitionerFactory) to the base class if we want to unify the code. But as yihua said, the change of public interface may broken existing users' code, requiring them to update their code too.
ps. I have re-written this part of the code to make it more clear.
leesf
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, @yihua do you have any other concern?
|
@hudi-bot run azure |
1 similar comment
|
@hudi-bot run azure |
Tips
What is the purpose of the pull request
Restructure the bulk insert partitioner interface, to include the handling of fileIdPfx & write handle factory.
With this improvement, one can implement a new bulk_insert partitioner that is capable of routing records to pre-defined fileIds using customized write factory (e.g., different write factories for different partitions)
JIRA link
Brief change log
BulkInsertPartitioner, to include the logic of handlingfileIdPrefixandwriteHandlerFactory. With this update, the partitioner of bulk_insert path now have ability to control records' final file group location (similar to the partitioner of the upsert path)AbstractBulkInsertHelperand its subclasses) to make use of the new partitioner interface. Now the partitioner are mandatory (not optional anymore), similar to the standard upsert/insert path.FileIdPrefixProvider.Verify this pull request
Added a fileId generation check to existing tests, and other parts are already covered by existing tests, such as
TestBulkInsertInternalPartitioner.Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.