Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public static Object[][] getData() {
{ FunctionRuntimeType.THREAD }
};
}

@DataProvider(name = "FunctionRuntimes")
public static Object[][] functionRuntimes() {
return new Object[][] {
Expand All @@ -100,17 +100,30 @@ protected PulsarFunctionsTestBase(FunctionRuntimeType functionRuntimeType) {
this.functionRuntimeType = functionRuntimeType;
}

@BeforeClass(alwaysRun = true)
public void setupFunctionWorkers() {
@Override
public void setupCluster() throws Exception {
super.setupCluster();
setupFunctionWorkers();
}

@Override
public void tearDownCluster() throws Exception {
try {
teardownFunctionWorkers();
} finally {
super.tearDownCluster();
}
}

protected void setupFunctionWorkers() {
final int numFunctionWorkers = 2;
log.info("Setting up {} function workers : function runtime type = {}",
numFunctionWorkers, functionRuntimeType);
pulsarCluster.setupFunctionWorkers(randomName(5), functionRuntimeType, numFunctionWorkers);
log.info("{} function workers has started", numFunctionWorkers);
}

@AfterClass(alwaysRun = true)
public void teardownFunctionWorkers() {
protected void teardownFunctionWorkers() {
log.info("Tearing down function workers ...");
pulsarCluster.stopWorkers();
log.info("All functions workers are stopped.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,6 @@ private void testCustomSerdeFunction() throws Exception {
return;
}

if (pulsarCluster == null) {
super.setupCluster();
super.setupFunctionWorkers();
}

String inputTopicName = "persistent://public/default/test-serde-java-input-" + randomName(8);
String outputTopicName = "test-publish-serde-output-" + randomName(8);
try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build()) {
Expand Down Expand Up @@ -98,7 +93,7 @@ private void testCustomSerdeFunction() throws Exception {
assertEquals(functionStatus.getNumInstances(), 1);
assertEquals(functionStatus.getInstances().get(0).getStatus().isRunning(), true);
}


@Test(groups = {"java_function", "function"})
public void testJavaExclamationFunction() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ public abstract class PulsarIOTestBase extends PulsarFunctionsTestBase {

@SuppressWarnings({ "unchecked", "rawtypes" })
protected void testSink(SinkTester tester, boolean builtin) throws Exception {
if (pulsarCluster == null) {
super.setupCluster();
super.setupFunctionWorkers();
}
tester.startServiceContainer(pulsarCluster);
try {
PulsarIOSinkRunner runner = new PulsarIOSinkRunner(pulsarCluster, functionRuntimeType.toString());
Expand All @@ -41,17 +37,12 @@ protected void testSink(SinkTester tester, boolean builtin) throws Exception {
tester.stopServiceContainer(pulsarCluster);
}
}

@SuppressWarnings("rawtypes")
protected <ServiceContainerT extends GenericContainer> void testSink(SinkTester<ServiceContainerT> sinkTester,
boolean builtinSink,
SourceTester<ServiceContainerT> sourceTester) throws Exception {

if (pulsarCluster == null) {
super.setupCluster();
super.setupFunctionWorkers();
}

ServiceContainerT serviceContainer = sinkTester.startServiceContainer(pulsarCluster);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
public class PulsarDebeziumSourcesTest extends PulsarIOTestBase {

protected final AtomicInteger testId = new AtomicInteger(0);

@Test(groups = "source")
public void testDebeziumMySqlSourceJson() throws Exception {
testDebeziumMySqlConnect("org.apache.kafka.connect.json.JsonConverter", true);
Expand Down Expand Up @@ -76,11 +76,6 @@ private void testDebeziumMySqlConnect(String converterClassName, boolean jsonWit
// This is the binlog count that contained in mysql container.
final int numMessages = 47;

if (pulsarCluster == null) {
super.setupCluster();
super.setupFunctionWorkers();
}

@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
Expand Down Expand Up @@ -108,10 +103,10 @@ private void testDebeziumMySqlConnect(String converterClassName, boolean jsonWit
DebeziumMySQLContainer mySQLContainer = new DebeziumMySQLContainer(pulsarCluster.getClusterName());
sourceTester.setServiceContainer(mySQLContainer);

PulsarIODebeziumSourceRunner runner = new PulsarIODebeziumSourceRunner(pulsarCluster, functionRuntimeType.toString(),
converterClassName, tenant, namespace, sourceName, outputTopicName, numMessages, jsonWithEnvelope,
PulsarIODebeziumSourceRunner runner = new PulsarIODebeziumSourceRunner(pulsarCluster, functionRuntimeType.toString(),
converterClassName, tenant, namespace, sourceName, outputTopicName, numMessages, jsonWithEnvelope,
consumeTopicName, client);

runner.testSource(sourceTester);
}

Expand All @@ -127,11 +122,6 @@ private void testDebeziumPostgreSqlConnect(String converterClassName, boolean js
// This is the binlog count that contained in postgresql container.
final int numMessages = 26;

if (pulsarCluster == null) {
super.setupCluster();
super.setupFunctionWorkers();
}

@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
Expand All @@ -152,10 +142,10 @@ private void testDebeziumPostgreSqlConnect(String converterClassName, boolean js
DebeziumPostgreSqlContainer postgreSqlContainer = new DebeziumPostgreSqlContainer(pulsarCluster.getClusterName());
sourceTester.setServiceContainer(postgreSqlContainer);

PulsarIODebeziumSourceRunner runner = new PulsarIODebeziumSourceRunner(pulsarCluster, functionRuntimeType.toString(),
converterClassName, tenant, namespace, sourceName, outputTopicName, numMessages, jsonWithEnvelope,
PulsarIODebeziumSourceRunner runner = new PulsarIODebeziumSourceRunner(pulsarCluster, functionRuntimeType.toString(),
converterClassName, tenant, namespace, sourceName, outputTopicName, numMessages, jsonWithEnvelope,
consumeTopicName, client);

runner.testSource(sourceTester);
}

Expand All @@ -170,12 +160,7 @@ private void testDebeziumMongoDbConnect(String converterClassName, boolean jsonW

// This is the binlog count that contained in mongodb container.
final int numMessages = 17;

if (pulsarCluster == null) {
super.setupCluster();
super.setupFunctionWorkers();
}


@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
Expand All @@ -195,11 +180,11 @@ private void testDebeziumMongoDbConnect(String converterClassName, boolean jsonW
// setup debezium mongodb server
DebeziumMongoDbContainer mongoDbContainer = new DebeziumMongoDbContainer(pulsarCluster.getClusterName());
sourceTester.setServiceContainer(mongoDbContainer);
PulsarIODebeziumSourceRunner runner = new PulsarIODebeziumSourceRunner(pulsarCluster, functionRuntimeType.toString(),
converterClassName, tenant, namespace, sourceName, outputTopicName, numMessages, jsonWithEnvelope,

PulsarIODebeziumSourceRunner runner = new PulsarIODebeziumSourceRunner(pulsarCluster, functionRuntimeType.toString(),
converterClassName, tenant, namespace, sourceName, outputTopicName, numMessages, jsonWithEnvelope,
consumeTopicName, client);

runner.testSource(sourceTester);
}

Expand Down