From 8855481c959a20fce97c65da178a0f4804022335 Mon Sep 17 00:00:00 2001 From: yuzhaojing Date: Fri, 5 Nov 2021 13:34:06 +0800 Subject: [PATCH] [HUDI-2686] Proccess record after all bootstrap operator ready --- .../sink/bootstrap/BootstrapOperator.java | 29 +++++++++- .../aggregate/BootstrapAccumulator.java | 53 +++++++++++++++++++ .../aggregate/BootstrapAggFunction.java | 50 +++++++++++++++++ .../batch/BatchBootstrapOperator.java | 4 +- 4 files changed, 133 insertions(+), 3 deletions(-) create mode 100644 hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAccumulator.java create mode 100644 hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAggFunction.java diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java index 1a7be7e30e0c1..f6055ba11d2fc 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java @@ -34,6 +34,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.format.FormatUtils; import org.apache.hudi.util.FlinkTables; @@ -48,6 +49,7 @@ import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -58,6 +60,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import static java.util.stream.Collectors.toList; @@ -71,7 +74,7 @@ * *

The output records should then shuffle by the recordKey and thus do scalable write. */ -public class BootstrapOperator +public class BootstrapOperator> extends AbstractStreamOperator implements OneInputStreamOperator { private static final Logger LOG = LoggerFactory.getLogger(BootstrapOperator.class); @@ -83,6 +86,8 @@ public class BootstrapOperator protected transient org.apache.hadoop.conf.Configuration hadoopConf; protected transient HoodieWriteConfig writeConfig; + private transient GlobalAggregateManager aggregateManager; + private transient ListState instantState; private final Pattern pattern; private String lastInstantTime; @@ -117,6 +122,7 @@ public void initializeState(StateInitializationContext context) throws Exception this.hadoopConf = StreamerUtil.getHadoopConf(); this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, true); this.hoodieTable = FlinkTables.createTable(writeConfig, hadoopConf, getRuntimeContext()); + this.aggregateManager = getRuntimeContext().getGlobalAggregateManager(); preLoadIndexRecords(); } @@ -135,6 +141,27 @@ protected void preLoadIndexRecords() throws Exception { } LOG.info("Finish sending index records, taskId = {}.", getRuntimeContext().getIndexOfThisSubtask()); + + // wait for the other bootstrap tasks finish bootstrapping. + waitForBootstrapReady(getRuntimeContext().getIndexOfThisSubtask()); + } + + /** + * Wait for other bootstrap tasks to finish the index bootstrap. + */ + private void waitForBootstrapReady(int taskID) { + int taskNum = getRuntimeContext().getNumberOfParallelSubtasks(); + int readyTaskNum = 1; + while (taskNum != readyTaskNum) { + try { + readyTaskNum = aggregateManager.updateGlobalAggregate(BootstrapAggFunction.NAME, taskID, new BootstrapAggFunction()); + LOG.info("Waiting for other bootstrap tasks to complete, taskId = {}.", taskID); + + TimeUnit.SECONDS.sleep(5); + } catch (Exception e) { + LOG.warn("Update global task bootstrap summary error", e); + } + } } @Override diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAccumulator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAccumulator.java new file mode 100644 index 0000000000000..14630a1f89b72 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAccumulator.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.bootstrap.aggregate; + +import java.io.Serializable; +import java.util.HashSet; +import java.util.Set; + +/** + * Bootstrap ready task id accumulator. + */ +public class BootstrapAccumulator implements Serializable { + private static final long serialVersionUID = 1L; + + private final Set readyTaskSet; + + public BootstrapAccumulator() { + this.readyTaskSet = new HashSet<>(); + } + + public void update(int taskId) { + readyTaskSet.add(taskId); + } + + public int readyTaskNum() { + return readyTaskSet.size(); + } + + public BootstrapAccumulator merge(BootstrapAccumulator acc) { + if (acc == null) { + return this; + } + + readyTaskSet.addAll(acc.readyTaskSet); + return this; + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAggFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAggFunction.java new file mode 100644 index 0000000000000..8c42fe903ad3c --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAggFunction.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.bootstrap.aggregate; + +import org.apache.flink.api.common.functions.AggregateFunction; + +/** + * Aggregate function that accumulates the loaded task number of + * function {@link org.apache.hudi.sink.bootstrap.BootstrapOperator}. + */ +public class BootstrapAggFunction implements AggregateFunction { + public static final String NAME = BootstrapAggFunction.class.getSimpleName(); + + @Override + public BootstrapAccumulator createAccumulator() { + return new BootstrapAccumulator(); + } + + @Override + public BootstrapAccumulator add(Integer taskId, BootstrapAccumulator bootstrapAccumulator) { + bootstrapAccumulator.update(taskId); + return bootstrapAccumulator; + } + + @Override + public Integer getResult(BootstrapAccumulator bootstrapAccumulator) { + return bootstrapAccumulator.readyTaskNum(); + } + + @Override + public BootstrapAccumulator merge(BootstrapAccumulator bootstrapAccumulator, BootstrapAccumulator acc) { + return bootstrapAccumulator.merge(acc); + } +} \ No newline at end of file diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapOperator.java index 1fde4593707b6..ead00d40a936d 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapOperator.java @@ -39,7 +39,7 @@ * *

The input records should shuffle by the partition path to avoid repeated loading. */ -public class BatchBootstrapOperator +public class BatchBootstrapOperator> extends BootstrapOperator { private Set partitionPathSet; @@ -64,7 +64,7 @@ protected void preLoadIndexRecords() { @Override @SuppressWarnings("unchecked") public void processElement(StreamRecord element) throws Exception { - final HoodieRecord record = (HoodieRecord) element.getValue(); + final HoodieRecord record = (HoodieRecord) element.getValue(); final String partitionPath = record.getKey().getPartitionPath(); if (haveSuccessfulCommits && !partitionPathSet.contains(partitionPath)) {