[Fix][Connector-V2][Postgres-CDC] Skip createReplicationSlot when slotInfo is present#10416
[Fix][Connector-V2][Postgres-CDC] Skip createReplicationSlot when slotInfo is present#10416CNF96 wants to merge 8 commits intoapache:devfrom
Conversation
Issue 1: PR mixes two unrelated changesLocation: PR root directory (overall structure issue) Related context: Issue description:
Although they both involve fixes/improvements, they:
Potential risks:
Impact scope:
Severity: MAJOR Improvement suggestions:
Rationale:
Issue 2: PostgreSQL CDC fix lacks proper InterruptedException handlingLocation: Modified code: } catch (SQLException | InterruptedException ex) {
String message = "ReplicationConnection init failed";
throw new DebeziumException(message, ex);
}Related context:
Issue description:
Potential risks:
Impact scope:
Severity: MAJOR Improvement suggestions: try {
// initialize replication connection and create slot if needed
replicationConnection.initConnection();
} catch (SQLException ex) {
String message = "ReplicationConnection init failed";
if (ex.getMessage() != null && ex.getMessage().contains("already exists")) {
message += "; when setting up multiple connectors for the same database host, " +
"please make sure to use a distinct replication slot name for each.";
}
throw new DebeziumException(message, ex);
} catch (InterruptedException ex) {
// Restore the interrupted status
Thread.currentThread().interrupt();
throw new DebeziumException("ReplicationConnection init interrupted", ex);
}Rationale:
Issue 3: PostgreSQL CDC fix loses user-friendly error messagesLocation: Modified code: try {
// initialize replication connection and create slot if needed
replicationConnection.initConnection();
} catch (SQLException | InterruptedException ex) {
String message = "ReplicationConnection init failed";
throw new DebeziumException(message, ex);
}Original code: try {
replicationConnection.createReplicationSlot().orElse(null);
} catch (SQLException ex) {
String message = "Creation of replication slot failed";
if (ex.getMessage().contains("already exists")) {
message +=
"; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each.";
log.warn(message);
} else {
throw new DebeziumException(message, ex);
}
}Related context:
Issue description:
This means:
Potential risks:
Impact scope:
Severity: MAJOR Improvement suggestions: try {
// initialize replication connection and create slot if needed
replicationConnection.initConnection();
} catch (SQLException ex) {
String message = "ReplicationConnection init failed";
if (ex.getMessage() != null && ex.getMessage().contains("already exists")) {
message += "; when setting up multiple connectors for the same database host, " +
"please make sure to use a distinct replication slot name for each.";
log.warn(message);
} else {
throw new DebeziumException(message, ex);
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new DebeziumException("ReplicationConnection init interrupted", ex);
}Rationale:
Issue 4: PostgreSQL CDC fix lacks detailed code commentsLocation: Modified code: // initialize replication connection and create slot if needed
replicationConnection.initConnection();Issue description:
Potential risks:
Impact scope:
Severity: MINOR Improvement suggestions: // Initialize the replication connection. This method is preferred over createReplicationSlot()
// because it handles the case where the replication slot already exists (e.g., after job
// restart from checkpoint or manual slot creation). The initConnection() method is idempotent
// and will skip slot creation if the slot already exists, avoiding the
// "replication slot already exists" error.
// See: https://issues.apache.org/jira/browse/SEA-XXX (Issue #10322)
replicationConnection.initConnection();Rationale:
Issue 5: Oracle CLOB fix lacks test coverageLocation:
Issue description:
Potential risks:
Impact scope:
Severity: MAJOR Improvement suggestions: Unit test 1: @Test
public void testSetClobValue() throws SQLException {
// Mock PreparedStatement
PreparedStatement mockStmt = mock(PreparedStatement.class);
// Create converter
OracleJdbcRowConverter converter = new OracleJdbcRowConverter();
// Test CLOB
SeaTunnelDataType<String> clobType = SeaTunnelDataType.of(SqlType.STRING);
String testValue = "This is a test CLOB value";
converter.setValueToStatementByDataType(
testValue, mockStmt, clobType, 1, OracleTypeConverter.ORACLE_CLOB);
// Verify setClob was called with StringReader
verify(mockStmt).setClob(eq(1), argThat(reader -> reader instanceof StringReader));
}
@Test
public void testSetNClobValue() throws SQLException {
// Similar test for NCLOB
}Unit test 2: @Test
public void testSetClobWithMultipleIndexes() throws SQLException, IOException {
// Mock connection and statement
Connection mockConn = mock(Connection.class);
PreparedStatement mockStmt = mock(PreparedStatement.class);
when(mockConn.prepareStatement(any())).thenReturn(mockStmt);
// Create prepared statement with named parameters
// where :name appears multiple times in the SQL
String sql = "INSERT INTO test (id, name, name_copy) VALUES (:id, :name, :name)";
String[] fieldNames = {"id", "name"};
FieldNamedPreparedStatement pstmt =
FieldNamedPreparedStatement.prepareStatement(mockConn, sql, fieldNames);
// Set CLOB value
String clobValue = "Test CLOB data";
pstmt.setClob(2, new StringReader(clobValue));
// Verify that setClob was called for each mapped index
// and that each call received a new StringReader
verify(mockStmt, times(2)).setClob(anyInt(), argThat(reader -> reader instanceof StringReader));
}Integration test: @Test
public void testWriteClobData() throws Exception {
// Create table with CLOB column
String createTableSql = "CREATE TABLE test_clob (id INT, clob_col CLOB, nclob_col NCLOB)";
// ...
// Prepare test data
// ...
// Execute SeaTunnel job
// ...
// Verify data is correctly written
String querySql = "SELECT clob_col, nclob_col FROM test_clob WHERE id = 1";
// ...
}Rationale:
Issue 6: Oracle CLOB fix uses Chinese comments, not compliant with project standardsLocation: Modified code: /**
* Set CLOB parameter value, supporting both regular CLOB and NCLOB types. When only one index position needs to be set, use the corresponding setClob or setNClob method directly
* When multiple index positions need to be set with the same value, to ensure all streams can correctly read data, the Reader needs to be converted to a string first, and then a new Reader is created for each index position for setting
*
* @param parameterIndex Parameter index position (starting from 1)
* @param reader Reader object for reading CLOB data
* @param isNClob Whether it is NCLOB type
* @throws SQLException SQL execution exception
*/Issue description: Potential risks:
Impact scope:
Severity: MINOR Improvement suggestions: /**
* Sets the CLOB parameter value, supporting both regular CLOB and NCLOB types.
* When there is only one index position to set, the corresponding setClob or setNClob
* method is called directly. When there are multiple index positions that need to be
* set with the same value, the Reader must first be converted to a String, and then
* a new Reader is created for each index position to ensure all streams can read the data correctly.
*
* @param parameterIndex the parameter index position (starting from 1)
* @param reader the Reader object for reading CLOB data
* @param isNClob whether it is an NCLOB type
* @throws SQLException if a SQL execution exception occurs
*/Rationale:
Issue 7: Oracle CLOB fix lacks logging and observabilityLocation:
Issue description:
Potential risks:
Impact scope:
Severity: MINOR Improvement suggestions (optional): private void setClob(int parameterIndex, Reader reader, boolean isNClob) throws SQLException {
int[] indexes = indexMapping[parameterIndex - 1];
if (indexes.length == 1) {
if (isNClob) {
statement.setNClob(indexes[0], reader);
} else {
statement.setClob(indexes[0], reader);
}
} else {
try {
String value = IOUtils.toString(reader);
if (log.isDebugEnabled()) {
log.debug("Setting CLOB/NCLOB value for parameter {} across {} indexes, value length: {}",
parameterIndex, indexes.length, value.length());
}
for (int index : indexes) {
if (isNClob) {
statement.setNClob(index, new StringReader(value));
} else {
statement.setClob(index, new StringReader(value));
}
}
} catch (IOException e) {
throw new SQLException(e.getLocalizedMessage(), e);
}
}
}Rationale:
Issue 8: PR description is inaccurateLocation: PR description (overall) Issue description: 1. PR title does not match content:
2. "Does this PR introduce any user-facing change?" answer is inaccurate:
3. "How was this patch tested?" answer is incomplete:
Potential risks:
Impact scope:
Severity: MAJOR Improvement suggestions: 1. Split PR (recommended):
2. If not splitting, update PR description: Rationale:
|
|
@DanielCarter-stack I am a new contributor. Could you please help me comply with the regulations so I can better understand them and continue contributing in the future? |
This reverts commit 65409fd. 原因:拆分到后续计划中
…"ore_slot_test_20260112" already exists
|
LiJie20190102
left a comment
There was a problem hiding this comment.
Thank you for your contribution. Can you add relevant unit tests
|
@LiJie20190102 Hi, the relevant logic has been fully covered by existing tests, see: PostgresCDCIT so no duplicate test cases are written. The original code logic has two issues: first, it will throw exceptions in Chinese OS because PostgreSQL's exception messages are automatically adapted to the system language, causing the relevant judgment to fail; second, the original logic itself is not rigorous and has potential risks. |
I have reviewed the relevant code of Flink CDC at |
Thanks a lot for your professional review and valuable suggestion! I fully accept this optimization plan. The core reason is that your approach follows the principle of minimal code modification — directly adding the null judgment if (slotInfo == null) avoids unnecessary changes to the original replicationConnection.createReplicationSlot().orElse(null) method call, which keeps the current code structure intact, reduces the potential risk of introducing new bugs due to method modification, and can still accurately achieve the null check effect required for the business logic. |
|
LGTM, but CI still needs to pass |
All CI checks for this PR have been completed successfully with no failures. |


Purpose of this pull request
#10322
Fix the critical issue where PostgreSQL CDC tasks fail to start:
Core issue fix:
replicationConnection.createReplicationSlot().orElse(null), and this method:Improvements to exception handling mechanism:
Solution:
replicationConnection.initConnection(), which:Does this PR introduce any user-facing change?
No — Only fixes the underlying connection initialization logic, without affecting user configurations or output formats
How was this patch tested?
Check list
i. Update plugin-mapping.properties and add new connector information in it
ii. Update the pom file of
seatunnel-distiii. Add ci label in
label-scope-confiv. Add e2e testcase in
seatunnel-e2ev. Update connector plugin_config
release-note.