Skip to content

Commit 833889f

Browse files
vbureninVolodymyr Bureninyihua
authored andcommitted
[HUDI-4873] Report number of messages to be processed via metrics (apache#6271)
Co-authored-by: Volodymyr Burenin <volodymyr.burenin@cloudkitchens.com> Co-authored-by: Y Ethan Guo <ethan.guoyihua@gmail.com> (cherry picked from commit 5cbd1d8)
1 parent 4b8c161 commit 833889f

2 files changed

Lines changed: 8 additions & 0 deletions

File tree

hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,12 @@ public void updateDeltaStreamerSyncMetrics(long syncEpochTimeInMs) {
101101
}
102102
}
103103

104+
public void updateDeltaStreamerKafkaMessageInCount(long totalNewMsgCount) {
105+
if (config.isMetricsOn()) {
106+
Metrics.registerGauge(getMetricsName("deltastreamer", "kafkaMessageInCount"), totalNewMsgCount);
107+
}
108+
}
109+
104110
public long getDurationInMs(long ctxDuration) {
105111
return ctxDuration / 1000000;
106112
}

hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,10 @@ protected InputBatch<JavaRDD<T>> fetchNewData(Option<String> lastCheckpointStr,
5757
long totalNewMsgs = KafkaOffsetGen.CheckpointUtils.totalNewMessages(offsetRanges);
5858
LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
5959
if (totalNewMsgs <= 0) {
60+
metrics.updateDeltaStreamerKafkaMessageInCount(0);
6061
return new InputBatch<>(Option.empty(), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
6162
}
63+
metrics.updateDeltaStreamerKafkaMessageInCount(totalNewMsgs);
6264
JavaRDD<T> newDataRDD = toRDD(offsetRanges);
6365
return new InputBatch<>(Option.of(newDataRDD), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
6466
} catch (org.apache.kafka.common.errors.TimeoutException e) {

0 commit comments

Comments
 (0)