Skip to content

Commit fe1a539

Browse files
author
Alexey Kudinkin
committed
Revisited HoodieMergeHelper shutdown seq to defer to the Executor to appropriatly clean up the resources
1 parent 2c37321 commit fe1a539

2 files changed

Lines changed: 15 additions & 18 deletions

File tree

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,19 @@
1818

1919
package org.apache.hudi.table.action.commit;
2020

21-
import org.apache.avro.generic.GenericRecord;
22-
import org.apache.hadoop.conf.Configuration;
23-
import org.apache.hadoop.fs.Path;
2421
import org.apache.hudi.avro.HoodieAvroUtils;
25-
import org.apache.hudi.client.utils.MergingIterator;
22+
import org.apache.hudi.client.utils.ClosableMergingIterator;
23+
import org.apache.hudi.common.util.ClosableIterator;
2624
import org.apache.hudi.common.util.queue.HoodieConsumer;
2725
import org.apache.hudi.io.HoodieMergeHandle;
2826
import org.apache.hudi.io.storage.HoodieFileReader;
2927
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
3028
import org.apache.hudi.table.HoodieTable;
3129

30+
import org.apache.avro.generic.GenericRecord;
31+
import org.apache.hadoop.conf.Configuration;
32+
import org.apache.hadoop.fs.Path;
33+
3234
import java.io.IOException;
3335
import java.util.Iterator;
3436

@@ -51,14 +53,14 @@ public abstract class BaseMergeHelper {
5153
* for indexing, writing and other functionality.
5254
*
5355
*/
54-
protected Iterator<GenericRecord> getMergingIterator(HoodieTable<?, ?, ?, ?> table,
55-
HoodieMergeHandle<?, ?, ?, ?> mergeHandle,
56-
Path bootstrapFilePath,
57-
Iterator<GenericRecord> recordIterator) throws IOException {
56+
protected ClosableIterator<GenericRecord> getMergingIterator(HoodieTable<?, ?, ?, ?> table,
57+
HoodieMergeHandle<?, ?, ?, ?> mergeHandle,
58+
Path bootstrapFilePath,
59+
Iterator<GenericRecord> recordIterator) throws IOException {
5860
Configuration bootstrapFileConfig = new Configuration(table.getHadoopConf());
5961
HoodieFileReader<GenericRecord> bootstrapReader =
6062
HoodieFileReaderFactory.getFileReader(bootstrapFileConfig, bootstrapFilePath);
61-
return new MergingIterator<>(recordIterator, bootstrapReader.getRecordIterator(),
63+
return new ClosableMergingIterator<>(recordIterator, bootstrapReader.getRecordIterator(),
6264
(inputRecordPair) -> HoodieAvroUtils.stitchRecords(inputRecordPair.getLeft(), inputRecordPair.getRight(), mergeHandle.getWriterSchemaWithMetaFields()));
6365
}
6466

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import org.apache.hudi.common.util.ClosableIterator;
3232
import org.apache.hudi.common.util.InternalSchemaCache;
3333
import org.apache.hudi.common.util.Option;
34-
import org.apache.hudi.common.util.collection.MappingIterator;
34+
import org.apache.hudi.common.util.collection.ClosableMappingIterator;
3535
import org.apache.hudi.common.util.queue.HoodieExecutor;
3636
import org.apache.hudi.config.HoodieWriteConfig;
3737
import org.apache.hudi.exception.HoodieException;
@@ -48,7 +48,6 @@
4848
import org.apache.hudi.util.QueueBasedExecutorFactory;
4949

5050
import java.io.IOException;
51-
import java.util.Iterator;
5251
import java.util.List;
5352
import java.util.Map;
5453
import java.util.Objects;
@@ -102,7 +101,7 @@ public void runMerge(HoodieTable<?, ?, ?, ?> table,
102101
HoodieExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
103102

104103
try {
105-
Iterator<GenericRecord> recordIterator;
104+
ClosableIterator<GenericRecord> recordIterator;
106105

107106
// In case writer's schema is simply a projection of the reader's one we can read
108107
// the records in the projected schema directly
@@ -112,7 +111,7 @@ public void runMerge(HoodieTable<?, ?, ?, ?> table,
112111
Path bootstrapFilePath = new Path(baseFile.getBootstrapBaseFile().get().getPath());
113112
recordIterator = getMergingIterator(table, mergeHandle, bootstrapFilePath, baseFileRecordIterator);
114113
} else if (schemaEvolutionTransformerOpt.isPresent()) {
115-
recordIterator = new MappingIterator<>(baseFileRecordIterator,
114+
recordIterator = new ClosableMappingIterator<>(baseFileRecordIterator,
116115
schemaEvolutionTransformerOpt.get());
117116
} else {
118117
recordIterator = baseFileRecordIterator;
@@ -130,14 +129,10 @@ public void runMerge(HoodieTable<?, ?, ?, ?> table,
130129
} catch (Exception e) {
131130
throw new HoodieException(e);
132131
} finally {
133-
// HUDI-2875: mergeHandle is not thread safe, we should totally terminate record inputting
134-
// and executor firstly and then close mergeHandle.
135-
reader.close();
136-
if (null != wrapper) {
132+
if (wrapper != null) {
137133
wrapper.shutdownNow();
138134
wrapper.awaitTermination();
139135
}
140-
mergeHandle.close();
141136
}
142137
}
143138

0 commit comments

Comments
 (0)