Skip to content

Commit 6b73c81

Browse files
[HUDI-5209] Fixing QueueBasedExecutor in Spark bundles (missing Disruptor as dep) (#7188)
1 parent 57961c0 commit 6b73c81

5 files changed

Lines changed: 25 additions & 18 deletions

File tree

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@
9393
import java.util.stream.Collectors;
9494

9595
import static org.apache.hudi.common.util.queue.ExecutorType.BOUNDED_IN_MEMORY;
96+
import static org.apache.hudi.common.util.queue.ExecutorType.DISRUPTOR;
9697
import static org.apache.hudi.config.HoodieCleanConfig.CLEANER_POLICY;
9798

9899
/**
@@ -138,6 +139,7 @@ public class HoodieWriteConfig extends HoodieConfig {
138139
public static final ConfigProperty<String> EXECUTOR_TYPE = ConfigProperty
139140
.key("hoodie.write.executor.type")
140141
.defaultValue(BOUNDED_IN_MEMORY.name())
142+
.withValidValues(BOUNDED_IN_MEMORY.name(), DISRUPTOR.name())
141143
.withDocumentation("Set executor which orchestrates concurrent producers and consumers communicating through a message queue."
142144
+ "default value is BOUNDED_IN_MEMORY which use a bounded in-memory queue using LinkedBlockingQueue."
143145
+ "Also users could use DISRUPTOR, which use disruptor as a lock free message queue "
@@ -1000,7 +1002,7 @@ public String getKeyGeneratorClass() {
10001002
}
10011003

10021004
public ExecutorType getExecutorType() {
1003-
return ExecutorType.valueOf(getString(EXECUTOR_TYPE).toUpperCase(Locale.ROOT));
1005+
return ExecutorType.valueOf(getStringOrDefault(EXECUTOR_TYPE).toUpperCase(Locale.ROOT));
10041006
}
10051007

10061008
public boolean isCDCEnabled() {

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,14 @@
1818

1919
package org.apache.hudi.table.action.commit;
2020

21+
import org.apache.avro.Schema;
2122
import org.apache.avro.SchemaCompatibility;
23+
import org.apache.avro.generic.GenericDatumReader;
24+
import org.apache.avro.generic.GenericDatumWriter;
25+
import org.apache.avro.generic.GenericRecord;
26+
import org.apache.avro.io.BinaryDecoder;
27+
import org.apache.avro.io.BinaryEncoder;
28+
import org.apache.hadoop.conf.Configuration;
2229
import org.apache.hudi.avro.HoodieAvroUtils;
2330
import org.apache.hudi.client.WriteStatus;
2431
import org.apache.hudi.common.data.HoodieData;
@@ -27,33 +34,26 @@
2734
import org.apache.hudi.common.model.HoodieKey;
2835
import org.apache.hudi.common.model.HoodieRecord;
2936
import org.apache.hudi.common.model.HoodieRecordPayload;
30-
import org.apache.hudi.common.util.Option;
3137
import org.apache.hudi.common.util.InternalSchemaCache;
32-
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
38+
import org.apache.hudi.common.util.Option;
39+
import org.apache.hudi.common.util.queue.HoodieExecutor;
3340
import org.apache.hudi.exception.HoodieException;
3441
import org.apache.hudi.internal.schema.InternalSchema;
3542
import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
3643
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
3744
import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils;
38-
import org.apache.hudi.internal.schema.utils.SerDeHelper;
3945
import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
46+
import org.apache.hudi.internal.schema.utils.SerDeHelper;
4047
import org.apache.hudi.io.HoodieMergeHandle;
4148
import org.apache.hudi.io.storage.HoodieFileReader;
4249
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
4350
import org.apache.hudi.table.HoodieTable;
44-
45-
import org.apache.avro.Schema;
46-
import org.apache.avro.generic.GenericDatumReader;
47-
import org.apache.avro.generic.GenericDatumWriter;
48-
import org.apache.avro.generic.GenericRecord;
49-
import org.apache.avro.io.BinaryDecoder;
50-
import org.apache.avro.io.BinaryEncoder;
51-
import org.apache.hadoop.conf.Configuration;
51+
import org.apache.hudi.util.QueueBasedExecutorFactory;
5252

5353
import java.io.IOException;
54+
import java.util.HashMap;
5455
import java.util.Iterator;
5556
import java.util.List;
56-
import java.util.HashMap;
5757
import java.util.Map;
5858
import java.util.stream.Collectors;
5959

@@ -91,7 +91,7 @@ public void runMerge(HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<Hood
9191
readSchema = mergeHandle.getWriterSchemaWithMetaFields();
9292
}
9393

94-
BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
94+
HoodieExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
9595
HoodieFileReader<GenericRecord> reader = HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath());
9696

9797
Option<InternalSchema> querySchemaOpt = SerDeHelper.fromJson(table.getConfig().getInternalSchema());
@@ -137,13 +137,14 @@ public void runMerge(HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<Hood
137137

138138
ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
139139
ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>();
140-
wrapper = new BoundedInMemoryExecutor(table.getConfig().getWriteBufferLimitBytes(), readerIterator,
141-
new UpdateHandler(mergeHandle), record -> {
140+
141+
wrapper = QueueBasedExecutorFactory.create(table.getConfig(), readerIterator, new UpdateHandler(mergeHandle), record -> {
142142
if (!externalSchemaTransformation) {
143143
return record;
144144
}
145-
return transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache, decoderCache, (GenericRecord) record);
145+
return transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache, decoderCache, record);
146146
}, table.getPreExecuteRunnable());
147+
147148
wrapper.execute();
148149
} catch (Exception e) {
149150
throw new HoodieException(e);

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/QueueBasedExecutorFactory.java renamed to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/QueueBasedExecutorFactory.java

File renamed without changes.

packaging/hudi-spark-bundle/pom.xml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,15 @@
9191
<include>org.jetbrains.kotlin:*</include>
9292
<include>org.rocksdb:rocksdbjni</include>
9393
<include>org.antlr:stringtemplate</include>
94-
<include>org.apache.parquet:parquet-avro</include>
9594

95+
<include>com.lmax:disruptor</include>
9696
<include>com.github.davidmoten:guava-mini</include>
9797
<include>com.github.davidmoten:hilbert-curve</include>
9898
<include>com.github.ben-manes.caffeine:caffeine</include>
99+
<include>org.apache.parquet:parquet-avro</include>
99100
<include>com.twitter:bijection-avro_${scala.binary.version}</include>
100101
<include>com.twitter:bijection-core_${scala.binary.version}</include>
102+
101103
<include>io.dropwizard.metrics:metrics-core</include>
102104
<include>io.dropwizard.metrics:metrics-graphite</include>
103105
<include>io.dropwizard.metrics:metrics-jmx</include>
@@ -127,6 +129,7 @@
127129
<include>org.apache.hbase.thirdparty:hbase-shaded-netty</include>
128130
<include>org.apache.hbase.thirdparty:hbase-shaded-protobuf</include>
129131
<include>org.apache.htrace:htrace-core4</include>
132+
130133
<include>org.apache.curator:curator-framework</include>
131134
<include>org.apache.curator:curator-client</include>
132135
<include>org.apache.curator:curator-recipes</include>

packaging/hudi-utilities-bundle/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@
116116
<include>org.antlr:stringtemplate</include>
117117
<include>org.apache.parquet:parquet-avro</include>
118118

119+
<include>com.lmax:disruptor</include>
119120
<include>com.github.davidmoten:guava-mini</include>
120121
<include>com.github.davidmoten:hilbert-curve</include>
121122
<include>com.github.ben-manes.caffeine:caffeine</include>

0 commit comments

Comments
 (0)