diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt index ad1f2a6e37b..46575636788 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt @@ -4128,6 +4128,10 @@ namespace Akka.Streams.Implementation.Fusing protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { } public override string ToString() { } } + public class DownstreamCompletedWithNoCauseException : System.Exception + { + public DownstreamCompletedWithNoCauseException() { } + } [Akka.Annotations.InternalApiAttribute()] public sealed class Expand : Akka.Streams.Stage.GraphStage> { diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt index 13d5294d711..8f533b59faa 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt @@ -4102,6 +4102,10 @@ namespace Akka.Streams.Implementation.Fusing protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { } public override string ToString() { } } + public class DownstreamCompletedWithNoCauseException : System.Exception + { + public DownstreamCompletedWithNoCauseException() { } + } [Akka.Annotations.InternalApiAttribute()] public sealed class Expand : Akka.Streams.Stage.GraphStage> { diff --git a/src/core/Akka.Streams/Implementation/Fusing/DownstreamCompletedWithNoCauseException.cs b/src/core/Akka.Streams/Implementation/Fusing/DownstreamCompletedWithNoCauseException.cs new file mode 100644 index 00000000000..d0cab76e1a9 --- /dev/null +++ b/src/core/Akka.Streams/Implementation/Fusing/DownstreamCompletedWithNoCauseException.cs @@ -0,0 +1,17 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2025 Lightbend Inc. +// Copyright (C) 2013-2025 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System; + +namespace Akka.Streams.Implementation.Fusing; + +public class DownstreamCompletedWithNoCauseException: Exception +{ + public DownstreamCompletedWithNoCauseException() : base("Downstream stage/flow completed with no cause") + { + } +} \ No newline at end of file diff --git a/src/core/Akka.Streams/Stage/GraphStage.cs b/src/core/Akka.Streams/Stage/GraphStage.cs index 4325a07dc8f..a193658a3e0 100644 --- a/src/core/Akka.Streams/Stage/GraphStage.cs +++ b/src/core/Akka.Streams/Stage/GraphStage.cs @@ -1297,10 +1297,25 @@ public void InternalOnDownstreamFinish(Exception cause) { try { - if (cause == null) - throw new ArgumentException("Cancellation cause must not be null", nameof(cause)); if (_lastCancellationCause != null) throw new ArgumentException("OnDownstreamFinish must not be called recursively", nameof(cause)); + + // Some stages might propagate null exceptions due to improper Task continuation handling + // (see https://github.com/akkadotnet/Akka.Persistence.Sql/issues/498) + // This is a stop gap solution to make sure that Akka.Streams doesn't behave improperly + // until we can fix all of those + if (cause is null) + { + try + { + throw new DownstreamCompletedWithNoCauseException(); + } + catch (DownstreamCompletedWithNoCauseException e) + { + cause = e; + } + } + _lastCancellationCause = cause; CancelStage(_lastCancellationCause); }