Skip to content

Commit c256a88

Browse files
added CommitCollectorStage benchmark (#477)
1 parent e5ddd61 commit c256a88

File tree

7 files changed

+176
-6
lines changed

7 files changed

+176
-6
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
2+
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=benchmarks/@EntryIndexedValue">True</s:Boolean>
23
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=benchmarks_005Cconsumer/@EntryIndexedValue">True</s:Boolean>
34
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=benchmarks_005Cmicro/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>

src/Akka.Streams.Kafka.Benchmark/Benchmarks/Consumer/ComittablePartitionedSourceBenchmark.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
using System.Linq;
21
using System.Threading.Tasks;
32
using Akka.Streams.Dsl;
43
using Akka.Streams.Kafka.Benchmark.Configs;
@@ -10,7 +9,7 @@
109
using BenchmarkDotNet.Attributes;
1110
using Confluent.Kafka;
1211

13-
namespace Akka.Streams.Kafka.Benchmark.Benchmarks;
12+
namespace Akka.Streams.Kafka.Benchmark;
1413

1514
[Config(typeof(MacroBenchmarkConfig))]
1615
public class CommittablePartitionedSourceBenchmark : KafkaConsumerBenchmark<int>

src/Akka.Streams.Kafka.Benchmark/Benchmarks/Consumer/CommittableSourceBenchmark.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
using System.Linq;
21
using System.Threading.Tasks;
32
using Akka.Streams.Dsl;
43
using Akka.Streams.Kafka.Benchmark.Configs;
@@ -10,7 +9,7 @@
109
using BenchmarkDotNet.Attributes;
1110
using Confluent.Kafka;
1211

13-
namespace Akka.Streams.Kafka.Benchmark.Benchmarks
12+
namespace Akka.Streams.Kafka.Benchmark
1413
{
1514
[Config(typeof(MacroBenchmarkConfig))]
1615
public class CommittableSourceBenchmark : KafkaConsumerBenchmark<int>

src/Akka.Streams.Kafka.Benchmark/Benchmarks/Consumer/PlainConsumerBenchmark.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
using BenchmarkDotNet.Attributes;
99
using Confluent.Kafka;
1010

11-
namespace Akka.Streams.Kafka.Benchmark.Benchmarks
11+
namespace Akka.Streams.Kafka.Benchmark
1212
{
1313
[Config(typeof(MacroBenchmarkConfig))]
1414
public class PlainSourceBenchmark : KafkaConsumerBenchmark<ConsumeResult<Null, string>>

src/Akka.Streams.Kafka.Benchmark/Benchmarks/Consumer/PlainPartitionedConsumerBenchmark.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
using BenchmarkDotNet.Attributes;
99
using Confluent.Kafka;
1010

11-
namespace Akka.Streams.Kafka.Benchmark.Benchmarks;
11+
namespace Akka.Streams.Kafka.Benchmark;
1212

1313
[Config(typeof(MacroBenchmarkConfig))]
1414
public class PlainPartitionedSourceBenchmark : KafkaConsumerBenchmark<ConsumeResult<Null, string>>
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using Akka.Actor;
4+
using Akka.Configuration;
5+
using Akka.Streams.Dsl;
6+
using Akka.Streams.Kafka.Benchmark.Infrastructure;
7+
using Akka.Streams.Kafka.Helpers;
8+
using Akka.Streams.Kafka.Messages;
9+
using Akka.Streams.Kafka.Settings;
10+
using Akka.Streams.Kafka.Stages.Consumers;
11+
using BenchmarkDotNet.Attributes;
12+
13+
namespace Akka.Streams.Kafka.Benchmark
14+
{
15+
[MemoryDiagnoser]
16+
public class CommitCollectorStageBenchmark
17+
{
18+
private ActorSystem? _system;
19+
private IMaterializer? _materializer;
20+
private ICommittable[] _offsets = [];
21+
private CommitterSettings? _settings;
22+
23+
[Params(1000, 10_000)] // Reduced from 100k to keep benchmark runtime reasonable
24+
public int TotalMessages { get; set; }
25+
26+
[Params(1, 4, 16)]
27+
public int PartitionCount { get; set; }
28+
29+
[Params(10, 100)]
30+
public int BatchSize { get; set; }
31+
32+
[Params(100)]
33+
public int MaxIntervalMs { get; set; }
34+
35+
[GlobalSetup]
36+
public void Setup()
37+
{
38+
var config = ConfigurationFactory.ParseString("akka.loglevel=WARNING")
39+
.WithFallback(KafkaExtensions.DefaultSettings);
40+
_system = ActorSystem.Create("CommitCollectorBenchmark", config);
41+
_materializer = ActorMaterializer.Create(_system);
42+
43+
_settings = CommitterSettings.Create(_system)
44+
.WithMaxBatch(BatchSize)
45+
.WithMaxInterval(TimeSpan.FromMilliseconds(MaxIntervalMs));
46+
47+
// Create test messages distributed across partitions
48+
_offsets = new ICommittable[TotalMessages];
49+
var messagesPerPartition = TotalMessages / PartitionCount;
50+
51+
for (var partitionId = 0; partitionId < PartitionCount; partitionId++)
52+
{
53+
var groupId = "benchmark-group";
54+
var topic = "benchmark-topic";
55+
56+
for (var i = 0; i < messagesPerPartition; i++)
57+
{
58+
// Distribute messages across partitions in sequence
59+
var position = (i * PartitionCount) + partitionId;
60+
if (position < TotalMessages)
61+
{
62+
var gtp = new GroupTopicPartition(groupId, topic, partitionId);
63+
var partitionOffset = new GroupTopicPartitionOffset(gtp, i);
64+
65+
_offsets[position] = new CommittableOffset(
66+
new KafkaAsyncConsumerCommitter(() => ActorRefs.Nobody, TimeSpan.Zero),
67+
partitionOffset,
68+
string.Empty);
69+
}
70+
}
71+
}
72+
}
73+
74+
[GlobalCleanup]
75+
public async Task CleanupAsync()
76+
{
77+
if (_system != null)
78+
await _system.Terminate();
79+
}
80+
81+
[Benchmark]
82+
[BenchmarkCategory(BenchmarkCategories.MicroBenchmark)]
83+
public async Task ProcessThroughStageAsync()
84+
{
85+
if (_system == null || _settings == null || _offsets == null || _materializer == null)
86+
throw new InvalidOperationException("Benchmark not properly initialized");
87+
88+
// Create and run the stream
89+
await Source.From(_offsets)
90+
.Via(Flow.FromGraph(new CommitCollectorStage(_settings)))
91+
.RunWith(Sink.Ignore<ICommittableOffsetBatch>(), _materializer);
92+
}
93+
}
94+
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Threading.Tasks;
4+
using Akka.Actor;
5+
using Akka.Streams;
6+
using Akka.Streams.Dsl;
7+
using Akka.Streams.Kafka.Messages;
8+
using Akka.Streams.Kafka.Settings;
9+
using Akka.Streams.Kafka.Stages.Consumers;
10+
using Akka.Streams.Kafka.Tests.TestKit.Internal;
11+
using Akka.Streams.TestKit;
12+
using BenchmarkDotNet.Attributes;
13+
using BenchmarkDotNet.Order;
14+
using Confluent.Kafka;
15+
16+
namespace Akka.Streams.Kafka.Tests.Performance
17+
{
18+
[MemoryDiagnoser]
19+
[Orderer(SummaryOrderPolicy.FastestToSlowest)]
20+
public class CommitCollectorStageBenchmark
21+
{
22+
private ActorSystem _system;
23+
private ActorMaterializer _materializer;
24+
private CommitterSettings _settings;
25+
private List<ICommittable> _testData;
26+
private const int DataSize = 1000;
27+
28+
[GlobalSetup]
29+
public void Setup()
30+
{
31+
_system = ActorSystem.Create("CommitCollectorStageBenchmark");
32+
_materializer = ActorMaterializer.Create(_system);
33+
_testData = new List<ICommittable>();
34+
35+
// Create test data
36+
for (int i = 0; i < DataSize; i++)
37+
{
38+
var offset = ConsumerResultFactory.CommittableOffset(
39+
"test-group",
40+
"test-topic",
41+
partition: i % 4, // Use 4 partitions
42+
offset: i,
43+
metadata: $"metadata-{i}");
44+
_testData.Add(offset);
45+
}
46+
}
47+
48+
[GlobalCleanup]
49+
public void Cleanup()
50+
{
51+
_materializer.Dispose();
52+
_system.Terminate().Wait();
53+
}
54+
55+
[Params(10, 50, 100)]
56+
public int MaxBatchSize { get; set; }
57+
58+
[Params(100, 500, 1000)]
59+
public int MaxIntervalMs { get; set; }
60+
61+
[Benchmark]
62+
public async Task CommitCollectorStageFlow()
63+
{
64+
_settings = CommitterSettings.Create(
65+
maxBatchSize: MaxBatchSize,
66+
maxInterval: TimeSpan.FromMilliseconds(MaxIntervalMs));
67+
68+
var source = Source.From(_testData);
69+
var flow = Flow.FromGraph(new CommitCollectorStage(_settings));
70+
var sink = Sink.Seq<ICommittableOffsetBatch>();
71+
72+
var result = await source
73+
.Via(flow)
74+
.RunWith(sink, _materializer);
75+
}
76+
}
77+
}

0 commit comments

Comments
 (0)