Skip to content

Commit 39bd7b3

Browse files
committed
add assignEndingChunkFirst option to other connector
1 parent 6a8ecb4 commit 39bd7b3

File tree

12 files changed

+63
-13
lines changed

12 files changed

+63
-13
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/Db2SourceBuilder.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,15 @@ public Db2SourceBuilder<T> skipSnapshotBackfill(boolean skipSnapshotBackfill) {
232232
return this;
233233
}
234234

235+
/**
236+
* Whether the {@link Db2IncrementalSource} should assign the ending chunk first or not during
237+
* snapshot reading phase.
238+
*/
239+
public Db2SourceBuilder<T> assignEndingChunkFirst(boolean assignEndingChunkFirst) {
240+
this.configFactory.assignEndingChunkFirst(assignEndingChunkFirst);
241+
return this;
242+
}
243+
235244
/**
236245
* Build the {@link Db2IncrementalSource}.
237246
*

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSource.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ public class Db2TableSource implements ScanTableSource, SupportsReadingMetadata
7777
private final String chunkKeyColumn;
7878
private final boolean closeIdleReaders;
7979
private final boolean skipSnapshotBackfill;
80+
private final boolean assignEndingChunkFirst;
8081

8182
/** Metadata that is appended at the end of a physical source row. */
8283
protected List<String> metadataKeys;
@@ -103,7 +104,8 @@ public Db2TableSource(
103104
double distributionFactorLower,
104105
@Nullable String chunkKeyColumn,
105106
boolean closeIdleReaders,
106-
boolean skipSnapshotBackfill) {
107+
boolean skipSnapshotBackfill,
108+
boolean assignEndingChunkFirst) {
107109
this.physicalSchema = physicalSchema;
108110
this.port = port;
109111
this.hostname = hostname;
@@ -128,6 +130,7 @@ public Db2TableSource(
128130
this.chunkKeyColumn = chunkKeyColumn;
129131
this.closeIdleReaders = closeIdleReaders;
130132
this.skipSnapshotBackfill = skipSnapshotBackfill;
133+
this.assignEndingChunkFirst = assignEndingChunkFirst;
131134
}
132135

133136
@Override
@@ -174,6 +177,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
174177
.chunkKeyColumn(chunkKeyColumn)
175178
.closeIdleReaders(closeIdleReaders)
176179
.skipSnapshotBackfill(skipSnapshotBackfill)
180+
.assignEndingChunkFirst(assignEndingChunkFirst)
177181
.build();
178182
return SourceProvider.of(db2ChangeEventSource);
179183
} else {
@@ -234,7 +238,8 @@ public DynamicTableSource copy() {
234238
distributionFactorLower,
235239
chunkKeyColumn,
236240
closeIdleReaders,
237-
skipSnapshotBackfill);
241+
skipSnapshotBackfill,
242+
assignEndingChunkFirst);
238243
source.metadataKeys = metadataKeys;
239244
source.producedDataType = producedDataType;
240245
return source;

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSourceFactory.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
4141
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
4242
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
43+
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST;
4344
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
4445
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
4546
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
@@ -138,6 +139,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
138139
config.getOptional(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN).orElse(null);
139140
boolean closeIdleReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
140141
boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
142+
boolean assignEndingChunkFirst =
143+
config.get(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST);
141144

142145
if (enableParallelRead) {
143146
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
@@ -173,7 +176,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
173176
distributionFactorLower,
174177
chunkKeyColumn,
175178
closeIdleReaders,
176-
skipSnapshotBackfill);
179+
skipSnapshotBackfill,
180+
assignEndingChunkFirst);
177181
}
178182

179183
@Override

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/table/Db2TableSourceFactoryTest.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,9 @@ public void testCommonProperties() {
118118
.defaultValue(),
119119
null,
120120
false,
121-
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
121+
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
122+
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST
123+
.defaultValue());
122124
assertEquals(expectedSource, actualSource);
123125
}
124126

@@ -157,7 +159,8 @@ public void testOptionalProperties() {
157159
.defaultValue(),
158160
null,
159161
false,
160-
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
162+
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
163+
true);
161164
assertEquals(expectedSource, actualSource);
162165
}
163166

@@ -245,7 +248,9 @@ public void testMetadataColumns() {
245248
.defaultValue(),
246249
null,
247250
false,
248-
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
251+
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
252+
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST
253+
.defaultValue());
249254
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
250255
expectedSource.metadataKeys =
251256
Arrays.asList("op_ts", "database_name", "table_name", "schema_name");

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBSourceBuilder.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,15 @@ public MongoDBSourceBuilder<T> scanNewlyAddedTableEnabled(boolean scanNewlyAdded
268268
return this;
269269
}
270270

271+
/**
272+
* Whether the {@link MongoDBSource} should assign the ending chunk first or not during snapshot
273+
* reading phase.
274+
*/
275+
public MongoDBSourceBuilder<T> assignEndingChunkFirst(boolean assignEndingChunkFirst) {
276+
this.configFactory.assignEndingChunkFirst(assignEndingChunkFirst);
277+
return this;
278+
}
279+
271280
/**
272281
* Build the {@link MongoDBSource}.
273282
*

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSource.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
8686
private final boolean noCursorTimeout;
8787
private final boolean skipSnapshotBackfill;
8888
private final boolean scanNewlyAddedTableEnabled;
89+
private final boolean assignEndingChunkFirst;
8990

9091
// --------------------------------------------------------------------------------------------
9192
// Mutable attributes
@@ -121,7 +122,8 @@ public MongoDBTableSource(
121122
boolean enableFullDocPrePostImage,
122123
boolean noCursorTimeout,
123124
boolean skipSnapshotBackfill,
124-
boolean scanNewlyAddedTableEnabled) {
125+
boolean scanNewlyAddedTableEnabled,
126+
boolean assignEndingChunkFirst) {
125127
this.physicalSchema = physicalSchema;
126128
this.scheme = checkNotNull(scheme);
127129
this.hosts = checkNotNull(hosts);
@@ -148,6 +150,7 @@ public MongoDBTableSource(
148150
this.noCursorTimeout = noCursorTimeout;
149151
this.skipSnapshotBackfill = skipSnapshotBackfill;
150152
this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
153+
this.assignEndingChunkFirst = assignEndingChunkFirst;
151154
}
152155

153156
@Override
@@ -207,7 +210,8 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
207210
.skipSnapshotBackfill(skipSnapshotBackfill)
208211
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
209212
.deserializer(deserializer)
210-
.disableCursorTimeout(noCursorTimeout);
213+
.disableCursorTimeout(noCursorTimeout)
214+
.assignEndingChunkFirst(assignEndingChunkFirst);
211215

212216
Optional.ofNullable(databaseList).ifPresent(builder::databaseList);
213217
Optional.ofNullable(collectionList).ifPresent(builder::collectionList);
@@ -307,7 +311,8 @@ public DynamicTableSource copy() {
307311
enableFullDocPrePostImage,
308312
noCursorTimeout,
309313
skipSnapshotBackfill,
310-
scanNewlyAddedTableEnabled);
314+
scanNewlyAddedTableEnabled,
315+
assignEndingChunkFirst);
311316
source.metadataKeys = metadataKeys;
312317
source.producedDataType = producedDataType;
313318
return source;

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636

3737
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
3838
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
39+
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST;
3940
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
4041
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
4142
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_STARTUP_MODE;
@@ -106,6 +107,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
106107
boolean enableCloseIdleReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
107108
boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
108109
boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
110+
boolean assignEndingChunkFirst =
111+
config.get(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST);
109112

110113
int splitSizeMB = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB);
111114
int splitMetaGroupSize = config.get(CHUNK_META_GROUP_SIZE);
@@ -146,7 +149,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
146149
enableFullDocumentPrePostImage,
147150
noCursorTimeout,
148151
skipSnapshotBackfill,
149-
scanNewlyAddedTableEnabled);
152+
scanNewlyAddedTableEnabled,
153+
assignEndingChunkFirst);
150154
}
151155

152156
private void checkPrimaryKey(UniqueConstraint pk, String message) {
@@ -228,6 +232,7 @@ public Set<ConfigOption<?>> optionalOptions() {
228232
options.add(SCAN_NO_CURSOR_TIMEOUT);
229233
options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
230234
options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
235+
options.add(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST);
231236
return options;
232237
}
233238
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848

4949
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
5050
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
51+
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST;
5152
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
5253
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
5354
import static org.apache.flink.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SRV_SCHEME;
@@ -156,7 +157,8 @@ public void testCommonProperties() {
156157
FULL_DOCUMENT_PRE_POST_IMAGE_ENABLED_DEFAULT,
157158
SCAN_NO_CURSOR_TIMEOUT_DEFAULT,
158159
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP_DEFAULT,
159-
SCAN_NEWLY_ADDED_TABLE_ENABLED_DEFAULT);
160+
SCAN_NEWLY_ADDED_TABLE_ENABLED_DEFAULT,
161+
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue());
160162
assertEquals(expectedSource, actualSource);
161163
}
162164

@@ -208,6 +210,7 @@ public void testOptionalProperties() {
208210
true,
209211
false,
210212
true,
213+
true,
211214
true);
212215
assertEquals(expectedSource, actualSource);
213216
}
@@ -249,7 +252,8 @@ public void testMetadataColumns() {
249252
FULL_DOCUMENT_PRE_POST_IMAGE_ENABLED_DEFAULT,
250253
SCAN_NO_CURSOR_TIMEOUT_DEFAULT,
251254
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP_DEFAULT,
252-
SCAN_NEWLY_ADDED_TABLE_ENABLED_DEFAULT);
255+
SCAN_NEWLY_ADDED_TABLE_ENABLED_DEFAULT,
256+
SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue());
253257

254258
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
255259
expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name");

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,8 @@ public MySqlSourceBuilder<T> parseOnLineSchemaChanges(boolean parseOnLineSchemaC
284284
}
285285

286286
/**
287-
* Whether to assign the ending chunk first during snapshot reading phase. Defaults to false.
287+
* Whether the {@link MySqlSource} should assign the ending chunk first or not during snapshot
288+
* reading phase.
288289
*/
289290
public MySqlSourceBuilder<T> assignEndingChunkFirst(boolean assignEndingChunkFirst) {
290291
this.configFactory.assignEndingChunkFirst(assignEndingChunkFirst);

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/table/OracleTableSourceFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ public Set<ConfigOption<?>> optionalOptions() {
188188
options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
189189
options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
190190
options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
191+
options.add(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST);
191192
return options;
192193
}
193194

0 commit comments

Comments
 (0)