Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public async Task PlainPartitionedSource_should_not_leave_gaps_when_subsource_is
.TakeWhile(m => m < totalMessages, inclusive: true)
.RunWith(Sink.Last<int>(), Materializer);

var consumedMessages = await consumedMessagesTask.ShouldCompleteWithin(10.Seconds());
var consumedMessages = await consumedMessagesTask.ShouldCompleteWithin(60.Seconds());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

consumedMessages.Should().Be(totalMessages);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,13 @@ public void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions,
RevokeEventsCounter.IncrementAndGet();
}

/// <inheritdoc />
public void OnLost(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions,
IRestrictedConsumer consumer)
{
RevokeEventsCounter.IncrementAndGet();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

}

/// <inheritdoc />
public void OnAssign(IImmutableSet<TopicPartition> assignedTopicPartitions, IRestrictedConsumer consumer)
{
Expand Down
28 changes: 27 additions & 1 deletion src/Akka.Streams.Kafka/Helpers/PartitionEventHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ public interface IPartitionEventHandler
/// </summary>
void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions, IRestrictedConsumer consumer);

/// <summary>
/// Called when partitions are lost
/// </summary>
void OnLost(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions, IRestrictedConsumer consumer);

/// <summary>
/// Called when partitions are assigned
/// </summary>
Expand All @@ -50,6 +55,11 @@ public void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions,
{
}

/// <inheritdoc />
public void OnLost(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions, IRestrictedConsumer consumer)
{
}

/// <inheritdoc />
public void OnAssign(IImmutableSet<TopicPartition> assignedTopicPartitions, IRestrictedConsumer consumer)
{
Expand All @@ -68,12 +78,15 @@ internal class AsyncCallbacks : IPartitionEventHandler
{
private readonly Action<IImmutableSet<TopicPartition>> _partitionAssignedCallback;
private readonly Action<IImmutableSet<TopicPartitionOffset>> _partitionRevokedCallback;
private readonly Action<IImmutableSet<TopicPartitionOffset>> _partitionLostCallback;

public AsyncCallbacks(Action<IImmutableSet<TopicPartition>> partitionAssignedCallback,
Action<IImmutableSet<TopicPartitionOffset>> partitionRevokedCallback)
Action<IImmutableSet<TopicPartitionOffset>> partitionRevokedCallback,
Action<IImmutableSet<TopicPartitionOffset>> partitionLostCallback)
{
_partitionAssignedCallback = partitionAssignedCallback;
_partitionRevokedCallback = partitionRevokedCallback;
_partitionLostCallback = partitionLostCallback;
}

/// <inheritdoc />
Expand All @@ -82,6 +95,12 @@ public void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions,
_partitionRevokedCallback(revokedTopicPartitions);
}

/// <inheritdoc />
public void OnLost(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions, IRestrictedConsumer consumer)
{
_partitionLostCallback(revokedTopicPartitions);
}

/// <inheritdoc />
public void OnAssign(IImmutableSet<TopicPartition> assignedTopicPartitions, IRestrictedConsumer consumer)
{
Expand Down Expand Up @@ -115,6 +134,13 @@ public void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions,
_handler2?.OnRevoke(revokedTopicPartitions, consumer);
}

/// <inheritdoc />
public void OnLost(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions, IRestrictedConsumer consumer)
{
_handler1?.OnLost(revokedTopicPartitions, consumer);
_handler2?.OnLost(revokedTopicPartitions, consumer);
}

/// <inheritdoc />
public void OnAssign(IImmutableSet<TopicPartition> assignedTopicPartitions, IRestrictedConsumer consumer)
{
Expand Down
12 changes: 10 additions & 2 deletions src/Akka.Streams.Kafka/Settings/ConsumerSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -394,11 +394,13 @@ private ConsumerSettings<TKey, TValue> Copy(
public Confluent.Kafka.IConsumer<TKey, TValue> CreateKafkaConsumer(Action<IConsumer<TKey, TValue>, Error> consumeErrorHandler = null,
Action<IConsumer<TKey, TValue>, List<TopicPartition>> partitionAssignedHandler = null,
Action<IConsumer<TKey, TValue>, List<TopicPartitionOffset>> partitionRevokedHandler = null,
Action<IConsumer<TKey, TValue>, List<TopicPartitionOffset>> partitionLostHandler = null,
Action<IConsumer<TKey, TValue>, string> statisticHandler = null)
{
RebalanceListener = new RebalanceListener<TKey, TValue>(
onPartitionAssigned: partitionAssignedHandler,
onPartitionRevoked: partitionRevokedHandler);
onPartitionRevoked: partitionRevokedHandler,
onPartitionLost: partitionLostHandler);

if (this.ConsumerFactory != null)
return this.ConsumerFactory(this);
Expand All @@ -409,20 +411,26 @@ public Confluent.Kafka.IConsumer<TKey, TValue> CreateKafkaConsumer(Action<IConsu
.SetErrorHandler((c, e) => consumeErrorHandler?.Invoke(c, e))
.SetPartitionsAssignedHandler((c, partitions) => partitionAssignedHandler?.Invoke(c, partitions))
.SetPartitionsRevokedHandler((c, partitions) => partitionRevokedHandler?.Invoke(c, partitions))
.SetPartitionsLostHandler((c, partitions) => partitionLostHandler?.Invoke(c, partitions))
.SetStatisticsHandler((c, json) => statisticHandler?.Invoke(c, json))
.Build();
}
}

internal sealed class RebalanceListener<TKey, TValue>
{
public RebalanceListener(Action<IConsumer<TKey, TValue>, List<TopicPartition>> onPartitionAssigned, Action<IConsumer<TKey, TValue>, List<TopicPartitionOffset>> onPartitionRevoked)
public RebalanceListener(
Action<IConsumer<TKey, TValue>, List<TopicPartition>> onPartitionAssigned,
Action<IConsumer<TKey, TValue>, List<TopicPartitionOffset>> onPartitionRevoked,
Action<IConsumer<TKey, TValue>, List<TopicPartitionOffset>> onPartitionLost)
{
OnPartitionAssigned = onPartitionAssigned;
OnPartitionRevoked = onPartitionRevoked;
OnPartitionLost = onPartitionLost;
}

public Action<IConsumer<TKey, TValue>, List<TopicPartition>> OnPartitionAssigned { get; }
public Action<IConsumer<TKey, TValue>, List<TopicPartitionOffset>> OnPartitionRevoked { get; }
public Action<IConsumer<TKey, TValue>, List<TopicPartitionOffset>> OnPartitionLost { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ protected override IActorRef CreateConsumerActor()
{
var partitionsAssignedHandler = GetAsyncCallback<IEnumerable<TopicPartition>>(PartitionsAssigned);
var partitionsRevokedHandler = GetAsyncCallback<IEnumerable<TopicPartitionOffset>>(PartitionsRevoked);
var partitionsLostHandler = GetAsyncCallback<IEnumerable<TopicPartitionOffset>>(PartitionsLost);

IPartitionEventHandler internalHandler = new PartitionEventHandlers.AsyncCallbacks(partitionsAssignedHandler, partitionsRevokedHandler);
IPartitionEventHandler internalHandler = new PartitionEventHandlers.AsyncCallbacks(partitionsAssignedHandler, partitionsRevokedHandler, partitionsLostHandler);

// If custom partition events handler specified - add it to the chain
var eventHandler = _subscription is IAutoSubscription autoSubscription && autoSubscription.PartitionEventsHandler.HasValue
Expand Down Expand Up @@ -145,5 +146,11 @@ private void PartitionsRevoked(IEnumerable<TopicPartitionOffset> partitions)
TopicPartitions = TopicPartitions.Except(partitions.Select(tpo => tpo.TopicPartition));
Log.Debug("Partitions were revoked");
}

private void PartitionsLost(IEnumerable<TopicPartitionOffset> partitions)
{
TopicPartitions = TopicPartitions.Except(partitions.Select(tpo => tpo.TopicPartition));
Log.Debug("Partitions were lost");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ private class CloseRevokedPartitions { }
private readonly Action<IImmutableSet<TopicPartition>> _partitionAssignedCallback;
private readonly Action<IImmutableSet<TopicPartition>> _updatePendingPartitionsAndEmitSubSourcesCallback;
private readonly Action<IImmutableSet<TopicPartitionOffset>> _partitionRevokedCallback;
private readonly Action<IImmutableSet<TopicPartitionOffset>> _partitionLostCallback;
private readonly Action<(TopicPartition, ISubSourceCancellationStrategy)> _subsourceCancelledCallback;
private readonly Action<(TopicPartition, IControl)> _subsourceStartedCallback;
private readonly Action<(IImmutableSet<TopicPartition>, IImmutableSet<TopicPartitionOffset>)> _offsetsFromExternalResponseCb;
Expand Down Expand Up @@ -115,6 +116,7 @@ public SubSourceLogic(SourceShape<(TopicPartition, Source<TMessage, NotUsed>)> s
_updatePendingPartitionsAndEmitSubSourcesCallback = GetAsyncCallback<IImmutableSet<TopicPartition>>(UpdatePendingPartitionsAndEmitSubSources);
_partitionAssignedCallback = GetAsyncCallback<IImmutableSet<TopicPartition>>(HandlePartitionsAssigned);
_partitionRevokedCallback = GetAsyncCallback<IImmutableSet<TopicPartitionOffset>>(HandlePartitionsRevoked);
_partitionLostCallback = GetAsyncCallback<IImmutableSet<TopicPartitionOffset>>(HandlePartitionsLost);
_stageFailCallback = GetAsyncCallback<ConsumerFailed>(FailStage);
_subsourceCancelledCallback = GetAsyncCallback<(TopicPartition, ISubSourceCancellationStrategy)>(HandleSubsourceCancelled);
_subsourceStartedCallback = GetAsyncCallback<(TopicPartition, IControl)>(HandleSubsourceStarted);
Expand Down Expand Up @@ -144,7 +146,7 @@ public override void PreStart()
if (!(Materializer is ActorMaterializer actorMaterializer))
throw new ArgumentException($"Expected {typeof(ActorMaterializer)} but got {Materializer.GetType()}");

var eventHandler = new PartitionEventHandlers.AsyncCallbacks(_partitionAssignedCallback, _partitionRevokedCallback);
var eventHandler = new PartitionEventHandlers.AsyncCallbacks(_partitionAssignedCallback, _partitionRevokedCallback, _partitionLostCallback);

var statisticsHandler = _subscription.StatisticsHandler.HasValue
? _subscription.StatisticsHandler.Value
Expand Down Expand Up @@ -275,6 +277,13 @@ private void HandlePartitionsRevoked(IImmutableSet<TopicPartitionOffset> revoked
ScheduleOnce(new CloseRevokedPartitions(), _settings.WaitClosePartition);
}

private void HandlePartitionsLost(IImmutableSet<TopicPartitionOffset> revoked)
{
_partitionsToRevoke = _partitionsToRevoke.Union(revoked.Select(r => r.TopicPartition));

ScheduleOnce(new CloseRevokedPartitions(), _settings.WaitClosePartition);
}

private void HandleSubsourceCancelled((TopicPartition, ISubSourceCancellationStrategy) obj)
{
var (topicPartition, cancellationStrategy) = obj;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,21 +144,24 @@ protected override IPartitionEventHandler AddToPartitionAssignmentHandler(IParti
{
var blockingRevokedCall = new PartitionEventHandlers.AsyncCallbacks(
partitionAssignedCallback: _ => { },
partitionRevokedCallback: revokedTopicPartitions =>
{
var topicPartitions = revokedTopicPartitions.Select(tp => tp.TopicPartition).ToImmutableHashSet();
if (WaitForDraining(topicPartitions))
{
SourceActor.Ref.Tell(new KafkaConsumerActorMetadata.Internal.Revoked(topicPartitions));
}
else
{
SourceActor.Ref.Tell(new Status.Failure(new Exception("Timeout while drailing")));
ConsumerActor.Tell(KafkaConsumerActorMetadata.Internal.Stop.Instance);
}
});
partitionRevokedCallback: OnRevoke,
partitionLostCallback: OnRevoke);

return new PartitionEventHandlers.Chain(handler, blockingRevokedCall);

void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions)
{
var topicPartitions = revokedTopicPartitions.Select(tp => tp.TopicPartition).ToImmutableHashSet();
if (WaitForDraining(topicPartitions))
{
SourceActor.Ref.Tell(new KafkaConsumerActorMetadata.Internal.Revoked(topicPartitions));
}
else
{
SourceActor.Ref.Tell(new Status.Failure(new Exception("Timeout while drailing")));
ConsumerActor.Tell(KafkaConsumerActorMetadata.Internal.Stop.Instance);
}
}
}

private bool WaitForDraining(IImmutableSet<TopicPartition> partitions)
Expand Down
Loading