-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-4814] Schedules new clustering plan based on latest clustering instant #6574
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
6fcda13
d6d651e
bcc7396
c8bebb1
6dd530a
277061f
86efca5
7ced8cc
b158b5a
7bdc7a9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -48,21 +48,28 @@ public class ClusteringPlanActionExecutor<T extends HoodieRecordPayload, I, K, O | |
| private final Option<Map<String, String>> extraMetadata; | ||
|
|
||
| public ClusteringPlanActionExecutor(HoodieEngineContext context, | ||
| HoodieWriteConfig config, | ||
| HoodieTable<T, I, K, O> table, | ||
| String instantTime, | ||
| Option<Map<String, String>> extraMetadata) { | ||
| HoodieWriteConfig config, | ||
| HoodieTable<T, I, K, O> table, | ||
| String instantTime, | ||
| Option<Map<String, String>> extraMetadata) { | ||
| super(context, config, table, instantTime); | ||
| this.extraMetadata = extraMetadata; | ||
| } | ||
|
|
||
| protected Option<HoodieClusteringPlan> createClusteringPlan() { | ||
| LOG.info("Checking if clustering needs to be run on " + config.getBasePath()); | ||
| Option<HoodieInstant> lastClusteringInstant = table.getActiveTimeline().getCompletedReplaceTimeline().lastInstant(); | ||
| Option<HoodieInstant> lastClusteringInstant; | ||
| Option<HoodieInstant> pendingInstant = table.getActiveTimeline().filterPendingReplaceTimeline().lastInstant(); | ||
| if (pendingInstant.isPresent()) { | ||
| lastClusteringInstant = pendingInstant; | ||
| } else { | ||
| lastClusteringInstant = table.getActiveTimeline().getCompletedReplaceTimeline().lastInstant(); | ||
| } | ||
|
|
||
| int commitsSinceLastClustering = table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants() | ||
| .findInstantsAfter(lastClusteringInstant.map(HoodieInstant::getTimestamp).orElse("0"), Integer.MAX_VALUE) | ||
| .countInstants(); | ||
|
|
||
| if (config.inlineClusteringEnabled() && config.getInlineClusterMaxCommits() > commitsSinceLastClustering) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This and the condition below guarantee that the clustering is only scheduled based on the max_commits config. @eric9204 could you double check the logic?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @yihua yes, this is indeed a redundant inspection, I'm testing whether this condition is needed. By adding these two conditions, it can really be guaranteed that only one clustering is running at the same time, and if there is no completed clustering, no new clustering plan will be generated. Configure only these three parameters. 'clustering.schedule.enabled'='true', Configure only these three parameters. 'clustering.schedule.enabled'='true',
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok, so the issue we are trying to solve is: there is a regular writer which just schedules clustering and we have a async clustering job which does the execution of clustering. if clustering is pending (may be will be executed by an async clustering job), every new successful commit with regular writer will keep adding new replacecommit.requested. If yes, then the fix makes sense to me.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. but one thing which I am finding it hard to comprehend is. wrt clustering, either both planning and execution is inline. or both are async atleast wrt spark datasource writer. So, not sure how the user ended up where clustering was just scheduled w/o getting to completion.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, the fix makes sense to me too.
Flink writer schedules a clustering plan on each successful regular commit and there is a async pipeline that executes the clustering continuously, this patch can solve the problem that the clustering plan schedules too frequently if there is pending clustering. So, +1 from my side. |
||
| LOG.info("Not scheduling inline clustering as only " + commitsSinceLastClustering | ||
| + " commits was found since last clustering " + lastClusteringInstant + ". Waiting for " | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
guess we can filter the instants directly by the action type here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@danny0405 good idea,I'm working on it.