Skip to content

Commit 3c510f1

Browse files
committed
[HUDI-4258] Fix when HoodieTable removes data file before the end of Flink job
1 parent fec49dc commit 3c510f1

File tree

4 files changed

+102
-19
lines changed

4 files changed

+102
-19
lines changed

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,8 @@ public void handleEventFromOperator(int i, OperatorEvent operatorEvent) {
285285

286286
if (event.isEndInput()) {
287287
// handle end input event synchronously
288-
handleEndInputEvent(event);
288+
// wrap handleEndInputEvent in executeSync to preserve the order of events
289+
executor.executeSync(() -> handleEndInputEvent(event), "handle end input event for instant %s", this.instant);
289290
} else {
290291
executor.execute(
291292
() -> {

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,18 @@ public boolean isReady(String currentInstant) {
163163
return lastBatch && this.instantTime.equals(currentInstant);
164164
}
165165

166+
@Override
167+
public String toString() {
168+
return "WriteMetadataEvent{"
169+
+ "writeStatusesSize=" + writeStatuses.size()
170+
+ ", taskID=" + taskID
171+
+ ", instantTime='" + instantTime + '\''
172+
+ ", lastBatch=" + lastBatch
173+
+ ", endInput=" + endInput
174+
+ ", bootstrap=" + bootstrap
175+
+ '}';
176+
}
177+
166178
// -------------------------------------------------------------------------
167179
// Utilities
168180
// -------------------------------------------------------------------------

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java

Lines changed: 50 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,11 @@
2626
import javax.annotation.Nullable;
2727

2828
import java.util.Objects;
29+
import java.util.concurrent.ExecutionException;
2930
import java.util.concurrent.ExecutorService;
3031
import java.util.concurrent.Executors;
3132
import java.util.concurrent.TimeUnit;
33+
import java.util.function.Supplier;
3234

3335
/**
3436
* An executor service that catches all the throwable with logging.
@@ -85,25 +87,21 @@ public void execute(
8587
final ExceptionHook hook,
8688
final String actionName,
8789
final Object... actionParams) {
90+
executor.execute(wrapAction(action, hook, actionName, actionParams));
91+
}
8892

89-
executor.execute(
90-
() -> {
91-
final String actionString = String.format(actionName, actionParams);
92-
try {
93-
action.run();
94-
logger.info("Executor executes action [{}] success!", actionString);
95-
} catch (Throwable t) {
96-
// if we have a JVM critical error, promote it immediately, there is a good
97-
// chance the
98-
// logging or job failing will not succeed any more
99-
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
100-
final String errMsg = String.format("Executor executes action [%s] error", actionString);
101-
logger.error(errMsg, t);
102-
if (hook != null) {
103-
hook.apply(errMsg, t);
104-
}
105-
}
106-
});
93+
/**
94+
* Run the action in a loop and wait for completion.
95+
*/
96+
public void executeSync(ThrowingRunnable<Throwable> action, String actionName, Object... actionParams) {
97+
try {
98+
executor.submit(wrapAction(action, this.exceptionHook, actionName, actionParams)).get();
99+
} catch (InterruptedException e) {
100+
handleException(e, this.exceptionHook, getActionString(actionName, actionParams));
101+
} catch (ExecutionException e) {
102+
// nonfatal exceptions are handled by wrapAction
103+
ExceptionUtils.rethrowIfFatalErrorOrOOM(e.getCause());
104+
}
107105
}
108106

109107
@Override
@@ -120,6 +118,40 @@ public void close() throws Exception {
120118
}
121119
}
122120

121+
private <E extends Throwable> Runnable wrapAction(
122+
final ThrowingRunnable<E> action,
123+
final ExceptionHook hook,
124+
final String actionName,
125+
final Object... actionParams) {
126+
127+
return () -> {
128+
final Supplier<String> actionString = getActionString(actionName, actionParams);
129+
try {
130+
action.run();
131+
logger.info("Executor executes action [{}] success!", actionString.get());
132+
} catch (Throwable t) {
133+
handleException(t, hook, actionString);
134+
}
135+
};
136+
}
137+
138+
private void handleException(Throwable t, ExceptionHook hook, Supplier<String> actionString) {
139+
// if we have a JVM critical error, promote it immediately, there is a good
140+
// chance the
141+
// logging or job failing will not succeed any more
142+
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
143+
final String errMsg = String.format("Executor executes action [%s] error", actionString.get());
144+
logger.error(errMsg, t);
145+
if (hook != null) {
146+
hook.apply(errMsg, t);
147+
}
148+
}
149+
150+
private Supplier<String> getActionString(String actionName, Object... actionParams) {
151+
// avoid String.format before OOM rethrown
152+
return () -> String.format(actionName, actionParams);
153+
}
154+
123155
// -------------------------------------------------------------------------
124156
// Inner Class
125157
// -------------------------------------------------------------------------

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.hudi.metadata.HoodieTableMetadata;
3131
import org.apache.hudi.sink.event.WriteMetadataEvent;
3232
import org.apache.hudi.sink.utils.MockCoordinatorExecutor;
33+
import org.apache.hudi.sink.utils.NonThrownExecutor;
3334
import org.apache.hudi.util.StreamerUtil;
3435
import org.apache.hudi.utils.TestConfigurations;
3536
import org.apache.hudi.utils.TestUtils;
@@ -46,11 +47,14 @@
4647
import org.junit.jupiter.api.BeforeEach;
4748
import org.junit.jupiter.api.Test;
4849
import org.junit.jupiter.api.io.TempDir;
50+
import org.mockito.Mockito;
51+
import org.slf4j.Logger;
4952

5053
import java.io.File;
5154
import java.io.IOException;
5255
import java.util.Collections;
5356
import java.util.concurrent.CompletableFuture;
57+
import java.util.concurrent.TimeUnit;
5458

5559
import static org.hamcrest.CoreMatchers.instanceOf;
5660
import static org.hamcrest.CoreMatchers.is;
@@ -298,6 +302,40 @@ void testSyncMetadataTableWithReusedInstant() throws Exception {
298302
assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant));
299303
}
300304

305+
@Test
306+
public void testEndInputIsTheLastEvent() throws Exception {
307+
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
308+
MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(new OperatorID(), 1);
309+
Logger logger = Mockito.mock(Logger.class); // avoid too many logs by executor
310+
NonThrownExecutor executor = NonThrownExecutor.builder(logger).waitForTasksFinish(true).build();
311+
312+
try (StreamWriteOperatorCoordinator coordinator = new StreamWriteOperatorCoordinator(conf, context)) {
313+
coordinator.start();
314+
coordinator.setExecutor(executor);
315+
coordinator.handleEventFromOperator(0, WriteMetadataEvent.emptyBootstrap(0));
316+
TimeUnit.SECONDS.sleep(5); // wait for handled bootstrap event
317+
318+
int eventCount = 20_000; // big enough to fill executor's queue
319+
for (int i = 0; i < eventCount; i++) {
320+
coordinator.handleEventFromOperator(0, createOperatorEvent(0, coordinator.getInstant(), "par1", true, 0.1));
321+
}
322+
323+
WriteMetadataEvent endInput = WriteMetadataEvent.builder()
324+
.taskID(0)
325+
.instantTime(coordinator.getInstant())
326+
.writeStatus(Collections.emptyList())
327+
.endInput(true)
328+
.build();
329+
coordinator.handleEventFromOperator(0, endInput);
330+
331+
// wait for submitted events completed
332+
executor.close();
333+
334+
// there should be no events after endInput
335+
assertNull(coordinator.getEventBuffer()[0]);
336+
}
337+
}
338+
301339
// -------------------------------------------------------------------------
302340
// Utilities
303341
// -------------------------------------------------------------------------

0 commit comments

Comments
 (0)