diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/bin/start-seatunnel-flink-13-connector-v2.sh b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/bin/start-seatunnel-flink-13-connector-v2.sh index 4bd9905354e3..70e08d7defd6 100755 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/bin/start-seatunnel-flink-13-connector-v2.sh +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/bin/start-seatunnel-flink-13-connector-v2.sh @@ -50,7 +50,7 @@ if [ ! -f "${APP_DIR}/runtime.tar.gz" ];then existing_dirs=() for dir in "${directories[@]}"; do - if [ -d "$dir" ]; then + if [ -d "${APP_DIR}/${dir}" ]; then existing_dirs+=("$dir") fi done @@ -58,7 +58,7 @@ if [ ! -f "${APP_DIR}/runtime.tar.gz" ];then if [ ${#existing_dirs[@]} -eq 0 ]; then echo "[connectors,lib,plugins] not existed, skip generate runtime.tar.gz" else - tar -zcvf runtime.tar.gz "${existing_dirs[@]}" + tar -zcvf "${APP_DIR}/runtime.tar.gz" -C "${APP_DIR}" "${existing_dirs[@]}" fi fi diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/bin/start-seatunnel-flink-15-connector-v2.sh b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/bin/start-seatunnel-flink-15-connector-v2.sh index 5698a340dab0..8ec36c20b5e9 100755 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/bin/start-seatunnel-flink-15-connector-v2.sh +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/bin/start-seatunnel-flink-15-connector-v2.sh @@ -50,7 +50,7 @@ if [ ! -f "${APP_DIR}/runtime.tar.gz" ];then existing_dirs=() for dir in "${directories[@]}"; do - if [ -d "$dir" ]; then + if [ -d "${APP_DIR}/${dir}" ]; then existing_dirs+=("$dir") fi done @@ -58,7 +58,7 @@ if [ ! -f "${APP_DIR}/runtime.tar.gz" ];then if [ ${#existing_dirs[@]} -eq 0 ]; then echo "[connectors,lib,plugins] not existed, skip generate runtime.tar.gz" else - tar -zcvf runtime.tar.gz "${existing_dirs[@]}" + tar -zcvf "${APP_DIR}/runtime.tar.gz" -C "${APP_DIR}" "${existing_dirs[@]}" fi fi diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-20-starter/src/main/bin/start-seatunnel-flink-20-connector-v2.sh b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-20-starter/src/main/bin/start-seatunnel-flink-20-connector-v2.sh index 40b395bec774..a2ef35850cd7 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-20-starter/src/main/bin/start-seatunnel-flink-20-connector-v2.sh +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-20-starter/src/main/bin/start-seatunnel-flink-20-connector-v2.sh @@ -50,7 +50,7 @@ if [ ! -f "${APP_DIR}/runtime.tar.gz" ];then existing_dirs=() for dir in "${directories[@]}"; do - if [ -d "$dir" ]; then + if [ -d "${APP_DIR}/${dir}" ]; then existing_dirs+=("$dir") fi done @@ -58,7 +58,7 @@ if [ ! -f "${APP_DIR}/runtime.tar.gz" ];then if [ ${#existing_dirs[@]} -eq 0 ]; then echo "[connectors,lib,plugins] not existed, skip generate runtime.tar.gz" else - tar -zcvf runtime.tar.gz "${existing_dirs[@]}" + tar -zcvf "${APP_DIR}/runtime.tar.gz" -C "${APP_DIR}" "${existing_dirs[@]}" fi fi diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/AbstractFlinkStarter.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/AbstractFlinkStarter.java index bd2e29f026dc..803632ebc435 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/AbstractFlinkStarter.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/AbstractFlinkStarter.java @@ -24,6 +24,7 @@ import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs; import org.apache.seatunnel.core.starter.utils.CommandLineUtils; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -65,7 +66,10 @@ public List buildCommands() { if (flinkCommandArgs.getMasterType() == MasterType.YARN_APPLICATION) { command.add( String.format("-Dyarn.ship-files=\"%s\"", flinkCommandArgs.getConfigFile())); - command.add(String.format("-Dyarn.ship-archives=%s", RUNTIME_FILE)); + command.add( + String.format( + "-Dyarn.ship-archives=\"%s\"", + Paths.get(Common.getSeaTunnelHome(), RUNTIME_FILE))); } // set yarn application name if (flinkCommandArgs.getMasterType() == MasterType.YARN_APPLICATION diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/test/java/org/apache/seatunnel/core/starter/flink/TestFlinkParameter.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/test/java/org/apache/seatunnel/core/starter/flink/TestFlinkParameter.java index 6a30b86a8ed4..55b464a0cbb0 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/test/java/org/apache/seatunnel/core/starter/flink/TestFlinkParameter.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/test/java/org/apache/seatunnel/core/starter/flink/TestFlinkParameter.java @@ -18,6 +18,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.common.config.DeployMode; import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs; import org.apache.seatunnel.core.starter.flink.utils.EnvironmentUtil; @@ -31,9 +32,12 @@ import org.junit.jupiter.api.Test; import java.nio.file.Path; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; +import static org.apache.seatunnel.core.starter.flink.AbstractFlinkStarter.RUNTIME_FILE; + public class TestFlinkParameter { @Test @@ -83,4 +87,20 @@ public void testFlinkParameter() throws Exception { ExternalSettingLists.sort(null); Assertions.assertIterableEquals(checkList, ExternalSettingLists); } + + /** Test the relevant configurations associated with the yarn application mode */ + @Test + public void testBuildCommandsWithYarnApplicationMode() { + String[] args = {"--config", "/path/to/config.conf", "--target", "yarn-application"}; + FlinkStarter starter = new FlinkStarter(args); + + List commands = starter.buildCommands(); + + // Assert + Assertions.assertTrue( + commands.contains( + String.format( + "-Dyarn.ship-archives=\"%s\"", + Paths.get(Common.getSeaTunnelHome(), RUNTIME_FILE)))); + } }