Skip to content

Commit 2eb59c5

Browse files
authored
[Streams] Allow GroupBy with infinite output substreams (#7607)
* [Streams] Allow GroupBy with infinite output substreams * Add public API oveloads
1 parent 4e30b9d commit 2eb59c5

File tree

7 files changed

+171
-17
lines changed

7 files changed

+171
-17
lines changed

src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1388,7 +1388,9 @@ namespace Akka.Streams.Dsl
13881388
public static Akka.Streams.Dsl.Flow<TIn, TOut, TMat3> DivertToMaterialized<TIn, TOut, TMat, TMat2, TMat3>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, Akka.Streams.IGraph<Akka.Streams.SinkShape<TOut>, TMat2> that, System.Func<TOut, bool> when, System.Func<TMat, TMat2, TMat3> materializerFunction) { }
13891389
public static Akka.Streams.Dsl.Flow<TIn, TOut2, TMat> Expand<TIn, TOut1, TOut2, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut1, TMat> flow, System.Func<TOut1, System.Collections.Generic.IEnumerator<TOut2>> extrapolate) { }
13901390
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, Akka.Streams.Dsl.Sink<TIn, TMat>> GroupBy<TIn, TOut, TMat, TKey>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, int maxSubstreams, System.Func<TOut, TKey> groupingFunc, bool allowClosedSubstreamRecreation) { }
1391+
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, Akka.Streams.Dsl.Sink<TIn, TMat>> GroupBy<TIn, TOut, TMat, TKey>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, System.Func<TOut, TKey> groupingFunc, bool allowClosedSubstreamRecreation) { }
13911392
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, Akka.Streams.Dsl.Sink<TIn, TMat>> GroupBy<TIn, TOut, TMat, TKey>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, int maxSubstreams, System.Func<TOut, TKey> groupingFunc) { }
1393+
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, Akka.Streams.Dsl.Sink<TIn, TMat>> GroupBy<TIn, TOut, TMat, TKey>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, System.Func<TOut, TKey> groupingFunc) { }
13921394
public static Akka.Streams.Dsl.Flow<TIn, System.Collections.Generic.IEnumerable<TOut>, TMat> Grouped<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, int n) { }
13931395
public static Akka.Streams.Dsl.Flow<TIn, System.Collections.Generic.IEnumerable<TOut>, TMat> GroupedWeightedWithin<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, long maxWeight, int maxNumber, System.TimeSpan interval, System.Func<TOut, long> costFn) { }
13941396
public static Akka.Streams.Dsl.Flow<TIn, System.Collections.Generic.IEnumerable<TOut>, TMat> GroupedWithin<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, int n, System.TimeSpan timeout) { }
@@ -2089,6 +2091,7 @@ namespace Akka.Streams.Dsl
20892091
public static Akka.Streams.Dsl.Source<TOut, TMat3> DivertToMaterialized<TOut, TMat, TMat2, TMat3>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, Akka.Streams.IGraph<Akka.Streams.SinkShape<TOut>, TMat2> that, System.Func<TOut, bool> when, System.Func<TMat, TMat2, TMat3> materializerFunction) { }
20902092
public static Akka.Streams.Dsl.Source<TOut2, TMat> Expand<TOut1, TOut2, TMat>(this Akka.Streams.Dsl.Source<TOut1, TMat> flow, System.Func<TOut1, System.Collections.Generic.IEnumerator<TOut2>> extrapolate) { }
20912093
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, Akka.Streams.Dsl.IRunnableGraph<TMat>> GroupBy<TOut, TMat, TKey>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, int maxSubstreams, System.Func<TOut, TKey> groupingFunc) { }
2094+
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, Akka.Streams.Dsl.IRunnableGraph<TMat>> GroupBy<TOut, TMat, TKey>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, System.Func<TOut, TKey> groupingFunc) { }
20922095
public static Akka.Streams.Dsl.Source<System.Collections.Generic.IEnumerable<TOut>, TMat> Grouped<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, int n) { }
20932096
public static Akka.Streams.Dsl.Source<System.Collections.Generic.IEnumerable<TOut>, TMat> GroupedWeightedWithin<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, long maxWeight, System.TimeSpan interval, System.Func<TOut, long> costFn) { }
20942097
public static Akka.Streams.Dsl.Source<System.Collections.Generic.IEnumerable<TOut>, TMat> GroupedWeightedWithin<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, long maxWeight, int maxNumber, System.TimeSpan interval, System.Func<TOut, long> costFn) { }

src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1386,7 +1386,9 @@ namespace Akka.Streams.Dsl
13861386
public static Akka.Streams.Dsl.Flow<TIn, TOut, TMat3> DivertToMaterialized<TIn, TOut, TMat, TMat2, TMat3>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, Akka.Streams.IGraph<Akka.Streams.SinkShape<TOut>, TMat2> that, System.Func<TOut, bool> when, System.Func<TMat, TMat2, TMat3> materializerFunction) { }
13871387
public static Akka.Streams.Dsl.Flow<TIn, TOut2, TMat> Expand<TIn, TOut1, TOut2, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut1, TMat> flow, System.Func<TOut1, System.Collections.Generic.IEnumerator<TOut2>> extrapolate) { }
13881388
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, Akka.Streams.Dsl.Sink<TIn, TMat>> GroupBy<TIn, TOut, TMat, TKey>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, int maxSubstreams, System.Func<TOut, TKey> groupingFunc, bool allowClosedSubstreamRecreation) { }
1389+
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, Akka.Streams.Dsl.Sink<TIn, TMat>> GroupBy<TIn, TOut, TMat, TKey>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, System.Func<TOut, TKey> groupingFunc, bool allowClosedSubstreamRecreation) { }
13891390
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, Akka.Streams.Dsl.Sink<TIn, TMat>> GroupBy<TIn, TOut, TMat, TKey>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, int maxSubstreams, System.Func<TOut, TKey> groupingFunc) { }
1391+
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, Akka.Streams.Dsl.Sink<TIn, TMat>> GroupBy<TIn, TOut, TMat, TKey>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, System.Func<TOut, TKey> groupingFunc) { }
13901392
public static Akka.Streams.Dsl.Flow<TIn, System.Collections.Generic.IEnumerable<TOut>, TMat> Grouped<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, int n) { }
13911393
public static Akka.Streams.Dsl.Flow<TIn, System.Collections.Generic.IEnumerable<TOut>, TMat> GroupedWeightedWithin<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, long maxWeight, int maxNumber, System.TimeSpan interval, System.Func<TOut, long> costFn) { }
13921394
public static Akka.Streams.Dsl.Flow<TIn, System.Collections.Generic.IEnumerable<TOut>, TMat> GroupedWithin<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, int n, System.TimeSpan timeout) { }
@@ -2087,6 +2089,7 @@ namespace Akka.Streams.Dsl
20872089
public static Akka.Streams.Dsl.Source<TOut, TMat3> DivertToMaterialized<TOut, TMat, TMat2, TMat3>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, Akka.Streams.IGraph<Akka.Streams.SinkShape<TOut>, TMat2> that, System.Func<TOut, bool> when, System.Func<TMat, TMat2, TMat3> materializerFunction) { }
20882090
public static Akka.Streams.Dsl.Source<TOut2, TMat> Expand<TOut1, TOut2, TMat>(this Akka.Streams.Dsl.Source<TOut1, TMat> flow, System.Func<TOut1, System.Collections.Generic.IEnumerator<TOut2>> extrapolate) { }
20892091
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, Akka.Streams.Dsl.IRunnableGraph<TMat>> GroupBy<TOut, TMat, TKey>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, int maxSubstreams, System.Func<TOut, TKey> groupingFunc) { }
2092+
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, Akka.Streams.Dsl.IRunnableGraph<TMat>> GroupBy<TOut, TMat, TKey>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, System.Func<TOut, TKey> groupingFunc) { }
20902093
public static Akka.Streams.Dsl.Source<System.Collections.Generic.IEnumerable<TOut>, TMat> Grouped<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, int n) { }
20912094
public static Akka.Streams.Dsl.Source<System.Collections.Generic.IEnumerable<TOut>, TMat> GroupedWeightedWithin<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, long maxWeight, System.TimeSpan interval, System.Func<TOut, long> costFn) { }
20922095
public static Akka.Streams.Dsl.Source<System.Collections.Generic.IEnumerable<TOut>, TMat> GroupedWeightedWithin<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, long maxWeight, int maxNumber, System.TimeSpan interval, System.Func<TOut, long> costFn) { }

src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,29 @@ await this.AssertAllStagesStoppedAsync(async () =>
377377
}, Materializer);
378378
}
379379

380+
[Fact(DisplayName = "GroupBy must not have substream limit when maxSubStream is set to negative numbers")]
381+
public async Task GroupBy_UnlimitedSubstreamTest()
382+
{
383+
await this.AssertAllStagesStoppedAsync(async () =>
384+
{
385+
var f = Flow.Create<int>().GroupBy(-1, x => x).PrefixAndTail(0).MergeSubstreams();
386+
var (up, down) = ((Flow<int, (IImmutableList<int>, Source<int, NotUsed>), NotUsed>)f)
387+
.RunWith(this.SourceProbe<int>(), this.SinkProbe<(IImmutableList<int>, Source<int, NotUsed>)>(), Materializer);
388+
389+
await down.RequestAsync(100);
390+
391+
foreach (var i in Enumerable.Range(0, 100))
392+
{
393+
await up.SendNextAsync(i);
394+
var (_, source) = await down.ExpectNextAsync();
395+
var (sub, probe) = await StreamPuppet(source.RunWith(Sink.AsPublisher<int>(false), Materializer), this);
396+
397+
sub.Request(1);
398+
await probe.ExpectNextAsync(i);
399+
}
400+
}, Materializer);
401+
}
402+
380403
[Fact]
381404
public async Task GroupBy_must_resume_when_exceeding_maxSubStreams()
382405
{

src/core/Akka.Streams/Dsl/FlowOperations.cs

Lines changed: 87 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1346,13 +1346,70 @@ public static Flow<TIn, TOut2, TMat> Transform<TIn, TOut1, TOut2, TMat>(this Flo
13461346
/// <typeparam name="TMat">TBD</typeparam>
13471347
/// <typeparam name="TKey">TBD</typeparam>
13481348
/// <param name="flow">TBD</param>
1349-
/// <param name="maxSubstreams">Configures the maximum number of substreams (keys) that are supported; if more distinct keys are encountered then the stream fails</param>
1349+
/// <param name="maxSubstreams">Configures the maximum number of substreams (keys) that are supported; if more distinct keys are encountered then the stream fails. Set to -1 for infinite substreams.</param>
13501350
/// <param name="groupingFunc">Computes the key for each element</param>
13511351
/// <param name="allowClosedSubstreamRecreation">Enables recreation of already closed substreams if elements with their corresponding keys arrive after completion</param>
13521352
/// <returns>TBD</returns>
13531353
public static SubFlow<TOut, TMat, Sink<TIn, TMat>> GroupBy<TIn, TOut, TMat, TKey>(this Flow<TIn, TOut, TMat> flow, int maxSubstreams, Func<TOut, TKey> groupingFunc, bool allowClosedSubstreamRecreation) =>
13541354
flow.GroupBy(maxSubstreams, groupingFunc, (f, s) => ((Flow<TIn, Source<TOut, NotUsed>, TMat>)f).To(s), allowClosedSubstreamRecreation);
13551355

1356+
/// <summary>
1357+
/// This operation demultiplexes the incoming stream into separate output
1358+
/// streams, one for each element key. The key is computed for each element
1359+
/// using the given function. When a new key is encountered for the first time
1360+
/// a new substream is opened and subsequently fed with all elements belonging to
1361+
/// that key.
1362+
/// <para>
1363+
/// WARNING: If <paramref name="allowClosedSubstreamRecreation"/> is set to false (default behavior) the operator
1364+
/// keeps track of all keys of streams that have already been closed. If you expect an infinite number of keys this
1365+
/// can cause memory issues. Elements belonging to those keys are drained directly and not send to the substream.
1366+
/// </para>
1367+
/// <para>
1368+
/// Note: If <paramref name="allowClosedSubstreamRecreation"/> is set to true substream completion and incoming
1369+
/// elements are subject to race-conditions. If elements arrive for a stream that is in the process of closing
1370+
/// these elements might get lost.
1371+
/// </para>
1372+
/// <para>
1373+
/// The object returned from this method is not a normal <see cref="Flow"/>, it is a
1374+
/// <see cref="SubFlow{TOut, TMat, TClosed}"/>. This means that after this operator
1375+
/// all transformations are applied to all encountered substreams in the same fashion.
1376+
/// Substream mode is exited either by closing the substream (i.e. connecting it to a <see cref="Sink"/>)
1377+
/// or by merging the substreams back together; see the <c>To</c> and <c>MergeBack</c> methods
1378+
/// on <see cref="SubFlow{TOut, TMat, TClosed}"/> for more information.
1379+
/// </para>
1380+
/// <para>
1381+
/// It is important to note that the substreams also propagate back-pressure as any other stream, which means
1382+
/// that blocking one substream will block the <c>GroupBy</c> operator itself —and thereby all substreams— once all
1383+
/// internal or explicit buffers are filled.
1384+
/// </para>
1385+
/// <para>
1386+
/// If the group by function <paramref name="groupingFunc"/> throws an exception and the supervision decision
1387+
/// is <see cref="Supervision.Directive.Stop"/> the stream and substreams will be completed with failure.
1388+
/// </para>
1389+
/// <para>
1390+
/// If the group by <paramref name="groupingFunc"/> throws an exception and the supervision decision
1391+
/// is <see cref="Supervision.Directive.Resume"/> or <see cref="Supervision.Directive.Restart"/>
1392+
/// the element is dropped and the stream and substreams continue.
1393+
/// </para>
1394+
/// <para>
1395+
/// Function <paramref name="groupingFunc"/> MUST NOT return <c>null</c>. This will throw exception and trigger supervision decision mechanism.
1396+
/// </para>
1397+
/// <para>**Emits when** an element for which the grouping function returns a group that has not yet been created. Emits the new group.</para>
1398+
/// <para>**Backpressures when** there is an element pending for a group whose substream backpressures</para>
1399+
/// <para>**Completes when** upstream completes</para>
1400+
/// <para>**Cancels when** downstream cancels and all substreams cancel</para>
1401+
/// </summary>
1402+
/// <typeparam name="TIn">TBD</typeparam>
1403+
/// <typeparam name="TOut">TBD</typeparam>
1404+
/// <typeparam name="TMat">TBD</typeparam>
1405+
/// <typeparam name="TKey">TBD</typeparam>
1406+
/// <param name="flow">TBD</param>
1407+
/// <param name="groupingFunc">Computes the key for each element</param>
1408+
/// <param name="allowClosedSubstreamRecreation">Enables recreation of already closed substreams if elements with their corresponding keys arrive after completion</param>
1409+
/// <returns>TBD</returns>
1410+
public static SubFlow<TOut, TMat, Sink<TIn, TMat>> GroupBy<TIn, TOut, TMat, TKey>(this Flow<TIn, TOut, TMat> flow, Func<TOut, TKey> groupingFunc, bool allowClosedSubstreamRecreation) =>
1411+
flow.GroupBy(-1, groupingFunc, (f, s) => ((Flow<TIn, Source<TOut, NotUsed>, TMat>)f).To(s), allowClosedSubstreamRecreation);
1412+
13561413
/// <summary>
13571414
/// This operation demultiplexes the incoming stream into separate output
13581415
/// streams, one for each element key. The key is computed for each element
@@ -1366,9 +1423,38 @@ public static SubFlow<TOut, TMat, Sink<TIn, TMat>> GroupBy<TIn, TOut, TMat, TKey
13661423
/// </para>
13671424
/// See <seealso cref="GroupBy{TIn, TOut, TMat, TKey}(Flow{TIn, TOut, TMat}, int, Func{TOut, TKey}, bool)"/>
13681425
/// </summary>
1426+
/// <typeparam name="TIn">TBD</typeparam>
1427+
/// <typeparam name="TOut">TBD</typeparam>
1428+
/// <typeparam name="TMat">TBD</typeparam>
1429+
/// <typeparam name="TKey">TBD</typeparam>
1430+
/// <param name="flow">TBD</param>
1431+
/// <param name="maxSubstreams">Configures the maximum number of substreams (keys) that are supported; if more distinct keys are encountered then the stream fails. Set to -1 for infinite substreams.</param>
1432+
/// <param name="groupingFunc">Computes the key for each element</param>
13691433
public static SubFlow<TOut, TMat, Sink<TIn, TMat>> GroupBy<TIn, TOut, TMat, TKey>(this Flow<TIn, TOut, TMat> flow, int maxSubstreams, Func<TOut, TKey> groupingFunc) =>
13701434
flow.GroupBy(maxSubstreams, groupingFunc, (f, s) => ((Flow<TIn, Source<TOut, NotUsed>, TMat>)f).To(s), false);
13711435

1436+
/// <summary>
1437+
/// This operation demultiplexes the incoming stream into separate output
1438+
/// streams, one for each element key. The key is computed for each element
1439+
/// using the given function. When a new key is encountered for the first time
1440+
/// a new substream is opened and subsequently fed with all elements belonging to
1441+
/// that key.
1442+
/// <para>
1443+
/// WARNING: The stage keeps track of all keys of streams that have already been closed.
1444+
/// If you expect an infinite number of keys this can cause memory issues. Elements belonging
1445+
/// to those keys are drained directly and not send to the substream.
1446+
/// </para>
1447+
/// See <seealso cref="GroupBy{TIn, TOut, TMat, TKey}(Flow{TIn, TOut, TMat}, int, Func{TOut, TKey}, bool)"/>
1448+
/// </summary>
1449+
/// <typeparam name="TIn">TBD</typeparam>
1450+
/// <typeparam name="TOut">TBD</typeparam>
1451+
/// <typeparam name="TMat">TBD</typeparam>
1452+
/// <typeparam name="TKey">TBD</typeparam>
1453+
/// <param name="flow">TBD</param>
1454+
/// <param name="groupingFunc">Computes the key for each element</param>
1455+
public static SubFlow<TOut, TMat, Sink<TIn, TMat>> GroupBy<TIn, TOut, TMat, TKey>(this Flow<TIn, TOut, TMat> flow, Func<TOut, TKey> groupingFunc) =>
1456+
flow.GroupBy(-1, groupingFunc, (f, s) => ((Flow<TIn, Source<TOut, NotUsed>, TMat>)f).To(s), false);
1457+
13721458
/// <summary>
13731459
/// This operation applies the given predicate to all incoming elements and
13741460
/// emits them to a stream of output streams, always beginning a new one with

0 commit comments

Comments
 (0)