Skip to content

Commit be53e07

Browse files
author
yuezhang
committed
address comments
1 parent db06b33 commit be53e07

4 files changed

Lines changed: 13 additions & 32 deletions

File tree

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/ExecutorFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig hoodieConfig,
5555
return new DisruptorExecutor<>(hoodieConfig.getDisruptorWriteBufferSize(), inputItr, consumer,
5656
transformFunction, hoodieConfig.getWriteExecutorWaitStrategy(), preExecuteRunnable);
5757
case SIMPLE:
58-
return new SimpleHoodieExecutor<>(inputItr, consumer, transformFunction, preExecuteRunnable);
58+
return new SimpleHoodieExecutor<>(inputItr, consumer, transformFunction);
5959
default:
6060
throw new HoodieException("Unsupported Executor Type " + executorType);
6161
}

hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestSimpleExecutionInSpark.java

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import scala.Tuple2;
4848

4949
import static org.junit.jupiter.api.Assertions.assertEquals;
50-
import static org.junit.jupiter.api.Assertions.assertFalse;
5150
import static org.junit.jupiter.api.Assertions.assertThrows;
5251
import static org.junit.jupiter.api.Assertions.assertTrue;
5352
import static org.mockito.Mockito.mock;
@@ -99,14 +98,11 @@ public Integer finish() {
9998
SimpleHoodieExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> exec = null;
10099

101100
try {
102-
exec = new SimpleHoodieExecutor(hoodieRecords.iterator(), consumer, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), getPreExecuteRunnable());
103-
101+
exec = new SimpleHoodieExecutor(hoodieRecords.iterator(), consumer, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
104102

105103
int result = exec.execute();
106-
// It should buffer and write 100 records
104+
// It should buffer and write 128 records
107105
assertEquals(128, result);
108-
// There should be no remaining records in the buffer
109-
assertFalse(exec.isRunning());
110106

111107
// collect all records and assert that consumed records are identical to produced ones
112108
// assert there's no tampering, and that the ordering is preserved
@@ -175,12 +171,9 @@ public Integer finish() {
175171
SimpleHoodieExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> exec = null;
176172

177173
try {
178-
exec = new SimpleHoodieExecutor(hoodieRecords.iterator(), consumer, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), getPreExecuteRunnable());
174+
exec = new SimpleHoodieExecutor(hoodieRecords.iterator(), consumer, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
179175
int result = exec.execute();
180-
// It should buffer and write 100 records
181176
assertEquals(100, result);
182-
// There should be no remaining records in the buffer
183-
assertFalse(exec.isRunning());
184177

185178
assertEquals(beforeRecord, afterRecord);
186179
assertEquals(beforeIndexedRecord, afterIndexedRecord);
@@ -223,7 +216,7 @@ public Integer finish() {
223216
};
224217

225218
SimpleHoodieExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> exec =
226-
new SimpleHoodieExecutor(iterator, consumer, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), getPreExecuteRunnable());
219+
new SimpleHoodieExecutor(iterator, consumer, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
227220

228221
final Throwable thrown = assertThrows(HoodieException.class, exec::execute,
229222
"exception is expected");

hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleHoodieExecutor.java

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -46,21 +46,17 @@ public class SimpleHoodieExecutor<I, O, E> implements HoodieExecutor<E> {
4646
// records iterator
4747
protected final Iterator<I> it;
4848
private final Function<I, O> transformFunction;
49-
private final Runnable preExecuteRunnable;
50-
private final AtomicBoolean isWriteDone = new AtomicBoolean(false);
51-
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
5249

5350
public SimpleHoodieExecutor(final Iterator<I> inputItr, HoodieConsumer<O, E> consumer,
54-
Function<I, O> transformFunction, Runnable preExecuteRunnable) {
55-
this(inputItr, Option.of(consumer), transformFunction, preExecuteRunnable);
51+
Function<I, O> transformFunction) {
52+
this(inputItr, Option.of(consumer), transformFunction);
5653
}
5754

5855
public SimpleHoodieExecutor(final Iterator<I> inputItr, Option<HoodieConsumer<O, E>> consumer,
59-
Function<I, O> transformFunction, Runnable preExecuteRunnable) {
56+
Function<I, O> transformFunction) {
6057
this.it = inputItr;
6158
this.consumer = consumer;
6259
this.transformFunction = transformFunction;
63-
this.preExecuteRunnable = preExecuteRunnable;
6460
}
6561

6662
/**
@@ -72,12 +68,7 @@ public E execute() {
7268

7369
try {
7470
LOG.info("Starting consumer, consuming records from the records iterator directly");
75-
preExecuteRunnable.run();
7671
while (it.hasNext()) {
77-
if (isShutdown.get()) {
78-
LOG.warn("Call shutdown while getting new entries, stop consuming.");
79-
break;
80-
}
8172
O payload = transformFunction.apply(it.next());
8273
consumer.get().consume(payload);
8374
}
@@ -86,22 +77,17 @@ public E execute() {
8677
} catch (Exception e) {
8778
LOG.error("Error consuming records in SimpleHoodieExecutor", e);
8879
throw new HoodieException(e);
89-
} finally {
90-
isWriteDone.set(true);
9180
}
9281
}
9382

9483
@Override
9584
public void shutdownNow() {
96-
isShutdown.set(true);
85+
// no-op
9786
}
9887

9988
@Override
10089
public boolean awaitTermination() {
101-
return isWriteDone.get();
102-
}
103-
104-
public boolean isRunning() {
105-
return it.hasNext();
90+
// no-op
91+
return true;
10692
}
10793
}

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/BoundInMemoryExecutorBenchmark.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ object BoundInMemoryExecutorBenchmark extends HoodieBenchmarkBase {
6666
}
6767

6868
/**
69+
* This benchmark has been run w/ unconstrained parallelism which is beneficial to Disruptor more than it's for Simple
70+
*
6971
* OpenJDK 64-Bit Server VM 1.8.0_342-b07 on Linux 5.10.62-55.141.amzn2.x86_64
7072
* Intel(R) Xeon(R) Platinum 8259CL CPU @ 2.50GHz
7173
* COW Ingestion: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative

0 commit comments

Comments
 (0)