From ffe4f023fc44b091ef42352cfe7df2c87805a645 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Tue, 16 Aug 2022 21:26:17 -0700 Subject: [PATCH] Fixing `DebeziumSource` to properly commit offsets; Tidying up --- .../apache/hudi/utilities/deltastreamer/DeltaSync.java | 1 + .../utilities/deltastreamer/HoodieDeltaStreamer.java | 2 +- .../utilities/sources/debezium/DebeziumSource.java | 8 ++++++++ .../hudi/utilities/sources/helpers/KafkaOffsetGen.java | 10 +++++++--- 4 files changed, 17 insertions(+), 4 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index c108a1d7a1d41..10b3f2ef7ea56 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -339,6 +339,7 @@ public Pair, JavaRDD> syncOnce() throws IOException metrics.updateDeltaStreamerSyncMetrics(System.currentTimeMillis()); + // TODO revisit (too early to unpersist) // Clear persistent RDDs jssc.getPersistentRDDs().values().forEach(JavaRDD::unpersist); return result; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index fe7576cc803c4..0f403cd266028 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -832,7 +832,7 @@ protected Boolean onInitializingWriteClient(SparkRDDWriteClient writeClient) { * Close all resources. */ public void close() { - if (null != deltaSync) { + if (deltaSync != null) { deltaSync.close(); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java index d9be692b5bc57..f05cd12d99d47 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java @@ -239,5 +239,13 @@ private static Dataset convertArrayColumnsToString(Dataset dataset) { return dataset; } + + @Override + public void onCommit(String lastCkptStr) { + if (this.props.getBoolean(KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.key(), + KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.defaultValue())) { + offsetGen.commitOffsetToKafka(lastCkptStr); + } + } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index cc577621fbf7c..1e78610ced0f2 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -110,11 +110,13 @@ public static OffsetRange[] computeOffsetRanges(Map fromOf Comparator byPartition = Comparator.comparing(OffsetRange::partition); // Create initial offset ranges for each 'to' partition, with from = to offsets. - OffsetRange[] ranges = new OffsetRange[toOffsetMap.size()]; - toOffsetMap.keySet().stream().map(tp -> { + OffsetRange[] ranges = toOffsetMap.keySet().stream().map(tp -> { long fromOffset = fromOffsetMap.getOrDefault(tp, 0L); return OffsetRange.create(tp, fromOffset, fromOffset); - }).sorted(byPartition).collect(Collectors.toList()).toArray(ranges); + }) + .sorted(byPartition) + .collect(Collectors.toList()) + .toArray(new OffsetRange[toOffsetMap.size()]); long allocedEvents = 0; Set exhaustedPartitions = new HashSet<>(); @@ -290,6 +292,7 @@ public OffsetRange[] getNextOffsetRanges(Option lastCheckpointStr, long numEvents = sourceLimit; } + // TODO(HUDI-4625) remove if (numEvents < toOffsets.size()) { throw new HoodieException("sourceLimit should not be less than the number of kafka partitions"); } @@ -309,6 +312,7 @@ private List fetchPartitionInfos(KafkaConsumer consumer, String t List partitionInfos; do { + // TODO(HUDI-4625) cleanup, introduce retrying client partitionInfos = consumer.partitionsFor(topicName); try { TimeUnit.SECONDS.sleep(10);