Skip to content

Commit 92c4600

Browse files
authored
Refactor KafkaConsumerActor event handling from Mailbox to direct method call (#418)
* Refactor KafkaConsumerActor event handling from Mailbox to direct method call * Cleanup unused codes * short-circuit pause and resume * Make everything immutable * Add OnPartitionsLost handler * Revert "Add OnPartitionsLost handler" This reverts commit d926fe1. * Add OnPartitionsLost handler * Optimize ResumePartitions
1 parent fecaddb commit 92c4600

File tree

8 files changed

+119
-69
lines changed

8 files changed

+119
-69
lines changed

src/Akka.Streams.Kafka.Tests/Integration/PlainPartitionedSourceIntegrationTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ public async Task PlainPartitionedSource_should_not_leave_gaps_when_subsource_is
216216
.TakeWhile(m => m < totalMessages, inclusive: true)
217217
.RunWith(Sink.Last<int>(), Materializer);
218218

219-
var consumedMessages = await consumedMessagesTask.ShouldCompleteWithin(10.Seconds());
219+
var consumedMessages = await consumedMessagesTask.ShouldCompleteWithin(60.Seconds());
220220
consumedMessages.Should().Be(totalMessages);
221221
}
222222

src/Akka.Streams.Kafka.Tests/Integration/PlainSourceIntegrationTests.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,13 @@ public void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions,
243243
RevokeEventsCounter.IncrementAndGet();
244244
}
245245

246+
/// <inheritdoc />
247+
public void OnLost(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions,
248+
IRestrictedConsumer consumer)
249+
{
250+
RevokeEventsCounter.IncrementAndGet();
251+
}
252+
246253
/// <inheritdoc />
247254
public void OnAssign(IImmutableSet<TopicPartition> assignedTopicPartitions, IRestrictedConsumer consumer)
248255
{

src/Akka.Streams.Kafka/Helpers/PartitionEventHandler.cs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ public interface IPartitionEventHandler
2424
/// </summary>
2525
void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions, IRestrictedConsumer consumer);
2626

27+
/// <summary>
28+
/// Called when partitions are lost
29+
/// </summary>
30+
void OnLost(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions, IRestrictedConsumer consumer);
31+
2732
/// <summary>
2833
/// Called when partitions are assigned
2934
/// </summary>
@@ -50,6 +55,11 @@ public void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions,
5055
{
5156
}
5257

58+
/// <inheritdoc />
59+
public void OnLost(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions, IRestrictedConsumer consumer)
60+
{
61+
}
62+
5363
/// <inheritdoc />
5464
public void OnAssign(IImmutableSet<TopicPartition> assignedTopicPartitions, IRestrictedConsumer consumer)
5565
{
@@ -68,12 +78,15 @@ internal class AsyncCallbacks : IPartitionEventHandler
6878
{
6979
private readonly Action<IImmutableSet<TopicPartition>> _partitionAssignedCallback;
7080
private readonly Action<IImmutableSet<TopicPartitionOffset>> _partitionRevokedCallback;
81+
private readonly Action<IImmutableSet<TopicPartitionOffset>> _partitionLostCallback;
7182

7283
public AsyncCallbacks(Action<IImmutableSet<TopicPartition>> partitionAssignedCallback,
73-
Action<IImmutableSet<TopicPartitionOffset>> partitionRevokedCallback)
84+
Action<IImmutableSet<TopicPartitionOffset>> partitionRevokedCallback,
85+
Action<IImmutableSet<TopicPartitionOffset>> partitionLostCallback)
7486
{
7587
_partitionAssignedCallback = partitionAssignedCallback;
7688
_partitionRevokedCallback = partitionRevokedCallback;
89+
_partitionLostCallback = partitionLostCallback;
7790
}
7891

7992
/// <inheritdoc />
@@ -82,6 +95,12 @@ public void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions,
8295
_partitionRevokedCallback(revokedTopicPartitions);
8396
}
8497

98+
/// <inheritdoc />
99+
public void OnLost(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions, IRestrictedConsumer consumer)
100+
{
101+
_partitionLostCallback(revokedTopicPartitions);
102+
}
103+
85104
/// <inheritdoc />
86105
public void OnAssign(IImmutableSet<TopicPartition> assignedTopicPartitions, IRestrictedConsumer consumer)
87106
{
@@ -115,6 +134,13 @@ public void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions,
115134
_handler2?.OnRevoke(revokedTopicPartitions, consumer);
116135
}
117136

137+
/// <inheritdoc />
138+
public void OnLost(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions, IRestrictedConsumer consumer)
139+
{
140+
_handler1?.OnLost(revokedTopicPartitions, consumer);
141+
_handler2?.OnLost(revokedTopicPartitions, consumer);
142+
}
143+
118144
/// <inheritdoc />
119145
public void OnAssign(IImmutableSet<TopicPartition> assignedTopicPartitions, IRestrictedConsumer consumer)
120146
{

src/Akka.Streams.Kafka/Settings/ConsumerSettings.cs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -394,11 +394,13 @@ private ConsumerSettings<TKey, TValue> Copy(
394394
public Confluent.Kafka.IConsumer<TKey, TValue> CreateKafkaConsumer(Action<IConsumer<TKey, TValue>, Error> consumeErrorHandler = null,
395395
Action<IConsumer<TKey, TValue>, List<TopicPartition>> partitionAssignedHandler = null,
396396
Action<IConsumer<TKey, TValue>, List<TopicPartitionOffset>> partitionRevokedHandler = null,
397+
Action<IConsumer<TKey, TValue>, List<TopicPartitionOffset>> partitionLostHandler = null,
397398
Action<IConsumer<TKey, TValue>, string> statisticHandler = null)
398399
{
399400
RebalanceListener = new RebalanceListener<TKey, TValue>(
400401
onPartitionAssigned: partitionAssignedHandler,
401-
onPartitionRevoked: partitionRevokedHandler);
402+
onPartitionRevoked: partitionRevokedHandler,
403+
onPartitionLost: partitionLostHandler);
402404

403405
if (this.ConsumerFactory != null)
404406
return this.ConsumerFactory(this);
@@ -409,20 +411,26 @@ public Confluent.Kafka.IConsumer<TKey, TValue> CreateKafkaConsumer(Action<IConsu
409411
.SetErrorHandler((c, e) => consumeErrorHandler?.Invoke(c, e))
410412
.SetPartitionsAssignedHandler((c, partitions) => partitionAssignedHandler?.Invoke(c, partitions))
411413
.SetPartitionsRevokedHandler((c, partitions) => partitionRevokedHandler?.Invoke(c, partitions))
414+
.SetPartitionsLostHandler((c, partitions) => partitionLostHandler?.Invoke(c, partitions))
412415
.SetStatisticsHandler((c, json) => statisticHandler?.Invoke(c, json))
413416
.Build();
414417
}
415418
}
416419

417420
internal sealed class RebalanceListener<TKey, TValue>
418421
{
419-
public RebalanceListener(Action<IConsumer<TKey, TValue>, List<TopicPartition>> onPartitionAssigned, Action<IConsumer<TKey, TValue>, List<TopicPartitionOffset>> onPartitionRevoked)
422+
public RebalanceListener(
423+
Action<IConsumer<TKey, TValue>, List<TopicPartition>> onPartitionAssigned,
424+
Action<IConsumer<TKey, TValue>, List<TopicPartitionOffset>> onPartitionRevoked,
425+
Action<IConsumer<TKey, TValue>, List<TopicPartitionOffset>> onPartitionLost)
420426
{
421427
OnPartitionAssigned = onPartitionAssigned;
422428
OnPartitionRevoked = onPartitionRevoked;
429+
OnPartitionLost = onPartitionLost;
423430
}
424431

425432
public Action<IConsumer<TKey, TValue>, List<TopicPartition>> OnPartitionAssigned { get; }
426433
public Action<IConsumer<TKey, TValue>, List<TopicPartitionOffset>> OnPartitionRevoked { get; }
434+
public Action<IConsumer<TKey, TValue>, List<TopicPartitionOffset>> OnPartitionLost { get; }
427435
}
428436
}

src/Akka.Streams.Kafka/Stages/Consumers/Abstract/SingleSourceStageLogic.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,9 @@ protected override IActorRef CreateConsumerActor()
5858
{
5959
var partitionsAssignedHandler = GetAsyncCallback<IEnumerable<TopicPartition>>(PartitionsAssigned);
6060
var partitionsRevokedHandler = GetAsyncCallback<IEnumerable<TopicPartitionOffset>>(PartitionsRevoked);
61+
var partitionsLostHandler = GetAsyncCallback<IEnumerable<TopicPartitionOffset>>(PartitionsLost);
6162

62-
IPartitionEventHandler internalHandler = new PartitionEventHandlers.AsyncCallbacks(partitionsAssignedHandler, partitionsRevokedHandler);
63+
IPartitionEventHandler internalHandler = new PartitionEventHandlers.AsyncCallbacks(partitionsAssignedHandler, partitionsRevokedHandler, partitionsLostHandler);
6364

6465
// If custom partition events handler specified - add it to the chain
6566
var eventHandler = _subscription is IAutoSubscription autoSubscription && autoSubscription.PartitionEventsHandler.HasValue
@@ -145,5 +146,11 @@ private void PartitionsRevoked(IEnumerable<TopicPartitionOffset> partitions)
145146
TopicPartitions = TopicPartitions.Except(partitions.Select(tpo => tpo.TopicPartition));
146147
Log.Debug("Partitions were revoked");
147148
}
149+
150+
private void PartitionsLost(IEnumerable<TopicPartitionOffset> partitions)
151+
{
152+
TopicPartitions = TopicPartitions.Except(partitions.Select(tpo => tpo.TopicPartition));
153+
Log.Debug("Partitions were lost");
154+
}
148155
}
149156
}

src/Akka.Streams.Kafka/Stages/Consumers/Abstract/SubSourceLogic.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ private class CloseRevokedPartitions { }
6363
private readonly Action<IImmutableSet<TopicPartition>> _partitionAssignedCallback;
6464
private readonly Action<IImmutableSet<TopicPartition>> _updatePendingPartitionsAndEmitSubSourcesCallback;
6565
private readonly Action<IImmutableSet<TopicPartitionOffset>> _partitionRevokedCallback;
66+
private readonly Action<IImmutableSet<TopicPartitionOffset>> _partitionLostCallback;
6667
private readonly Action<(TopicPartition, ISubSourceCancellationStrategy)> _subsourceCancelledCallback;
6768
private readonly Action<(TopicPartition, IControl)> _subsourceStartedCallback;
6869
private readonly Action<(IImmutableSet<TopicPartition>, IImmutableSet<TopicPartitionOffset>)> _offsetsFromExternalResponseCb;
@@ -115,6 +116,7 @@ public SubSourceLogic(SourceShape<(TopicPartition, Source<TMessage, NotUsed>)> s
115116
_updatePendingPartitionsAndEmitSubSourcesCallback = GetAsyncCallback<IImmutableSet<TopicPartition>>(UpdatePendingPartitionsAndEmitSubSources);
116117
_partitionAssignedCallback = GetAsyncCallback<IImmutableSet<TopicPartition>>(HandlePartitionsAssigned);
117118
_partitionRevokedCallback = GetAsyncCallback<IImmutableSet<TopicPartitionOffset>>(HandlePartitionsRevoked);
119+
_partitionLostCallback = GetAsyncCallback<IImmutableSet<TopicPartitionOffset>>(HandlePartitionsLost);
118120
_stageFailCallback = GetAsyncCallback<ConsumerFailed>(FailStage);
119121
_subsourceCancelledCallback = GetAsyncCallback<(TopicPartition, ISubSourceCancellationStrategy)>(HandleSubsourceCancelled);
120122
_subsourceStartedCallback = GetAsyncCallback<(TopicPartition, IControl)>(HandleSubsourceStarted);
@@ -144,7 +146,7 @@ public override void PreStart()
144146
if (!(Materializer is ActorMaterializer actorMaterializer))
145147
throw new ArgumentException($"Expected {typeof(ActorMaterializer)} but got {Materializer.GetType()}");
146148

147-
var eventHandler = new PartitionEventHandlers.AsyncCallbacks(_partitionAssignedCallback, _partitionRevokedCallback);
149+
var eventHandler = new PartitionEventHandlers.AsyncCallbacks(_partitionAssignedCallback, _partitionRevokedCallback, _partitionLostCallback);
148150

149151
var statisticsHandler = _subscription.StatisticsHandler.HasValue
150152
? _subscription.StatisticsHandler.Value
@@ -275,6 +277,13 @@ private void HandlePartitionsRevoked(IImmutableSet<TopicPartitionOffset> revoked
275277
ScheduleOnce(new CloseRevokedPartitions(), _settings.WaitClosePartition);
276278
}
277279

280+
private void HandlePartitionsLost(IImmutableSet<TopicPartitionOffset> revoked)
281+
{
282+
_partitionsToRevoke = _partitionsToRevoke.Union(revoked.Select(r => r.TopicPartition));
283+
284+
ScheduleOnce(new CloseRevokedPartitions(), _settings.WaitClosePartition);
285+
}
286+
278287
private void HandleSubsourceCancelled((TopicPartition, ISubSourceCancellationStrategy) obj)
279288
{
280289
var (topicPartition, cancellationStrategy) = obj;

src/Akka.Streams.Kafka/Stages/Consumers/Abstract/TransactionalSourceLogic.cs

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -144,21 +144,24 @@ protected override IPartitionEventHandler AddToPartitionAssignmentHandler(IParti
144144
{
145145
var blockingRevokedCall = new PartitionEventHandlers.AsyncCallbacks(
146146
partitionAssignedCallback: _ => { },
147-
partitionRevokedCallback: revokedTopicPartitions =>
148-
{
149-
var topicPartitions = revokedTopicPartitions.Select(tp => tp.TopicPartition).ToImmutableHashSet();
150-
if (WaitForDraining(topicPartitions))
151-
{
152-
SourceActor.Ref.Tell(new KafkaConsumerActorMetadata.Internal.Revoked(topicPartitions));
153-
}
154-
else
155-
{
156-
SourceActor.Ref.Tell(new Status.Failure(new Exception("Timeout while drailing")));
157-
ConsumerActor.Tell(KafkaConsumerActorMetadata.Internal.Stop.Instance);
158-
}
159-
});
147+
partitionRevokedCallback: OnRevoke,
148+
partitionLostCallback: OnRevoke);
160149

161150
return new PartitionEventHandlers.Chain(handler, blockingRevokedCall);
151+
152+
void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions)
153+
{
154+
var topicPartitions = revokedTopicPartitions.Select(tp => tp.TopicPartition).ToImmutableHashSet();
155+
if (WaitForDraining(topicPartitions))
156+
{
157+
SourceActor.Ref.Tell(new KafkaConsumerActorMetadata.Internal.Revoked(topicPartitions));
158+
}
159+
else
160+
{
161+
SourceActor.Ref.Tell(new Status.Failure(new Exception("Timeout while drailing")));
162+
ConsumerActor.Tell(KafkaConsumerActorMetadata.Internal.Stop.Instance);
163+
}
164+
}
162165
}
163166

164167
private bool WaitForDraining(IImmutableSet<TopicPartition> partitions)

0 commit comments

Comments
 (0)