|
8 | 8 | using System.Collections.Generic; |
9 | 9 | using System.Collections.Immutable; |
10 | 10 | using System.Linq; |
| 11 | +using System.Threading; |
11 | 12 | using System.Threading.Tasks; |
12 | 13 | using Akka.Actor; |
13 | 14 | using Akka.Streams.Dsl; |
@@ -140,6 +141,21 @@ private readonly Option<Func<IImmutableSet<TopicPartition>, Task<IImmutableSet<T |
140 | 141 | /// </summary> |
141 | 142 | private IImmutableSet<TopicPartition> _partitionsToRevoke = ImmutableHashSet<TopicPartition>.Empty; |
142 | 143 |
|
| 144 | + /// <summary> |
| 145 | + /// Cancellation token source for shutting down seek operations when stage is stopping |
| 146 | + /// </summary> |
| 147 | + private readonly CancellationTokenSource _shutdownTokenSource = new(); |
| 148 | + |
| 149 | + /// <summary> |
| 150 | + /// Track when the stage started to detect race conditions |
| 151 | + /// </summary> |
| 152 | + private readonly DateTime _startTime = DateTime.UtcNow; |
| 153 | + |
| 154 | + /// <summary> |
| 155 | + /// Track if seek operation is in progress to detect race conditions |
| 156 | + /// </summary> |
| 157 | + private volatile bool _seekInProgress = false; |
| 158 | + |
143 | 159 | protected StageActor SourceActor { get; private set; } = null!; |
144 | 160 | public IActorRef ConsumerActor { get; private set; } = null!; |
145 | 161 |
|
@@ -295,9 +311,15 @@ public override void PreStart() |
295 | 311 |
|
296 | 312 | public override void PostStop() |
297 | 313 | { |
| 314 | + // Cancel any ongoing seek operations |
| 315 | + _shutdownTokenSource.Cancel(); |
| 316 | + |
298 | 317 | ConsumerActor.Tell(new KafkaConsumerActorMetadata.Internal.StopFromStage(_shape.ToString()), SourceActor.Ref); |
299 | 318 |
|
300 | 319 | Control.OnShutdown(); |
| 320 | + |
| 321 | + // Clean up cancellation token |
| 322 | + _shutdownTokenSource.Dispose(); |
301 | 323 |
|
302 | 324 | base.PostStop(); |
303 | 325 | } |
@@ -379,17 +401,58 @@ private void SeekAndEmitSubSources(IImmutableSet<TopicPartition> formerlyUnknown |
379 | 401 |
|
380 | 402 | async Task AskToSeekOffsets() |
381 | 403 | { |
| 404 | + _seekInProgress = true; |
| 405 | + |
| 406 | + if (Log.IsDebugEnabled) |
| 407 | + Log.Debug("#{0} Starting seek operation for partitions: {1}", _actorNumber, offsets.JoinToString(", ")); |
| 408 | + |
382 | 409 | try |
383 | 410 | { |
384 | 411 | await ConsumerActor.Ask(new KafkaConsumerActorMetadata.Internal.Seek(offsets), |
385 | | - TimeSpan.FromSeconds(10)); |
| 412 | + TimeSpan.FromSeconds(10), _shutdownTokenSource.Token); |
| 413 | + |
| 414 | + if (Log.IsDebugEnabled) |
| 415 | + Log.Debug("#{0} Seek operation completed successfully", _actorNumber); |
| 416 | + |
386 | 417 | _updatePendingPartitionsAndEmitSubSourcesCallback(formerlyUnknown); |
387 | 418 | } |
388 | | - catch (Exception) |
| 419 | + catch (OperationCanceledException) when (_shutdownTokenSource.IsCancellationRequested) |
389 | 420 | { |
390 | | - // only exceptions that can be thrown here are related to TCS cancellation / timeout |
391 | | - _stageFailCallback(new ConsumerFailed( |
392 | | - $"{_actorNumber} Consumer failed during seek, Ask timed out. Partitions: {offsets.JoinToString(", ")}")); |
| 421 | + // Stage is shutting down, this is expected - don't treat as fatal error |
| 422 | + if (Log.IsDebugEnabled) |
| 423 | + Log.Debug("#{0} Seek operation cancelled due to stage shutdown (expected). Partitions: {1}", |
| 424 | + _actorNumber, offsets.JoinToString(", ")); |
| 425 | + } |
| 426 | + catch (Exception ex) |
| 427 | + { |
| 428 | + var timeSinceStart = DateTime.UtcNow.Subtract(_startTime); |
| 429 | + |
| 430 | + // Other exceptions (timeout, etc.) - check if we're shutting down before failing |
| 431 | + if (_shutdownTokenSource.IsCancellationRequested) |
| 432 | + { |
| 433 | + // Stage is shutting down, timeout is expected |
| 434 | + if (Log.IsDebugEnabled) |
| 435 | + Log.Debug("#{0} Seek operation timed out during stage shutdown (expected). Partitions: {1}", |
| 436 | + _actorNumber, offsets.JoinToString(", ")); |
| 437 | + } |
| 438 | + else |
| 439 | + { |
| 440 | + // Check for race condition pattern - very fast failure |
| 441 | + if (timeSinceStart.TotalMilliseconds < 100) |
| 442 | + { |
| 443 | + Log.Warning("#{0} Seek operation failed very quickly ({1}ms after start) - possible race condition. " + |
| 444 | + "Error: {2}, Partitions: {3}", |
| 445 | + _actorNumber, timeSinceStart.TotalMilliseconds, ex.Message, offsets.JoinToString(", ")); |
| 446 | + } |
| 447 | + |
| 448 | + // Unexpected timeout while stage is still active |
| 449 | + _stageFailCallback(new ConsumerFailed( |
| 450 | + $"{_actorNumber} Consumer failed during seek, Ask timed out. Partitions: {offsets.JoinToString(", ")}", ex)); |
| 451 | + } |
| 452 | + } |
| 453 | + finally |
| 454 | + { |
| 455 | + _seekInProgress = false; |
393 | 456 | } |
394 | 457 | } |
395 | 458 | } |
@@ -497,9 +560,28 @@ private void PerformStop() |
497 | 560 |
|
498 | 561 | private void PerformShutdown() |
499 | 562 | { |
| 563 | + var timeSinceStart = DateTime.UtcNow.Subtract(_startTime); |
| 564 | + |
| 565 | + // Only log if this looks like the race condition (very fast shutdown with seek in progress) |
| 566 | + if (timeSinceStart.TotalMilliseconds < 100 && _seekInProgress) |
| 567 | + { |
| 568 | + Log.Warning("#{0} Potential race condition detected: PerformShutdown called {1}ms after start " + |
| 569 | + "with seek operation still in progress. Pending: {2}, InStartup: {3}, SubSources: {4}", |
| 570 | + _actorNumber, timeSinceStart.TotalMilliseconds, _pendingPartitions.Count, |
| 571 | + _partitionsInStartup.Count, _subSources.Count); |
| 572 | + } |
| 573 | + else if (Log.IsDebugEnabled) |
| 574 | + { |
| 575 | + Log.Debug("#{0} PerformShutdown called normally after {1}ms. SeekInProgress: {2}", |
| 576 | + _actorNumber, timeSinceStart.TotalMilliseconds, _seekInProgress); |
| 577 | + } |
| 578 | + |
500 | 579 | Log.Info("Completing. Partitions [{0}], StageActor {1}", string.Join(", ", _subSources.Keys), SourceActor.Ref); |
501 | 580 | SetKeepGoing(true); |
502 | 581 |
|
| 582 | + // Cancel any ongoing seek operations immediately to avoid timeouts |
| 583 | + _shutdownTokenSource.Cancel(); |
| 584 | + |
503 | 585 | // TODO from alpakka: we should wait for subsources to be shutdown and next shutdown main stage |
504 | 586 | _subSources.Values.Select(c => c.ControlAndStageActor.Control).ForEach(control => control.Shutdown()); |
505 | 587 |
|
|
0 commit comments