diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
index 1a7be7e30e0c1..f6055ba11d2fc 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
@@ -34,6 +34,7 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.format.FormatUtils;
import org.apache.hudi.util.FlinkTables;
@@ -48,6 +49,7 @@
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -58,6 +60,7 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import static java.util.stream.Collectors.toList;
@@ -71,7 +74,7 @@
*
*
The output records should then shuffle by the recordKey and thus do scalable write.
*/
-public class BootstrapOperator
+public class BootstrapOperator>
extends AbstractStreamOperator implements OneInputStreamOperator {
private static final Logger LOG = LoggerFactory.getLogger(BootstrapOperator.class);
@@ -83,6 +86,8 @@ public class BootstrapOperator
protected transient org.apache.hadoop.conf.Configuration hadoopConf;
protected transient HoodieWriteConfig writeConfig;
+ private transient GlobalAggregateManager aggregateManager;
+
private transient ListState instantState;
private final Pattern pattern;
private String lastInstantTime;
@@ -117,6 +122,7 @@ public void initializeState(StateInitializationContext context) throws Exception
this.hadoopConf = StreamerUtil.getHadoopConf();
this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, true);
this.hoodieTable = FlinkTables.createTable(writeConfig, hadoopConf, getRuntimeContext());
+ this.aggregateManager = getRuntimeContext().getGlobalAggregateManager();
preLoadIndexRecords();
}
@@ -135,6 +141,27 @@ protected void preLoadIndexRecords() throws Exception {
}
LOG.info("Finish sending index records, taskId = {}.", getRuntimeContext().getIndexOfThisSubtask());
+
+ // wait for the other bootstrap tasks finish bootstrapping.
+ waitForBootstrapReady(getRuntimeContext().getIndexOfThisSubtask());
+ }
+
+ /**
+ * Wait for other bootstrap tasks to finish the index bootstrap.
+ */
+ private void waitForBootstrapReady(int taskID) {
+ int taskNum = getRuntimeContext().getNumberOfParallelSubtasks();
+ int readyTaskNum = 1;
+ while (taskNum != readyTaskNum) {
+ try {
+ readyTaskNum = aggregateManager.updateGlobalAggregate(BootstrapAggFunction.NAME, taskID, new BootstrapAggFunction());
+ LOG.info("Waiting for other bootstrap tasks to complete, taskId = {}.", taskID);
+
+ TimeUnit.SECONDS.sleep(5);
+ } catch (Exception e) {
+ LOG.warn("Update global task bootstrap summary error", e);
+ }
+ }
}
@Override
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAccumulator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAccumulator.java
new file mode 100644
index 0000000000000..14630a1f89b72
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAccumulator.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bootstrap.aggregate;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Bootstrap ready task id accumulator.
+ */
+public class BootstrapAccumulator implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final Set readyTaskSet;
+
+ public BootstrapAccumulator() {
+ this.readyTaskSet = new HashSet<>();
+ }
+
+ public void update(int taskId) {
+ readyTaskSet.add(taskId);
+ }
+
+ public int readyTaskNum() {
+ return readyTaskSet.size();
+ }
+
+ public BootstrapAccumulator merge(BootstrapAccumulator acc) {
+ if (acc == null) {
+ return this;
+ }
+
+ readyTaskSet.addAll(acc.readyTaskSet);
+ return this;
+ }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAggFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAggFunction.java
new file mode 100644
index 0000000000000..8c42fe903ad3c
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAggFunction.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bootstrap.aggregate;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+
+/**
+ * Aggregate function that accumulates the loaded task number of
+ * function {@link org.apache.hudi.sink.bootstrap.BootstrapOperator}.
+ */
+public class BootstrapAggFunction implements AggregateFunction {
+ public static final String NAME = BootstrapAggFunction.class.getSimpleName();
+
+ @Override
+ public BootstrapAccumulator createAccumulator() {
+ return new BootstrapAccumulator();
+ }
+
+ @Override
+ public BootstrapAccumulator add(Integer taskId, BootstrapAccumulator bootstrapAccumulator) {
+ bootstrapAccumulator.update(taskId);
+ return bootstrapAccumulator;
+ }
+
+ @Override
+ public Integer getResult(BootstrapAccumulator bootstrapAccumulator) {
+ return bootstrapAccumulator.readyTaskNum();
+ }
+
+ @Override
+ public BootstrapAccumulator merge(BootstrapAccumulator bootstrapAccumulator, BootstrapAccumulator acc) {
+ return bootstrapAccumulator.merge(acc);
+ }
+}
\ No newline at end of file
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapOperator.java
index 1fde4593707b6..ead00d40a936d 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapOperator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapOperator.java
@@ -39,7 +39,7 @@
*
* The input records should shuffle by the partition path to avoid repeated loading.
*/
-public class BatchBootstrapOperator
+public class BatchBootstrapOperator>
extends BootstrapOperator {
private Set partitionPathSet;
@@ -64,7 +64,7 @@ protected void preLoadIndexRecords() {
@Override
@SuppressWarnings("unchecked")
public void processElement(StreamRecord element) throws Exception {
- final HoodieRecord record = (HoodieRecord>) element.getValue();
+ final HoodieRecord> record = (HoodieRecord>) element.getValue();
final String partitionPath = record.getKey().getPartitionPath();
if (haveSuccessfulCommits && !partitionPathSet.contains(partitionPath)) {