[Feature][Connector-V2] Add multi-table sink support for AmazonDynamo…#10497
[Feature][Connector-V2] Add multi-table sink support for AmazonDynamo…#10497Best2Two wants to merge 9 commits intoapache:devfrom
Conversation
Issue 1: Missing null validation leads to NPE riskLocation: Modified code: public void write(SeaTunnelRow element) throws IOException {
String tableName = element.getTableId();
dynamoDbSinkClient.write(serializer.serialize(element), tableName);
}Related context:
Problem description: Potential risks:
Impact scope:
Severity: MAJOR Improvement suggestion: public void write(SeaTunnelRow element) throws IOException {
String tableName = element.getTableId();
// Fallback to configured table name (single table compatibility)
if (StringUtils.isEmpty(tableName)) {
tableName = catalogTable.getTableId().toTablePath().getTableName();
}
dynamoDbSinkClient.write(serializer.serialize(element), tableName);
}Import needs to be added: import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;Rationale: Issue 2: Batch size counted per table, logic has flawsLocation: Modified code: if (amazondynamodbConfig.getBatchSize() > 0
&& batchListByTable.get(tableName).size() >= amazondynamodbConfig.getBatchSize()) {
flush();
}Original code (dev branch): if (amazondynamodbConfig.getBatchSize() > 0
&& batchList.size() >= amazondynamodbConfig.getBatchSize()) {
flush();
}Related context:
Problem description:
Potential risks:
Impact scope:
Severity: MINOR Improvement suggestion: public synchronized void write(PutItemRequest putItemRequest, String tableName) {
tryInit();
batchListByTable.computeIfAbsent(tableName, k -> new ArrayList<>());
batchListByTable.get(tableName).add(...);
// Only flush the current table
if (amazondynamodbConfig.getBatchSize() > 0
&& batchListByTable.get(tableName).size() >= amazondynamodbConfig.getBatchSize()) {
flushTable(tableName); // New method
}
}
private void flushTable(String tableName) {
List<WriteRequest> requests = batchListByTable.get(tableName);
if (requests != null && !requests.isEmpty()) {
Map<String, List<WriteRequest>> requestItems = new HashMap<>(1);
requestItems.put(tableName, requests);
dynamoDbClient.batchWriteItem(
BatchWriteItemRequest.builder().requestItems(requestItems).build());
batchListByTable.remove(tableName); // Only remove flushed tables
}
}Rationale: Issue 3: Concurrency safety issues with synchronized methodsLocation: Modified code: public synchronized void write(PutItemRequest putItemRequest, String tableName) {
tryInit();
batchListByTable.computeIfAbsent(tableName, k -> new ArrayList<>());
batchListByTable.get(tableName).add(...);
if (...)
flush(); // Network I/O inside lock
}
synchronized void flush() {
for (Map.Entry<String, List<WriteRequest>> entry : batchListByTable.entrySet()) {
// ...
dynamoDbClient.batchWriteItem(...); // AWS API call
}
batchListByTable.clear();
}Related context:
Problem description:
Potential risks:
Impact scope:
Severity: MAJOR Improvement suggestion: private final Object lock = new Object();
private final Map<String, List<WriteRequest>> batchListByTable;
public void write(PutItemRequest putItemRequest, String tableName) {
synchronized (lock) {
tryInit();
batchListByTable.computeIfAbsent(tableName, k -> new ArrayList<>());
batchListByTable.get(tableName).add(...);
if (amazondynamodbConfig.getBatchSize() > 0
&& batchListByTable.get(tableName).size() >= amazondynamodbConfig.getBatchSize()) {
// Copy current table batch
List<WriteRequest> toFlush = new ArrayList<>(batchListByTable.get(tableName));
batchListByTable.get(tableName).clear();
// Execute network I/O outside lock
flushAsync(tableName, toFlush);
}
}
}
private void flushAsync(String tableName, List<WriteRequest> requests) {
try {
Map<String, List<WriteRequest>> requestItems = new HashMap<>(1);
requestItems.put(tableName, requests);
dynamoDbClient.batchWriteItem(
BatchWriteItemRequest.builder().requestItems(requestItems).build());
} catch (Exception e) {
// Handle exception and retry
log.error("Failed to flush table: {}", tableName, e);
}
}Rationale: Issue 4: Unprocessed items returned by AWS API not handledLocation: Modified code: for (Map.Entry<String, List<WriteRequest>> entry : batchListByTable.entrySet()) {
String tableName = entry.getKey();
List<WriteRequest> requests = entry.getValue();
if (!requests.isEmpty()) {
Map<String, List<WriteRequest>> requestItems = new HashMap<>(1);
requestItems.put(tableName, requests);
dynamoDbClient.batchWriteItem(
BatchWriteItemRequest.builder().requestItems(requestItems).build());
// Missing handling of return value
}
}
batchListByTable.clear(); // Clear directly, assuming all succeededRelated context:
Problem description:
Items exceeding limits are returned in
Potential risks:
Impact scope:
Severity: CRITICAL Improvement suggestion: synchronized void flush() {
if (batchListByTable.isEmpty()) {
return;
}
for (Map.Entry<String, List<WriteRequest>> entry : batchListByTable.entrySet()) {
String tableName = entry.getKey();
List<WriteRequest> requests = entry.getValue();
if (!requests.isEmpty()) {
flushWithRetry(tableName, requests);
}
}
batchListByTable.clear();
}
private void flushWithRetry(String tableName, List<WriteRequest> requests) {
List<WriteRequest> pendingRequests = new ArrayList<>(requests);
int maxRetries = 3;
int retryCount = 0;
while (!pendingRequests.isEmpty() && retryCount < maxRetries) {
Map<String, List<WriteRequest>> requestItems = new HashMap<>(1);
requestItems.put(tableName, pendingRequests);
BatchWriteItemResponse response = dynamoDbClient.batchWriteItem(
BatchWriteItemRequest.builder().requestItems(requestItems).build());
Map<String, List<WriteRequest>> unprocessedKeys = response.unprocessedKeys();
pendingRequests = unprocessedKeys.getOrDefault(tableName, Collections.emptyList());
if (!pendingRequests.isEmpty()) {
retryCount++;
log.warn("Table {} has {} unprocessed items, retry {}/{}",
tableName, pendingRequests.size(), retryCount, maxRetries);
try {
Thread.sleep(100 * retryCount); // Exponential backoff
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted during retry", e);
}
}
}
if (!pendingRequests.isEmpty()) {
throw new RuntimeException(
String.format("Failed to write %d items to table %s after %d retries",
pendingRequests.size(), tableName, maxRetries));
}
}Rationale: Issue 5: Missing multi-table feature testsLocation: Test file directory Current status:
Related context:
Problem description: Potential risks:
Impact scope:
Severity: MAJOR Improvement suggestion: public class AmazonDynamoDBMultiTableSinkTest {
@Test
public void testMultiTableWrite() {
// Simulate multi-table write scenario
SeaTunnelRow row1 = createRow("table1", ...);
SeaTunnelRow row2 = createRow("table2", ...);
SeaTunnelRow row3 = createRow("table1", ...);
writer.write(row1);
writer.write(row2);
writer.write(row3);
writer.prepareCommit();
// Verify both tables are written
verify(dynamoDbClient, times(1)).batchWriteItem(argThat(req ->
req.containsKey("table1") && req.containsKey("table2")
));
}
@Test
public void testEmptyTableIdFallback() {
SeaTunnelRow row = new SeaTunnelRow(new Object[0]);
row.setTableId(""); // Empty table name
writer.write(row);
// Should fallback to configured table name
verify(dynamoDbClient).write(any(), eq("configTable"));
}
}Rationale: Issue 6: Typo (minor)Location: Modified code: .optional(BATCH_SIZE, SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA)Problem description:
Potential risks:
Impact scope:
Severity: MINOR Improvement suggestion:
|
|
@DanielCarter-stack Thank you for the thorough and detailed review! I've addressed all the issues you raised: Issue 1 - Null validation (MAJOR): ✅ Fixed
Issue 2 - Batch size logic (MINOR): ✅ Fixed
Issue 3 - Concurrency safety (MAJOR): ✅ Fixed
Issue 4 - Unprocessed items (CRITICAL): ✅ Fixed
Issue 5 - Missing tests (MAJOR): ✅ Fixed
Issue 6 - Typo (MINOR): ✅ Acknowledged
All tests pass locally: Ready for re-review. Thank you again for the detailed feedback! |
|
Good job. The overall design follows the standard SeaTunnel pattern by implementing 1. Concurrency Bug: Mismatched Locks causing CrashIn // Uses 'lock' object
public void write(PutItemRequest putItemRequest, String tableName) {
synchronized (lock) {
// ... modifies batchListByTable (HashMap)
}
}
// Uses 'this' instance
synchronized void flush() {
// ... iterates over batchListByTable
}Impact:
Fix: Ensure both methods synchronize on the same object (specifically // Remove 'synchronized' keyword from method signature and use block
public void flush() {
synchronized (lock) {
// implementation
}
}2. Weak Retry Strategy for ThrottlingThe current retry logic in int maxRetries = 3;
// ...
Thread.sleep(100 * retryCount);Impact:
Suggestion:
Logic Implementation Correctness1. NPE Handling in Writer (Verified)The String tableName = element.getTableId();
if (StringUtils.isEmpty(tableName)) {
tableName = amazondynamodbConfig.getTable();
}This is robust and safely falls back to the default configured table, ensuring backward compatibility for single-table jobs. 2. Batch Flush Logic (Verified)The refactored synchronized (lock) {
// ... adds to buffer ...
if (batchSizeReached) {
toFlush = new ArrayList<>(batchListByTable.get(tableName));
batchListByTable.remove(tableName);
}
}
if (toFlush != null) {
// Correctly executed outside lock
flushTableAsync(tableName, toFlush);
}This reduces lock contention significantly. By the way, Please pay attention to the CI running status,now the CI failed |
|
@davidzollo Thank you for the thorough review and catching those critical issues! 🙏 I've addressed all the concerns you raised: 1. Concurrency Bug (Critical) ✅
2. Weak Retry Strategy ✅
Additional improvements:
The implementation now properly handles DynamoDB throttling scenarios and ready for another review. Please let me know if you spot anything else that needs attention! Thank you again! |
|
hi @davidzollo quick ping, is there is anything I need to do? |
Hi there! 👋 Thank you for contributing to Apache SeaTunnel. First of all, this PR adds real value:
Both directions are useful in production. Since this is a non-trivial area, I’m sharing detailed feedback to help align the implementation with SeaTunnel’s runtime behavior and improve maintainability. 1. Multi-Table Routing Semantics (Important Clarification)Observation: Clarification: Why this still needs care:
Suggested Improvement:
2. Synchronization Scope in
|
|
@davidzollo Thank you for the detailed and constructive feedback! I really appreciate the time you took to review this thoroughly. I'll address all points systematically: 1. Multi-Table Routing Semantics: ✅ Will add
2. Synchronization Scope in flush(): ✅ Will fix immediately
3. Retry Policy Hardcoded: ✅ Will make configurable
4. Test Focus: ✅ Acknowledged
I'll push these changes within the next hours. Thanks again for the thorough review! |
|
Hi @davidzollo, thank you for the detailed feedback again, really appreciate it :) I have addressed all the points as follows: Multi-Table Routing Semantics: Kept the per-table buffering map to ensure correctness in low-parallelism or dynamic routing scenarios. Added a comment in AmazonDynamoDBWriter to clarify this runtime routing logic. Synchronization Scope in flush(): Refactored the flush() method in DynamoDbSinkClient to use a snapshot pattern. The network I/O and retry sleep now execute outside the synchronized block to prevent writer contention. Configurable Retry Policy: Replaced hardcoded values with new optional configuration settings: max_retries, retry_base_delay_ms, and retry_max_delay_ms. Test Fidelity: Refactored AmazonDynamoDBMultiTableSinkTest to remove brittle reflection. Used protected constructors for dependency injection to improve maintainability and better simulate runtime behavior. Please let me know if any further adjustments are needed. |
|
hey @davidzollo waiting for your review or merge :) |
...g/apache/seatunnel/connectors/seatunnel/amazondynamodb/AmazonDynamoDBMultiTableSinkTest.java
Outdated
Show resolved
Hide resolved
|
The CI failure is in seatunnel-engine-server and is unrelated to my changes. I will try to rerun |
We're fixing it |
| flush(); | ||
| synchronized (lock) { | ||
| if (dynamoDbClient != null) { | ||
| dynamoDbClient.close(); |
There was a problem hiding this comment.
Should the flush() method be placed inside if (dynamoDbClient != null)?
There was a problem hiding this comment.
In the flush method if no write() happens batchListByTable will be empty so it will return so NPE won't potentially happen.
It will be safer to put it inside this null guard but this will violate your previous feedback about doing I/O operations outside locks which was already flagged earlier as a risk!
So what do you think about that? Thank you for your review though!!
There was a problem hiding this comment.
I would recommend adding a null check inside the flush itself
as, so this way it will be safe!
if (dynamoDbClient == null || batchListByTable.isEmpty()) {
return;
}
|
|
||
| long jitter = (long) (delay * Math.random() * 0.5); | ||
| delay += jitter; | ||
|
|
There was a problem hiding this comment.
Please add log info during retries.
Recommendation: Log retry count, table name, delay, and remaining unprocessed items.
|
Please add docs for new options
|
|
@davidzollo I have committed some improvements based on your reviews, kindly can you check them. |
|
I found a retry semantics issue in
Suggestion:
|
|
Hello @davidzollo, I have submitted the last reviews based on your review :) Kindly please check them |
…able sink - Add null/empty tableId fallback to config table for backward compatibility - Optimize per-table flush to avoid affecting low-frequency tables - Move network I/O outside synchronized block for better concurrency - Add retry logic with exponential backoff for unprocessed items - Add comprehensive unit tests for multi-table functionality
…etry strategy - Fix critical concurrency issue by using consistent lock object in flush() and close() - Improve retry strategy with exponential backoff (10 retries, up to 5s delay) - Add jitter to prevent thundering herd problem - Rename flushTableAsync to flushTable for clarity
…urable retry policy
86c0b35 to
27de843
Compare
…DB connector
[Feature][Connector-V2] Add multi-table sink support for AmazonDynamoDB connector
Purpose of this pull request
Implements multi-table sink support for the AmazonDynamoDB connector as requested in issue #10426.
Changes:
SupportMultiTableSinkinterface toAmazonDynamoDBSinkSupportMultiTableSinkWriter<Void>interface toAmazonDynamoDBWriterAmazonDynamoDBSinkFactoryto includeMULTI_TABLE_SINK_REPLICAoptionAmazonDynamoDBWriterconstructor to acceptCatalogTableDynamoDbSinkClientto batch and flush writes per tableDoes this PR introduce any user-facing change?
Yes. The AmazonDynamoDB sink now supports multi-table scenarios such as CDC replication.
Example configuration:
How was this patch tested?
./mvnw spotless:apply./mvnw verify -DskipTests