Skip to content

Commit 36d5eaf

Browse files
[WIP] rebalance integration specs (#443)
* Make partitions-per-topic configurable * WIP: rebalance integration specs * Fixed partition assigments * finished first pass of spec * increase number of restart attempts * hardened specs * improving traceability * Trying to add better name prefixes * Harden spec * shorten test * Fix unexpected records bug --------- Co-authored-by: Gregorius Soedharmo <[email protected]>
1 parent 7fcdeb3 commit 36d5eaf

File tree

7 files changed

+175
-44
lines changed

7 files changed

+175
-44
lines changed

src/Akka.Streams.Kafka.Tests/Integration/CommittableSourceIntegrationTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ await Source
4242

4343
var probe = KafkaConsumer
4444
.CommittableSource(consumerSettings, Subscriptions.Assignment(topicPartition1))
45-
.Select(c => c.Record.Value)
45+
.Select(c => c.Record.Message.Value)
4646
.RunWith(this.SinkProbe<string>(), Materializer);
4747

4848
probe.Request(elementsCount);

src/Akka.Streams.Kafka.Tests/Integration/PlainSinkIntegrationTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public async Task PlainSink_should_fail_stage_if_broker_unavailable()
7575
{
7676
var topic1 = CreateTopic(1);
7777

78-
await GivenInitializedTopic(topic1);
78+
await GivenInitializedTopicAsync(topic1);
7979

8080
var config = ProducerSettings<Null, string>.Create(Sys, null, null)
8181
.WithBootstrapServers("localhost:10092");
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
using System;
2+
using System.Linq;
3+
using System.Runtime.ExceptionServices;
4+
using System.Threading.Tasks;
5+
using Akka.Actor;
6+
using Akka.Streams.Dsl;
7+
using Akka.Streams.Kafka.Dsl;
8+
using Akka.Streams.Kafka.Helpers;
9+
using Akka.Streams.Kafka.Messages;
10+
using Akka.Streams.Kafka.Settings;
11+
using Confluent.Kafka;
12+
using FluentAssertions.Extensions;
13+
using Xunit;
14+
using Xunit.Abstractions;
15+
16+
namespace Akka.Streams.Kafka.Tests.Integration;
17+
18+
public class RebalanceIntegrationTests : KafkaIntegrationTests
19+
{
20+
public RebalanceIntegrationTests(ITestOutputHelper output, KafkaFixture fixture)
21+
: base(nameof(AtMostOnceSourceIntegrationTests), output, fixture)
22+
{
23+
}
24+
25+
private static Source<CommittableMessage<Null, string>, IControl> GetConsumer(
26+
ConsumerSettings<Null, string> settings,
27+
string topic)
28+
{
29+
return KafkaConsumer.CommittableSource(settings, Subscriptions.Topics(topic));
30+
}
31+
32+
private sealed class StreamCompleted
33+
{
34+
public static StreamCompleted Instance { get; } = new();
35+
36+
private StreamCompleted()
37+
{
38+
}
39+
}
40+
41+
private sealed record StreamFailed(Exception Ex);
42+
43+
private IKillSwitch CreateKillableStream(string topic, ConsumerSettings<Null, string> settings, IActorRef sinkRef, string consumerName)
44+
{
45+
var source = GetConsumer(settings, topic);
46+
var killSwitch = source
47+
.WithAttributes(Attributes.CreateName(consumerName))
48+
.ViaMaterialized(KillSwitches.Single<CommittableMessage<Null, string>>(), Keep.Right)
49+
.SelectAsync(1, async msg =>
50+
{
51+
// commit the offset
52+
await msg.CommitableOffset.Commit();
53+
return msg.Record;
54+
})
55+
.WithAttributes(Attributes.CreateName($"selectAsync-{consumerName}"))
56+
.ToMaterialized(
57+
Sink.ActorRef<ConsumeResult<Null, string>>(sinkRef, StreamCompleted.Instance,
58+
exception => new StreamFailed(exception)), Keep.Left)
59+
.Run(Materializer.WithNamePrefix(consumerName));
60+
61+
return killSwitch;
62+
}
63+
64+
/// <summary>
65+
/// Reproduction spec for https://github.com/akkadotnet/Akka.Streams.Kafka/issues/415
66+
/// </summary>
67+
[Fact]
68+
public async Task ShouldReBalanceWithoutArgumentExceptions()
69+
{
70+
// arrange
71+
var topic = CreateTopic(1);
72+
var group = CreateGroup(1);
73+
const int partitions = 10;
74+
const int totalMessages = 100;
75+
76+
// initialize the topic with 10 partitions
77+
await GivenInitializedTopicAsync(topic, partitions);
78+
79+
var settings = CreateConsumerSettings<Null, string>(group);
80+
81+
// Produce some messages
82+
await ProduceStrings(topic, Enumerable.Range(0, 10), ProducerSettings);
83+
84+
// Spin up 3 consumers
85+
var probe1 = CreateTestProbe();
86+
87+
var killSwitch1 = CreateKillableStream(topic, settings, probe1.Ref, "consumer1");
88+
var killSwitch2 = CreateKillableStream(topic, settings, probe1.Ref, "consumer2");
89+
var killSwitch3 = CreateKillableStream(topic, settings, probe1.Ref, "consumer3");
90+
91+
// make sure all 10 messages got processed
92+
await foreach (var msg in probe1.ReceiveNAsync(10))
93+
{
94+
if(msg is StreamFailed failed)
95+
throw failed.Ex;
96+
}
97+
98+
// act
99+
100+
// per https://github.com/akkadotnet/Akka.Streams.Kafka/issues/415 - it might take many restart attempts to reproduce
101+
const int restartAttempts = 100;
102+
for (var i = 0; i < restartAttempts; i++)
103+
{
104+
killSwitch1 = await KillAndRelaunchFirstConsumer(killSwitch1, i);
105+
}
106+
107+
return;
108+
109+
async Task<IKillSwitch> KillAndRelaunchFirstConsumer(IKillSwitch ks, int attemptCount)
110+
{
111+
Sys.Log.Info("Restarting consumer, attempt {0}", attemptCount);
112+
113+
// kill the first consumer
114+
ks.Shutdown();
115+
await probe1.FishForMessageAsync(o => o is StreamCompleted);
116+
117+
// produce more messages
118+
_ = ProduceStrings(topic, Enumerable.Range(10, 30), ProducerSettings); // let it run as a detatched task
119+
120+
// relaunch the first consumer
121+
var newKs = CreateKillableStream(topic, settings, probe1.Ref, $"consumer1-{attemptCount}");
122+
123+
await foreach (var msg in probe1.ReceiveNAsync(30, 30.Seconds()))
124+
{
125+
if(msg is StreamFailed failed)
126+
ExceptionDispatchInfo.Throw(failed.Ex);
127+
}
128+
129+
return newKs;
130+
}
131+
}
132+
}

src/Akka.Streams.Kafka.Tests/KafkaIntegrationTests.cs

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -118,24 +118,7 @@ protected TResult AssertTaskCompletesWithin<TResult>(TimeSpan timeout, Task<TRes
118118
return task.Result;
119119
}
120120

121-
protected async Task GivenInitializedTopic(string topic)
122-
{
123-
var builder = new AdminClientBuilder(new AdminClientConfig
124-
{
125-
BootstrapServers = Fixture.KafkaServer
126-
});
127-
using (var client = builder.Build())
128-
{
129-
await client.CreateTopicsAsync(new[] {new TopicSpecification
130-
{
131-
Name = topic,
132-
NumPartitions = KafkaFixture.KafkaPartitions,
133-
ReplicationFactor = KafkaFixture.KafkaReplicationFactor
134-
}});
135-
}
136-
}
137-
138-
protected async Task GivenInitializedTopicAsync(TopicPartition topicPartition, int partitions = KafkaFixture.KafkaPartitions)
121+
protected async Task GivenInitializedTopicAsync(string topic, int partitions = KafkaFixture.KafkaPartitions)
139122
{
140123
var builder = new AdminClientBuilder(new AdminClientConfig
141124
{
@@ -146,19 +129,24 @@ protected async Task GivenInitializedTopicAsync(TopicPartition topicPartition, i
146129
await client.CreateTopicsAsync([
147130
new TopicSpecification
148131
{
149-
Name = topicPartition.Topic,
132+
Name = topic,
150133
NumPartitions = partitions,
151134
ReplicationFactor = KafkaFixture.KafkaReplicationFactor
152135
}
153136
]);
154137
}
155138
}
156139

140+
protected Task GivenInitializedTopicAsync(TopicPartition topicPartition, int partitions = KafkaFixture.KafkaPartitions)
141+
{
142+
return GivenInitializedTopicAsync(topicPartition.Topic, partitions);
143+
}
144+
157145
protected (IControl, TestSubscriber.Probe<TValue>) CreateExternalPlainSourceProbe<TValue>(IActorRef consumer, IManualSubscription sub)
158146
{
159147
return KafkaConsumer
160148
.PlainExternalSource<Null, TValue>(consumer, sub, true)
161-
.Select(c => c.Value)
149+
.Select(c => c.Message.Value)
162150
.ToMaterialized(this.SinkProbe<TValue>(), Keep.Both)
163151
.Run(Materializer);
164152
}

src/Akka.Streams.Kafka/Stages/Consumers/Abstract/BaseSingleSourceLogic.cs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,11 @@
22
using System.Collections.Concurrent;
33
using System.Collections.Immutable;
44
using System.Linq;
5-
using System.Runtime.Serialization;
6-
using System.Threading.Tasks;
75
using Akka.Actor;
8-
using Akka.Dispatch;
96
using Akka.Streams.Kafka.Helpers;
107
using Akka.Streams.Kafka.Settings;
118
using Akka.Streams.Kafka.Stages.Consumers.Actors;
12-
using Akka.Streams.Kafka.Stages.Consumers.Exceptions;
13-
using Akka.Streams.Kafka.Supervision;
149
using Akka.Streams.Stage;
15-
using Akka.Streams.Supervision;
16-
using Akka.Streams.Util;
1710
using Confluent.Kafka;
1811
using Decider = Akka.Streams.Supervision.Decider;
1912
using Directive = Akka.Streams.Supervision.Directive;
@@ -190,7 +183,7 @@ protected void RequestMessages()
190183
_requested = true;
191184
_requestId += 1;
192185
if (Log.IsDebugEnabled)
193-
Log.Debug("Requesting messages, requestId: {0}, partitions: {1}", _requestId, string.Join(", ", TopicPartitions));
186+
Log.Debug("[{0}] Requesting messages, requestId: {1}, partitions: {2}", ConsumerActor.Path.Name, _requestId, string.Join(", ", TopicPartitions));
194187
ConsumerActor.Tell(new KafkaConsumerActorMetadata.Internal.RequestMessages(_requestId, TopicPartitions.ToImmutableHashSet()), SourceActor.Ref);
195188
}
196189

src/Akka.Streams.Kafka/Stages/Consumers/Abstract/SingleSourceStageLogic.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,20 +137,20 @@ protected virtual void StopConsumerActor()
137137
private void PartitionsAssigned(IEnumerable<TopicPartition> partitions)
138138
{
139139
TopicPartitions = TopicPartitions.Union(partitions);
140-
Log.Debug($"Partitions were assigned: {string.Join(", ", TopicPartitions)}");
140+
Log.Debug("[{0}] Partitions were assigned: {1}", ConsumerActor.Path.Name, string.Join(", ", partitions));
141141
RequestMessages();
142142
}
143143

144144
private void PartitionsRevoked(IEnumerable<TopicPartitionOffset> partitions)
145145
{
146146
TopicPartitions = TopicPartitions.Except(partitions.Select(tpo => tpo.TopicPartition));
147-
Log.Debug("Partitions were revoked");
147+
Log.Debug("[{0}] Partitions were revoked: {1}", ConsumerActor.Path.Name, string.Join(", ", partitions));
148148
}
149149

150150
private void PartitionsLost(IEnumerable<TopicPartitionOffset> partitions)
151151
{
152152
TopicPartitions = TopicPartitions.Except(partitions.Select(tpo => tpo.TopicPartition));
153-
Log.Debug("Partitions were lost");
153+
Log.Debug("[{0}] Partitions were lost: {1}", ConsumerActor.Path.Name, string.Join(", ", partitions));
154154
}
155155
}
156156
}

src/Akka.Streams.Kafka/Stages/Consumers/Actors/KafkaConsumerActor.cs

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ internal class KafkaConsumerActor<K, V> : ActorBase, ILogReceive, IWithTimers
9292
/// </summary>
9393
private IImmutableList<IActorRef> _rebalanceCommitSenders = ImmutableArray<IActorRef>.Empty;
9494

95+
private ImmutableList<TopicPartition> _pausedPartitions = ImmutableList<TopicPartition>.Empty;
96+
9597
/// <summary>
9698
/// KafkaConsumerActor
9799
/// </summary>
@@ -121,9 +123,8 @@ public KafkaConsumerActor(IActorRef owner, ConsumerSettings<K, V> settings, Deci
121123
// This is RebalanceListener.OnPartitionAssigned on JVM
122124
private void PartitionsAssignedHandler(IImmutableSet<TopicPartition> partitions)
123125
{
124-
var assignment = _consumer.Assignment;
125-
var partitionsToPause = partitions.Where(p => assignment.Contains(p)).ToImmutableList();
126-
PausePartitions(partitionsToPause);
126+
_log.Debug($"Partitions were assigned: {string.Join(", ", partitions)}");
127+
_pausedPartitions = partitions.ToImmutableList();
127128

128129
_commitRefreshing.AssignedPositions(partitions, _consumer, _settings.PositionTimeout);
129130

@@ -138,6 +139,8 @@ private void PartitionsAssignedHandler(IImmutableSet<TopicPartition> partitions)
138139
// This is RebalanceListener.OnPartitionRevoked on JVM
139140
private void PartitionsRevokedHandler(IImmutableSet<TopicPartitionOffset> partitions)
140141
{
142+
_log.Debug($"Partitions were revoked: {string.Join(", ", partitions)}");
143+
141144
var watch = Stopwatch.StartNew();
142145
_partitionEventHandler.OnRevoke(partitions, _restrictedConsumer);
143146
watch.Stop();
@@ -150,6 +153,8 @@ private void PartitionsRevokedHandler(IImmutableSet<TopicPartitionOffset> partit
150153
// This is RebalanceListener.OnPartitionLost on JVM
151154
private void PartitionsLostHandler(IImmutableSet<TopicPartitionOffset> partitions)
152155
{
156+
_log.Debug($"Partitions were lost: {string.Join(", ", partitions)}");
157+
153158
var watch = Stopwatch.StartNew();
154159
_partitionEventHandler.OnLost(partitions, _restrictedConsumer);
155160
watch.Stop();
@@ -225,6 +230,8 @@ protected override bool Receive(object message)
225230
return true;
226231

227232
case KafkaConsumerActorMetadata.Internal.RequestMessages requestMessages:
233+
_log.Debug("Messages was requested, RequestId: {0}, Partitions: {1}", requestMessages.RequestId, string.Join(", ", requestMessages.Topics));
234+
228235
Context.Watch(Sender);
229236
CheckOverlappingRequests("RequestMessages", Sender, requestMessages.Topics);
230237
_requests = _requests.SetItem(Sender, requestMessages);
@@ -460,7 +467,11 @@ private void Poll()
460467
var currentAssignment = _consumer.Assignment.ToImmutableList();
461468
var initialRebalanceInProcess = _rebalanceInProgress.Value;
462469

463-
if (_requests.IsEmpty())
470+
var partitionsToFetch = _requests.Values.SelectMany(v => v.Topics)
471+
.Where(p => currentAssignment.Contains(p))
472+
.ToImmutableHashSet();
473+
474+
if (partitionsToFetch.IsEmpty || _requests.IsEmpty())
464475
{
465476
if(_log.IsDebugEnabled)
466477
_log.Debug("Requests are empty - attempting to consume.");
@@ -470,6 +481,8 @@ private void Poll()
470481
var consumed = _consumer.Consume(0);
471482
if (consumed != null)
472483
throw new IllegalActorStateException("Consumed message should be null");
484+
PausePartitions(_pausedPartitions);
485+
_pausedPartitions = ImmutableList<TopicPartition>.Empty;
473486
}
474487
catch (Exception e)
475488
{
@@ -497,7 +510,6 @@ private void Poll()
497510
}
498511

499512
// resume partitions to fetch
500-
IImmutableSet<TopicPartition> partitionsToFetch = _requests.Values.SelectMany(v => v.Topics).ToImmutableHashSet();
501513
var (resumeThese, pauseThese) = currentAssignment.Partition(partitionsToFetch.Contains);
502514
PausePartitions(pauseThese); // SHOULD PAUSE ANY PARTITIONS THAT HAVE BEEN ASSIGNED BUT ARE NOT REQUESTED
503515
ResumePartitions(resumeThese);
@@ -555,25 +567,31 @@ private void ProcessExceptions(Exception exception)
555567
{
556568
// this would return immediately if there are messages waiting inside the client queue buffer
557569
consumed = _consumer.Consume(timeout);
570+
if (consumed is null)
571+
{
572+
PausePartitions(_pausedPartitions);
573+
_pausedPartitions = ImmutableList<TopicPartition>.Empty;
574+
return (polled, null);
575+
}
576+
polled.Add(consumed);
577+
i--;
558578
}
559579
catch (Exception e)
560580
{
561581
return (polled, e);
562582
}
563-
if (consumed != null)
564-
polled.Add(consumed);
565-
i--;
566-
} while (i > 0 && consumed != null && !token.IsCancellationRequested);
583+
} while (i > 0 && !token.IsCancellationRequested);
567584

568585
return (polled, null);
569586
}
570587

571588
private void ProcessResult(IImmutableSet<TopicPartition> partitionsToFetch, List<ConsumeResult<K,V>> rawResult)
572589
{
573-
if(_log.IsDebugEnabled)
574-
_log.Debug("Processing poll result with {0} records", rawResult.Count);
575590
if(rawResult.IsEmpty())
576591
return;
592+
593+
if(_log.IsDebugEnabled)
594+
_log.Debug("Processing poll result with {0} records", rawResult.Count);
577595

578596
var fetchedTps = rawResult.Select(m => m.TopicPartition).ToImmutableSet();
579597
if (!fetchedTps.Except(partitionsToFetch).IsEmpty())

0 commit comments

Comments
 (0)