Skip to content

Commit 562212d

Browse files
authored
Make sure that we prevent downstream to propagate null completion exception (#427)
1 parent 36643b2 commit 562212d

File tree

2 files changed

+32
-4
lines changed

2 files changed

+32
-4
lines changed
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
using System;
2+
3+
namespace Akka.Streams.Kafka.Stages;
4+
5+
public class DownstreamFinishedWithNoCauseException: Exception
6+
{
7+
public override string Message => "Downstream flow or stage completed with no cause";
8+
}

src/Akka.Streams.Kafka/Stages/Producers/DefaultProducerStage.cs

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,10 +153,30 @@ public DefaultProducerStageLogic(IProducerStage<K, V, P, TIn, TOut> stage, Attri
153153
CheckForCompletion();
154154
});
155155

156-
SetHandler(_stage.Out, onPull: () =>
157-
{
158-
TryPull(_stage.In);
159-
});
156+
SetHandler(
157+
outlet: _stage.Out,
158+
onPull: () =>
159+
{
160+
TryPull(_stage.In);
161+
},
162+
onDownstreamFinish: ex =>
163+
{
164+
if (ex is null)
165+
{
166+
try
167+
{
168+
throw new DownstreamFinishedWithNoCauseException();
169+
}
170+
catch(DownstreamFinishedWithNoCauseException e)
171+
{
172+
InternalOnDownstreamFinish(e);
173+
}
174+
}
175+
else
176+
{
177+
InternalOnDownstreamFinish(ex);
178+
}
179+
});
160180
}
161181

162182
public override void PreStart()

0 commit comments

Comments
 (0)