Skip to content

[Feature][Zeta]Add datasource spi#10586

Open
chl-wxp wants to merge 10 commits intoapache:devfrom
chl-wxp:add-datasource-spi
Open

[Feature][Zeta]Add datasource spi#10586
chl-wxp wants to merge 10 commits intoapache:devfrom
chl-wxp:add-datasource-spi

Conversation

@chl-wxp
Copy link
Contributor

@chl-wxp chl-wxp commented Mar 10, 2026

Related: #10554

Summary

This PR adds a DataSource SPI and runtime mapping so connectors can reference external data sources by datasourceId. The implementation includes SPI interfaces, a mapper mechanism, a provider implementation, and the runtime wiring to resolve and merge external data-source configuration into connector configs.This SPI may evolve into a more general metadata abstraction in the future.

What this PR actually adds

  • New SPI interfaces:
    • DataSourceProvider — provider entry point for external metadata services
    • DataSourceMapper — per-connector mapper that converts external metadata into connector config
  • Provider implementation (reference implementation) to fetch metadata from an external system
  • Runtime integration:
    • Load configured provider at startup
    • Detect datasource_id in connector configs during job parsing
    • Resolve mapped connector configuration and merge with job-level overrides
  • Configuration:
    • seatunnel.yaml entries to select and configure the DataSource provider (example included)
  • Tests and examples:
    • unit/e2e tests and example config demonstrating jdbc datasource resolution (where present)
  • Minor docs/packaging updates and related registration to plug the SPI into the build/runtime

Usage (example)

datasource:
    enabled: true
    kind: gravitino
    gravitino:
       uri: http://127.0.0.1:8090
       metalake: test_metalake

@chl-wxp chl-wxp marked this pull request as ready for review March 10, 2026 10:25
@DanielCarter-stack
Copy link

Issue 1: Provider Repeated Initialization Causes Resource Leak

Location: seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/DataSourceConfigUtil.java:76-107

Related Context:

  • Provider factory: DataSourceProviderFactory.java:50-51 (caches Provider singleton)
  • Gravitino Provider implementation: GravitinoDataSourceProvider.java:96 (creates new client on each init)
  • Caller: MultipleTableJobConfigParser.java:855-864 (called during job parsing)

Problem Description:
The resolveDataSourceConfigs() method repeatedly calls init() and close() on the globally cached Provider singleton, leading to:

  1. HTTP clients are created multiple times
  2. Provider is closed after the first job execution
  3. The second job execution uses the closed Provider, which may cause exceptions

Potential Risks:

  • Risk 1: HTTP client is already closed during the second job execution, metadata retrieval fails
  • Risk 2: Resource leak, old clients created by each init are not properly released
  • Risk 3: In SeaTunnel Server long-running scenarios, multiple job submissions will cause instability

Impact Scope:

  • Direct impact: All jobs using DataSource SPI
  • Indirect impact: Stability of SeaTunnel Client/Server processes
  • Impact surface: Core framework

Severity: BLOCKER

Improvement Suggestions:

// DataSourceConfigUtil.java
public static Config resolveDataSourceConfigs(
        Config seaTunnelJobConfig, DataSourceConfig dataSourceConfig) {
    if (!dataSourceConfig.isEnabled()) {
        log.debug("DataSource Center is disabled, returning original config");
        return seaTunnelJobConfig;
    }

    String providerKind = dataSourceConfig.getKind();
    log.info("Starting datasource config resolution with provider: {}", providerKind);

    // Get Provider singleton (no duplicate init)
    DataSourceProvider provider = getOrCreateProvider(dataSourceConfig);
    
    // Only on first initialization
    if (!isProviderInitialized(provider)) {
        provider.init(ConfigFactory.parseMap(dataSourceConfig.getProperties()));
        markProviderInitialized(provider);
    }
    
    // ... existing parsing logic ...
    
    // ❌ Remove provider.close()
    // provider.close();
    
    return ConfigFactory.parseMap(resultMap);
}

// Add initialization state management in DataSourceProviderFactory
private static final ConcurrentMap<String, Boolean> INITIALIZED_PROVIDERS = new ConcurrentHashMap<>();

Rationale:

  • Provider should be a process-level singleton, managed by the framework lifecycle
  • Initialize at process startup, clean up at process shutdown
  • Each job submission only uses the already initialized Provider, should not affect its state

Issue 2: Mapper Cache Key Does Not Include Provider Type

Location: seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/DataSourceConfigUtil.java:268-282

Related Context:

  • Cache definition: DataSourceConfigUtil.java:56-57
  • Provider interface: DataSourceProvider.java:98 (returns Mapper collection)

Problem Description:
MAPPER_CACHE only uses connectorIdentifier as the cache key. When multiple Providers implement the same Connector's Mapper, the cache will be mixed.

Potential Risks:

  • Risk 1: When switching Provider kind, the old Provider's Mapper is still used
  • Risk 2: If two Providers' Mapper implementations have different logic, it will cause configuration parsing errors

Impact Scope:

  • Direct impact: Scenarios where multiple Providers coexist
  • Indirect impact: Correctness when switching Providers
  • Impact surface: Multiple Connectors

Severity: CRITICAL

Improvement Suggestions:

private static DataSourceMapper findMapper(
        DataSourceProvider provider, String connectorIdentifier) {

    String cacheKey = provider.kind() + "_" + connectorIdentifier;  // ✅ Composite key

    return MAPPER_CACHE.computeIfAbsent(
            cacheKey,
            k -> {
                for (DataSourceMapper mapper : provider.dataSourceMappers()) {
                    if (mapper.connectorIdentifier().equalsIgnoreCase(connectorIdentifier)) {
                        return mapper;
                    }
                }
                return null;
            });
}

Rationale:

  • Mapper implementation is bound to Provider, same-name Mappers from different Providers may have different logic
  • Cache key must include Provider type to ensure uniqueness

Issue 3: Configuration Merge Strategy Does Not Match Documentation

Location: seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/DataSourceConfigUtil.java:319-342

Related Context:

  • Documentation: docs/en/introduction/concepts/datasource-spi.md:51-52
  • Connector configuration: All Connectors use datasource_id

Problem Description:
The code implementation is "external configuration overrides job configuration", but the documentation states "job configuration takes priority". This prevents users from overriding incorrect external system configurations in job configuration.

Potential Risks:

  • Risk 1: When external metadata system configuration is incorrect, users cannot temporarily override and fix it
  • Risk 2: Violates user intuition, increases configuration debugging difficulty

Impact Scope:

  • Direct impact: All Connectors using datasource_id
  • Indirect impact: Configuration predictability
  • Impact surface: All Connectors

Severity: MAJOR

Improvement Suggestions:

Solution 1: Modify code to make job configuration take priority

for (Map.Entry<String, Object> entry : originalMap.entrySet()) {
    String key = entry.getKey();
    Object value = entry.getValue();
    // ✅ Original config takes priority, external config only supplements missing fields
    if (!mergedMap.containsKey(key)) {
        mergedMap.put(key, datasourceConfig.get(key));
    }
}

Solution 2: Modify documentation to state external configuration takes priority

When `datasource_id` is specified, the connector will:
1. Use the `datasource_id` to fetch connection details from the external metadata service
2. Merge the fetched configuration with any additional parameters in the job config
3. **Fetched configuration takes precedence over job-level parameters**

Rationale:

  • Solution 1 (job configuration takes priority) is recommended because users need an "escape hatch"
  • When external system configuration is incorrect, users can temporarily override in job configuration
  • Follows the principle of least surprise

Issue 4: HTTP Client Not Configured With Timeout

Location: seatunnel-api/src/main/java/org/apache/seatunnel/api/metalake/gravitino/GravitinoClient.java:62-64

Related Context:

  • HTTP request: GravitinoClient.java:117-163 (executeGetRequest)
  • Provider resource management: GravitinoDataSourceProvider.java:56 (client field)

Problem Description:
Using the default HttpClients.createDefault() without configuring connection timeout and read timeout may cause jobs to hang indefinitely.

Potential Risks:

  • Risk 1: When Gravitino service is unresponsive, job execution hangs indefinitely
  • Risk 2: Cannot set reasonable timeout strategy, affecting fault recovery

Impact Scope:

  • Direct impact: All jobs using Gravitino
  • Indirect impact: Timeout control of job execution
  • Impact surface: Core framework

Severity: MAJOR

Improvement Suggestions:

public GravitinoClient() {
    RequestConfig config = RequestConfig.custom()
        .setConnectTimeout(5000)      // 5 second connection timeout
        .setConnectionRequestTimeout(5000)  // 5 second request timeout
        .setSocketTimeout(30000)      // 30 second read timeout
        .build();
    
    this.httpClient = HttpClients.custom()
        .setDefaultRequestConfig(config)
        .setMaxConnTotal(50)          // Maximum number of connections
        .setMaxConnPerRoute(20)       // Maximum number of connections per route
        .build();
}

Rationale:

  • Must set timeout to avoid jobs hanging indefinitely
  • Connection pool configuration can improve concurrent performance
  • Timeout value should be configurable, but default value needs to be reasonable

Issue 5: Thread Safety of Mapper in Concurrent Scenarios

Location: seatunnel-api/src/main/java/org/apache/seatunnel/api/datasource/gravitino/GravitinoJdbcDataSourceMapper.java:78-81

Related Context:

  • Mapper interface documentation: DataSourceMapper.java:39-42 (requires thread safety)
  • Mapper cache: DataSourceConfigUtil.java:56-57

Problem Description:
GravitinoJdbcDataSourceMapper holds a shared GravitinoClient instance. Although CloseableHttpClient's execute method is thread-safe, this design does not meet the interface documentation's thread safety requirements.

Potential Risks:

  • Risk 1: If GravitinoClient adds state later, concurrency issues may arise
  • Risk 2: Violates interface contract, may introduce bugs during future maintenance

Impact Scope:

  • Direct impact: Concurrent job submission scenarios
  • Indirect impact: System concurrent stability
  • Impact surface: Core framework

Severity: MAJOR

Improvement Suggestions:

Solution 1: Make Mapper stateless

public class GravitinoJdbcDataSourceMapper implements DataSourceMapper {
    private final String catalogBaseUrl;
    // ❌ Remove client field
    
    public GravitinoJdbcDataSourceMapper(String catalogBaseUrl) {
        this.catalogBaseUrl = catalogBaseUrl;
    }
    
    @Override
    public Map<String, Object> map(String datasourceId) {
        // ✅ Create temporary client on each call
        try (GravitinoClient client = new GravitinoClient()) {
            JsonNode propertiesNode = client.getMetaInfo(datasourceId, catalogBaseUrl);
            return convertToJdbcConfig(propertiesNode);
        } catch (IOException e) {
            throw new RuntimeException(...);
        }
    }
}

Solution 2: Ensure GravitinoClient is thread-safe

// GravitinoClient.java
@Override
public JsonNode getMetaInfo(String sourceId, String metalakeUrl) throws IOException {
    // ✅ Use synchronized to ensure thread safety
    synchronized (httpClient) {
        return executeGetRequest(metalakeUrl + sourceId);
    }
}

Rationale:

  • Solution 1 better aligns with interface contract, but has poorer performance
  • Solution 2 requires clear documentation of GravitinoClient's thread safety
  • In actual use, Solution 1 is recommended because HTTP calls are not frequent operations

Issue 6: Improper Handling of Null Mapper Return Values

Location: seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/DataSourceConfigUtil.java:165-183

Related Context:

  • Mapper interface: DataSourceMapper.java:65 (return may be null)
  • Error handling: DataSourceConfigUtil.java:188-196

Problem Description:
When Mapper is not found or returns empty configuration, it silently returns the original configuration, which may lead to users using incorrect connection configuration without noticing.

Potential Risks:

  • Risk 1: User configured datasource_id, but actually used connection information from job configuration
  • Risk 2: May connect to the wrong database, causing data security issues

Impact Scope:

  • Direct impact: Troubleshooting configuration errors
  • Indirect impact: Data security
  • Impact surface: All Connectors

Severity: MAJOR

Improvement Suggestions:

DataSourceMapper mapper = findMapper(provider, connectorIdentifier);

if (mapper == null) {
    throw new DataSourceProviderException(
        String.format("No DataSourceMapper found for connector '%s' in provider '%s'. " +
                      "Please check if the connector is supported by the provider.",
                      connectorIdentifier, providerKind));
}

Map<String, Object> datasourceConfig = mapper.map(datasourceId);

if (datasourceConfig == null || datasourceConfig.isEmpty()) {
    throw new DataSourceProviderException(
        String.format("DataSourceMapper returned empty config for datasource_id: '%s'. " +
                      "Please check if the datasource exists in the external system.",
                      datasourceId));
}

Rationale:

  • Fail-fast principle: Configuration errors should fail fast
  • Avoid data security issues caused by silent failures
  • Clear error messages for easier troubleshooting

Issue 7: Logs May Leak Sensitive Information

Location: seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/DataSourceConfigUtil.java:324-328

Related Context:

  • Log level: Uses log.debug
  • Sensitive fields: password, secret, token, etc.

Problem Description:
DEBUG logs record all configuration values, including passwords. If DEBUG logging is enabled, it may cause sensitive information leakage.

Potential Risks:

  • Risk 1: After enabling DEBUG logs in production environment, passwords are recorded to log files
  • Risk 2: Log collection and analysis systems may store sensitive information

Impact Scope:

  • Direct impact: Log security
  • Indirect impact: Compliance
  • Impact surface: Core framework

Severity: MINOR

Improvement Suggestions:

log.debug("Merging datasource config: key={}, datasource_id={}",
        key, datasourceId);  // ❌ Remove value

// Or add sensitive field filtering
String safeValue = isSensitiveField(key) ? "***" : String.valueOf(value);
log.debug("Merging datasource config: key={}, value={}, datasource_id={}",
        key, safeValue, datasourceId);

private boolean isSensitiveField(String key) {
    return key.toLowerCase().contains("password") ||
           key.toLowerCase().contains("secret") ||
           key.toLowerCase().contains("token");
}

Rationale:

  • Security best practice: Do not record sensitive information in logs
  • Even at DEBUG level, security principles should be followed

@chl-wxp
Copy link
Contributor Author

chl-wxp commented Mar 11, 2026

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants