Skip to content

Commit ad3fa03

Browse files
committed
[HUDI-4273] Support inline schedule clustering for Flink stream
1 parent 25bbff6 commit ad3fa03

24 files changed

+712
-121
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -553,6 +553,24 @@ public void rollbackInflightCompaction(HoodieInstant inflightInstant, Function<S
553553
getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
554554
}
555555

556+
public void rollbackInflightClustering(HoodieInstant inflightInstant) {
557+
rollbackInflightClustering(inflightInstant, s -> Option.empty());
558+
}
559+
560+
/**
561+
* Rollback failed clustering. Inflight rollbacks for clustering revert the .inflight file
562+
* to the .requested file.
563+
*
564+
* @param inflightInstant Inflight Clustering Instant
565+
*/
566+
public void rollbackInflightClustering(HoodieInstant inflightInstant, Function<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
567+
final String commitTime = getPendingRollbackInstantFunc.apply(inflightInstant.getTimestamp()).map(entry
568+
-> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
569+
scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers());
570+
rollback(context, commitTime, inflightInstant, false, false);
571+
getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
572+
}
573+
556574
/**
557575
* Finalize the written data onto storage. Perform any final cleanups.
558576
*

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public Option<HoodieCompactionPlan> execute() {
7171
if (!config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()
7272
&& !config.getFailedWritesCleanPolicy().isLazy()) {
7373
// TODO(yihua): this validation is removed for Java client used by kafka-connect. Need to revisit this.
74-
if (config.getEngineType() != EngineType.JAVA) {
74+
if (config.getEngineType() == EngineType.SPARK) {
7575
// if there are inflight writes, their instantTime must not be less than that of compaction instant time
7676
table.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction().firstInstant()
7777
.ifPresent(earliestInflight -> ValidationUtils.checkArgument(

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -594,6 +594,12 @@ private FlinkOptions() {
594594
.defaultValue(false) // default false for pipeline
595595
.withDescription("Schedule the cluster plan, default false");
596596

597+
public static final ConfigOption<Boolean> CLUSTERING_ASYNC_ENABLED = ConfigOptions
598+
.key("clustering.async.enabled")
599+
.booleanType()
600+
.defaultValue(false) // default false for pipeline
601+
.withDescription("Async Clustering, default false");
602+
597603
public static final ConfigOption<Integer> CLUSTERING_DELTA_COMMITS = ConfigOptions
598604
.key("clustering.delta_commits")
599605
.intType()
@@ -641,7 +647,7 @@ private FlinkOptions() {
641647
public static final ConfigOption<String> CLUSTERING_SORT_COLUMNS = ConfigOptions
642648
.key("clustering.plan.strategy.sort.columns")
643649
.stringType()
644-
.noDefaultValue()
650+
.defaultValue("")
645651
.withDescription("Columns to sort the data by when clustering");
646652

647653
public static final ConfigOption<Integer> CLUSTERING_MAX_NUM_GROUPS = ConfigOptions

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
2222
import org.apache.hudi.common.model.WriteOperationType;
23+
import org.apache.hudi.common.util.StringUtils;
2324
import org.apache.hudi.index.HoodieIndex;
2425
import org.apache.hudi.table.format.FilePathUtils;
2526

@@ -42,7 +43,10 @@ public static boolean insertClustering(Configuration conf) {
4243
* Returns whether the insert is clustering disabled with given configuration {@code conf}.
4344
*/
4445
public static boolean isAppendMode(Configuration conf) {
45-
return isCowTable(conf) && isInsertOperation(conf) && !conf.getBoolean(FlinkOptions.INSERT_CLUSTER);
46+
// 1. inline clustering is supported for COW table;
47+
// 2. async clustering is supported for both COW and MOR table
48+
return isCowTable(conf) && isInsertOperation(conf) && !conf.getBoolean(FlinkOptions.INSERT_CLUSTER)
49+
|| needsScheduleClustering(conf);
4650
}
4751

4852
/**
@@ -115,4 +119,49 @@ public static boolean emitChangelog(Configuration conf) {
115119
return conf.getBoolean(FlinkOptions.READ_AS_STREAMING)
116120
&& conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED);
117121
}
122+
123+
/**
124+
* Returns whether there is need to schedule the async compaction.
125+
*
126+
* @param conf The flink configuration.
127+
*/
128+
public static boolean needsAsyncCompaction(Configuration conf) {
129+
return OptionsResolver.isMorTable(conf)
130+
&& conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED);
131+
}
132+
133+
/**
134+
* Returns whether there is need to schedule the compaction plan.
135+
*
136+
* @param conf The flink configuration.
137+
*/
138+
public static boolean needsScheduleCompaction(Configuration conf) {
139+
return OptionsResolver.isMorTable(conf)
140+
&& conf.getBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED);
141+
}
142+
143+
/**
144+
* Returns whether there is need to schedule the async clustering.
145+
*
146+
* @param conf The flink configuration.
147+
*/
148+
public static boolean needsAsyncClustering(Configuration conf) {
149+
return isInsertOperation(conf) && conf.getBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED);
150+
}
151+
152+
/**
153+
* Returns whether there is need to schedule the clustering plan.
154+
*
155+
* @param conf The flink configuration.
156+
*/
157+
public static boolean needsScheduleClustering(Configuration conf) {
158+
return isInsertOperation(conf) && conf.getBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED);
159+
}
160+
161+
/**
162+
* Returns whether the clustering sort is enabled.
163+
*/
164+
public static boolean sortClusteringEnabled(Configuration conf) {
165+
return !StringUtils.isNullOrEmpty(conf.getString(FlinkOptions.CLUSTERING_SORT_COLUMNS));
166+
}
118167
}

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.hudi.sink.meta.CkpMetadata;
3838
import org.apache.hudi.sink.utils.HiveSyncContext;
3939
import org.apache.hudi.sink.utils.NonThrownExecutor;
40+
import org.apache.hudi.util.ClusteringUtil;
4041
import org.apache.hudi.util.CompactionUtil;
4142
import org.apache.hudi.util.StreamerUtil;
4243

@@ -253,6 +254,11 @@ public void notifyCheckpointComplete(long checkpointId) {
253254
CompactionUtil.scheduleCompaction(metaClient, writeClient, tableState.isDeltaTimeCompaction, committed);
254255
}
255256

257+
if (tableState.scheduleClustering) {
258+
// if async clustering is on, schedule the clustering
259+
ClusteringUtil.scheduleClustering(conf, writeClient, committed);
260+
}
261+
256262
if (committed) {
257263
// start new instant.
258264
startInstant();
@@ -606,6 +612,7 @@ private static class TableState implements Serializable {
606612
final String commitAction;
607613
final boolean isOverwrite;
608614
final boolean scheduleCompaction;
615+
final boolean scheduleClustering;
609616
final boolean syncHive;
610617
final boolean syncMetadata;
611618
final boolean isDeltaTimeCompaction;
@@ -615,7 +622,8 @@ private TableState(Configuration conf) {
615622
this.commitAction = CommitUtils.getCommitActionType(this.operationType,
616623
HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE).toUpperCase(Locale.ROOT)));
617624
this.isOverwrite = WriteOperationType.isOverwrite(this.operationType);
618-
this.scheduleCompaction = StreamerUtil.needsScheduleCompaction(conf);
625+
this.scheduleCompaction = OptionsResolver.needsScheduleCompaction(conf);
626+
this.scheduleClustering = OptionsResolver.needsScheduleClustering(conf);
619627
this.syncHive = conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED);
620628
this.syncMetadata = conf.getBoolean(FlinkOptions.METADATA_ENABLED);
621629
this.isDeltaTimeCompaction = OptionsResolver.isDeltaTimeCompaction(conf);

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitEvent.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import java.util.List;
2525

2626
/**
27-
* Represents a commit event from the clustering task {@link ClusteringFunction}.
27+
* Represents a commit event from the clustering task {@link ClusteringOperator}.
2828
*/
2929
public class ClusteringCommitEvent implements Serializable {
3030
private static final long serialVersionUID = 1L;
@@ -51,6 +51,10 @@ public ClusteringCommitEvent(String instant, List<WriteStatus> writeStatuses, in
5151
this.taskID = taskID;
5252
}
5353

54+
public ClusteringCommitEvent(String instant, int taskID) {
55+
this(instant, null, taskID);
56+
}
57+
5458
public void setInstant(String instant) {
5559
this.instant = instant;
5660
}
@@ -74,4 +78,8 @@ public List<WriteStatus> getWriteStatuses() {
7478
public int getTaskID() {
7579
return taskID;
7680
}
81+
82+
public boolean isFailed() {
83+
return this.writeStatuses == null;
84+
}
7785
}

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.hudi.sink.CleanFunction;
3636
import org.apache.hudi.table.HoodieFlinkTable;
3737
import org.apache.hudi.table.action.HoodieWriteMetadata;
38+
import org.apache.hudi.util.CompactionUtil;
3839
import org.apache.hudi.util.StreamerUtil;
3940

4041
import org.apache.flink.configuration.Configuration;
@@ -115,6 +116,30 @@ private void commitIfNecessary(String instant, List<ClusteringCommitEvent> event
115116
if (!isReady) {
116117
return;
117118
}
119+
120+
if (events.stream().anyMatch(ClusteringCommitEvent::isFailed)) {
121+
try {
122+
// handle failure case
123+
CompactionUtil.rollbackCompaction(table, instant);
124+
} finally {
125+
// remove commitBuffer to avoid obsolete metadata commit
126+
reset(instant);
127+
}
128+
return;
129+
}
130+
131+
try {
132+
doCommit(instant, clusteringPlan, events);
133+
} catch (Throwable throwable) {
134+
// make it fail-safe
135+
LOG.error("Error while committing clustering instant: " + instant, throwable);
136+
} finally {
137+
// reset the status
138+
reset(instant);
139+
}
140+
}
141+
142+
private void doCommit(String instant, HoodieClusteringPlan clusteringPlan, List<ClusteringCommitEvent> events) {
118143
List<WriteStatus> statuses = events.stream()
119144
.map(ClusteringCommitEvent::getWriteStatuses)
120145
.flatMap(Collection::stream)
@@ -139,9 +164,6 @@ private void commitIfNecessary(String instant, List<ClusteringCommitEvent> event
139164
this.table.getMetaClient().reloadActiveTimeline();
140165
this.writeClient.completeTableService(
141166
TableServiceType.CLUSTER, writeMetadata.getCommitMetadata().get(), table, instant);
142-
143-
// reset the status
144-
reset(instant);
145167
}
146168

147169
private void reset(String instant) {

0 commit comments

Comments
 (0)