Skip to content

Commit 9a7ed71

Browse files
beryllwwangjunbo
authored andcommitted
support MongoDBChunkSplitter add ending chunk first
1 parent 13e511a commit 9a7ed71

File tree

22 files changed

+170
-25
lines changed

22 files changed

+170
-25
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
7777
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
7878
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
79+
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST;
7980
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
8081
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
8182
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
@@ -150,6 +151,8 @@ public DataSource createDataSource(Context context) {
150151
config.get(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);
151152
boolean isParsingOnLineSchemaChanges = config.get(PARSE_ONLINE_SCHEMA_CHANGES);
152153
boolean useLegacyJsonFormat = config.get(USE_LEGACY_JSON_FORMAT);
154+
boolean isAssignEndingChunkFirst =
155+
config.get(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST);
153156

154157
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
155158
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
@@ -201,7 +204,8 @@ public DataSource createDataSource(Context context) {
201204
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
202205
.parseOnLineSchemaChanges(isParsingOnLineSchemaChanges)
203206
.treatTinyInt1AsBoolean(treatTinyInt1AsBoolean)
204-
.useLegacyJsonFormat(useLegacyJsonFormat);
207+
.useLegacyJsonFormat(useLegacyJsonFormat)
208+
.assignEndingChunkFirst(isAssignEndingChunkFirst);
205209

206210
List<TableId> tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);
207211

@@ -336,6 +340,8 @@ public Set<ConfigOption<?>> optionalOptions() {
336340
options.add(INCLUDE_COMMENTS_ENABLED);
337341
options.add(USE_LEGACY_JSON_FORMAT);
338342
options.add(TREAT_TINYINT1_AS_BOOLEAN_ENABLED);
343+
options.add(PARSE_ONLINE_SCHEMA_CHANGES);
344+
options.add(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST);
339345
return options;
340346
}
341347

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,4 +313,12 @@ public class MySqlDataSourceOptions {
313313
.defaultValue(true)
314314
.withDescription(
315315
"Whether to use legacy json format. The default value is true, which means there is no whitespace before value and after comma in json format.");
316+
317+
@Experimental
318+
public static final ConfigOption<Boolean> SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST =
319+
ConfigOptions.key("scan.incremental.snapshot.assign.ending.first")
320+
.booleanType()
321+
.defaultValue(false)
322+
.withDescription(
323+
"Whether to assign the ending chunk first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk. Defaults to false.");
316324
}

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/BaseSourceConfig.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public abstract class BaseSourceConfig implements SourceConfig {
3838
protected final boolean closeIdleReaders;
3939
protected final boolean skipSnapshotBackfill;
4040
protected final boolean isScanNewlyAddedTableEnabled;
41+
protected final boolean assignEndingChunkFirst;
4142

4243
// --------------------------------------------------------------------------------------------
4344
// Debezium Configurations
@@ -56,7 +57,8 @@ public BaseSourceConfig(
5657
boolean skipSnapshotBackfill,
5758
boolean isScanNewlyAddedTableEnabled,
5859
Properties dbzProperties,
59-
Configuration dbzConfiguration) {
60+
Configuration dbzConfiguration,
61+
boolean assignEndingChunkFirst) {
6062
this.startupOptions = startupOptions;
6163
this.splitSize = splitSize;
6264
this.splitMetaGroupSize = splitMetaGroupSize;
@@ -68,6 +70,7 @@ public BaseSourceConfig(
6870
this.isScanNewlyAddedTableEnabled = isScanNewlyAddedTableEnabled;
6971
this.dbzProperties = dbzProperties;
7072
this.dbzConfiguration = dbzConfiguration;
73+
this.assignEndingChunkFirst = assignEndingChunkFirst;
7174
}
7275

7376
@Override
@@ -115,4 +118,9 @@ public Configuration getDbzConfiguration() {
115118
public boolean isSkipSnapshotBackfill() {
116119
return skipSnapshotBackfill;
117120
}
121+
122+
@Override
123+
public boolean isAssignEndingChunkFirst() {
124+
return assignEndingChunkFirst;
125+
}
118126
}

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfig.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ public JdbcSourceConfig(
7373
int connectionPoolSize,
7474
String chunkKeyColumn,
7575
boolean skipSnapshotBackfill,
76-
boolean isScanNewlyAddedTableEnabled) {
76+
boolean isScanNewlyAddedTableEnabled,
77+
boolean assignEndingChunkFirst) {
7778
super(
7879
startupOptions,
7980
splitSize,
@@ -85,7 +86,8 @@ public JdbcSourceConfig(
8586
skipSnapshotBackfill,
8687
isScanNewlyAddedTableEnabled,
8788
dbzProperties,
88-
dbzConfiguration);
89+
dbzConfiguration,
90+
assignEndingChunkFirst);
8991
this.driverClassName = driverClassName;
9092
this.hostname = hostname;
9193
this.port = port;

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfigFactory.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ public abstract class JdbcSourceConfigFactory implements Factory<JdbcSourceConfi
6060
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue();
6161
protected boolean scanNewlyAddedTableEnabled =
6262
JdbcSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue();
63+
protected boolean assignEndingChunkFirst =
64+
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST.defaultValue();
6365

6466
/** Integer port number of the database server. */
6567
public JdbcSourceConfigFactory hostname(String hostname) {
@@ -252,6 +254,14 @@ public JdbcSourceConfigFactory scanNewlyAddedTableEnabled(boolean scanNewlyAdded
252254
return this;
253255
}
254256

257+
/**
258+
* Whether to assign the ending chunk first during snapshot reading phase. Defaults to false.
259+
*/
260+
public JdbcSourceConfigFactory assignEndingChunkFirst(boolean assignEndingChunkFirst) {
261+
this.assignEndingChunkFirst = assignEndingChunkFirst;
262+
return this;
263+
}
264+
255265
@Override
256266
public abstract JdbcSourceConfig create(int subtask);
257267
}

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/SourceConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ public interface SourceConfig extends Serializable {
4040

4141
boolean isScanNewlyAddedTableEnabled();
4242

43+
boolean isAssignEndingChunkFirst();
44+
4345
/** Factory for the {@code SourceConfig}. */
4446
@FunctionalInterface
4547
interface Factory<C extends SourceConfig> extends Serializable {

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/SourceOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,4 +137,12 @@ public class SourceOptions {
137137
.defaultValue(false)
138138
.withDescription(
139139
"Whether capture the newly added tables when restoring from a savepoint/checkpoint or not, by default is false.");
140+
141+
@Experimental
142+
public static final ConfigOption<Boolean> SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST =
143+
ConfigOptions.key("scan.incremental.snapshot.assign.ending.first")
144+
.booleanType()
145+
.defaultValue(false)
146+
.withDescription(
147+
"Whether to assign the ending chunk first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk. Defaults to false.");
140148
}

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,13 @@ private List<ChunkRange> splitEvenlySizedChunks(
470470
}
471471
}
472472
// add the ending split
473-
splits.add(0, ChunkRange.of(chunkStart, null));
473+
// assign ending split first, both the largest and smallest unbounded chunks are completed
474+
// in the first two splits
475+
if (sourceConfig.isAssignEndingChunkFirst()) {
476+
splits.add(0, ChunkRange.of(chunkStart, null));
477+
} else {
478+
splits.add(ChunkRange.of(chunkStart, null));
479+
}
474480
return splits;
475481
}
476482

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/mocked/MockedSourceConfig.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ public MockedSourceConfig(
8585
connectionPoolSize,
8686
null,
8787
true,
88-
isScanNewlyAddedTableEnabled);
88+
isScanNewlyAddedTableEnabled,
89+
false);
8990
}
9091

9192
@Override

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/config/Db2SourceConfig.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ public Db2SourceConfig(
5656
int connectMaxRetries,
5757
int connectionPoolSize,
5858
String chunkKeyColumn,
59-
boolean skipSnapshotBackfill) {
59+
boolean skipSnapshotBackfill,
60+
boolean assignEndingChunkFirst) {
6061
super(
6162
startupOptions,
6263
databaseList,
@@ -82,7 +83,8 @@ public Db2SourceConfig(
8283
connectionPoolSize,
8384
chunkKeyColumn,
8485
skipSnapshotBackfill,
85-
false);
86+
false,
87+
assignEndingChunkFirst);
8688
}
8789

8890
@Override

0 commit comments

Comments
 (0)