Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.common.utils;

import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;

import org.apache.seatunnel.common.config.Common;

import java.io.File;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;

public class PathResolver {

private static final String SEATUNNEL_HOME_VAR = "$SEATUNNEL_HOME";

/**
* Replaces the absolute path of SEATUNNEL_HOME in the given URLs with a logical variable. The
* modification happens in-place on the provided collection.
*
* @param urls The collection of absolute URLs
*/
public static void replacePathWithEnv(Collection<URL> urls) {
if (urls == null || urls.isEmpty()) {
return;
}
List<URL> replaced =
urls.stream().map(PathResolver::replacePathWithEnv).collect(Collectors.toList());
urls.clear();
urls.addAll(replaced);
}

/**
* Replaces SEATUNNEL_HOME in a URL with a logical variable.
*
* @param url The absolute URL
* @return A URL with the logical variable, or the original URL if it's not within
* SEATUNNEL_HOME
*/
public static URL replacePathWithEnv(URL url) {
String path = url.getPath();
String home = Common.getSeaTunnelHome();
if (StringUtils.isBlank(home)) {
return url;
}
// Normalize paths for comparison (handle Windows backslashes)
String normalizedPath = new File(path).getAbsolutePath();
String normalizedHome = new File(home).getAbsolutePath();

if (normalizedPath.startsWith(normalizedHome)) {
String relativePath = normalizedPath.substring(normalizedHome.length());
// Ensure leading slash for URL construction
String newPath =
"/" + SEATUNNEL_HOME_VAR + relativePath.replace(File.separatorChar, '/');
// Remove double slashes if any (e.g. if relativePath started with /)
newPath = newPath.replace("//", "/");
try {
// Use URI constructor to build URL, ensure format consistency
return new URI(url.getProtocol(), url.getHost(), newPath, null).toURL();
} catch (MalformedURLException | URISyntaxException e) {
throw new RuntimeException("Failed to create logical URL for: " + url, e);
}
}
return url;
}

/**
* Resolves a collection of URLs containing the logical SEATUNNEL_HOME variable to absolute
* paths. The modification happens in-place on the provided collection.
*
* @param urls The collection of logical URLs to resolve
*/
public static void resolvePathEnv(Collection<URL> urls) {
if (urls == null || urls.isEmpty()) {
return;
}
List<URL> resolved =
urls.stream().map(PathResolver::resolvePathEnv).collect(Collectors.toList());
urls.clear();
urls.addAll(resolved);
}

/**
* Resolves a URL containing the logical SEATUNNEL_HOME variable to an absolute path.
*
* @param url The logical URL
* @return The resolved absolute URL
*/
public static URL resolvePathEnv(URL url) {
String path = url.getPath();
if (!path.contains(SEATUNNEL_HOME_VAR)) {
return url;
}

String home = Common.getSeaTunnelHome();
if (StringUtils.isBlank(home)) {
return url;
}

// Replace the variable with the actual home path
// We need to handle the case where path might start with / or not
String cleanPath = path.startsWith("/") ? path.substring(1) : path;
String relativePath = cleanPath.replace(SEATUNNEL_HOME_VAR, "");
// Remove leading slashes from relative path
relativePath = relativePath.replaceAll("^/+", "");

Path fullPath = Paths.get(home, relativePath);
try {
return fullPath.toUri().toURL();
} catch (MalformedURLException e) {
throw new RuntimeException("Failed to resolve logical URL for: " + url, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.common.utils;

import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;

import org.apache.seatunnel.common.config.Common;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.file.Paths;

public class PathResolverTest {

@BeforeEach
public void setUp() {
System.clearProperty("SEATUNNEL_HOME");
Common.setSeaTunnelHome(null);
}

/**
* SEATUNNEL_HOME will be calculated, and it will be recalculated even if the SEATUNNEL_HOME has
* been set to null in the {@link #setUp()}
*/
@Test
public void testReplacePathWithEnvWithNoStHome() throws MalformedURLException {
// assert SEATUNNEL_HOME not blank
Assertions.assertTrue(StringUtils.isNotBlank(Common.getSeaTunnelHome()));

String jarPath = "/opt/seatunnel-client/connectors/seatunnel/connector-kafka.jar";
// Handle Windows path separator if needed for test robustness
if (File.separatorChar == '\\') {
jarPath = jarPath.replace('/', '\\');
}

URL absoluteUrl = new File(jarPath).toURI().toURL();
URL logicalUrl = PathResolver.replacePathWithEnv(absoluteUrl);

Assertions.assertEquals(absoluteUrl.getPath(), logicalUrl.getPath());
}

@Test
public void testReplacePathWithEnv() throws MalformedURLException {
// Simulate Client Side
String clientHome = "/opt/seatunnel-client";
System.setProperty("SEATUNNEL_HOME", clientHome);
Common.setSeaTunnelHome(clientHome);

// Test file inside SEATUNNEL_HOME
String jarPath = clientHome + "/connectors/seatunnel/connector-kafka.jar";
// Handle Windows path separator if needed for test robustness
if (File.separatorChar == '\\') {
jarPath = jarPath.replace('/', '\\');
clientHome = clientHome.replace('/', '\\');
System.setProperty("SEATUNNEL_HOME", clientHome);
Common.setSeaTunnelHome(clientHome);
}

URL absoluteUrl = new File(jarPath).toURI().toURL();
URL logicalUrl = PathResolver.replacePathWithEnv(absoluteUrl);

Assertions.assertEquals(
"$SEATUNNEL_HOME/connectors/seatunnel/connector-kafka.jar", logicalUrl.getPath());

// Test file OUTSIDE SEATUNNEL_HOME
String outsidePath = "/tmp/other/connector.jar";
if (File.separatorChar == '\\') {
outsidePath = "C:\\tmp\\other\\connector.jar";
}
URL outsideUrl = new File(outsidePath).toURI().toURL();
URL resultUrl = PathResolver.replacePathWithEnv(outsideUrl);

Assertions.assertEquals(outsideUrl, resultUrl);
}

@Test
public void testResolvePathEnv() throws MalformedURLException {
// Simulate Server Side
String serverHome = "/opt/seatunnel-server";
System.setProperty("SEATUNNEL_HOME", serverHome);
Common.setSeaTunnelHome(serverHome);

if (File.separatorChar == '\\') {
serverHome = serverHome.replace('/', '\\');
System.setProperty("SEATUNNEL_HOME", serverHome);
Common.setSeaTunnelHome(serverHome);
}

// Logical URL from client
URL logicalUrl = new URL("file:$SEATUNNEL_HOME/connectors/seatunnel/connector-kafka.jar");
URL resolvedUrl = PathResolver.resolvePathEnv(logicalUrl);

String expectedPath =
Paths.get(serverHome, "connectors/seatunnel/connector-kafka.jar")
.toUri()
.toURL()
.getPath();
Assertions.assertEquals(expectedPath, resolvedUrl.getPath());
}

@Test
public void testEndToEndFlow() throws MalformedURLException {
// 1. Client Environment
String clientHome = "/home/user/client";
if (File.separatorChar == '\\') {
clientHome = "C:\\home\\user\\client";
}
System.setProperty("SEATUNNEL_HOME", clientHome);
Common.setSeaTunnelHome(clientHome);

String jarPath = Paths.get(clientHome, "lib", "test.jar").toString();
URL clientUrl = new File(jarPath).toURI().toURL();

// 2. Client replaces path
URL logicalUrl = PathResolver.replacePathWithEnv(clientUrl);
Assertions.assertTrue(logicalUrl.getPath().contains("$SEATUNNEL_HOME"));

// 3. Server Environment (Different Path)
String serverHome = "/var/lib/server";
if (File.separatorChar == '\\') {
serverHome = "D:\\var\\lib\\server";
}
System.setProperty("SEATUNNEL_HOME", serverHome);
Common.setSeaTunnelHome(serverHome);

// 4. Server resolves path
URL resolvedUrl = PathResolver.resolvePathEnv(logicalUrl);

String expectedServerPath =
Paths.get(serverHome, "lib", "test.jar").toUri().toURL().getPath();
Assertions.assertEquals(expectedServerPath, resolvedUrl.getPath());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.seatunnel.shade.org.apache.commons.lang3.tuple.ImmutablePair;

import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.common.utils.PathResolver;
import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
Expand Down Expand Up @@ -145,10 +146,14 @@ public LogicalDag getLogicalDag() {
action, commonPluginJarUrls, commonJarIdentifiers);
});
} else {
// Replaces the absolute SEATUNNEL_HOME path in the given URL with a logical variable.
PathResolver.replacePathWithEnv(commonPluginJars);
PathResolver.replacePathWithEnv(immutablePair.getRight());
jarUrls.addAll(commonPluginJars);
jarUrls.addAll(immutablePair.getRight());
actions.forEach(
action -> {
replaceActionJarUrls(action);
addCommonPluginJarsToAction(
action, new HashSet<>(commonPluginJars), Collections.emptySet());
});
Expand Down Expand Up @@ -186,6 +191,20 @@ private void uploadActionPluginJar(List<Action> actions, Set<ConnectorJarIdentif
});
}

/**
* It will traverse the entire task graph (DAG) and ensure that the jar path of each node
* (whether it is Source, Transform, or Sink) is correctly replaced with the logical path
* starting with $SEATUNNEL_HOME.
*
* @param action action
*/
private void replaceActionJarUrls(Action action) {
PathResolver.replacePathWithEnv(action.getJarUrls());
if (!action.getUpstream().isEmpty()) {
action.getUpstream().forEach(this::replaceActionJarUrls);
}
}

public ClientJobProxy execute() throws ExecutionException, InterruptedException {
LogicalDag logicalDag = getLogicalDag();
JobImmutableInformation jobImmutableInformation =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.common.utils.PathResolver;
import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
Expand Down Expand Up @@ -308,6 +309,8 @@ private List<URL> getConnectorJarList(List<? extends Config> configs, PluginType
jarPaths.addAll(
new SeaTunnelSinkPluginDiscovery().getPluginJarAndDependencyPaths(factoryIds));
jarPaths.addAll(commonPluginJars);
// Resolves URLs with the logical SEATUNNEL_HOME variable to absolute paths.
PathResolver.resolvePathEnv(jarPaths);
return jarPaths;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.seatunnel.api.tracing.MDCExecutorService;
import org.apache.seatunnel.api.tracing.MDCTracer;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.PathResolver;
import org.apache.seatunnel.common.utils.StringFormatUtils;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
Expand Down Expand Up @@ -301,6 +302,8 @@ public TaskDeployState deployTask(@NonNull TaskGroupImmutableInformation taskImm
} else if (!CollectionUtils.isEmpty(taskImmutableInfo.getJars().get(i))) {
jars = taskImmutableInfo.getJars().get(i);
}
// Resolves URLs with the logical SEATUNNEL_HOME variable to absolute paths.
PathResolver.resolvePathEnv(jars);
ClassLoader classLoader =
classLoaderService.getClassLoader(
taskImmutableInfo.getJobId(), Lists.newArrayList(jars));
Expand Down
Loading