[Feature][Connectors-v2] Oracel sqlserver support cdc timestmap#10428
[Feature][Connectors-v2] Oracel sqlserver support cdc timestmap#10428LeonYoah wants to merge 7 commits intoapache:devfrom
Conversation
…ng from a specified timestamp
…ng from a specified timestamp - add e2e
…ng from a specified timestamp - fix specific
…ng from a specified timestamp - fix ci
…ng from a specified timestamp - fix e2e ci
|
Hi @LeonYoah, thanks for this contribution! The ability to start from a specific timestamp is a very valuable feature for CDC tasks. After reviewing the changes, I have a few concerns that need to be addressed, particularly regarding timezone handling in Oracle: 1. Critical Timezone Issue in
|
…ng from a specified timestamp - Fix timezone issue
Thank you for your review. I think your suggestions are completely fine. I will try to make the changes. |
Issue 1: Inconsistent timezone handling causes timestamp conversion errorsLocation:
Related Context:
Problem Description: // Current implementation
java.sql.Timestamp timestamp = new java.sql.Timestamp(timestampMs);
statement.setTimestamp(1, timestamp); // Use JVM default timezoneThe
Potential Risks:
Impact Scope:
Severity: MAJOR (may cause data loss and user confusion) Improvement Suggestions: // Oracle implementation
public static RedoLogOffset timestampToScn(JdbcConnection jdbc, long timestampMs) {
try {
LOG.info("Converting timestamp {} to SCN", timestampMs);
String sql = "SELECT TIMESTAMP_TO_SCN(?) AS SCN FROM DUAL";
return jdbc.prepareQueryAndMap(
sql,
statement -> {
// Solution 1: Use Calendar to explicitly specify UTC timezone
java.util.Calendar utcCalendar = java.util.Calendar.getInstance(java.util.TimeZone.getTimeZone("UTC"));
java.sql.Timestamp timestamp = new java.sql.Timestamp(timestampMs);
statement.setTimestamp(1, timestamp, utcCalendar);
},
rs -> {
if (rs.next()) {
final String scn = rs.getString(1);
LOG.info("Converted timestamp {} to SCN: {}", timestampMs, scn);
return new RedoLogOffset(Scn.valueOf(scn).longValue());
} else {
throw new SeaTunnelException(
"Cannot convert timestamp to SCN. Make sure the specified timestamp is valid.");
}
});
} catch (SQLException e) {
LOG.error("Failed to convert timestamp to SCN", e);
throw new SeaTunnelException("Failed to convert timestamp to SCN", e);
}
}
// SQL Server implementation similar modificationRationale:
Issue 2: SQL Server CDC function parameter semantics may cause data lossLocation:
Related Context:
Problem Description: String sql = "SELECT sys.fn_cdc_map_time_to_lsn('smallest greater than or equal', ?) AS lsn";
Potential Risks:
Impact Scope:
Severity: MAJOR (may cause data loss) Improvement Suggestions: // Solution 1: Use 'largest less than or equal' (recommended)
String sql = "SELECT sys.fn_cdc_map_time_to_lsn('largest less than or equal', ?) AS lsn";
// Solution 2: Clearly document behavioral differences
// Add explanation in SqlServer-CDC.md:
// "Note: The timestamp startup mode uses the 'smallest greater than or equal'
// strategy, which may start from a time slightly later than the specified
// timestamp to ensure data integrity."Rationale:
Issue 3: Exception handling uses RuntimeException instead of SeaTunnelExceptionLocation:
Related Context:
Problem Description: // OffsetFactory layer
catch (Exception e) {
throw new RuntimeException("Convert timestamp to redoLog offset error", e);
}
// Utility class layer
catch (SQLException e) {
throw new SeaTunnelException("Failed to convert timestamp to SCN", e);
}Inconsistent exception handling hierarchy:
Potential Risks:
Impact Scope:
Severity: MINOR (does not affect functionality, but affects code consistency and maintainability) Improvement Suggestions: // Oracle implementation
@Override
public Offset timestamp(long timestamp) {
try (JdbcConnection jdbcConnection = dialect.openJdbcConnection(sourceConfig)) {
return OracleConnectionUtils.timestampToScn(jdbcConnection, timestamp);
} catch (Exception e) {
throw new SeaTunnelException("Convert timestamp to redoLog offset error", e);
}
}
// SQL Server implementation
@Override
public Offset timestamp(long timestamp) {
try (JdbcConnection jdbcConnection = dialect.openJdbcConnection(sourceConfig)) {
return SqlServerUtils.timestampToLsn((SqlServerConnection) jdbcConnection, timestamp);
} catch (Exception e) {
throw new SeaTunnelException("Convert timestamp to LSN offset error", e);
}
}Rationale:
Issue 4: Documentation description inconsistent with actually supported startup modesLocation:
Related Context:
Problem Description: .withDescription(
"Optional startup mode for CDC source, valid enumerations are "
+ "\"initial\", \"latest\" or \"timestamp\"");The description text only lists
This may lead users to mistakenly believe these modes are not supported. Potential Risks:
Impact Scope:
Severity: MINOR (does not affect functionality, but affects user experience) Improvement Suggestions: // Restore complete enum list (consistent with dev branch)
.withDescription(
"Optional startup mode for CDC source, valid enumerations are "
+ "\"initial\", \"earliest\", \"latest\", \"timestamp\" or \"specific\"");Rationale:
Issue 5: E2E test time granularity insufficient may cause instabilityLocation:
Related Context:
Problem Description: insertRow(1, SCEHMA_NAME, SOURCE_TABLE1); // Insert ID=1
TimeUnit.SECONDS.sleep(5);
long startTimestamp = System.currentTimeMillis(); // Record timestamp
TimeUnit.SECONDS.sleep(5);
insertRow(2, SCEHMA_NAME, SOURCE_TABLE1); // Insert ID=2Test design has the following issues:
Potential Risks:
Impact Scope:
Severity: MINOR (test quality issue, does not affect production code functionality) Improvement Suggestions: @TestTemplate
public void testTimestampStartupMode(TestContainer container) throws Exception {
clearTable(SCEHMA_NAME, SINK_TABLE1);
clearTable(SCEHMA_NAME, SOURCE_TABLE1);
// Insert first batch of data
insertRow(1, SCEHMA_NAME, SOURCE_TABLE1);
// Ensure timestamp distinction
TimeUnit.SECONDS.sleep(10);
// Use database time instead of JVM time
long startTimestamp = getCurrentDatabaseTimestamp(); // Add helper method
TimeUnit.SECONDS.sleep(10);
// Insert second batch of data
insertRow(2, SCEHMA_NAME, SOURCE_TABLE1);
// Start CDC task...
await().atMost(300000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
// Verify logic...
});
}
// Helper method: Get current database timestamp
private long getCurrentDatabaseTimestamp() throws Exception {
// Execute SQL: SELECT CURRENT_TIMESTAMP FROM DUAL (Oracle)
// Or: SELECT SYSDATETIME() (SQL Server)
// Return millisecond timestamp
}Rationale:
Issue 6: Oracle Chinese documentation contains large unrelated changesLocation:
Related Context:
Problem Description: Oracle Chinese documentation changes (383 lines) include extensive content unrelated to timestamp functionality:
Although these content improve documentation quality:
Potential Risks:
Impact Scope:
Severity: MINOR (process issue, does not affect functionality) Improvement Suggestions: Rationale:
VI. Overall Assessment ConclusionCan it be MergedRecommendation: Conditionally Approved Rationale:
Recommended Pre-merge Conditions: Must Fix (BLOCKER):
Strongly Recommended to Fix (MAJOR):
Recommended to Fix (MINOR):
Improvement PriorityHigh Priority:
Medium Priority: Low Priority: Architecture Rationality AssessmentOverall Rating: ⭐⭐⭐⭐ (4/5) Strengths:
Room for Improvement:
Long-term Recommendations
|
Response to Issue 2: SQL Server CDC
|
| java.sql.Timestamp timestamp = new java.sql.Timestamp(timestampMs); | ||
| statement.setTimestamp(1, timestamp); |
There was a problem hiding this comment.
@LeonYoah @davidzollo
The setTimestamp() method uses the JVM's default time zone instead of the database time zone or UTC. Could modifying the time zone like this cause problems?
There was a problem hiding this comment.
I noticed that the MySQL CDC has a "server-time-zone" parameter. We should allow users to specify or debug how to handle it. After all, it's quite challenging to ensure compatibility with both time zones.
// Pass in serverTimeZone when calling
Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone(serverTimeZone));
statement.setTimestamp(1, timestamp, calendar);There was a problem hiding this comment.
Can you have a try for the following code
/**
* Convert timestamp (milliseconds since epoch) to Oracle SCN.
*
* @param jdbc JDBC connection
* @param timestampMs timestamp in milliseconds since epoch
* @return RedoLogOffset with the corresponding SCN
*/
public static RedoLogOffset timestampToScn(JdbcConnection jdbc, long timestampMs) {
try {
// Use UTC base time + offset to completely avoid JVM and Session time zone interference
// TO_TIMESTAMP_TZ('1970-01-01 00:00:00 +00:00', 'YYYY-MM-DD HH24:MI:SS TZH:TZM') establishes an absolute UTC origin
// NUMTODSINTERVAL(?, 'SECOND') converts milliseconds to a second interval (Oracle division retains decimals by default, with sufficient precision)
String sql = "SELECT TIMESTAMP_TO_SCN(" +
"TO_TIMESTAMP_TZ('1970-01-01 00:00:00 +00:00', 'YYYY-MM-DD HH24:MI:SS TZH:TZM') " +
"+ NUMTODSINTERVAL(? / 1000, 'SECOND')" +
") AS SCN FROM DUAL";
return jdbc.prepareQueryAndMap(
sql,
statement -> {
// Pass Long directly; there's no need for setTimestamp, and thus no need for Calendar or time zone conversion
statement.setObject(1, timestampMs);
},
rs -> {
if (rs.next()) {
final String scn = rs.getString(1);
return new RedoLogOffset(Long.parseLong(scn));
} else {
throw new SeaTunnelException(
"Cannot convert timestamp to SCN. Make sure the specified timestamp is valid.");
}
});
} catch (SQLException e) {
String msg = "Failed to convert timestamp to SCN";
if (e.getMessage() != null && e.getMessage().contains("ORA-08180")) {
msg += ". The timestamp is too old and the redo log may have been purged (undo_retention exceeded).";
}
throw new SeaTunnelException(msg, e);
}
}
…mp conversion and update docs
Purpose of this pull request
Does this PR introduce any user-facing change?
This PR aims to extend the support for Oracle CDC and SQL Server CDC to allow users to specify a start timestamp for data capture. This enhancement will facilitate data replay after task interruption by resuming from a specific point in time. Currently, there is no clear solution for PostgreSQL CDC.
How was this patch tested?
Check list
New License Guide
incompatible-changes.mdto describe the incompatibility caused by this PR.