You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
A producer publishes messages to Kafka topics. The message itself contains information about what topic and partition to publish to so you can publish to different topics with the same producer.
@@ -123,7 +120,7 @@ Source
123
120
124
121
This flow accepts implementations of `Akka.Streams.Kafka.Messages.IEnvelope` and return `Akka.Streams.Kafka.Messages.IResults` elements.
125
122
`IEnvelope` elements contain an extra field to pass through data, the so called `passThrough`.
126
-
Its value is passed through the flow and becomes available in the `ProducerMessage.Results`’s `PassThrough`.
123
+
Its value is passed through the flow and becomes available in the `ProducerMessage.Results`'s `PassThrough`.
127
124
It can for example hold a `Akka.Streams.Kafka.Messages.CommittableOffset` or `Akka.Streams.Kafka.Messages.CommittableOffsetBatch` (from a `KafkaConsumer.CommittableSource`)
# Properties for akka.kafka.ConsumerSettings can be
257
-
# defined in this section or a configuration section with
258
-
# the same layout.
259
-
akka.kafka.consumer {
260
-
# Tuning property of scheduled polls.
261
-
# Controls the interval from one scheduled poll to the next.
262
-
poll-interval = 50ms
263
-
264
-
# Tuning property of the `KafkaConsumer.poll` parameter.
265
-
# Note that non-zero value means that the thread that
266
-
# is executing the stage will be blocked. See also the `wakup-timeout` setting below.
267
-
poll-timeout = 50ms
268
-
269
-
# The stage will delay stopping the internal actor to allow processing of
270
-
# messages already in the stream (required for successful committing).
271
-
# This can be set to 0 for streams using `DrainingControl`.
272
-
stop-timeout = 30s
273
-
274
-
# If offset commit requests are not completed within this timeout
275
-
# the returned Future is completed `CommitTimeoutException`.
276
-
# The `Transactional.source` waits this ammount of time for the producer to mark messages as not
277
-
# being in flight anymore as well as waiting for messages to drain, when rebalance is triggered.
278
-
commit-timeout = 15s
279
-
280
-
# If commits take longer than this time a warning is logged
281
-
commit-time-warning = 1s
282
-
283
-
# Not relevant for Kafka after version 2.1.0.
284
-
# If set to a finite duration, the consumer will re-send the last committed offsets periodically
285
-
# for all assigned partitions. See https://issues.apache.org/jira/browse/KAFKA-4682.
286
-
commit-refresh-interval = infinite
287
-
288
-
buffer-size = 128
289
-
290
-
# Fully qualified config path which holds the dispatcher configuration
291
-
# to be used by the KafkaConsumerActor. Some blocking may occur.
292
-
use-dispatcher = "akka.kafka.default-dispatcher"
293
-
294
-
# Properties defined by Confluent.Kafka.ConsumerConfig
295
-
# can be defined in this configuration section.
296
-
kafka-clients {
297
-
# Disable auto-commit by default
298
-
enable.auto.commit = false
299
-
}
300
-
301
-
# Time to wait for pending requests when a partition is closed
302
-
wait-close-partition = 500ms
303
-
304
-
# Limits the query to Kafka for a topic's position
305
-
position-timeout = 5s
306
-
307
-
# When using `AssignmentOffsetsForTimes` subscriptions: timeout for the
308
-
# call to Kafka's API
309
-
offset-for-times-timeout = 5s
310
-
311
-
# Timeout for akka.kafka.Metadata requests
312
-
# This value is used instead of Kafka's default from `default.api.timeout.ms`
313
-
# which is 1 minute.
314
-
metadata-request-timeout = 5s
315
-
316
-
# Interval for checking that transaction was completed before closing the consumer.
317
-
# Used in the transactional flow for exactly-once-semantics processing.
318
-
eos-draining-check-interval = 30ms
319
-
320
-
# Issue warnings when a call to a partition assignment handler method takes
321
-
# longer than this.
322
-
partition-handler-warning = 5s
323
-
}
324
-
```
252
+
See [`reference.conf`](https://github.com/akkadotnet/Akka.Streams.Kafka/blob/dev/src/Akka.Streams.Kafka/reference.conf) for the latest on settings.
325
253
326
254
### PlainSource
327
255
@@ -357,27 +285,73 @@ The `KafkaConsumer.CommittableSource` makes it possible to commit offset positio
357
285
358
286
If you need to store offsets in anything other than Kafka, `PlainSource` should be used instead of this API.
359
287
360
-
This is useful when “at-least once delivery” is desired, as each message will likely be delivered one time but in failure cases could be duplicated:
288
+
This is useful when "at-least once delivery" is desired, as each message will likely be delivered one time but in failure cases could be duplicated.
289
+
290
+
The recommended way to handle commits is to use the built-in `Committer` facilities, which provide proper batching and error handling:
The `Committer` facilities handle batching automatically based on your `CommitterSettings`. You can configure batch size, parallelism, and other parameters:
.WithMaxBatch(100) // Maximum number of offsets in one commit
328
+
.WithParallelism(5) // Number of commits that can be in progress at the same time
329
+
.WithMaxInterval(TimeSpan.FromSeconds(3)); // Maximum interval between commits
370
330
```
371
-
The above example uses separate `SelectAsync` stages for processing and committing. This guarantees that for parallelism higher than 1 we will keep correct ordering of messages sent for commit.
372
331
373
-
Committing the offset for each message as illustrated above is rather slow.
374
-
It is recommended to batch the commits for better throughput, with the trade-off that more messages may be re-delivered in case of failures.
332
+
> **WARNING**: Avoid calling `CommittableOffset.Commit()` or `CommittableOffsetBatch.Commit()` directly. Always use the `Committer` facilities to ensure proper batching and error handling. Direct commits can lead to reduced performance and potential data loss in failure scenarios.
333
+
334
+
When using manual partition assignment or when you need more control over the commit process:
The `PlainPartitionedSource` is a way to track automatic partition assignment from Kafka.
379
353
When a topic-partition is assigned to a consumer, this source will emit tuples with the assigned topic-partition and a corresponding source of `ConsumerRecord`s.
380
-
When a topic-partition is revoked, the corresponding source completes.
354
+
When a topic-partition is revoked, the corresponding source completes. As of version 1.5.39, the source automatically filters out any messages from recently revoked partitions, providing better consistency during rebalancing operations.
This source emits <seecref="ConsumeResult{TKey,TValue}"/> together with the offset position as flow context, thus makes it possible to commit offset positions to Kafka.
422
396
This is useful when "at-least once delivery" is desired, as each message will likely be delivered one time but in failure cases could be duplicated.
423
-
It is intended to be used with `KafkaProducer.FlowWithContext` and/or `Committer.SinkWithOffsetContext`
397
+
It is intended to be used with `KafkaProducer.FlowWithContext` and/or `Committer.SinkWithOffsetContext`. As of version 1.5.39, this source includes improved partition handling with automatic filtering of messages from revoked partitions.
@@ -462,18 +436,24 @@ but allows the use of an offset store outside of Kafka, while retaining the auto
462
436
When a topic-partition is assigned to a consumer, the `getOffsetsOnAssign`
463
437
function will be called to retrieve the offset, followed by a seek to the correct spot in the partition.
464
438
439
+
As of version 1.5.39, this source uses `IncrementalAssign` internally to prevent offset resets during partition reassignment, making it more reliable for scenarios where you're managing offsets externally - in other words: the stage now remembers any previous assignments you've made.
440
+
465
441
The `onRevoke` function gives the consumer a chance to store any uncommitted offsets, and do any other cleanup
466
-
that is required.
442
+
that is required. The source also automatically filters out any messages from recently revoked partitions to maintain consistency during rebalancing.
By default, tests are configured to be friendly to CI - that is, before starting tests docker Kafka images will be downloaded (if not yet exist) and containers started, and after all tests finish full cleanup will be performed (except the fact that downloaded docker images will not be removed).
692
671
693
672
While this might be useful when running tests locally, there are situations when you would like to save startup/shutdown tests time by using some pre-existing container, that will be used for all test runs and will not be stopped/started each time.
694
673
695
-
To achieve that, set `AKKA_STREAMS_KAFKA_TEST_CONTAINER_REUSE` environment variable on your local machine to any value. This will force using existing Kafka container, listening on port `29092` . Use `docker-compose up` console command in the root of project folder to get this container up and running.
674
+
To achieve that, set `AKKA_STREAMS_KAFKA_TEST_CONTAINER_REUSE` environment variable on your local machine to any value. This will force using existing Kafka container, listening on port `29092` . Use `docker-compose up` console command in the root of project folder to get this container up and running.
0 commit comments