Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -321,6 +324,141 @@ public void pendingJobCancel() {
}
}

@SneakyThrows
@Test
public void pendingQueueRescheduleAllowsLaterJobRunMultiNode() {
HazelcastInstanceImpl masterNode = null;
HazelcastInstanceImpl workerNode1 = null;
HazelcastInstanceImpl workerNode2 = null;
String testClusterName = "Test_pendingQueueRescheduleAllowsLaterJobRun";
SeaTunnelClient seaTunnelClient = null;

SeaTunnelConfig masterConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
EngineConfig masterEngineConfig = masterConfig.getEngineConfig();
masterEngineConfig.setScheduleStrategy(ScheduleStrategy.WAIT_RESCHEDULE);
masterEngineConfig.getSlotServiceConfig().setDynamicSlot(false);
masterEngineConfig.getSlotServiceConfig().setSlotNum(1);
masterConfig
.getHazelcastConfig()
.setClusterName(ContentFormatUtilTest.getClusterName(testClusterName));

SeaTunnelConfig workerConfig1 = ConfigProvider.locateAndGetSeaTunnelConfig();
EngineConfig workerEngineConfig1 = workerConfig1.getEngineConfig();
workerEngineConfig1.setScheduleStrategy(ScheduleStrategy.WAIT_RESCHEDULE);
workerEngineConfig1.getSlotServiceConfig().setDynamicSlot(false);
workerEngineConfig1.getSlotServiceConfig().setSlotNum(1);
workerConfig1
.getHazelcastConfig()
.setClusterName(ContentFormatUtilTest.getClusterName(testClusterName));

SeaTunnelConfig workerConfig2 = ConfigProvider.locateAndGetSeaTunnelConfig();
EngineConfig workerEngineConfig2 = workerConfig2.getEngineConfig();
workerEngineConfig2.setScheduleStrategy(ScheduleStrategy.WAIT_RESCHEDULE);
workerEngineConfig2.getSlotServiceConfig().setDynamicSlot(false);
workerEngineConfig2.getSlotServiceConfig().setSlotNum(1);
workerConfig2
.getHazelcastConfig()
.setClusterName(ContentFormatUtilTest.getClusterName(testClusterName));

Common.setDeployMode(DeployMode.CLIENT);
String pendingJobFile = createTempJobConfig(3);
String smallJobFile1 = createTempJobConfig(1);
String smallJobFile2 = createTempJobConfig(1);
JobConfig pendingJobConfig = new JobConfig();
pendingJobConfig.setName("Test_pendingQueueRescheduleAllowsLaterJobRun_pending");
JobConfig smallJobConfig1 = new JobConfig();
smallJobConfig1.setName("Test_pendingQueueRescheduleAllowsLaterJobRun_small1");
JobConfig smallJobConfig2 = new JobConfig();
smallJobConfig2.setName("Test_pendingQueueRescheduleAllowsLaterJobRun_small2");
try {
masterNode = SeaTunnelServerStarter.createMasterHazelcastInstance(masterConfig);

HazelcastInstanceImpl finalMasterNode = masterNode;
Awaitility.await()
.atMost(10000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
1, finalMasterNode.getCluster().getMembers().size()));

workerNode1 = SeaTunnelServerStarter.createWorkerHazelcastInstance(workerConfig1);
workerNode2 = SeaTunnelServerStarter.createWorkerHazelcastInstance(workerConfig2);
HazelcastInstanceImpl finalWorkerNode = workerNode1;
Awaitility.await()
.atMost(10000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
3, finalWorkerNode.getCluster().getMembers().size()));

seaTunnelClient = createSeaTunnelClient(testClusterName);
ClientJobProxy pendingJobProxy =
seaTunnelClient
.createExecutionContext(pendingJobFile, pendingJobConfig, masterConfig)
.execute();
ClientJobProxy smallJobProxy1 =
seaTunnelClient
.createExecutionContext(smallJobFile1, smallJobConfig1, masterConfig)
.execute();
Awaitility.await()
.atMost(10000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.PENDING, pendingJobProxy.getJobStatus()));

Awaitility.await()
.atMost(90000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.FINISHED, smallJobProxy1.getJobStatus()));

Awaitility.await()
.atMost(10000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.PENDING, pendingJobProxy.getJobStatus()));

ClientJobProxy smallJobProxy2 =
seaTunnelClient
.createExecutionContext(smallJobFile2, smallJobConfig2, masterConfig)
.execute();

Awaitility.await()
.atMost(90000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.FINISHED, smallJobProxy2.getJobStatus()));

seaTunnelClient.getJobClient().cancelJob(pendingJobProxy.getJobId());
Awaitility.await()
.atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.CANCELED, pendingJobProxy.getJobStatus()));

} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
} finally {
if (seaTunnelClient != null) {
seaTunnelClient.close();
}
if (workerNode1 != null) {
workerNode1.shutdown();
}
if (workerNode2 != null) {
workerNode2.shutdown();
}
if (masterNode != null) {
masterNode.shutdown();
}
}
}

@Test
public void testStartMasterNodeWithTcpIp() {
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
Expand Down Expand Up @@ -501,4 +639,41 @@ private SeaTunnelClient createSeaTunnelClient(String clusterName) {
clientConfig.setClusterName(ContentFormatUtilTest.getClusterName(clusterName));
return new SeaTunnelClient(clientConfig);
}

private String createTempJobConfig(int parallelism) throws Exception {
String content =
"env {\n"
+ " parallelism = "
+ parallelism
+ "\n"
+ " job.mode = \"BATCH\"\n"
+ "}\n"
+ "\n"
+ "source {\n"
+ " FakeSource {\n"
+ " plugin_output = \"fake\"\n"
+ " parallelism = "
+ parallelism
+ "\n"
+ " schema = {\n"
+ " fields {\n"
+ " name = \"string\"\n"
+ " age = \"int\"\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ "}\n"
+ "\n"
+ "transform {\n"
+ "}\n"
+ "\n"
+ "sink {\n"
+ " console {\n"
+ " plugin_input=\"fake\"\n"
+ " }\n"
+ "}\n";
Path tempFile = Files.createTempFile("seatunnel_pending_job_", ".conf");
Files.write(tempFile, content.getBytes(StandardCharsets.UTF_8));
return tempFile.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,16 @@
import org.apache.seatunnel.engine.common.config.server.SlotServiceConfig;
import org.apache.seatunnel.engine.common.config.server.TelemetryConfig;
import org.apache.seatunnel.engine.common.config.server.ThreadShareMode;
import org.apache.seatunnel.engine.common.config.server.scheduler.ScheduleStrategyConfig;
import org.apache.seatunnel.engine.common.config.server.scheduler.WaitConfig;
import org.apache.seatunnel.engine.common.config.server.scheduler.WaitRescheduleConfig;
import org.apache.seatunnel.engine.common.config.server.scheduler.WindowScanAgingPriorityConfig;
import org.apache.seatunnel.engine.common.runtime.ExecutionMode;

import lombok.Data;

import java.util.Collections;
import java.util.EnumMap;
import java.util.Map;

import static com.hazelcast.internal.util.Preconditions.checkBackupCount;
Expand Down Expand Up @@ -96,9 +101,40 @@ public class EngineConfig {
private ScheduleStrategy scheduleStrategy =
ServerConfigOptions.MasterServerConfigOptions.JOB_SCHEDULE_STRATEGY.defaultValue();

private Map<ScheduleStrategy, ScheduleStrategyConfig> scheduleStrategyConfigs =
defaultScheduleStrategyConfigs();

private HttpConfig httpConfig =
ServerConfigOptions.MasterServerConfigOptions.HTTP.defaultValue();

private static Map<ScheduleStrategy, ScheduleStrategyConfig> defaultScheduleStrategyConfigs() {
Map<ScheduleStrategy, ScheduleStrategyConfig> configs =
new EnumMap<>(ScheduleStrategy.class);
configs.put(ScheduleStrategy.WAIT, new WaitConfig());
configs.put(ScheduleStrategy.WAIT_RESCHEDULE, new WaitRescheduleConfig());
configs.put(
ScheduleStrategy.WINDOW_SCAN_AGING_PRIORITY, new WindowScanAgingPriorityConfig());
return configs;
}

public void putScheduleStrategyConfig(
ScheduleStrategy scheduleStrategy, ScheduleStrategyConfig scheduleStrategyConfig) {
scheduleStrategyConfigs.put(scheduleStrategy, scheduleStrategyConfig);
}

public <T extends ScheduleStrategyConfig> T getScheduleStrategyConfig(
ScheduleStrategy scheduleStrategy, Class<T> configType) {
ScheduleStrategyConfig config = scheduleStrategyConfigs.get(scheduleStrategy);
if (config == null) {
return null;
}
if (!configType.isInstance(config)) {
throw new IllegalStateException(
"Config type mismatch for schedule strategy " + scheduleStrategy);
}
return configType.cast(config);
}

public void setBackupCount(int newBackupCount) {
checkBackupCount(newBackupCount, 0);
this.backupCount = newBackupCount;
Expand Down
Loading