Skip to content

Commit 73f788a

Browse files
Revert #418 (#431)
* Implement OnPartitionsLost event handler from #418 * Bump AkkaVersion to 1.5.37 (#422) (cherry picked from commit d2aa70f) * Implement dual targeting for netstandard2.0 and net6.0 (#424) (cherry picked from commit 6c895c5) * Cleanup linq by introducing Partition (#425) * Make sure that we prevent downstream to propagate null completion exception (#427) (cherry picked from commit 562212d) * Update RELEASE_NOTES.md for 1.5.37 release (#423) * Update RELEASE_NOTES.md for 1.5.37 release * Update RELEASE_NOTES.md (cherry picked from commit d6b0a42) * Bump Confluent.Kafka to 2.8.0 (#428) (cherry picked from commit 02c002e) * Make sure that restricted consumer is populated properly (#429) (cherry picked from commit 8e41f71) * Add CancellationToken support to MockCustomer (#430) (cherry picked from commit 2d11235) * Fix missing OnPartitionLost handler * Simplify exception handling --------- Co-authored-by: Aaron Stannard <aaron@petabridge.com>
1 parent d623aba commit 73f788a

3 files changed

Lines changed: 108 additions & 68 deletions

File tree

src/Akka.Streams.Kafka.Tests/Integration/PlainPartitionedSourceIntegrationTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ public async Task PlainPartitionedSource_should_not_leave_gaps_when_subsource_is
216216
.TakeWhile(m => m < totalMessages, inclusive: true)
217217
.RunWith(Sink.Last<int>(), Materializer);
218218

219-
var consumedMessages = await consumedMessagesTask.ShouldCompleteWithin(60.Seconds());
219+
var consumedMessages = await consumedMessagesTask.ShouldCompleteWithin(10.Seconds());
220220
consumedMessages.Should().Be(totalMessages);
221221
}
222222

src/Akka.Streams.Kafka/Extensions/CollectionExtensions.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public static IImmutableSet<T> ToImmutableSet<T>(this IEnumerable<T> collection)
4343
/// <summary>
4444
/// Split a list into two list depending on the boolean return value of the predicate.
4545
/// </summary>
46-
public static (IImmutableList<T> True, IImmutableList<T> False) Partition<T>(this IImmutableList<T> list, Predicate<T> predicate)
46+
public static (List<T> True, List<T> False) Partition<T>(this List<T> list, Predicate<T> predicate)
4747
{
4848
var left = new List<T>();
4949
var right = new List<T>();
@@ -55,7 +55,7 @@ public static (IImmutableList<T> True, IImmutableList<T> False) Partition<T>(thi
5555
right.Add(t);
5656
}
5757

58-
return (left.ToImmutableList(), right.ToImmutableList());
58+
return (left, right);
5959
}
6060
}
6161
}

src/Akka.Streams.Kafka/Stages/Consumers/Actors/KafkaConsumerActor.cs

Lines changed: 105 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ internal class KafkaConsumerActor<K, V> : ActorBase, ILogReceive
8282
/// While `true`, committing is delayed.
8383
/// Changed by `onPartitionsRevoked` and `onPartitionsAssigned` callbacks
8484
/// </summary>
85-
private AtomicBoolean _rebalanceInProgress = new();
85+
private bool _rebalanceInProgress = false;
8686
/// <summary>
8787
/// Keeps commit offsets during rebalances for later commit.
8888
/// </summary>
@@ -116,11 +116,42 @@ public KafkaConsumerActor(IActorRef owner, ConsumerSettings<K, V> settings, Deci
116116
}
117117

118118
#region Rebalance listener
119+
120+
internal sealed class PartitionAssigned
121+
{
122+
public PartitionAssigned(IImmutableSet<TopicPartition> partitions)
123+
{
124+
Partitions = partitions;
125+
}
126+
127+
public IImmutableSet<TopicPartition> Partitions { get; }
128+
}
129+
130+
internal sealed class PartitionRevoked
131+
{
132+
public PartitionRevoked(IImmutableSet<TopicPartitionOffset> partitions)
133+
{
134+
Partitions = partitions;
135+
}
136+
137+
public IImmutableSet<TopicPartitionOffset> Partitions { get; }
138+
}
139+
140+
internal sealed class PartitionLost
141+
{
142+
public PartitionLost(IImmutableSet<TopicPartitionOffset> partitions)
143+
{
144+
Partitions = partitions;
145+
}
146+
147+
public IImmutableSet<TopicPartitionOffset> Partitions { get; }
148+
}
149+
119150
// This is RebalanceListener.OnPartitionAssigned on JVM
120151
private void PartitionsAssignedHandler(IImmutableSet<TopicPartition> partitions)
121152
{
122153
var assignment = _consumer.Assignment;
123-
var partitionsToPause = partitions.Where(p => assignment.Contains(p)).ToImmutableList();
154+
var partitionsToPause = partitions.Where(p => assignment.Contains(p)).ToList();
124155
PausePartitions(partitionsToPause);
125156

126157
_commitRefreshing.AssignedPositions(partitions, _consumer, _settings.PositionTimeout);
@@ -130,7 +161,7 @@ private void PartitionsAssignedHandler(IImmutableSet<TopicPartition> partitions)
130161
watch.Stop();
131162
CheckDuration(watch, "onAssign");
132163

133-
_rebalanceInProgress.GetAndSet(false);
164+
_rebalanceInProgress = false;
134165
}
135166

136167
// This is RebalanceListener.OnPartitionRevoked on JVM
@@ -142,10 +173,10 @@ private void PartitionsRevokedHandler(IImmutableSet<TopicPartitionOffset> partit
142173
CheckDuration(watch, "onRevoke");
143174

144175
_commitRefreshing.Revoke(partitions.Select(tp => tp.TopicPartition).ToImmutableHashSet());
145-
_rebalanceInProgress.GetAndSet(true);
176+
_rebalanceInProgress = true;
146177
}
147178

148-
// This is RebalanceListener.OnPartitionLost on JVM
179+
// This is RebalanceListener.OnPartitionRevoked on JVM
149180
private void PartitionsLostHandler(IImmutableSet<TopicPartitionOffset> partitions)
150181
{
151182
var watch = Stopwatch.StartNew();
@@ -154,12 +185,12 @@ private void PartitionsLostHandler(IImmutableSet<TopicPartitionOffset> partition
154185
CheckDuration(watch, "onLost");
155186

156187
_commitRefreshing.Revoke(partitions.Select(tp => tp.TopicPartition).ToImmutableHashSet());
157-
_rebalanceInProgress.GetAndSet(true);
188+
_rebalanceInProgress = true;
158189
}
159-
190+
160191
private void RebalancePostStop()
161192
{
162-
var currentTopicPartitions = _consumer.Assignment.ToImmutableList();
193+
var currentTopicPartitions = _consumer.Assignment;
163194
PausePartitions(currentTopicPartitions);
164195

165196
var watch = Stopwatch.StartNew();
@@ -274,6 +305,23 @@ protected override bool Receive(object message)
274305
Sender.Tell(HandleMetadataRequest(req));
275306
return true;
276307

308+
// Rebalance callbacks
309+
case PartitionAssigned evt:
310+
PartitionsAssignedHandler(evt.Partitions);
311+
return true;
312+
313+
case PartitionRevoked evt:
314+
PartitionsRevokedHandler(evt.Partitions);
315+
return true;
316+
317+
case PartitionLost evt:
318+
PartitionsLostHandler(evt.Partitions);
319+
return true;
320+
321+
case Status.Failure fail:
322+
ProcessExceptions(fail.Cause);
323+
return true;
324+
277325
default:
278326
return false;
279327
}
@@ -305,11 +353,12 @@ private void ApplySettings(ConsumerSettings<K, V> updatedSettings)
305353
if (_log.IsDebugEnabled)
306354
_log.Debug($"Creating Kafka consumer with settings: {JsonConvert.SerializeObject(_settings)}");
307355

356+
var localSelf = Self;
308357
_consumer = _settings.CreateKafkaConsumer(
309-
consumeErrorHandler: (c, e) => ProcessExceptions(new KafkaException(e)),
310-
partitionAssignedHandler: (c, tp) => PartitionsAssignedHandler(tp.ToImmutableHashSet()),
311-
partitionRevokedHandler: (c, tp) => PartitionsRevokedHandler(tp.ToImmutableHashSet()),
312-
partitionLostHandler: (c, tp) => PartitionsLostHandler(tp.ToImmutableHashSet()),
358+
consumeErrorHandler: (c, e) => localSelf.Tell(new Status.Failure(new KafkaException(e))),
359+
partitionAssignedHandler: (c, tp) => localSelf.Tell(new PartitionAssigned(tp.ToImmutableHashSet())),
360+
partitionRevokedHandler: (c, tp) => localSelf.Tell(new PartitionRevoked(tp.ToImmutableHashSet())),
361+
partitionLostHandler: (c, tp) => localSelf.Tell(new PartitionLost(tp.ToImmutableHashSet())),
313362
statisticHandler: (c, json) => _statisticsHandler.OnStatistics(c, json));
314363

315364
var restrictedConsumerTimeoutMs = Math.Round(_settings.PartitionHandlerWarning.TotalMilliseconds * 0.95);
@@ -455,68 +504,62 @@ private void ReceivePoll(Internal.Poll<K, V> poll)
455504

456505
private void Poll()
457506
{
458-
var currentAssignment = _consumer.Assignment.ToImmutableList();
459-
var initialRebalanceInProcess = _rebalanceInProgress.Value;
460-
461-
if (_requests.IsEmpty())
507+
try
462508
{
463-
if(_log.IsDebugEnabled)
464-
_log.Debug("Requests are empty - attempting to consume.");
465-
PausePartitions(currentAssignment);
466-
try
509+
var currentAssignment = _consumer.Assignment;
510+
var initialRebalanceInProcess = _rebalanceInProgress;
511+
512+
if (_requests.IsEmpty())
467513
{
514+
if (_log.IsDebugEnabled)
515+
_log.Debug("Requests are empty - attempting to consume.");
516+
PausePartitions(currentAssignment);
468517
var consumed = _consumer.Consume(0);
469518
if (consumed != null)
470519
throw new IllegalActorStateException("Consumed message should be null");
471520
}
472-
catch (Exception e)
473-
{
474-
ProcessExceptions(e);
475-
}
476-
}
477-
else
478-
{
479-
// Seek has to be done here because they can somehow fail.
480-
// Would need to see if we can move this somewhere else
481-
// because a seek can take up to 200ms to complete
482-
foreach (var tpo in _seekedOffset.Select(kvp => kvp.Value))
521+
else
483522
{
484-
try
485-
{
486-
if(_log.IsDebugEnabled)
487-
_log.Debug("Seeking offset {0} in partition {1} for topic {2}", tpo.Offset, tpo.Partition, tpo.Topic);
488-
_consumer.Seek(tpo);
489-
}
490-
catch (Exception ex)
523+
// Seek has to be done here because they can somehow fail.
524+
// Would need to see if we can move this somewhere else
525+
// because a seek can take up to 200ms to complete
526+
foreach (var tpo in _seekedOffset.Select(kvp => kvp.Value))
491527
{
492-
_log.Error(ex, $"{tpo.TopicPartition} Failed to seek to {tpo.Offset}: {ex}");
493-
throw;
528+
try
529+
{
530+
if (_log.IsDebugEnabled)
531+
_log.Debug("Seeking offset {0} in partition {1} for topic {2}", tpo.Offset,
532+
tpo.Partition, tpo.Topic);
533+
_consumer.Seek(tpo);
534+
}
535+
catch (Exception ex)
536+
{
537+
_log.Error(ex, $"{tpo.TopicPartition} Failed to seek to {tpo.Offset}: {ex}");
538+
throw;
539+
}
494540
}
495-
}
496-
497-
// resume partitions to fetch
498-
IImmutableSet<TopicPartition> partitionsToFetch = _requests.Values.SelectMany(v => v.Topics).ToImmutableHashSet();
499-
var (resumeThese, pauseThese) = currentAssignment.Partition(partitionsToFetch.Contains);
500-
PausePartitions(pauseThese);
501-
ResumePartitions(resumeThese);
502541

503-
using (var cts = new CancellationTokenSource(_settings.PollTimeout))
504-
{
505-
var (polled, exception) = PollKafka(cts.Token);
506-
try
542+
// resume partitions to fetch
543+
IImmutableSet<TopicPartition> partitionsToFetch =
544+
_requests.Values.SelectMany(v => v.Topics).ToImmutableHashSet();
545+
var (resumeThese, pauseThese) = currentAssignment.Partition(partitionsToFetch.Contains);
546+
PausePartitions(pauseThese);
547+
ResumePartitions(resumeThese);
548+
549+
using (var cts = new CancellationTokenSource(_settings.PollTimeout))
507550
{
551+
var (polled, exception) = PollKafka(cts.Token);
508552
ProcessResult(partitionsToFetch, polled);
509-
}
510-
catch (Exception e)
511-
{
512-
ProcessExceptions(e);
553+
ProcessExceptions(exception);
513554
}
514555

515-
ProcessExceptions(exception);
556+
CheckRebalanceState(initialRebalanceInProcess);
516557
}
517558
}
518-
519-
CheckRebalanceState(initialRebalanceInProcess);
559+
catch (Exception e)
560+
{
561+
ProcessExceptions(e);
562+
}
520563

521564
if (_stopInProgress)
522565
{
@@ -661,7 +704,7 @@ private void Commit(IImmutableSet<TopicPartitionOffset> commitMap, Action<object
661704
/// </summary>
662705
private void CheckRebalanceState(bool initialRebalanceInProgress)
663706
{
664-
if (initialRebalanceInProgress && !_rebalanceInProgress.Value && _rebalanceCommitSenders.Any())
707+
if (initialRebalanceInProgress && !_rebalanceInProgress && _rebalanceCommitSenders.Any())
665708
{
666709
_log.Debug($"Comitting stash {string.Join(", ", _rebalanceCommitStash)} replying to {string.Join(", ", _rebalanceCommitSenders)}");
667710
var replyTo = _rebalanceCommitSenders;
@@ -671,7 +714,7 @@ private void CheckRebalanceState(bool initialRebalanceInProgress)
671714
}
672715
}
673716

674-
private void PausePartitions(IImmutableList<TopicPartition> partitions)
717+
private void PausePartitions(List<TopicPartition> partitions)
675718
{
676719
if (partitions.Count == 0)
677720
return;
@@ -682,17 +725,14 @@ private void PausePartitions(IImmutableList<TopicPartition> partitions)
682725
_resumedPartitions = _resumedPartitions.Except(partitions);
683726
}
684727

685-
private void ResumePartitions(IImmutableList<TopicPartition> partitions)
728+
private void ResumePartitions(List<TopicPartition> partitions)
686729
{
687730
if (partitions.Count == 0)
688731
return;
689732

690733
var partitionsToResume = partitions.Except(_resumedPartitions).ToList();
691-
if(partitionsToResume.Count == 0 && _log.IsDebugEnabled)
692-
{
693-
_log.Debug("Requested partitions already resumed. Resume request: [{0}], already resumed: [{1}]", string.Join(",", partitions), string.Join(",", _resumedPartitions));
734+
if(partitionsToResume.Count == 0)
694735
return;
695-
}
696736

697737
if(_log.IsDebugEnabled)
698738
_log.Debug("Resuming partitions [{0}]", string.Join(",", partitionsToResume));

0 commit comments

Comments
 (0)