Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions seatunnel-connectors-v2/connector-socket/MULTI_TABLE_SINK.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Multi-Table Sink Support - Socket Connector

## Implementation Status

The Socket connector implements `SupportMultiTableSink` to prevent `ClassCastException` when used in multi-table scenarios (e.g., CDC pipelines).

## Current Behavior

- ✅ **Supports multi-table routing**: Multiple source tables can write to the same Socket sink instance without data shuffling
- ⚠️ **Uses shared schema**: All incoming rows are serialized using the initial table's schema
- ✅ **100% backward compatible**: Single-table jobs work exactly as before

## Suitable Use Cases

This implementation works correctly for:
1. **Single-table scenarios** (standard usage)
2. **Multi-table scenarios where all tables share the same schema**
3. **Debug/development scenarios** where schema variations are acceptable

## Technical Details

### What Changed
- `SocketSink` implements `SupportMultiTableSink`
- `SocketSinkWriter` implements `SupportMultiTableSinkWriter<Void>`
- No changes to `SocketClient` or serialization logic

### Why This Approach
Socket connector is primarily used for debugging and development. The current implementation:
- Prevents runtime `ClassCastException` in multi-table scenarios
- Maintains simplicity and performance
- Avoids over-engineering for a debug-oriented connector

## Future Enhancements

For production multi-table scenarios with different schemas, future work could include:
- Refactoring `SocketClient` to accept per-row serializer selection
- Adding configuration options for strict schema validation
- Implementing table-aware serialization strategies

**These enhancements should be proposed as separate issues after this minimal implementation is merged.**

## References

- Issue: #10426 - Implement multi-table sink support for connectors
- Parent Issue: #5652 - Multi-table sink feature tracking
- Reference: `ElasticsearchSink`, `JdbcSink` (similar minimal implementations)

## Implementation

**Author:** @AshharAhmadKhan
**Date:** 2026-02-03
**Reviewers:** @davidzollo, @DanielCarter-stack
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
Expand All @@ -29,7 +30,22 @@
import java.io.IOException;
import java.util.Optional;

public class SocketSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
/**
* Socket Sink for writing data to a network socket.
*
* <p>This sink implements {@link SupportMultiTableSink} to support multi-table routing scenarios.
* In multi-table mode, multiple source tables can write to the same socket instance without data
* shuffling, which is useful for CDC and debugging scenarios.
*
* <p><b>Current Implementation:</b> Uses a shared schema for all incoming rows (the schema of the
* first table). This works correctly for single-table jobs and multi-table jobs where all tables
* share the same schema.
*
* @see org.apache.seatunnel.api.sink.SupportMultiTableSink
* @since 2.3.13
*/
public class SocketSink extends AbstractSimpleSink<SeaTunnelRow, Void>
implements SupportMultiTableSink {

private final SocketConfig socketConfig;
private final CatalogTable catalogTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.socket.sink;

import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
Expand All @@ -25,7 +26,19 @@

import java.io.IOException;

public class SocketSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
/**
* Socket sink writer that writes data to network sockets.
*
* <p>Implements {@link SupportMultiTableSinkWriter} to prevent ClassCastException when the
* framework expects multi-table support. All rows are serialized using the schema provided at
* construction time.
*
* @see SupportMultiTableSinkWriter
* @since 2.3.13
*/
public class SocketSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
implements SupportMultiTableSinkWriter<Void> {

private final SocketClient socketClient;

SocketSinkWriter(SocketConfig socketConfig, SeaTunnelRowType seaTunnelRowType)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.socket.sink;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.socket.config.SocketCommonOptions;
import org.apache.seatunnel.connectors.seatunnel.socket.config.SocketConfig;
import org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkOptions;

import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Contract test to verify SocketSinkWriter implements SupportMultiTableSinkWriter. This prevents
* ClassCastException when used in multi-table scenarios.
*/
public class SocketSinkWriterContractTest {

@Test
public void testWriterImplementsMultiTableInterface() throws Exception {
// Given: Socket configuration (ReadonlyConfig-based, as required)
Map<String, Object> configMap = new HashMap<>();
configMap.put(SocketCommonOptions.HOST.key(), "localhost");
configMap.put(SocketCommonOptions.PORT.key(), 9999);
configMap.put(SocketSinkOptions.MAX_RETRIES.key(), 0);

ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(configMap);
SocketConfig config = new SocketConfig(readonlyConfig);

SeaTunnelRowType rowType =
new SeaTunnelRowType(
new String[] {"id", "name"},
new SeaTunnelDataType[] {BasicType.INT_TYPE, BasicType.STRING_TYPE});

// When: Creating writer
SocketSinkWriter writer = null;
try {
writer = new SocketSinkWriter(config, rowType);

// Then: Verify interface implementation (this is what framework checks)
assertTrue(
writer instanceof SupportMultiTableSinkWriter,
"SocketSinkWriter must implement SupportMultiTableSinkWriter");

// Verify framework cast succeeds
SupportMultiTableSinkWriter<?> multiTableWriter =
(SupportMultiTableSinkWriter<?>) writer;
assertNotNull(multiTableWriter);

} catch (Exception e) {
// Socket connection failure is acceptable (no server running)
assertTrue(
e.getMessage().contains("Connection refused")
|| e.getMessage().contains("connect")
|| e.getMessage().contains("ConnectException"),
"Unexpected exception: " + e.getMessage());
} finally {
if (writer != null) {
try {
writer.close();
} catch (Exception ignored) {
// ignore cleanup errors
}
}
}
}
}