Skip to content

Commit e46df7c

Browse files
Alexey Kudinkinfengjian
authored andcommitted
[Stacked on 6386] Fixing DebeziumSource to properly commit offsets; (apache#6416)
1 parent 047b33a commit e46df7c

4 files changed

Lines changed: 17 additions & 4 deletions

File tree

hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,7 @@ public Pair<Option<String>, JavaRDD<WriteStatus>> syncOnce() throws IOException
340340

341341
metrics.updateDeltaStreamerSyncMetrics(System.currentTimeMillis());
342342

343+
// TODO revisit (too early to unpersist)
343344
// Clear persistent RDDs
344345
jssc.getPersistentRDDs().values().forEach(JavaRDD::unpersist);
345346
return result;

hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -832,7 +832,7 @@ protected Boolean onInitializingWriteClient(SparkRDDWriteClient writeClient) {
832832
* Close all resources.
833833
*/
834834
public void close() {
835-
if (null != deltaSync) {
835+
if (deltaSync != null) {
836836
deltaSync.close();
837837
}
838838
}

hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,5 +239,13 @@ private static Dataset<Row> convertArrayColumnsToString(Dataset<Row> dataset) {
239239

240240
return dataset;
241241
}
242+
243+
@Override
244+
public void onCommit(String lastCkptStr) {
245+
if (this.props.getBoolean(KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.key(),
246+
KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.defaultValue())) {
247+
offsetGen.commitOffsetToKafka(lastCkptStr);
248+
}
249+
}
242250
}
243251

hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,11 +110,13 @@ public static OffsetRange[] computeOffsetRanges(Map<TopicPartition, Long> fromOf
110110
Comparator<OffsetRange> byPartition = Comparator.comparing(OffsetRange::partition);
111111

112112
// Create initial offset ranges for each 'to' partition, with from = to offsets.
113-
OffsetRange[] ranges = new OffsetRange[toOffsetMap.size()];
114-
toOffsetMap.keySet().stream().map(tp -> {
113+
OffsetRange[] ranges = toOffsetMap.keySet().stream().map(tp -> {
115114
long fromOffset = fromOffsetMap.getOrDefault(tp, 0L);
116115
return OffsetRange.create(tp, fromOffset, fromOffset);
117-
}).sorted(byPartition).collect(Collectors.toList()).toArray(ranges);
116+
})
117+
.sorted(byPartition)
118+
.collect(Collectors.toList())
119+
.toArray(new OffsetRange[toOffsetMap.size()]);
118120

119121
long allocedEvents = 0;
120122
Set<Integer> exhaustedPartitions = new HashSet<>();
@@ -290,6 +292,7 @@ public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long
290292
numEvents = sourceLimit;
291293
}
292294

295+
// TODO(HUDI-4625) remove
293296
if (numEvents < toOffsets.size()) {
294297
throw new HoodieException("sourceLimit should not be less than the number of kafka partitions");
295298
}
@@ -309,6 +312,7 @@ private List<PartitionInfo> fetchPartitionInfos(KafkaConsumer consumer, String t
309312

310313
List<PartitionInfo> partitionInfos;
311314
do {
315+
// TODO(HUDI-4625) cleanup, introduce retrying client
312316
partitionInfos = consumer.partitionsFor(topicName);
313317
try {
314318
TimeUnit.SECONDS.sleep(10);

0 commit comments

Comments
 (0)