[Feature] [Connector-V2] [JDBC] Add PostgreSQL COPY Command Support to JDBC Source - #10406#10468
[Feature] [Connector-V2] [JDBC] Add PostgreSQL COPY Command Support to JDBC Source - #10406#10468Ruiii-w wants to merge 8 commits intoapache:devfrom
Conversation
Issue 1: Feature not integrated into data reading flowLocation:
Issue Description:
Potential Risks:
Impact Scope:
Severity: BLOCKER Improvement Suggestions: // JdbcInputFormat.java
public void open(JdbcSourceSplit inputSplit) throws IOException {
try {
splitTableSchema = tables.get(inputSplit.getTablePath()).getTableSchema();
splitTableId = inputSplit.getTablePath().toString();
// Added: Choose reading method based on configuration
if (config.isUseCopyStatement() && isPostgreSQLDialect()) {
// Using COPY method
PgCopyInput copyInput = new PgCopyInput(config, jdbcDialect, chunkSplitter, splitTableSchema, splitTableId);
copyInput.open(inputSplit);
// Using PgCopyInput to read data...
} else {
// Traditional method
statement = chunkSplitter.generateSplitStatement(inputSplit, splitTableSchema);
resultSet = statement.executeQuery();
hasNext = resultSet.next();
}
} catch (SQLException se) {
throw new JdbcConnectorException(...);
}
}
private boolean isPostgreSQLDialect() {
return jdbcDialect instanceof PostgresDialect ||
jdbcDialect.dialectName().toLowerCase().contains("postgres");
}Rationale: Only by integrating Issue 2: Thread safety issues caused by static variablesLocation: private static int BUFFER_SIZE;
private static int MAX_BUFFER_SIZE;Related Context:
Issue Description:
Potential Risks:
Impact Scope:
Severity: CRITICAL Improvement Suggestions: // PgCopyBinaryReader.java
public final class PgCopyBinaryReader implements PgCopyReader {
// Remove static modifier
private final int bufferSize;
private final int maxBufferSize;
private ByteBuffer buffer;
public PgCopyBinaryReader(InputStream stream, TableSchema schema, Integer pgCopyBufferSize) {
this.stream = stream;
this.rowType = schema.toPhysicalRowDataType();
this.fieldTypes = rowType.getFieldTypes();
// Calculate and assign to instance variable
this.bufferSize = pgCopyBufferSize == null
? DEFAULT_BUFFER_SIZE
: 1 << (32 - Integer.numberOfLeadingZeros(pgCopyBufferSize - 1));
this.maxBufferSize = bufferSize * 1024;
this.buffer = ByteBuffer.allocate(bufferSize).order(ByteOrder.BIG_ENDIAN);
}
private void ensureCapacityFor(int required) {
if (required <= buffer.capacity()) return;
if (required > maxBufferSize) { // Use instance variable
throw new JdbcConnectorException(...);
}
// ...
}
}Rationale: Each instance should have independent buffer configuration. Static variables will cause serious concurrency issues. This is a blocking issue that must be fixed. Issue 3: Documentation and code default values are inconsistentLocation:
// Default value in code
public static final Option<Integer> PG_COPY_BUFFER_SIZE =
Options.key("pg_copy_buffer_size")
.intType()
.defaultValue(65536) // 64KB
.withDescription("Postgres copy buffer size");Issue Description: Potential Risks:
Impact Scope:
Severity: MAJOR Improvement Suggestions: // JdbcSourceOptions.java
public static final Option<Integer> PG_COPY_BUFFER_SIZE =
Options.key("pg_copy_buffer_size")
.intType()
.defaultValue(1048576) // Change to 1MB, consistent with documentation
.withDescription("Postgres copy buffer size (bytes). Only takes effect when use_copy_statement=true.");Or, if 64KB is the more appropriate default value, update the documentation: | pg_copy_buffer_size | Int | No | 65536 | Buffer size for COPY reading (bytes). |Rationale: Inconsistency between documentation and code will cause user confusion and configuration errors. Must be unified before merging. Issue 4: SQL injection riskLocation: String selectSql = chunkSplitter.generateSplitQuerySQL(split, tableSchema);
String copySql = String.format(
"COPY (%s) TO STDOUT WITH %s", selectSql, useBinary ? "BINARY" : "CSV");Related Context:
Issue Description:
Then SQL injection may occur. Potential Risks:
Impact Scope:
Severity: CRITICAL Improvement Suggestions:
private void validateSqlSafety(String sql) {
String upperSql = sql.toUpperCase();
// Check for dangerous keyword combinations
if (upperSql.contains(";") ||
upperSql.contains("--") ||
upperSql.contains("/*") ||
upperSql.matches(".*\\b(DROP|DELETE|INSERT|UPDATE|ALTER|CREATE|TRUNCATE)\\b.*")) {
throw new JdbcConnectorException(
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
"Potentially unsafe SQL detected: " + sql);
}
}
public void open(JdbcSourceSplit split) {
String selectSql = chunkSplitter.generateSplitQuerySQL(split, tableSchema);
validateSqlSafety(selectSql); // Add validation
String copySql = String.format(
"COPY (%s) TO STDOUT WITH %s", selectSql, useBinary ? "BINARY" : "CSV");
// ...
}
String copySql = String.format(
"COPY (%s) TO STDOUT WITH %s",
selectSql.replaceAll(";", ""), // Remove semicolon
useBinary ? "BINARY" : "CSV");Rationale: SQL injection is a serious security issue. Although the risk is low in the current scenario (SQL is generated by SeaTunnel), security programming best practices should be followed. Issue 5: ChunkSplitter interface changes affect all subclassesLocation: public String generateSplitQuerySQL(JdbcSourceSplit split, TableSchema tableSchema) {
throw new UnsupportedOperationException("Not supported by this splitter");
}Related Context:
Issue Description: Potential Risks:
Impact Scope:
Severity: MAJOR Improvement Suggestions: Solution A: Implement the method for // DynamicChunkSplitter.java
@Override
public String generateSplitQuerySQL(JdbcSourceSplit split, TableSchema tableSchema) {
// Dynamic sharding is typically used for tables without primary keys or uneven shard keys
// Use split.query directly as SELECT SQL
if (StringUtils.isNotBlank(split.getSplitQuery())) {
return split.getSplitQuery();
}
// Fall back to full table scan
return String.format("SELECT * FROM %s",
jdbcDialect.tableIdentifier(split.getTablePath()));
}Solution B: Restrict COPY functionality to only support fixed sharding in configuration: // JdbcSourceConfig.java
public static JdbcSourceConfig of(ReadonlyConfig config) {
// ...
boolean useCopy = config.get(JdbcSourceOptions.USE_COPY_STATEMENT);
boolean useDynamic = config.getOptional(JdbcSourceOptions.PARTITION_COLUMN).isEmpty();
if (useCopy && useDynamic) {
throw new IllegalArgumentException(
"use_copy_statement is only supported with fixed partition mode. " +
"Please configure partition_column or set use_copy_statement=false.");
}
// ...
}Rationale: Interface changes need to consider all implementation classes, otherwise runtime exceptions will occur. It is recommended to implement at least Solution B before merging, adding configuration validation. Issue 6: Lack of integration tests and E2E testsLocation:
Issue Description:
Potential Risks:
Impact Scope:
Severity: MAJOR Improvement Suggestions:
// PgCopyBinaryReaderTest.java
@Test
public void testBinaryHeaderParsing() {
byte[] header = "PGCOPY\n\377\r\n\0\0\0\0\0\0\0\0\0".getBytes();
InputStream stream = new ByteArrayInputStream(header);
PgCopyBinaryReader reader = new PgCopyBinaryReader(stream, schema, 65536);
// Verify headerParsed is true
}
@Test
public void testRowParsing() {
// Construct test data: row with 2 fields
byte[] rowData = {...};
InputStream stream = new ByteArrayInputStream(rowData);
PgCopyBinaryReader reader = new PgCopyBinaryReader(stream, schema, 65536);
assertTrue(reader.hasNext());
SeaTunnelRow row = reader.next();
// Verify field values
}
// JdbcPostgresCopyIT.java
@ExtendWith(PostgresContainerExtension.class)
public class JdbcPostgresCopyIT {
@Test
public void testCopyRead() throws Exception {
// 1. Prepare test data
// 2. Configure Source: use_copy_statement=true, binary=true
// 3. Execute sync task
// 4. Verify correctness of read data
}
}
# Test steps
# 1. Start PostgreSQL
docker run -d -p 5432:5432 -e POSTGRES_PASSWORD=test postgres:14
# 2. Prepare test data
psql -h localhost -U postgres -d test -c "CREATE TABLE test_copy AS SELECT generate_series(1, 10000) AS id, 'test_' || id AS name"
# 3. Configure SeaTunnel task
# cat test_copy.conf
source {
Jdbc {
url = "jdbc:postgresql://localhost:5432/test"
driver = "org.postgresql.Driver"
user = "postgres"
password = "test"
query = "SELECT * FROM test_copy"
use_copy_statement = true
binary = true
}
}
sink {
Console {}
}
# 4. Run task
./bin/seatunnel.sh -c test_copy.confRationale: Insufficient test coverage is a code quality issue. Especially for core functionality (such as high-performance data transfer), sufficient test coverage is essential. Issue 7: Improper log level may leak sensitive informationLocation: LOG.info("Open PG COPY split={}, sql={}", split.splitId(), copySql);Related Context:
Issue Description:
In production environments, INFO level logs are persisted to log systems, posing information leakage risks. Potential Risks:
Impact Scope:
Severity: MINOR Improvement Suggestions: // PgCopyInput.java
public void open(JdbcSourceSplit split) {
try {
String selectSql = chunkSplitter.generateSplitQuerySQL(split, tableSchema);
String copySql = String.format(
"COPY (%s) TO STDOUT WITH %s", selectSql, useBinary ? "BINARY" : "CSV");
Connection conn = getConnection();
LOG.debug("Open PG COPY split={}, sql={}", split.splitId(), copySql); // Change to DEBUG
copyManagerProxy = new CopyManagerProxy(conn);
copyStream = copyManagerProxy.copyOutAsStream(copySql);
// ...
}
}Additionally, a switch can be added to configuration to control whether to log detailed SQL: // JdbcSourceOptions.java
public static final Option<Boolean> LOG_SQL =
Options.key("log_sql")
.booleanType()
.defaultValue(false)
.withDescription("Whether to log SQL statements (DEBUG level). Only for debugging.");
// PgCopyInput.java
if (config.isLogSql()) {
LOG.debug("Open PG COPY split={}, sql={}", split.splitId(), copySql);
} else {
LOG.info("Open PG COPY split={}", split.splitId()); // Do not log SQL
}Rationale: Protecting sensitive information is a security best practice. Recommend changing the log level to DEBUG before merging, or adding a configuration switch. Issue 8: Unreasonable buffer size configuration may cause memory overflowLocation: MAX_BUFFER_SIZE = BUFFER_SIZE * 1024;Related Context:
Issue Description:
Although the code has Potential Risks:
Impact Scope:
Severity: MAJOR Improvement Suggestions:
// PgCopyBinaryReader.java
private static final int MAX_ALLOWED_BUFFER_SIZE = 256 * 1024 * 1024; // 256MB hard limit
public PgCopyBinaryReader(InputStream stream, TableSchema schema, Integer pgCopyBufferSize) {
// ...
MAX_BUFFER_SIZE = Math.min(bufferSize * 1024, MAX_ALLOWED_BUFFER_SIZE);
this.buffer = ByteBuffer.allocate(bufferSize).order(ByteOrder.BIG_ENDIAN);
}
// JdbcSourceOptions.java
public static final Option<Integer> PG_COPY_BUFFER_SIZE =
Options.key("pg_copy_buffer_size")
.intType()
.defaultValue(65536)
.withDescription("Postgres copy buffer size (bytes). Must be between 65536 and 10485760 (10MB).");
// JdbcSourceConfig.java
public static JdbcSourceConfig of(ReadonlyConfig config) {
int bufferSize = config.get(JdbcSourceOptions.PG_COPY_BUFFER_SIZE);
if (bufferSize < 65536 || bufferSize > 10 * 1024 * 1024) {
throw new IllegalArgumentException(
"pg_copy_buffer_size must be between 64KB and 10MB, got: " + bufferSize);
}
// ...
}
| pg_copy_buffer_size | Int | No | 65536 | Buffer size for COPY reading (bytes). Recommended range: 65536-1048576 (64KB-1MB). Larger values may improve performance but increase memory usage. Maximum allowed: 10485760 (10MB). |Rationale: Memory management is a critical issue in production environments. Reasonable upper limits must be set to prevent memory overflow. Issue 9: Improper exception handling may lead to resource leaksLocation: public static void closeQuietly(Object obj) {
if (obj instanceof Closeable) {
Closeable c = (Closeable) obj;
try {
c.close();
} catch (Exception ignored) {
// Exception ignored
}
}
}Related Context:
Issue Description:
Potential Risks:
Impact Scope:
Severity: MAJOR Improvement Suggestions:
public static void closeQuietly(Object obj, String resourceName) {
if (obj instanceof Closeable) {
Closeable c = (Closeable) obj;
try {
c.close();
} catch (Exception e) {
// Log exception but do not throw
LOG.warn("Failed to close resource: {}", resourceName, e);
// Optional: Add exception to monitoring metrics
}
}
}
// PgCopyInput.java
@Override
public void close() {
PgCopyUtils.closeQuietly(reader, "PgCopyReader");
PgCopyUtils.closeQuietly(copyStream, "PgCopyInputStream");
PgCopyUtils.closeQuietly(copyManagerProxy, "CopyManagerProxy");
}
public void open(JdbcSourceSplit split) {
try {
copyManagerProxy = new CopyManagerProxy(getConnection());
copyStream = copyManagerProxy.copyOutAsStream(copySql);
reader = createReader(copyStream);
hasNext = reader.hasNext();
} catch (Exception e) {
// Ensure resource cleanup on exception
close();
throw e;
}
}
private boolean closed = false;
@Override
public void close() {
if (closed) {
LOG.warn("PgCopyInput already closed");
return;
}
try {
// Close resource
} finally {
closed = true;
}
}
@Override
protected void finalize() throws Throwable {
try {
if (!closed) {
LOG.error("PgCopyInput leaked, not closed properly");
}
} finally {
super.finalize();
}
}Rationale: Proper resource management is the foundation of stability. Close exceptions should not be silently ignored. Recommend at least logging warning messages. Issue 10: Non-standard configuration namespaceLocation: public static final Option<Boolean> BINARY =
Options.key("binary")
.booleanType()
.defaultValue(false)
.withDescription("Use binary copy mode");
public static final Option<Integer> PG_COPY_BUFFER_SIZE =
Options.key("pg_copy_buffer_size")
.intType()
.defaultValue(65536)
.withDescription("Postgres copy buffer size");Issue Description:
Potential Risks:
Impact Scope:
Severity: MINOR Improvement Suggestions: // JdbcSourceOptions.java
public static final Option<Boolean> COPY_ENABLED =
Options.key("copy.enabled")
.booleanType()
.defaultValue(false)
.withDescription("Whether to use COPY method for reading (PostgreSQL only).");
public static final Option<Boolean>_COPY_BINARY =
Options.key("copy.binary")
.booleanType()
.defaultValue(false)
.withDescription("Whether to use binary format for COPY reading. Only takes effect when copy.enabled=true.");
public static final Option<Integer> COPY_BUFFER_SIZE =
Options.key("copy.buffer_size")
.intType()
.defaultValue(65536)
.withDescription("Buffer size for COPY reading (bytes). Only takes effect when copy.enabled=true.");Corresponding configuration example: source {
Jdbc {
url = "jdbc:postgresql://localhost:5432/test"
# ...
copy {
enabled = true
binary = true
buffer_size = 1048576
}
}
}Rationale: Using namespaces can improve configuration readability and extensibility, avoiding future naming conflicts. Although this is a minor issue, recommend optimizing before merging. Issue 11: Exactly-Once semantic incompatibilityLocation: Issue Description:
Potential Risks:
Impact Scope:
Severity: MAJOR Improvement Suggestions:
// JdbcSourceConfig.java
public static JdbcSourceConfig of(ReadonlyConfig config) {
boolean useCopy = config.get(JdbcSourceOptions.COPY_ENABLED);
boolean isExactlyOnce = config.get(JdbcCommonOptions.IS_EXACTLY_ONCE);
if (useCopy && isExactlyOnce) {
throw new IllegalArgumentException(
"use_copy_statement is incompatible with is_exactly_once=true. " +
"COPY protocol does not support offset-based recovery. " +
"Please either set use_copy_statement=false or is_exactly_once=false.");
}
// ...
}
### Limitations
- **Exactly-Once**: COPY mode does NOT support exactly-once semantics. When `use_copy_statement=true`,
the connector operates in at-least-once mode. If you require exactly-once guarantees, please keep
`use_copy_statement=false` (use standard SELECT queries).
- **Fault Tolerance**: COPY mode cannot recover from intermediate positions. If a task fails, it will
re-read the entire split from the beginning, which may result in duplicate records.
### When to use COPY mode?
**Recommended scenarios**:
- Full data loads (not incremental sync)
- One-time batch migrations
- Scenarios where duplicate records can be tolerated or deduplicated downstream
**NOT recommended for**:
- Incremental CDC scenarios
- Exactly-once required workloads
- Frequent fault recovery environmentsRationale: Feature limitations should be clearly stated in the design phase to avoid user misuse causing data consistency issues. This is a problem that must be resolved before merging. Issue 12: Lack of quantitative data for performance improvementsLocation: Issue Description:
Potential Risks:
Impact Scope:
Severity: MINOR Improvement Suggestions:
### Performance Benchmarks
We tested COPY mode against standard SELECT queries on a PostgreSQL 14 instance with different data volumes:
| Rows | Data Size | SELECT (time) | COPY CSV (time) | COPY Binary (time) | Speedup (CSV) | Speedup (Binary) |
|------|-----------|---------------|-----------------|-------------------|---------------|------------------|
| 10K | 1 MB | 0.5s | 0.3s | 0.2s | 1.7x | 2.5x |
| 100K | 10 MB | 5.2s | 2.1s | 1.4s | 2.5x | 3.7x |
| 1M | 100 MB | 58s | 18s | 12s | 3.2x | 4.8x |
| 10M | 1 GB | 650s | 185s | 120s | 3.5x | 5.4x |
**Test Environment**:
- PostgreSQL 14.5 on Ubuntu 20.04
- 4 vCPU, 16GB RAM
- Network: 1Gbps
- SeaTunnel 2.3.8
**Conclusion**: COPY mode provides significant performance improvements, especially for large data volumes (>100K rows). Binary format is consistently faster than CSV format.
### Performance Tuning
**Buffer Size**:
- Default (64KB): Good for most workloads
- Large data (>1GB rows): Increase to 1MB or larger
- Limited memory: Decrease to 32KB
**Binary vs CSV**:
- Binary: Faster, but requires PostgreSQL-specific parsing
- CSV: Slightly slower, but more portable
- Recommendation: Use Binary for production, CSV for debugging
**Partition Strategy**:
- COPY mode works best with parallel partitioning
- Ensure `partition_column` is properly configured for optimal performance
### Monitoring
When COPY mode is enabled, the following metrics are logged at task completion:
- `rows_parsed`: Total number of rows read
- `bytes_read`: Total bytes transferred
- `buffer_expansions`: Number of times the buffer was expanded
- `elapsed_ms`: Total time spent reading
- `rows_per_second`: Read throughput
- `bytes_per_second`: Data transfer rate
Example log:PG COPY summary: rows=1000000 rows, bytes=104857600 B, expansions=2 times, Rationale: Although this is not a technical issue, providing performance data can help users make informed decisions and also demonstrates the value of the feature. Recommend adding at least simple performance comparison explanations before merging. |
Purpose of this pull request
This pull request introduces support for the PostgreSQL
COPYprotocol in the JDBC Source connector.The
COPYcommand is significantly faster than standardSELECTqueries for bulk data retrieval. This feature allows users to enableCOPYmode for PostgreSQL sources to improve read performance.Note: This feature was developed based on SeaTunnel version 2.3.8, but has been merged into dev and the functionality remains normal.
Key features added:
COPY (SELECT ...) TO STDOUTstatement generation.COPYstatements.COPY.use_copy_statement,binary, andpg_copy_buffer_size.Does this PR introduce any user-facing change?
Yes, this PR adds new configuration options for JDBC Source (specifically for PostgreSQL).
New Options:
use_copy_statementfalseCOPYmethod for reading.binaryfalseCOPYreading. Only takes effect whenuse_copy_statement=true.pg_copy_buffer_size1048576COPYreading (bytes). Only takes effect whenuse_copy_statement=true.Documentation Details (per new specifications):
COPYprotocol for high-performance bulk data extraction.org.postgresql:postgresql(Standard JDBC Driver)Example Configuration:
How was this patch tested?
org.apache.seatunnel.example.engine.SeaTunnelEngineLocalExample.use_copy_statement = trueand verified that data was correctly extracted and processed.COPYSQL was generated correctly and executed against a local PostgreSQL instance.Check list
https://github.com/apache/seatunnel/blob/dev/docs/en/contribution/new-license.mdincompatible-changes.mdto describe the incompatibility caused by this PR.https://github.com/apache/seatunnel/blob/dev/plugin-mapping.propertiesand add new connector information in ithttps://github.com/apache/seatunnel/blob/dev/seatunnel-dist/pom.xmlhttps://github.com/apache/seatunnel/blob/dev/.github/workflows/labeler/label-scope-conf.ymlhttps://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/https://github.com/apache/seatunnel/blob/dev/config/plugin_config