Skip to content

Commit d926fe1

Browse files
committed
Add OnPartitionsLost handler
1 parent 27c9b10 commit d926fe1

File tree

4 files changed

+48
-16
lines changed

4 files changed

+48
-16
lines changed

src/Akka.Streams.Kafka.Tests/Integration/PlainSourceIntegrationTests.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,13 @@ public void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions,
243243
RevokeEventsCounter.IncrementAndGet();
244244
}
245245

246+
/// <inheritdoc />
247+
public void OnLost(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions,
248+
IRestrictedConsumer consumer)
249+
{
250+
RevokeEventsCounter.IncrementAndGet();
251+
}
252+
246253
/// <inheritdoc />
247254
public void OnAssign(IImmutableSet<TopicPartition> assignedTopicPartitions, IRestrictedConsumer consumer)
248255
{

src/Akka.Streams.Kafka/Helpers/PartitionEventHandler.cs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ public interface IPartitionEventHandler
2424
/// </summary>
2525
void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions, IRestrictedConsumer consumer);
2626

27+
/// <summary>
28+
/// Called when partitions are lost
29+
/// </summary>
30+
void OnLost(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions, IRestrictedConsumer consumer);
31+
2732
/// <summary>
2833
/// Called when partitions are assigned
2934
/// </summary>
@@ -50,6 +55,11 @@ public void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions,
5055
{
5156
}
5257

58+
/// <inheritdoc />
59+
public void OnLost(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions, IRestrictedConsumer consumer)
60+
{
61+
}
62+
5363
/// <inheritdoc />
5464
public void OnAssign(IImmutableSet<TopicPartition> assignedTopicPartitions, IRestrictedConsumer consumer)
5565
{
@@ -82,6 +92,12 @@ public void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions,
8292
_partitionRevokedCallback(revokedTopicPartitions);
8393
}
8494

95+
/// <inheritdoc />
96+
public void OnLost(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions, IRestrictedConsumer consumer)
97+
{
98+
OnRevoke(revokedTopicPartitions, consumer);
99+
}
100+
85101
/// <inheritdoc />
86102
public void OnAssign(IImmutableSet<TopicPartition> assignedTopicPartitions, IRestrictedConsumer consumer)
87103
{
@@ -115,6 +131,13 @@ public void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions,
115131
_handler2?.OnRevoke(revokedTopicPartitions, consumer);
116132
}
117133

134+
/// <inheritdoc />
135+
public void OnLost(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions, IRestrictedConsumer consumer)
136+
{
137+
_handler1?.OnLost(revokedTopicPartitions, consumer);
138+
_handler2?.OnLost(revokedTopicPartitions, consumer);
139+
}
140+
118141
/// <inheritdoc />
119142
public void OnAssign(IImmutableSet<TopicPartition> assignedTopicPartitions, IRestrictedConsumer consumer)
120143
{

src/Akka.Streams.Kafka/Settings/ConsumerSettings.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,7 @@ private ConsumerSettings<TKey, TValue> Copy(
394394
public Confluent.Kafka.IConsumer<TKey, TValue> CreateKafkaConsumer(Action<IConsumer<TKey, TValue>, Error> consumeErrorHandler = null,
395395
Action<IConsumer<TKey, TValue>, List<TopicPartition>> partitionAssignedHandler = null,
396396
Action<IConsumer<TKey, TValue>, List<TopicPartitionOffset>> partitionRevokedHandler = null,
397+
Action<IConsumer<TKey, TValue>, List<TopicPartitionOffset>> partitionLostHandler = null,
397398
Action<IConsumer<TKey, TValue>, string> statisticHandler = null)
398399
{
399400
RebalanceListener = new RebalanceListener<TKey, TValue>(
@@ -409,6 +410,7 @@ public Confluent.Kafka.IConsumer<TKey, TValue> CreateKafkaConsumer(Action<IConsu
409410
.SetErrorHandler((c, e) => consumeErrorHandler?.Invoke(c, e))
410411
.SetPartitionsAssignedHandler((c, partitions) => partitionAssignedHandler?.Invoke(c, partitions))
411412
.SetPartitionsRevokedHandler((c, partitions) => partitionRevokedHandler?.Invoke(c, partitions))
413+
.SetPartitionsLostHandler((c, partitions) => partitionLostHandler?.Invoke(c, partitions))
412414
.SetStatisticsHandler((c, json) => statisticHandler?.Invoke(c, json))
413415
.Build();
414416
}

src/Akka.Streams.Kafka/Stages/Consumers/Actors/KafkaConsumerActor.cs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,17 @@ private void PartitionsRevokedHandler(IImmutableSet<TopicPartitionOffset> partit
147147
_rebalanceInProgress.GetAndSet(true);
148148
}
149149

150+
// This is RebalanceListener.OnPartitionsLost on JVM
151+
private void PartitionsLostHandler(IImmutableSet<TopicPartitionOffset> partitions)
152+
{
153+
var watch = Stopwatch.StartNew();
154+
_partitionEventHandler.OnLost(partitions, _restrictedConsumer);
155+
watch.Stop();
156+
CheckDuration(watch, "onLost");
157+
158+
_commitRefreshing.Revoke(partitions.Select(tp => tp.TopicPartition).ToImmutableHashSet());
159+
}
160+
150161
private void RebalancePostStop()
151162
{
152163
var currentTopicPartitions = _consumer.Assignment.ToImmutableList();
@@ -490,17 +501,15 @@ private void Poll()
490501

491502
using (var cts = new CancellationTokenSource(_settings.PollTimeout))
492503
{
493-
var (polled, exception) = PollKafka(cts.Token);
494504
try
495505
{
506+
var polled = PollKafka(cts.Token);
496507
ProcessResult(partitionsToFetch, polled);
497508
}
498509
catch (Exception e)
499510
{
500511
ProcessExceptions(e);
501512
}
502-
503-
ProcessExceptions(exception);
504513
}
505514
}
506515

@@ -529,29 +538,20 @@ private void ProcessExceptions(Exception exception)
529538
Context.Stop(Self);
530539
}
531540

532-
private (List<ConsumeResult<K, V>>, Exception) PollKafka(CancellationToken token)
541+
private List<ConsumeResult<K, V>> PollKafka(CancellationToken token)
533542
{
534543
ConsumeResult<K, V> consumed = null;
535544
var i = 10; // 10 poll attempts
536545
var timeout = Math.Max((int) _pollTimeout.TotalMilliseconds / i, 1);
537546
var polled = new List<ConsumeResult<K, V>>();
538547
do
539548
{
540-
try
541-
{
542-
// this would return immediately if there are messages waiting inside the client queue buffer
543-
consumed = _consumer.Consume(timeout);
544-
}
545-
catch (Exception e)
546-
{
547-
return (polled, e);
548-
}
549-
if (consumed != null)
550-
polled.Add(consumed);
549+
// this would return immediately if there are messages waiting inside the client queue buffer
550+
consumed = _consumer.Consume(timeout);
551551
i--;
552552
} while (i > 0 && consumed != null && !token.IsCancellationRequested);
553553

554-
return (polled, null);
554+
return polled;
555555
}
556556

557557
private void ProcessResult(IImmutableSet<TopicPartition> partitionsToFetch, List<ConsumeResult<K,V>> rawResult)

0 commit comments

Comments
 (0)