From 88c75fa8f5ad16221b3c572f667128a3d07e2b46 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 18 Nov 2022 12:05:48 -0800 Subject: [PATCH 1/7] Revisiting termination sequence in write handles to shutdown writing handles w/in the executor's consumers, as soon as writing is completed --- .../lock/metrics/HoodieLockMetrics.java | 4 +- .../client/utils/ClosableMergingIterator.java | 40 +++++++++++++++++ .../apache/hudi/io/HoodieAppendHandle.java | 28 +++++++----- .../apache/hudi/io/HoodieCreateHandle.java | 6 +++ .../org/apache/hudi/io/HoodieMergeHandle.java | 12 +++-- .../org/apache/hudi/io/HoodieWriteHandle.java | 10 +++++ .../bootstrap/BootstrapRecordConsumer.java | 1 + .../table/action/commit/BaseMergeHelper.java | 8 ++++ .../action/commit/HoodieMergeHelper.java | 26 +++++------ .../execution/FlinkLazyInsertIterable.java | 14 +++--- ...nkDeletePartitionCommitActionExecutor.java | 2 +- .../execution/JavaLazyInsertIterable.java | 13 +++--- .../OrcBootstrapMetadataHandler.java | 23 ++++++---- .../ParquetBootstrapMetadataHandler.java | 22 +++++----- .../log/HoodieMergedLogRecordScanner.java | 2 +- .../apache/hudi/common/util/FutureUtils.java | 44 +++++++++++++------ .../apache/hudi/common/util/HoodieTimer.java | 13 ++++-- .../collection/ClosableMappingIterator.java | 35 +++++++++++++++ .../queue/BaseHoodieQueueBasedExecutor.java | 29 ++++++++++-- 19 files changed, 245 insertions(+), 87 deletions(-) create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ClosableMergingIterator.java create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/util/collection/ClosableMappingIterator.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java index 0fb5fd1caa3df..b3780bedef22a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java @@ -39,8 +39,8 @@ public class HoodieLockMetrics { private final HoodieWriteConfig writeConfig; private final boolean isMetricsEnabled; private final int keepLastNtimes = 100; - private final transient HoodieTimer lockDurationTimer = HoodieTimer.create(); - private final transient HoodieTimer lockApiRequestDurationTimer = HoodieTimer.create(); + private final transient HoodieTimer lockDurationTimer = HoodieTimer.delayed(); + private final transient HoodieTimer lockApiRequestDurationTimer = HoodieTimer.delayed(); private transient Counter lockAttempts; private transient Counter successfulLockAttempts; private transient Counter failedLockAttempts; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ClosableMergingIterator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ClosableMergingIterator.java new file mode 100644 index 0000000000000..63d46de39bce6 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ClosableMergingIterator.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.utils; + +import org.apache.hudi.common.util.ClosableIterator; +import org.apache.hudi.common.util.collection.Pair; + +import java.util.function.Function; + +// TODO move to hudi-common +public class ClosableMergingIterator extends MergingIterator implements ClosableIterator { + + public ClosableMergingIterator(ClosableIterator leftIterator, + ClosableIterator rightIterator, + Function, T> mergeFunction) { + super(leftIterator, rightIterator, mergeFunction); + } + + @Override + public void close() { + ((ClosableIterator) leftIterator).close(); + ((ClosableIterator) rightIterator).close(); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index de16ea17c9118..f7df0816e3124 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -65,6 +65,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -498,21 +499,26 @@ protected void doWrite(HoodieRecord record, Schema schema, TypedProperties props @Override public List close() { try { + if (isClosed()) { + // Handle has already been closed + return Collections.emptyList(); + } + + markClosed(); // flush any remaining records to disk appendDataAndDeleteBlocks(header, true); recordItr = null; - if (writer != null) { - writer.close(); - writer = null; - - // update final size, once for all log files - // TODO we can actually deduce file size purely from AppendResult (based on offset and size - // of the appended block) - for (WriteStatus status : statuses) { - long logFileSize = FSUtils.getFileSize(fs, new Path(config.getBasePath(), status.getStat().getPath())); - status.getStat().setFileSizeInBytes(logFileSize); - } + + writer.close(); + + // update final size, once for all log files + // TODO we can actually deduce file size purely from AppendResult (based on offset and size + // of the appended block) + for (WriteStatus status : statuses) { + long logFileSize = FSUtils.getFileSize(fs, new Path(config.getBasePath(), status.getStat().getPath())); + status.getStat().setFileSizeInBytes(logFileSize); } + return statuses; } catch (IOException e) { throw new HoodieUpsertException("Failed to close UpdateHandle", e); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java index 8b20df3f1a5c7..97fd9878b4ffd 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java @@ -203,6 +203,12 @@ public IOType getIOType() { public List close() { LOG.info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records " + recordsWritten); try { + if (isClosed()) { + // Handle has already been closed + return Collections.emptyList(); + } + + markClosed(); if (fileWriter != null) { fileWriter.close(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index d83237cb92536..416e3ebfe5e0c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -407,6 +407,12 @@ protected void writeIncomingRecords() throws IOException { @Override public List close() { try { + if (isClosed()) { + // Handle has already been closed + return Collections.emptyList(); + } + + markClosed(); writeIncomingRecords(); if (keyToNewRecords instanceof ExternalSpillableMap) { @@ -416,10 +422,8 @@ public List close() { keyToNewRecords = null; writtenRecordKeys = null; - if (fileWriter != null) { - fileWriter.close(); - fileWriter = null; - } + fileWriter.close(); + fileWriter = null; long fileSizeInBytes = FSUtils.getFileSize(fs, newFilePath); HoodieWriteStat stat = writeStatus.getStat(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index 3a053e6439e6c..676e407f2c1ae 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -76,6 +76,8 @@ public abstract class HoodieWriteHandle extends HoodieIOHandle hoodieTable, TaskContextSupplier taskContextSupplier) { this(config, instantTime, partitionPath, fileId, hoodieTable, @@ -175,6 +177,14 @@ public void write(HoodieRecord record, Schema schema, TypedProperties props) { doWrite(record, schema, props); } + protected boolean isClosed() { + return closed; + } + + protected void markClosed() { + this.closed = true; + } + public abstract List close(); public List writeStatuses() { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapRecordConsumer.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapRecordConsumer.java index f9b85679fbbe1..b3496ad3dc033 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapRecordConsumer.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapRecordConsumer.java @@ -41,6 +41,7 @@ public void consume(HoodieRecord record) { @Override public Void finish() { + bootstrapHandle.close(); return null; } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java index 17b8620da63f6..922b077c71bb7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java @@ -19,10 +19,17 @@ package org.apache.hudi.table.action.commit; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.client.utils.ClosableMergingIterator; +import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.common.util.queue.HoodieConsumer; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.table.HoodieTable; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + import java.io.IOException; /** @@ -56,6 +63,7 @@ public void consume(HoodieRecord record) { @Override public Void finish() { + upsertHandle.close(); return null; } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java index b7668a347969e..78c0407c6b1bd 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.InternalSchemaCache; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ClosableMappingIterator; import org.apache.hudi.common.util.queue.HoodieExecutor; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; @@ -49,7 +50,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -105,10 +105,10 @@ public void runMerge(HoodieTable table, || !isPureProjection || baseFile.getBootstrapBaseFile().isPresent(); - HoodieExecutor wrapper = null; + HoodieExecutor executor = null; try { - Iterator recordIterator; + ClosableIterator recordIterator; // In case writer's schema is simply a projection of the reader's one we can read // the records in the projected schema directly @@ -148,21 +148,19 @@ public void runMerge(HoodieTable table, return isBufferingRecords ? newRecord.copy() : newRecord; }, table.getPreExecuteRunnable()); - wrapper.execute(); + executor.execute(); } catch (Exception e) { throw new HoodieException(e); } finally { - // HUDI-2875: mergeHandle is not thread safe, we should totally terminate record inputting - // and executor firstly and then close mergeHandle. - baseFileReader.close(); - if (bootstrapFileReader != null) { - bootstrapFileReader.close(); - } - if (null != wrapper) { - wrapper.shutdownNow(); - wrapper.awaitTermination(); + // NOTE: If executor is initialized it's responsible for gracefully shutting down + // both producer and consumer + if (executor != null) { + executor.shutdownNow(); + executor.awaitTermination(); + } else { + reader.close(); + mergeHandle.close(); } - mergeHandle.close(); } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java index 6e573ec9432b6..3a088cc51d21e 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java @@ -56,20 +56,20 @@ public FlinkLazyInsertIterable(Iterator> recordItr, @Override protected List computeNext() { // Executor service used for launching writer thread. - HoodieExecutor> bufferedIteratorExecutor = null; + HoodieExecutor> executor = null; try { - final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema()); - bufferedIteratorExecutor = ExecutorFactory.create(hoodieConfig, inputItr, getExplicitInsertHandler(), + Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema()); + executor = ExecutorFactory.create(hoodieConfig, inputItr, getExplicitInsertHandler(), getTransformer(schema, hoodieConfig)); - final List result = bufferedIteratorExecutor.execute(); + final List result = executor.execute(); checkState(result != null && !result.isEmpty()); return result; } catch (Exception e) { throw new HoodieException(e); } finally { - if (null != bufferedIteratorExecutor) { - bufferedIteratorExecutor.shutdownNow(); - bufferedIteratorExecutor.awaitTermination(); + if (executor != null) { + executor.shutdownNow(); + executor.awaitTermination(); } } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeletePartitionCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeletePartitionCommitActionExecutor.java index 3f19534d08cdb..5fc6d8a807aa6 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeletePartitionCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeletePartitionCommitActionExecutor.java @@ -66,7 +66,7 @@ public HoodieWriteMetadata> execute() { DeletePartitionUtils.checkForPendingTableServiceActions(table, partitions); try { - HoodieTimer timer = new HoodieTimer().startTimer(); + HoodieTimer timer = HoodieTimer.start(); context.setJobStatus(this.getClass().getSimpleName(), "Gather all file ids from all deleting partitions."); Map> partitionToReplaceFileIds = context.parallelize(partitions).distinct().collectAsList() diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java index d2e813a506bc8..d3612fbf86861 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java @@ -59,21 +59,20 @@ public JavaLazyInsertIterable(Iterator> recordItr, @Override protected List computeNext() { // Executor service used for launching writer thread. - HoodieExecutor> bufferedIteratorExecutor = + HoodieExecutor> executor = null; try { final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema()); - bufferedIteratorExecutor = - ExecutorFactory.create(hoodieConfig, inputItr, getInsertHandler(), getTransformer(schema, hoodieConfig)); - final List result = bufferedIteratorExecutor.execute(); + executor = ExecutorFactory.create(hoodieConfig, inputItr, getInsertHandler(), getTransformer(schema, hoodieConfig)); + final List result = executor.execute(); checkState(result != null && !result.isEmpty()); return result; } catch (Exception e) { throw new HoodieException(e); } finally { - if (null != bufferedIteratorExecutor) { - bufferedIteratorExecutor.shutdownNow(); - bufferedIteratorExecutor.awaitTermination(); + if (executor != null) { + executor.shutdownNow(); + executor.awaitTermination(); } } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java index 6c7f70cc58ea7..fa60148ea10bf 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java @@ -70,11 +70,12 @@ void executeBootstrap(HoodieBootstrapHandle bootstrapHandle, Path so if (config.getRecordMerger().getRecordType() == HoodieRecordType.SPARK) { throw new UnsupportedOperationException(); } - HoodieExecutor wrapper = null; Reader orcReader = OrcFile.createReader(sourceFilePath, OrcFile.readerOptions(table.getHadoopConf())); TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(avroSchema); - try (RecordReader reader = orcReader.rows(new Reader.Options(table.getHadoopConf()).schema(orcSchema))) { - wrapper = ExecutorFactory.create(config, new OrcReaderIterator(reader, avroSchema, orcSchema), + HoodieExecutor executor = null; + RecordReader reader = orcReader.rows(new Reader.Options(table.getHadoopConf()).schema(orcSchema)); + try { + executor = ExecutorFactory.create(config, new OrcReaderIterator(reader, avroSchema, orcSchema), new BootstrapRecordConsumer(bootstrapHandle), inp -> { String recKey = keyGenerator.getKey(inp).getRecordKey(); GenericRecord gr = new GenericData.Record(METADATA_BOOTSTRAP_RECORD_SCHEMA); @@ -82,16 +83,20 @@ void executeBootstrap(HoodieBootstrapHandle bootstrapHandle, Path so BootstrapRecordPayload payload = new BootstrapRecordPayload(gr); HoodieRecord rec = new HoodieAvroRecord(new HoodieKey(recKey, partitionPath), payload); return rec; - }, table.getPreExecuteRunnable()); - wrapper.execute(); + }, table.getPreExecuteRunnable()); + executor.execute(); } catch (Exception e) { throw new HoodieException(e); } finally { - if (null != wrapper) { - wrapper.shutdownNow(); - wrapper.awaitTermination(); + // NOTE: If executor is initialized it's responsible for gracefully shutting down + // both producer and consumer + if (executor != null) { + executor.shutdownNow(); + executor.awaitTermination(); + } else { + reader.close(); + bootstrapHandle.close(); } - bootstrapHandle.close(); } } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java index 8f412a39f3cdc..8ef6ab8f5cff4 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java @@ -78,11 +78,12 @@ protected void executeBootstrap(HoodieBootstrapHandle bootstrapHandl KeyGeneratorInterface keyGenerator, String partitionPath, Schema schema) throws Exception { - HoodieExecutor wrapper = null; HoodieRecordMerger recordMerger = table.getConfig().getRecordMerger(); HoodieFileReader reader = HoodieFileReaderFactory.getReaderFactory(recordMerger.getRecordType()) .getFileReader(table.getHadoopConf(), sourceFilePath); + + HoodieExecutor executor = null; try { Function transformer = record -> { String recordKey = record.getRecordKey(schema, Option.of(keyGenerator)); @@ -92,21 +93,22 @@ protected void executeBootstrap(HoodieBootstrapHandle bootstrapHandl // it since these records will be inserted into the queue later. .copy(); }; - ClosableIterator recordIterator = reader.getRecordIterator(schema); - wrapper = ExecutorFactory.create(config, recordIterator, + executor = ExecutorFactory.create(config, recordIterator, new BootstrapRecordConsumer(bootstrapHandle), transformer, table.getPreExecuteRunnable()); - - wrapper.execute(); + executor.execute(); } catch (Exception e) { throw new HoodieException(e); } finally { - reader.close(); - if (null != wrapper) { - wrapper.shutdownNow(); - wrapper.awaitTermination(); + // NOTE: If executor is initialized it's responsible for gracefully shutting down + // both producer and consumer + if (executor != null) { + executor.shutdownNow(); + executor.awaitTermination(); + } else { + reader.close(); + bootstrapHandle.close(); } - bootstrapHandle.close(); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java index 71a4d19fe474c..dfc58238da406 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java @@ -76,7 +76,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader private static final Logger LOG = LoggerFactory.getLogger(HoodieMergedLogRecordScanner.class); // A timer for calculating elapsed time in millis - public final HoodieTimer timer = new HoodieTimer(); + public final HoodieTimer timer = HoodieTimer.delayed(); // Map of compacted/merged records private final ExternalSpillableMap records; // Set of already scanned prefixes allowing us to avoid scanning same prefixes again diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/FutureUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/FutureUtils.java index d9c1dee6e8173..bb9819df8bf76 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/FutureUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/FutureUtils.java @@ -18,8 +18,6 @@ package org.apache.hudi.common.util; -import javax.annotation.Nonnull; - import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; @@ -30,18 +28,38 @@ public class FutureUtils { /** - * Parallel CompletableFutures + * Similar to {@link CompletableFuture#allOf(CompletableFuture[])} with a few important + * differences: + * + *
    + *
  1. Completes successfully as soon as *all* of the futures complete successfully
  2. + *
  3. Completes exceptionally as soon as *any* of the futures complete exceptionally
  4. + *
  5. In case it's completed exceptionally all the other futures not completed yet, will be + * cancelled
  6. + *
* - * @param futures CompletableFuture list - * @return a new CompletableFuture which will completed when all of the given CompletableFutures complete. + * @param futures list of {@link CompletableFuture}s */ - public static CompletableFuture> allOf(@Nonnull List> futures) { - return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) - .thenApply(aVoid -> - futures.stream() - // NOTE: This join wouldn't block, since all the - // futures are completed at this point. - .map(CompletableFuture::join) - .collect(Collectors.toList())); + public static CompletableFuture> allOf(List> futures) { + CompletableFuture union = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + + futures.forEach(future -> { + // NOTE: We add a callback to every future, to cancel all the other not yet completed futures, + // which will be providing for an early termination semantic: whenever any of the futures + // fail other futures will be cancelled and the exception will be returned as a result + future.whenComplete((ignored, throwable) -> { + if (throwable != null) { + futures.forEach(f -> f.cancel(true)); + union.completeExceptionally(throwable); + } + }); + }); + + return union.thenApply(aVoid -> + futures.stream() + // NOTE: This join wouldn't block, since all the + // futures are completed at this point. + .map(CompletableFuture::join) + .collect(Collectors.toList())); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTimer.java b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTimer.java index a0a8ca0867e93..aa49b8f8e48e0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTimer.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTimer.java @@ -30,10 +30,10 @@ public class HoodieTimer { // Ordered stack of TimeInfo's to make sure stopping the timer returns the correct elapsed time - private final Deque timeInfoDeque = new ArrayDeque<>(); + private final Deque timeInfoDeque = new ArrayDeque<>(1); /** - * @deprecated please use either {@link HoodieTimer#start} or {@link HoodieTimer#create} APIs + * @deprecated please use either {@link HoodieTimer#start} or {@link HoodieTimer#delayed} APIs */ @Deprecated public HoodieTimer() { @@ -47,7 +47,6 @@ private HoodieTimer(boolean shouldStart) { } static class TimeInfo { - // captures the startTime of the code block long startTime; // is the timing still running for the last started timer @@ -84,11 +83,17 @@ public long endTimer() { return timeInfoDeque.pop().stop(); } + /** + * Creates an instance of {@link HoodieTimer} already started + */ public static HoodieTimer start() { return new HoodieTimer(true); } - public static HoodieTimer create() { + /** + * Creates an instance of {@link HoodieTimer} that is NOT started + */ + public static HoodieTimer delayed() { return new HoodieTimer(false); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ClosableMappingIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ClosableMappingIterator.java new file mode 100644 index 0000000000000..6e39ceef02f0c --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ClosableMappingIterator.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util.collection; + +import org.apache.hudi.common.util.ClosableIterator; + +import java.util.function.Function; + +public class ClosableMappingIterator extends MappingIterator implements ClosableIterator { + + public ClosableMappingIterator(ClosableIterator sourceIterator, Function transformer) { + super(sourceIterator, transformer); + } + + @Override + public void close() { + ((ClosableIterator) source).close(); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BaseHoodieQueueBasedExecutor.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BaseHoodieQueueBasedExecutor.java index 2bd01bdd33493..12901902f1c42 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BaseHoodieQueueBasedExecutor.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BaseHoodieQueueBasedExecutor.java @@ -25,6 +25,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -66,6 +67,9 @@ public abstract class BaseHoodieQueueBasedExecutor implements HoodieExe private final List> producers; // Consumer protected final Option> consumer; + // Futures corresponding to producing/consuming processes + private CompletableFuture consumingFuture; + private CompletableFuture producingFuture; public BaseHoodieQueueBasedExecutor(List> producers, Option> consumer, @@ -152,7 +156,24 @@ public final boolean awaitTermination() { } @Override - public void shutdownNow() { + public final void shutdownNow() { + // NOTE: PLEASE READ CAREFULLY + // Graceful shutdown sequence have been a source of multiple issues in the + // past (for ex HUDI-2875, HUDI-5238). To handle it appropriately in a graceful + // fashion we're consolidating shutdown sequence w/in the Executor itself (in + // this method) shutting down in following order + // + // 1. We shut down producing/consuming pipeline (by interrupting + // corresponding futures), then + // 2. We shut down producer and consumer (if present), and after that + // 3. We shut down the executors + // + producingFuture.cancel(true); + consumingFuture.cancel(true); + // Clean up resources associated w/ producers/consumers + producers.forEach(HoodieProducer::close); + consumer.ifPresent(HoodieConsumer::finish); + // Shutdown executor-services producerExecutorService.shutdownNow(); consumerExecutorService.shutdownNow(); } @@ -170,12 +191,12 @@ public E execute() { checkState(this.consumer.isPresent()); setUp(); // Start consuming/producing asynchronously - CompletableFuture consuming = startConsumingAsync(); - CompletableFuture producing = startProducingAsync(); + this.consumingFuture = startConsumingAsync(); + this.producingFuture = startProducingAsync(); // NOTE: To properly support mode when there's no consumer, we have to fall back // to producing future as the trigger for us to shut down the queue - return producing.thenCombine(consuming, (aVoid, anotherVoid) -> null) + return allOf(Arrays.asList(producingFuture, consumingFuture)) .whenComplete((ignored, throwable) -> { // Close the queue to release the resources queue.close(); From a22e8beeed8810ddd1caaeb55880e6fe7d2af4b5 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 17 Feb 2023 10:34:23 -0800 Subject: [PATCH 2/7] Tidying up --- .../client/utils/ClosableMergingIterator.java | 21 +++++------ .../hudi/client/utils/MergingIterator.java | 22 ++++++++---- .../util/collection/ClosableIterator.java | 2 -- .../collection/ClosableMappingIterator.java | 35 ------------------- 4 files changed, 26 insertions(+), 54 deletions(-) delete mode 100644 hudi-common/src/main/java/org/apache/hudi/common/util/collection/ClosableMappingIterator.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ClosableMergingIterator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ClosableMergingIterator.java index 63d46de39bce6..bbd51d8b95d49 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ClosableMergingIterator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ClosableMergingIterator.java @@ -18,23 +18,24 @@ package org.apache.hudi.client.utils; -import org.apache.hudi.common.util.ClosableIterator; -import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.common.util.collection.ClosableIterator; -import java.util.function.Function; +import java.util.function.BiFunction; -// TODO move to hudi-common -public class ClosableMergingIterator extends MergingIterator implements ClosableIterator { +/** + * Closeable counterpart of {@link MergingIterator} + */ +public class ClosableMergingIterator extends MergingIterator implements ClosableIterator { - public ClosableMergingIterator(ClosableIterator leftIterator, - ClosableIterator rightIterator, - Function, T> mergeFunction) { + public ClosableMergingIterator(ClosableIterator leftIterator, + ClosableIterator rightIterator, + BiFunction mergeFunction) { super(leftIterator, rightIterator, mergeFunction); } @Override public void close() { - ((ClosableIterator) leftIterator).close(); - ((ClosableIterator) rightIterator).close(); + ((ClosableIterator) leftIterator).close(); + ((ClosableIterator) rightIterator).close(); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MergingIterator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MergingIterator.java index 2dd7c44c4df10..28389100b377f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MergingIterator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MergingIterator.java @@ -18,19 +18,27 @@ package org.apache.hudi.client.utils; -import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.ValidationUtils; import java.util.Iterator; import java.util.function.BiFunction; -public class MergingIterator implements Iterator { +/** + * Iterator providing for the semantic of simultaneously iterating over 2 other iterators + * and combining their respective output + * + * @param type returned by the first iterator + * @param type returned by the second iterator + * @param type returned by this iterator + */ +public class MergingIterator implements Iterator { + + protected final Iterator leftIterator; + protected final Iterator rightIterator; - private final Iterator leftIterator; - private final Iterator rightIterator; - private final BiFunction mergeFunction; + private final BiFunction mergeFunction; - public MergingIterator(Iterator leftIterator, Iterator rightIterator, BiFunction mergeFunction) { + public MergingIterator(Iterator leftIterator, Iterator rightIterator, BiFunction mergeFunction) { this.leftIterator = leftIterator; this.rightIterator = rightIterator; this.mergeFunction = mergeFunction; @@ -45,7 +53,7 @@ public boolean hasNext() { } @Override - public T next() { + public R next() { return mergeFunction.apply(leftIterator.next(), rightIterator.next()); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ClosableIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ClosableIterator.java index ac0335cd3eb22..f91aede5f3e48 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ClosableIterator.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ClosableIterator.java @@ -24,8 +24,6 @@ * An iterator that give a chance to release resources. * * @param The return type - * - * TODO move under common.util.collection */ public interface ClosableIterator extends Iterator, AutoCloseable { @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ClosableMappingIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ClosableMappingIterator.java deleted file mode 100644 index 6e39ceef02f0c..0000000000000 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ClosableMappingIterator.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.common.util.collection; - -import org.apache.hudi.common.util.ClosableIterator; - -import java.util.function.Function; - -public class ClosableMappingIterator extends MappingIterator implements ClosableIterator { - - public ClosableMappingIterator(ClosableIterator sourceIterator, Function transformer) { - super(sourceIterator, transformer); - } - - @Override - public void close() { - ((ClosableIterator) source).close(); - } -} From 00379ef4ec9ce0bab05465636e07c5cfd34c37f5 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 17 Feb 2023 10:34:38 -0800 Subject: [PATCH 3/7] Fixing compilation (after rebase) --- .../hudi/table/action/commit/BaseMergeHelper.java | 7 ------- .../table/action/commit/HoodieMergeHelper.java | 14 +++++++++----- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java index 922b077c71bb7..138e6a840d4bf 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java @@ -19,17 +19,10 @@ package org.apache.hudi.table.action.commit; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.client.utils.ClosableMergingIterator; -import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.common.util.queue.HoodieConsumer; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.table.HoodieTable; -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; - import java.io.IOException; /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java index 78c0407c6b1bd..6161b5bf7c2de 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java @@ -18,15 +18,15 @@ package org.apache.hudi.table.action.commit; +import org.apache.hudi.client.utils.ClosableMergingIterator; import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; -import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.InternalSchemaCache; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.collection.ClosableMappingIterator; +import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.queue.HoodieExecutor; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; @@ -123,7 +123,11 @@ public void runMerge(HoodieTable table, HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(bootstrapFileConfig, bootstrapFilePath), mergeHandle.getPartitionFields(), mergeHandle.getPartitionValues()); - recordIterator = bootstrapFileReader.getRecordIterator(mergeHandle.getWriterSchemaWithMetaFields()); + recordIterator = new ClosableMergingIterator<>( + baseFileRecordIterator, + (ClosableIterator) bootstrapFileReader.getRecordIterator(), + (left, right) -> + left.joinWith(right, mergeHandle.getWriterSchemaWithMetaFields())); recordSchema = mergeHandle.getWriterSchemaWithMetaFields(); } else { recordIterator = baseFileRecordIterator; @@ -132,7 +136,7 @@ public void runMerge(HoodieTable table, boolean isBufferingRecords = ExecutorFactory.isBufferingRecords(writeConfig); - wrapper = ExecutorFactory.create(writeConfig, recordIterator, new UpdateHandler(mergeHandle), record -> { + executor = ExecutorFactory.create(writeConfig, recordIterator, new UpdateHandler(mergeHandle), record -> { HoodieRecord newRecord; if (schemaEvolutionTransformerOpt.isPresent()) { newRecord = schemaEvolutionTransformerOpt.get().apply(record); @@ -158,7 +162,7 @@ public void runMerge(HoodieTable table, executor.shutdownNow(); executor.awaitTermination(); } else { - reader.close(); + baseFileReader.close(); mergeHandle.close(); } } From fc942ae945fad7271dd2c8fc817b4a92b7dc877a Mon Sep 17 00:00:00 2001 From: sivabalan Date: Sat, 18 Mar 2023 10:50:08 -0700 Subject: [PATCH 4/7] fixing test failures --- .../execution/TestBoundedInMemoryExecutorInSpark.java | 7 +++---- .../common/util/queue/BaseHoodieQueueBasedExecutor.java | 8 ++++++-- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java index eb61cb433120d..bca0764b87603 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java @@ -29,7 +29,6 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.testutils.HoodieClientTestHarness; -import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.spark.TaskContext; import org.apache.spark.TaskContext$; @@ -154,15 +153,15 @@ public Integer finish() { @Test public void testExecutorTermination() { - Iterator unboundedRecordIter = new Iterator() { + Iterator unboundedRecordIter = new Iterator() { @Override public boolean hasNext() { return true; } @Override - public GenericRecord next() { - return dataGen.generateGenericRecord(); + public HoodieRecord next() { + return dataGen.generateInserts(instantTime, 1).get(0); } }; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BaseHoodieQueueBasedExecutor.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BaseHoodieQueueBasedExecutor.java index 12901902f1c42..f35d7977490f6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BaseHoodieQueueBasedExecutor.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BaseHoodieQueueBasedExecutor.java @@ -168,8 +168,12 @@ public final void shutdownNow() { // 2. We shut down producer and consumer (if present), and after that // 3. We shut down the executors // - producingFuture.cancel(true); - consumingFuture.cancel(true); + if (producingFuture != null) { + producingFuture.cancel(true); + } + if (consumingFuture != null) { + consumingFuture.cancel(true); + } // Clean up resources associated w/ producers/consumers producers.forEach(HoodieProducer::close); consumer.ifPresent(HoodieConsumer::finish); From f5753cd6b902333e8009ec12d176e2b17f96f9a3 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Mon, 20 Mar 2023 10:08:58 -0700 Subject: [PATCH 5/7] fixing exception flow in HoodieExecutor --- .../hudi/execution/TestDisruptorMessageQueue.java | 13 +------------ .../util/queue/BaseHoodieQueueBasedExecutor.java | 5 +++++ .../common/util/queue/BoundedInMemoryQueue.java | 5 +++++ .../common/util/queue/DisruptorMessageQueue.java | 8 ++++++++ .../hudi/common/util/queue/HoodieMessageQueue.java | 2 ++ 5 files changed, 21 insertions(+), 12 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java index 4c8e0dac27dbc..b2fd31fd3296a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java @@ -281,16 +281,6 @@ public Integer finish() { @Timeout(value = 60) public void testException() throws Exception { final int numRecords = 1000; - final int numProducers = 40; - - final DisruptorMessageQueue queue = - new DisruptorMessageQueue(1024, getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig), - "BLOCKING_WAIT", numProducers, new Runnable() { - @Override - public void run() { - // do nothing. - } - }); List pRecs = dataGen.generateInserts(instantTime, numRecords); @@ -307,8 +297,7 @@ public void run() { })); } } - - + HoodieConsumer, Integer> consumer = new HoodieConsumer, Integer>() { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BaseHoodieQueueBasedExecutor.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BaseHoodieQueueBasedExecutor.java index f35d7977490f6..86011e865dc04 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BaseHoodieQueueBasedExecutor.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BaseHoodieQueueBasedExecutor.java @@ -215,6 +215,11 @@ public E execute() { // to be interrupted as well Thread.currentThread().interrupt(); } + // throw if we have any other exception seen already. There is a chance that cancellation/closing of producers with CompeletableFuture wins before the actual exception + // is thrown. + if (this.queue.getThrowable() != null) { + throw new HoodieException(queue.getThrowable()); + } throw new HoodieException(e); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java index fa22efec24107..e9d13b10dca25 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java @@ -270,6 +270,11 @@ public void markAsFailed(Throwable e) { this.rateLimiter.release(RECORD_CACHING_LIMIT + 1); } + @Override + public Throwable getThrowable() { + return this.hasFailed.get(); + } + @Override public boolean isEmpty() { return this.queue.size() == 0; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java index 1c91b81239948..ea0efab5386cb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java @@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; /** @@ -47,6 +48,7 @@ public class DisruptorMessageQueue implements HoodieMessageQueue { private final Disruptor queue; private final Function transformFunction; private final RingBuffer ringBuffer; + private AtomicReference throwable = new AtomicReference<>(null); private boolean isShutdown = false; private boolean isStarted = false; @@ -89,9 +91,15 @@ public Option readNextRecord() { @Override public void markAsFailed(Throwable e) { + this.throwable.compareAndSet(null, e); // no-op } + @Override + public Throwable getThrowable() { + return this.throwable.get(); + } + @Override public boolean isEmpty() { return ringBuffer.getBufferSize() == ringBuffer.remainingCapacity(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieMessageQueue.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieMessageQueue.java index 185cdea022e79..79baf23e97a56 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieMessageQueue.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieMessageQueue.java @@ -50,6 +50,8 @@ public interface HoodieMessageQueue extends Closeable { */ void markAsFailed(Throwable e); + Throwable getThrowable(); + boolean isEmpty(); /** From fed21ba52903ea9709fe3b69a5602a2c62f3f648 Mon Sep 17 00:00:00 2001 From: Raymond Xu <2701446+xushiyan@users.noreply.github.com> Date: Fri, 19 May 2023 19:11:51 +0800 Subject: [PATCH 6/7] revert api renaming --- .../client/transaction/lock/metrics/HoodieLockMetrics.java | 4 ++-- .../hudi/common/table/log/HoodieMergedLogRecordScanner.java | 2 +- .../main/java/org/apache/hudi/common/util/HoodieTimer.java | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java index b3780bedef22a..0fb5fd1caa3df 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java @@ -39,8 +39,8 @@ public class HoodieLockMetrics { private final HoodieWriteConfig writeConfig; private final boolean isMetricsEnabled; private final int keepLastNtimes = 100; - private final transient HoodieTimer lockDurationTimer = HoodieTimer.delayed(); - private final transient HoodieTimer lockApiRequestDurationTimer = HoodieTimer.delayed(); + private final transient HoodieTimer lockDurationTimer = HoodieTimer.create(); + private final transient HoodieTimer lockApiRequestDurationTimer = HoodieTimer.create(); private transient Counter lockAttempts; private transient Counter successfulLockAttempts; private transient Counter failedLockAttempts; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java index dfc58238da406..bf220ca784719 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java @@ -76,7 +76,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader private static final Logger LOG = LoggerFactory.getLogger(HoodieMergedLogRecordScanner.class); // A timer for calculating elapsed time in millis - public final HoodieTimer timer = HoodieTimer.delayed(); + public final HoodieTimer timer = HoodieTimer.create(); // Map of compacted/merged records private final ExternalSpillableMap records; // Set of already scanned prefixes allowing us to avoid scanning same prefixes again diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTimer.java b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTimer.java index aa49b8f8e48e0..a35cd60933f8a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTimer.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTimer.java @@ -33,7 +33,7 @@ public class HoodieTimer { private final Deque timeInfoDeque = new ArrayDeque<>(1); /** - * @deprecated please use either {@link HoodieTimer#start} or {@link HoodieTimer#delayed} APIs + * @deprecated please use either {@link HoodieTimer#start} or {@link HoodieTimer#create} APIs */ @Deprecated public HoodieTimer() { @@ -93,7 +93,7 @@ public static HoodieTimer start() { /** * Creates an instance of {@link HoodieTimer} that is NOT started */ - public static HoodieTimer delayed() { + public static HoodieTimer create() { return new HoodieTimer(false); } } From 10b74fc1a59fff6213287ccca82a84f7d69d1834 Mon Sep 17 00:00:00 2001 From: Raymond Xu <2701446+xushiyan@users.noreply.github.com> Date: Fri, 19 May 2023 19:12:09 +0800 Subject: [PATCH 7/7] fix UT --- .../apache/hudi/table/action/commit/HoodieMergeHelper.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java index 6161b5bf7c2de..893ee3fc03219 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java @@ -123,12 +123,11 @@ public void runMerge(HoodieTable table, HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(bootstrapFileConfig, bootstrapFilePath), mergeHandle.getPartitionFields(), mergeHandle.getPartitionValues()); + recordSchema = mergeHandle.getWriterSchemaWithMetaFields(); recordIterator = new ClosableMergingIterator<>( baseFileRecordIterator, - (ClosableIterator) bootstrapFileReader.getRecordIterator(), - (left, right) -> - left.joinWith(right, mergeHandle.getWriterSchemaWithMetaFields())); - recordSchema = mergeHandle.getWriterSchemaWithMetaFields(); + (ClosableIterator) bootstrapFileReader.getRecordIterator(recordSchema), + (left, right) -> left.joinWith(right, recordSchema)); } else { recordIterator = baseFileRecordIterator; recordSchema = isPureProjection ? writerSchema : readerSchema;