Skip to content

Commit aead3e4

Browse files
Refactor SubSourceStageLogic; filter messages from revoked partitions in partitioned stream sources (#452)
* added `RequestDelayedPoll` method * moved `CloseRevokedPartitions` out of generic class * move `ISubSourceCancellationStrategy` inside `SubSourceLogic` * refactoring `SubSourceLogic` * more refactoring * make `CollectionExtensions` `internal` * finished refactoring `SubSourceLogic` * updated `KafkaConsumerActor` to support registering sub stages and check for overlaps * improve Kafka `Consumer` error handling reporting to StageActorRefs * fixed `CommittableSubSourceStage` * fixed all compilation issues * fixed logic error around `_stageActorsMap` * Ensured that flushing messages from revoked partitions works in sub-stages too * removed dumb non-optimization * fixed `AssignWithOffset` handling * added more detailed error logging * use the stored offset for this partition * attempt to fix `Seek`-handling behavior * removed unneeded partition-seeking code * made `SubSourceLogic.AddToPartitionAssignmentHandler` private
1 parent 215a232 commit aead3e4

File tree

7 files changed

+688
-432
lines changed

7 files changed

+688
-432
lines changed

src/Akka.Streams.Kafka.Tests/Integration/RebalanceIntegrationTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ public async Task FetchedRecords_must_be_removed_from_source_stage_buffer_when_p
118118
// Assert.True(control2.IsShutdown.IsCompleted);
119119
}
120120

121-
[Fact(Skip = "Filtering is not enabled on sub-sources just yet")]
121+
[Fact]
122122
public async Task FetchedRecords_must_be_removed_from_the_partitioned_source_stage_when_a_partition_is_revoked()
123123
{
124124
const int count = 20;

src/Akka.Streams.Kafka/Extensions/CollectionExtensions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
namespace Akka.Streams.Kafka.Extensions
88
{
9-
public static class CollectionExtensions
9+
internal static class CollectionExtensions
1010
{
1111
/// <summary>
1212
/// Joins elements of the collection with given separator to single string

src/Akka.Streams.Kafka/Stages/Consumers/Abstract/SubSourceLogic.cs

Lines changed: 415 additions & 271 deletions
Large diffs are not rendered by default.

src/Akka.Streams.Kafka/Stages/Consumers/Actors/KafkaConsumerActor.cs

Lines changed: 178 additions & 137 deletions
Large diffs are not rendered by default.

src/Akka.Streams.Kafka/Stages/Consumers/Actors/KafkaConsumerActorMetadata.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ public class Stop
7575

7676
private Stop() { }
7777
}
78+
79+
public sealed record RegisterSubStage(IImmutableSet<TopicPartition> TopicPartitions)
80+
: INoSerializationVerificationNeeded;
7881

7982
public sealed record Seek(IImmutableSet<TopicPartitionOffset> Offsets) : INoSerializationVerificationNeeded;
8083
/// <summary>
Lines changed: 57 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Collections.Immutable;
33
using System.Threading.Tasks;
4+
using Akka.Actor;
45
using Akka.Streams.Dsl;
56
using Akka.Streams.Kafka.Helpers;
67
using Akka.Streams.Kafka.Messages;
@@ -13,44 +14,85 @@
1314

1415
namespace Akka.Streams.Kafka.Stages.Consumers.Concrete
1516
{
16-
public class CommittableSubSourceStage<K, V> : KafkaSourceStage<K, V, (TopicPartition, Source<CommittableMessage<K, V>, NotUsed>)>
17+
public class
18+
CommittableSubSourceStage<K, V> : KafkaSourceStage<K, V, (TopicPartition,
19+
Source<CommittableMessage<K, V>, NotUsed>)>
1720
{
1821
private readonly Func<ConsumeResult<K, V>, string> _metadataFromRecord;
19-
22+
2023
/// <summary>
2124
/// Consumer settings
2225
/// </summary>
2326
public ConsumerSettings<K, V> Settings { get; }
27+
2428
/// <summary>
2529
/// Subscription
2630
/// </summary>
2731
public IAutoSubscription Subscription { get; }
2832

29-
public CommittableSubSourceStage(ConsumerSettings<K, V> settings, IAutoSubscription subscription, Func<ConsumeResult<K, V>, string>? metadataFromRecord = null)
33+
public CommittableSubSourceStage(ConsumerSettings<K, V> settings, IAutoSubscription subscription,
34+
Func<ConsumeResult<K, V>, string>? metadataFromRecord = null)
3035
: base("CommittableSubSourceStage")
3136
{
3237
Settings = settings;
3338
Subscription = subscription;
3439
_metadataFromRecord = metadataFromRecord ?? (_ => string.Empty);
40+
_subSourceStageLogicFactory = new CommittableSubSourceStageLogicFactory(Settings, _metadataFromRecord);
3541
}
3642

37-
protected override (GraphStageLogic, IControl) Logic(SourceShape<(TopicPartition, Source<CommittableMessage<K, V>, NotUsed>)> shape, Attributes inheritedAttributes)
43+
private readonly SubSourceLogic.ISubSourceStageLogicFactory<K, V, CommittableMessage<K, V>> _subSourceStageLogicFactory;
44+
45+
protected override (GraphStageLogic, IControl) Logic(
46+
SourceShape<(TopicPartition, Source<CommittableMessage<K, V>, NotUsed>)> shape,
47+
Attributes inheritedAttributes)
3848
{
39-
var logic = new SubSourceLogic<K, V, CommittableMessage<K, V>>(shape, Settings, Subscription,
40-
messageBuilderFactory: GetMessageBuilder,
41-
getOffsetsOnAssign: Option<Func<IImmutableSet<TopicPartition>, Task<IImmutableSet<TopicPartitionOffset>>>>.None,
42-
onRevoke: _ => { },
43-
attributes: inheritedAttributes);
49+
var logic = new SubSourceLogic<K, V, CommittableMessage<K, V>>(shape, Settings, Subscription,
50+
getOffsetsOnAssign:
51+
Option<Func<IImmutableSet<TopicPartition>, Task<IImmutableSet<TopicPartitionOffset>>>>.None,
52+
onRevoke: _ => { },
53+
_subSourceStageLogicFactory,
54+
attributes: inheritedAttributes);
4455
return (logic, logic.Control);
4556
}
4657

47-
/// <summary>
48-
/// Creates message builder for sub-source logic
49-
/// </summary>
50-
private CommittableSourceMessageBuilder<K, V> GetMessageBuilder(SubSourceLogic<K, V, CommittableMessage<K, V>> logic)
58+
private class
59+
CommittableSubSourceStageLogicFactory(ConsumerSettings<K, V> settings, Func<ConsumeResult<K, V>, string> metadataFromRecord)
60+
: SubSourceLogic.ISubSourceStageLogicFactory<K, V,
61+
CommittableMessage<K, V>>
62+
{
63+
64+
65+
public SubSourceStageLogic<K, V, CommittableMessage<K, V>> Create(SourceShape<CommittableMessage<K, V>> shape,
66+
TopicPartition tp, IActorRef consumerActor,
67+
Action<SubSourceLogic.SubSourceStageLogicControl> subSourceStartedCb,
68+
Action<(TopicPartition partition, SubSourceLogic.ISubSourceCancellationStrategy cancellationStrategy)>
69+
subSourceCancelledCb, int actorNumber) =>
70+
new CommittableSubSourceStageLogic<K, V>(shape, tp, consumerActor, actorNumber, GetMessageBuilder(consumerActor, settings, metadataFromRecord),
71+
subSourceStartedCb, subSourceCancelledCb);
72+
73+
/// <summary>
74+
/// Creates message builder for sub-source logic
75+
/// </summary>
76+
private CommittableSourceMessageBuilder<K, V> GetMessageBuilder(IActorRef consumerActor, ConsumerSettings<K, V> consumerSettings, Func<ConsumeResult<K, V>, string> metadataFromRecord)
77+
{
78+
var committer = new KafkaAsyncConsumerCommitter(() => consumerActor, consumerSettings.CommitTimeout);
79+
return new CommittableSourceMessageBuilder<K, V>(committer, consumerSettings.GroupId,metadataFromRecord);
80+
}
81+
}
82+
}
83+
84+
internal sealed class CommittableSubSourceStageLogic<K, V> : SubSourceStageLogic<K, V, CommittableMessage<K, V>>
85+
{
86+
public CommittableSubSourceStageLogic(
87+
SourceShape<CommittableMessage<K, V>> shape,
88+
TopicPartition topicPartition,
89+
IActorRef consumerActor, int actorNumber,
90+
IMessageBuilder<K, V, CommittableMessage<K, V>> messageBuilder,
91+
Action<SubSourceLogic.SubSourceStageLogicControl> subSourceStartedCallback,
92+
Action<(TopicPartition, SubSourceLogic.ISubSourceCancellationStrategy)> subSourceCancelledCallback)
93+
: base(shape, topicPartition, consumerActor, actorNumber, messageBuilder, subSourceStartedCallback,
94+
subSourceCancelledCallback)
5195
{
52-
var committer = new KafkaAsyncConsumerCommitter(() => logic.ConsumerActor, Settings.CommitTimeout);
53-
return new CommittableSourceMessageBuilder<K, V>(committer, Settings.GroupId, _metadataFromRecord);
5496
}
5597
}
5698
}

src/Akka.Streams.Kafka/Stages/Consumers/Concrete/PlainSubSourceStage.cs

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
using System;
22
using System.Collections.Immutable;
33
using System.Threading.Tasks;
4-
using Akka.Event;
4+
using Akka.Actor;
55
using Akka.Streams.Dsl;
66
using Akka.Streams.Kafka.Dsl;
77
using Akka.Streams.Kafka.Helpers;
88
using Akka.Streams.Kafka.Settings;
99
using Akka.Streams.Kafka.Stages.Consumers.Abstract;
1010
using Akka.Streams.Stage;
11-
using Akka.Streams.Util;
1211
using Akka.Util;
1312
using Confluent.Kafka;
1413

@@ -38,9 +37,9 @@ public class PlainSubSourceStage<K, V> : KafkaSourceStage<K, V, (TopicPartition,
3837
/// </summary>
3938
public Action<IImmutableSet<TopicPartition>> OnRevoke { get; }
4039

41-
/// <summary>
42-
/// PlainSubSourceStage
43-
/// </summary>
40+
private readonly SubSourceLogic.ISubSourceStageLogicFactory<K, V, ConsumeResult<K, V>>
41+
_subSourceStageLogicFactory;
42+
4443
public PlainSubSourceStage(ConsumerSettings<K, V> settings, IAutoSubscription subscription,
4544
Option<Func<IImmutableSet<TopicPartition>, Task<IImmutableSet<TopicPartitionOffset>>>> getOffsetsOnAssign,
4645
Action<IImmutableSet<TopicPartition>> onRevoke)
@@ -50,19 +49,46 @@ public PlainSubSourceStage(ConsumerSettings<K, V> settings, IAutoSubscription su
5049
Subscription = subscription;
5150
GetOffsetsOnAssign = getOffsetsOnAssign;
5251
OnRevoke = onRevoke;
52+
_subSourceStageLogicFactory = new PlainSubSourceStageLogicFactory();
5353
}
5454

55-
/// <inheritdoc />
55+
private class
56+
PlainSubSourceStageLogicFactory : SubSourceLogic.ISubSourceStageLogicFactory<K, V, ConsumeResult<K, V>>
57+
{
58+
public SubSourceStageLogic<K, V, ConsumeResult<K, V>> Create(SourceShape<ConsumeResult<K, V>> shape,
59+
TopicPartition tp, IActorRef consumerActor,
60+
Action<SubSourceLogic.SubSourceStageLogicControl> subSourceStartedCb,
61+
Action<(TopicPartition partition, SubSourceLogic.ISubSourceCancellationStrategy cancellationStrategy)>
62+
subSourceCancelledCb, int actorNumber) =>
63+
new PlainSubSourceStageLogic<K, V>(shape, tp, consumerActor, actorNumber,
64+
new PlainMessageBuilder<K, V>(), subSourceStartedCb, subSourceCancelledCb);
65+
}
66+
5667
protected override (GraphStageLogic, IControl) Logic(SourceShape<(TopicPartition, Source<ConsumeResult<K, V>, NotUsed>)> shape,
5768
Attributes inheritedAttributes)
5869
{
5970
var logic = new SubSourceLogic<K, V, ConsumeResult<K, V>>(shape, Settings, Subscription,
60-
messageBuilderFactory: _ => new PlainMessageBuilder<K, V>(),
6171
getOffsetsOnAssign: GetOffsetsOnAssign,
6272
onRevoke: OnRevoke,
73+
_subSourceStageLogicFactory,
6374
attributes: inheritedAttributes);
6475

6576
return (logic, logic.Control);
6677
}
6778
}
79+
80+
internal sealed class PlainSubSourceStageLogic<K, V> : SubSourceStageLogic<K, V, ConsumeResult<K, V>>
81+
{
82+
public PlainSubSourceStageLogic(
83+
SourceShape<ConsumeResult<K, V>> shape,
84+
TopicPartition topicPartition,
85+
IActorRef consumerActor, int actorNumber,
86+
IMessageBuilder<K, V, ConsumeResult<K, V>> messageBuilder,
87+
Action<SubSourceLogic.SubSourceStageLogicControl> subSourceStartedCallback,
88+
Action<(TopicPartition, SubSourceLogic.ISubSourceCancellationStrategy)> subSourceCancelledCallback)
89+
: base(shape, topicPartition, consumerActor, actorNumber, messageBuilder, subSourceStartedCallback,
90+
subSourceCancelledCallback)
91+
{
92+
}
93+
}
6894
}

0 commit comments

Comments
 (0)