[Feature][Connector-V2] Add Airtable source and sink#10469
[Feature][Connector-V2] Add Airtable source and sink#10469kuleat wants to merge 1 commit intoapache:devfrom
Conversation
Issue 1: Offset Initial Value Handling UnclearLocation: if (pluginConfig.getOptional(AirtableSourceOptions.OFFSET).isPresent()) {
body.put("offset", pluginConfig.get(AirtableSourceOptions.OFFSET));
} else {
body.putIfAbsent("offset", null);
}Related Context:
Problem Description: Potential Risks:
Impact Scope:
Severity: MINOR Improvement Suggestions: // Suggested code
if (pluginConfig.getOptional(AirtableSourceOptions.OFFSET).isPresent()) {
body.put("offset", pluginConfig.get(AirtableSourceOptions.OFFSET));
} else {
// Only add offset if not already present in user-provided body
// Do not add null offset for initial request
body.putIfAbsent("offset", null);
// Alternative: Remove offset if it's null
// body.remove("offset");
}Rationale:
Issue 2: Concurrency Safety Lacks DocumentationLocation:
private long lastRequestTimeMillis = 0L;Related Context:
Problem Description: Potential Risks:
Impact Scope:
Severity: MINOR Improvement Suggestions: // Suggested code
/**
* Last request timestamp in milliseconds.
* Note: This field is accessed by a single thread (HttpSourceReader is single-threaded).
* Volatile is not required due to the happens-before relationship in single-threaded execution.
*/
private long lastRequestTimeMillis = 0L;Rationale:
Issue 3: Missing Input Upper Bound ValidationLocation:
this.requestIntervalMs = Math.max(0, requestIntervalMs);
this.rateLimitBackoffMs = Math.max(0, rateLimitBackoffMs);
this.rateLimitMaxRetries = Math.max(0, rateLimitMaxRetries);Related Context:
Problem Description: Potential Risks:
Impact Scope:
Severity: MINOR Improvement Suggestions: // Suggested code
private static final int MAX_REQUEST_INTERVAL_MS = 600000; // 10 minutes
private static final int MAX_BACKOFF_MS = 600000; // 10 minutes
private static final int MAX_RETRIES = 100;
this.requestIntervalMs =
Math.min(Math.max(0, requestIntervalMs), MAX_REQUEST_INTERVAL_MS);
this.rateLimitBackoffMs =
Math.min(Math.max(0, rateLimitBackoffMs), MAX_BACKOFF_MS);
this.rateLimitMaxRetries =
Math.min(Math.max(0, rateLimitMaxRetries), MAX_RETRIES);Rationale:
Issue 4: E2E Tests Lack Pagination ScenariosLocation:
Related Context:
Problem Description: Potential Risks:
Impact Scope:
Severity: MAJOR Improvement Suggestions: // Suggest adding pagination test scenarios to mockserver-config.json
{
"httpRequest": {
"path": "/v0/appTEST123/TestTable/listRecords"
},
"httpResponse": {
"statusCode": 200,
"body": {
"records": [...], // 100 records
"offset": "itrXXXXXXXXXXXXX/recXXXXXXXXXXXXX"
}
},
"httpResponseTemplates": {
"templateType": "RANDOM"
}
}Rationale:
Issue 5: Incomplete Changelog ContentLocation: Related Context:
Problem Description: Potential Risks:
Impact Scope:
Severity: MINOR Improvement Suggestions: # Airtable Connector Changelog
## 2.3.5 (Unreleased)
### Added
- Add Airtable source connector to read data from Airtable tables (#10443)
- Add Airtable sink connector to write data to Airtable tables
- Support offset-based pagination for large table reads
- Support configurable rate limiting with exponential backoff (429 handling)
- Support batch writes (up to 10 records per request)
- Support field projection and filtering via Airtable API options
### Changed
- Extract `HttpSourceReader#executeRequest()` to protected method to allow subclass override
### Known Limitations
- Does not support streaming mode (batch only)
- Does not support exactly-once semantics
- Batch size is limited to 10 records by Airtable APIRationale:
Issue 6: Missing Observability MetricsLocation:
Related Context:
Problem Description: Potential Risks:
Impact Scope:
Severity: MINOR (because logging is already in place) Improvement Suggestions: // Suggest adding Metrics to AirtableSourceReader
private Counter retryCounter;
private Histogram backoffTimeHistogram;
@Override
public void open() {
super.open();
MetricContext context = this.context.getMetricContext();
retryCounter = context.counter("airtableApiRetryCount");
backoffTimeHistogram = context.histogram("airtableApiBackoffTime");
}
private HttpResponse executeRequest() throws Exception {
int retryCount = 0;
while (true) {
waitForRequestSlot();
HttpResponse response = doExecuteRequest();
if (response.getCode() == STATUS_TOO_MANY_REQUESTS
&& retryCount < rateLimitMaxRetries) {
retryCount += 1;
retryCounter.increment(); // Record retry count
long backoffMillis = calculateBackoffMillis(retryCount);
backoffTimeHistogram.update(backoffMillis); // Record backoff time
...
}
return response;
}
}Rationale:
Issue 7: Missing Success Write LogsLocation: Related Context:
Problem Description: Potential Risks:
Impact Scope:
Severity: MINOR Improvement Suggestions: // Suggested code
private void sendWithRateLimitRetry(String body) throws IOException {
int retryCount = 0;
while (true) {
waitForRequestSlot();
try {
HttpResponse response = httpClient.doPost(url, headers, body);
if (HttpResponse.STATUS_OK == response.getCode()) {
log.info("Successfully wrote {} records to Airtable", batchBuffer.size());
return;
}
...
}
}
}Rationale:
|
Purpose of this pull request
#10443
Add Airtable source and sink connector.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Add UT and e2e tests.
Check list
New License Guide
incompatible-changes.mdto describe the incompatibility caused by this PR.