[Improve][seatunnel core][seatunnel-flink-start] Improve the packaging of the Flink Yarn application and upload the 'runtime.tar.gz' file#10419
Conversation
...ink-starter/seatunnel-flink-13-starter/src/main/bin/start-seatunnel-flink-13-connector-v2.sh
Outdated
Show resolved
Hide resolved
...ink-starter/seatunnel-flink-15-starter/src/main/bin/start-seatunnel-flink-15-connector-v2.sh
Outdated
Show resolved
Hide resolved
...ink-starter/seatunnel-flink-20-starter/src/main/bin/start-seatunnel-flink-20-connector-v2.sh
Outdated
Show resolved
Hide resolved
...arter-common/src/main/java/org/apache/seatunnel/core/starter/flink/AbstractFlinkStarter.java
Outdated
Show resolved
Hide resolved
5d18e37 to
cd893aa
Compare
Issue 1: Deleting public constants causes compilation failureLocation:
Related context: // FlinkStarter.java (FLINK13/FLINK20) - After modification
public class FlinkStarter extends AbstractFlinkStarter {
// public static final String APP_JAR_NAME = EngineType.FLINK13.getStarterJarName(); // Deleted
FlinkStarter(String[] args) {
super(args, EngineType.FLINK13);
}
}
// FlinkStarter.java (FLINK15) - Unmodified
public class FlinkStarter extends AbstractFlinkStarter {
public static final String APP_JAR_NAME = EngineType.FLINK15.getStarterJarName();
// ...
}
// FlinkExecution.java - Caller
new File(
Common.appStarterDir()
.resolve(FlinkStarter.APP_JAR_NAME) // ← References the deleted constant
.toString())Problem description:
Potential risks:
Impact scope:
Severity: CRITICAL Improvement suggestions: Approach A: Restore the deleted constants (recommended) // FlinkStarter.java (FLINK13/FLINK20) - Restored
public class FlinkStarter extends AbstractFlinkStarter {
public static final String APP_JAR_NAME = EngineType.FLINK13.getStarterJarName();
FlinkStarter(String[] args) {
super(args, EngineType.FLINK13);
}
}Approach B: Refactor FlinkExecution.java dependency injection // FlinkExecution.java - Modify constructor
public FlinkExecution(Config config, EngineType engineType) {
try {
jarPaths = new ArrayList<>(
Collections.singletonList(
new File(
Common.appStarterDir()
.resolve(engineType.getStarterJarName()) // Use engineType parameter
.toString())
.toURI()
.toURL()));
} catch (MalformedURLException e) {
throw new SeaTunnelException("load flink starter error.", e);
}
// ...
}
// Also modify all places that call FlinkExecution, pass in engineType parameterRationale:
Confidence: High - This is a definitive compilation error Issue 2: Incomplete error handling in shell scriptsLocation:
Related context: if [ ! -f "${APP_DIR}/runtime.tar.gz" ];then
cd "${APP_DIR}" # ← 如果失败,脚本会因 set -e 终止
directories=("connectors" "lib" "plugins")
# ...
tar -zcvf runtime.tar.gz "${existing_dirs[@]}"
cd - # ← 如果失败,工作目录会被改变
fiProblem description:
Potential risks:
Impact scope:
Severity: MINOR Improvement suggestions: if [ ! -f "${APP_DIR}/runtime.tar.gz" ];then
cd "${APP_DIR}" || {
echo "ERROR: Failed to change directory to ${APP_DIR}" >&2
exit 1
}
directories=("connectors" "lib" "plugins")
existing_dirs=()
for dir in "${directories[@]}"; do
if [ -d "$dir" ]; then
existing_dirs+=("$dir")
fi
done
if [ ${#existing_dirs[@]} -eq 0 ]; then
echo "[${directories[@]}] not existed, skip generate runtime.tar.gz"
else
tar -zcvf runtime.tar.gz "${existing_dirs[@]}" || {
echo "ERROR: Failed to create runtime.tar.gz" >&2
cd - >/dev/null 2>&1
exit 1
}
fi
cd - >/dev/null 2>&1 || echo "WARNING: Failed to restore working directory" >&2
fiRationale:
Confidence: Medium - This is an edge case, but the risk does exist Issue 3: Inconsistent path concatenation methodsLocation:
Related context: // Modified code
command.add(
String.format(
"-Dyarn.ship-archives=\"%s/%s\"",
Common.getSeaTunnelHome(), Common.FLINK_YARN_APPLICATION_PATH));
// Compare path concatenation methods in other places (Common.java)
public static Path pluginRootDir() {
return Paths.get(getSeaTunnelHome(), "plugins");
}
public static Path connectorDir() {
return Paths.get(getSeaTunnelHome(), "connectors");
}Problem description:
Potential risks:
Impact scope:
Severity: MAJOR Improvement suggestions: // Option 1: Use Path.resolve() (Recommended)
command.add(
String.format(
"-Dyarn.ship-archives=\"%s\"",
Paths.get(Common.getSeaTunnelHome(), Common.FLINK_YARN_APPLICATION_PATH)));
// Option 2: Define a helper method
public abstract class AbstractFlinkStarter implements Starter {
private static String buildArchivePath() {
return Paths.get(Common.getSeaTunnelHome(), Common.FLINK_YARN_APPLICATION_PATH)
.toString();
}
// Then use in buildCommands()
command.add(
String.format(
"-Dyarn.ship-archives=\"%s\"",
buildArchivePath()));
}Rationale:
Confidence: High - Cross-platform compatibility issues do exist Issue 4: Constant access modifier change not documentedLocation:
Related context: // Before modification
private static final String FLINK_YARN_APPLICATION_PATH = "runtime.tar.gz";
// After modification
public static final String FLINK_YARN_APPLICATION_PATH = "runtime.tar.gz";Problem description:
Potential risks:
Impact scope:
Severity: MINOR Improvement suggestions: Approach A: If exposure is necessary, add documentation /**
* The relative path of the runtime tarball for Flink YARN Application mode.
* This file contains connectors, lib, and plugins directories.
*
* @since 2.3.5
*/
public static final String FLINK_YARN_APPLICATION_PATH = "runtime.tar.gz";Approach B: Don't expose the constant, provide an accessor method (recommended) // Keep private
private static final String FLINK_YARN_APPLICATION_PATH = "runtime.tar.gz";
// Add accessor method
public static Path getFlinkYarnApplicationPath() {
return Paths.get(getSeaTunnelHome(), FLINK_YARN_APPLICATION_PATH);
}
// Use in AbstractFlinkStarter
command.add(
String.format(
"-Dyarn.ship-archives=\"%s\"",
Common.getFlinkYarnApplicationPath()));Rationale:
Confidence: Medium - This is an API design issue, but impact is limited Issue 5: Missing unit testsLocation: N/A (missing files) Problem description:
Potential risks:
Impact scope:
Severity: MAJOR Improvement suggestions: Test 1: Java unit tests // AbstractFlinkStarterTest.java
@Test
public void testBuildCommandsWithYarnApplicationMode() {
// Arrange
String[] args = {"--config", "/path/to/config.conf", "--target", "yarn-application"};
FlinkStarter starter = new FlinkStarter(args);
// Act
List<String> commands = starter.buildCommands();
// Assert
assertTrue(commands.contains("-Dyarn.ship-archives=\"" +
Common.getSeaTunnelHome() + "/runtime.tar.gz\""));
}Test 2: Shell script tests (BATS) @test "runtime.tar.gz created in APP_DIR" {
cd /tmp
run /opt/soft/seatunnel/bin/start-seatunnel-flink-13-connector-v2.sh --config /opt/soft/seatunnel/config/test.conf
[ -f "/opt/soft/seatunnel/runtime.tar.gz" ]
[ ! -f "/tmp/runtime.tar.gz" ]
}Rationale:
Confidence: High - Missing tests is unacceptable Issue 6: Inconsistent shell script outputLocation:
Problem description: # Before
echo "[connectors,lib,plugins] not existed, skip generate runtime.tar.gz"
# After
echo "[${directories[@]}] not existed, skip generate runtime.tar.gz"
# Expanded becomes:
echo "[connectors lib plugins] not existed, skip generate runtime.tar.gz"Issue:
Potential risks:
Impact scope:
Severity: MINOR Improvement suggestions: # Keep comma-separated
if [ ${#existing_dirs[@]} -eq 0 ]; then
echo "[connectors, lib, plugins] not existed, skip generate runtime.tar.gz"
fi
# Or use IFS to temporarily modify
if [ ${#existing_dirs[@]} -eq 0 ]; then
(IFS=','; echo "[${directories[*]}] not existed, skip generate runtime.tar.gz")
fiRationale:
Confidence: Low - This is just a formatting issue and does not affect functionality |
…g of the Flink Yarn application and upload the 'runtime.tar.gz' file
…g of the Flink Yarn application and upload the 'runtime.tar.gz' file
…g of the Flink Yarn application and upload the 'runtime.tar.gz' file
cd893aa to
1acac0e
Compare
…g of the Flink Yarn application and upload the 'runtime.tar.gz' file
Purpose of this pull request
For the Flink Yarn application mode, there is an issue with packaging the
runtime.tar.gzfile. When executing/opt/soft/seatunnel/bin/start-seatunnel-flink-13-connector-v2.sh --target local --deploy-mode run --config /opt/soft/seatunnel/config/aa.confoutside the Seatunnel home directory, theruntime.tar.gzfile cannot be packaged correctly, resulting in the task failing to execute successfullyDoes this PR introduce any user-facing change?
How was this patch tested?
Submit the task using an absolute path for the
sttask in any directory, such as/opt/soft/seatunnel/bin/start-seatunnel-flink-13-connector-v2.sh --target local --deploy-mode run --config /opt/soft/seatunnel/config/aa.conf. Then check if there isruntime.tar.gzunder the seatunnel home directory, and ensure the task has been successfully submittedCheck list
New License Guide
incompatible-changes.mdto describe the incompatibility caused by this PR.