Skip to content

Commit 5525944

Browse files
committed
[pulsar-io] feat: implement pip-297 for jdbc sinks
1 parent 52a4d5e commit 5525944

2 files changed

Lines changed: 71 additions & 1 deletion

File tree

pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.concurrent.ScheduledExecutorService;
3636
import java.util.concurrent.TimeUnit;
3737
import java.util.concurrent.atomic.AtomicBoolean;
38+
import java.util.concurrent.atomic.AtomicReference;
3839
import java.util.function.Function;
3940
import java.util.stream.Collectors;
4041
import lombok.AllArgsConstructor;
@@ -50,6 +51,11 @@
5051
*/
5152
@Slf4j
5253
public abstract class JdbcAbstractSink<T> implements Sink<T> {
54+
55+
private enum State {
56+
OPEN, FAILED, CLOSED
57+
}
58+
5359
// ----- Runtime fields
5460
protected JdbcSinkConfig jdbcSinkConfig;
5561
@Getter
@@ -73,9 +79,12 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
7379
private AtomicBoolean isFlushing;
7480
private int batchSize;
7581
private ScheduledExecutorService flushExecutor;
82+
private SinkContext sinkContext;
83+
private final AtomicReference<State> state = new AtomicReference<>(State.OPEN);
7684

7785
@Override
7886
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
87+
this.sinkContext = sinkContext;
7988
jdbcSinkConfig = JdbcSinkConfig.load(config, sinkContext);
8089
jdbcSinkConfig.validate();
8190

@@ -148,6 +157,7 @@ private static List<String> getListFromConfig(String jdbcSinkConfig) {
148157

149158
@Override
150159
public void close() throws Exception {
160+
state.set(State.CLOSED);
151161
if (flushExecutor != null) {
152162
int timeoutMs = jdbcSinkConfig.getTimeoutMs() * 2;
153163
flushExecutor.shutdown();
@@ -310,8 +320,9 @@ private void flush() {
310320
connection.rollback();
311321
}
312322
} catch (Exception ex) {
313-
throw new RuntimeException(ex);
323+
log.error("Failed to rollback transaction", ex);
314324
}
325+
fatal(e);
315326
}
316327

317328
isFlushing.set(false);
@@ -385,4 +396,16 @@ private static boolean isBatchItemFailed(int returnCode) {
385396
return true;
386397
}
387398

399+
/**
400+
* Signal a fatal exception to the framework.
401+
* This will cause the function instance to terminate properly.
402+
*
403+
* @param e the fatal exception
404+
*/
405+
private void fatal(Exception e) {
406+
if (sinkContext != null && state.compareAndSet(State.OPEN, State.FAILED)) {
407+
sinkContext.fatal(e);
408+
}
409+
}
410+
388411
}

pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
*/
1919
package org.apache.pulsar.io.jdbc;
2020

21+
import static org.mockito.ArgumentMatchers.any;
2122
import static org.mockito.Mockito.doAnswer;
2223
import static org.mockito.Mockito.mock;
24+
import static org.mockito.Mockito.verify;
2325
import static org.mockito.Mockito.when;
2426
import com.google.common.collect.ImmutableMap;
2527
import com.google.common.collect.Maps;
@@ -56,6 +58,7 @@
5658
import org.apache.pulsar.common.schema.SchemaType;
5759
import org.apache.pulsar.functions.api.Record;
5860
import org.apache.pulsar.functions.source.PulsarRecord;
61+
import org.apache.pulsar.io.core.SinkContext;
5962
import org.awaitility.Awaitility;
6063
import org.testng.Assert;
6164
import org.testng.annotations.AfterMethod;
@@ -860,6 +863,50 @@ public void testNullValueAction(NullValueActionTestConfig config) throws Excepti
860863
}
861864
}
862865

866+
/**
867+
* Test that fatal() is called when an unrecoverable exception occurs during flush.
868+
* This verifies the PIP-297 implementation for proper termination of the sink.
869+
*/
870+
@Test
871+
public void testFatalCalledOnFlushException() throws Exception {
872+
jdbcSink.close();
873+
jdbcSink = null;
874+
875+
String jdbcUrl = sqliteUtils.sqliteUri();
876+
Map<String, Object> conf = Maps.newHashMap();
877+
conf.put("jdbcUrl", jdbcUrl);
878+
conf.put("tableName", "nonexistent_table"); // This will cause an exception on flush
879+
conf.put("key", "field3");
880+
conf.put("nonKey", "field1,field2");
881+
conf.put("batchSize", 1);
882+
883+
SinkContext mockSinkContext = mock(SinkContext.class);
884+
AtomicReference<Throwable> fatalException = new AtomicReference<>();
885+
doAnswer(invocation -> {
886+
fatalException.set(invocation.getArgument(0));
887+
return null;
888+
}).when(mockSinkContext).fatal(any(Throwable.class));
889+
890+
SqliteJdbcAutoSchemaSink sinkWithContext = new SqliteJdbcAutoSchemaSink();
891+
try {
892+
sinkWithContext.open(conf, mockSinkContext);
893+
894+
Foo insertObj = new Foo("f1", "f2", 1);
895+
Map<String, String> props = Maps.newHashMap();
896+
props.put("ACTION", "INSERT");
897+
CompletableFuture<Boolean> future = new CompletableFuture<>();
898+
sinkWithContext.write(createMockFooRecord(insertObj, props, future));
899+
900+
// Wait for the flush to complete and fail
901+
Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
902+
verify(mockSinkContext).fatal(any(Throwable.class));
903+
Assert.assertNotNull(fatalException.get());
904+
});
905+
} finally {
906+
sinkWithContext.close();
907+
}
908+
}
909+
863910
@SuppressWarnings("unchecked")
864911
private Record<GenericObject> createMockFooRecord(Foo record, Map<String, String> actionProperties,
865912
CompletableFuture<Boolean> future) {

0 commit comments

Comments
 (0)