Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ public Pair<Option<String>, JavaRDD<WriteStatus>> syncOnce() throws IOException

metrics.updateDeltaStreamerSyncMetrics(System.currentTimeMillis());

// TODO revisit (too early to unpersist)
// Clear persistent RDDs
jssc.getPersistentRDDs().values().forEach(JavaRDD::unpersist);
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,7 @@ protected Boolean onInitializingWriteClient(SparkRDDWriteClient writeClient) {
* Close all resources.
*/
public void close() {
if (null != deltaSync) {
if (deltaSync != null) {
deltaSync.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,5 +239,13 @@ private static Dataset<Row> convertArrayColumnsToString(Dataset<Row> dataset) {

return dataset;
}

@Override
public void onCommit(String lastCkptStr) {
if (this.props.getBoolean(KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.key(),
KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.defaultValue())) {
offsetGen.commitOffsetToKafka(lastCkptStr);
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,13 @@ public static OffsetRange[] computeOffsetRanges(Map<TopicPartition, Long> fromOf
Comparator<OffsetRange> byPartition = Comparator.comparing(OffsetRange::partition);

// Create initial offset ranges for each 'to' partition, with from = to offsets.
OffsetRange[] ranges = new OffsetRange[toOffsetMap.size()];
toOffsetMap.keySet().stream().map(tp -> {
OffsetRange[] ranges = toOffsetMap.keySet().stream().map(tp -> {
long fromOffset = fromOffsetMap.getOrDefault(tp, 0L);
return OffsetRange.create(tp, fromOffset, fromOffset);
}).sorted(byPartition).collect(Collectors.toList()).toArray(ranges);
})
.sorted(byPartition)
.collect(Collectors.toList())
.toArray(new OffsetRange[toOffsetMap.size()]);

long allocedEvents = 0;
Set<Integer> exhaustedPartitions = new HashSet<>();
Expand Down Expand Up @@ -290,6 +292,7 @@ public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long
numEvents = sourceLimit;
}

// TODO(HUDI-4625) remove
if (numEvents < toOffsets.size()) {
throw new HoodieException("sourceLimit should not be less than the number of kafka partitions");
}
Expand All @@ -309,6 +312,7 @@ private List<PartitionInfo> fetchPartitionInfos(KafkaConsumer consumer, String t

List<PartitionInfo> partitionInfos;
do {
// TODO(HUDI-4625) cleanup, introduce retrying client
partitionInfos = consumer.partitionsFor(topicName);
try {
TimeUnit.SECONDS.sleep(10);
Expand Down