Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public boolean advance() throws IOException {
*/
while (true) {
if (curBatch.hasNext()) {
// Initalize metrics container.
// Initialize metrics container.
kafkaResults = KafkaSinkMetrics.kafkaMetrics();

PartitionState<K, V> pState = curBatch.next();
Expand Down Expand Up @@ -374,6 +374,7 @@ public boolean offsetBasedDeduplicationSupported() {
private static final Duration RECORDS_DEQUEUE_POLL_TIMEOUT_MIN = Duration.millis(1);
private static final Duration RECORDS_DEQUEUE_POLL_TIMEOUT_MAX = Duration.millis(20);
private static final Duration RECORDS_ENQUEUE_POLL_TIMEOUT = Duration.millis(100);
private static final Duration MIN_COMMIT_FAIL_LOG_INTERVAL = Duration.standardMinutes(10);

// Use a separate thread to read Kafka messages. Kafka Consumer does all its work including
// network I/O inside poll(). Polling only inside #advance(), especially with a small timeout
Expand All @@ -392,6 +393,7 @@ public boolean offsetBasedDeduplicationSupported() {
private AtomicReference<@Nullable KafkaCheckpointMark> finalizedCheckpointMark =
new AtomicReference<>();
private AtomicBoolean closed = new AtomicBoolean(false);
private Instant nextAllowedCommitFailLogTime = Instant.ofEpochMilli(0);

// Backlog support :
// Kafka consumer does not have an API to fetch latest offset for topic. We need to seekToEnd()
Expand Down Expand Up @@ -612,6 +614,7 @@ private void commitCheckpointMark() {
if (checkpointMark != null) {
LOG.debug("{}: Committing finalized checkpoint {}", this, checkpointMark);
Consumer<byte[], byte[]> consumer = Preconditions.checkStateNotNull(this.consumer);
Instant now = Instant.now();

try {
consumer.commitSync(
Expand All @@ -621,11 +624,24 @@ private void commitCheckpointMark() {
Collectors.toMap(
p -> new TopicPartition(p.getTopic(), p.getPartition()),
p -> new OffsetAndMetadata(p.getNextOffset()))));
nextAllowedCommitFailLogTime = now.plus(MIN_COMMIT_FAIL_LOG_INTERVAL);
} catch (Exception e) {
// Log but ignore the exception. Committing consumer offsets to Kafka is not critical for
// KafkaIO because it relies on the offsets stored in KafkaCheckpointMark.
LOG.warn(
String.format("%s: Could not commit finalized checkpoint %s", this, checkpointMark), e);
if (now.isAfter(nextAllowedCommitFailLogTime)) {
LOG.warn(
String.format(
"%s: Did not successfully commit finalized checkpoint for > %s. Current checkpoint: %s",
this, MIN_COMMIT_FAIL_LOG_INTERVAL, checkpointMark),
e);
nextAllowedCommitFailLogTime = now.plus(MIN_COMMIT_FAIL_LOG_INTERVAL);
} else {
LOG.info(
String.format(
"%s: Could not commit finalized checkpoint. Commit will be retried with subsequent reads. Current checkpoint: %s",
this, checkpointMark),
e);
}
}
}
}
Expand Down
Loading