Skip to content

Commit 14819f4

Browse files
authored
Cleanup extra debug logging (#444)
* Cleanup extra debug logging * Enable "Processing poll result with 0 records" log spam again * Wrap log calls with IsDebugEnabled
1 parent 36d5eaf commit 14819f4

File tree

1 file changed

+11
-10
lines changed

1 file changed

+11
-10
lines changed

src/Akka.Streams.Kafka/Stages/Consumers/Actors/KafkaConsumerActor.cs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,8 @@ public KafkaConsumerActor(IActorRef owner, ConsumerSettings<K, V> settings, Deci
123123
// This is RebalanceListener.OnPartitionAssigned on JVM
124124
private void PartitionsAssignedHandler(IImmutableSet<TopicPartition> partitions)
125125
{
126-
_log.Debug($"Partitions were assigned: {string.Join(", ", partitions)}");
126+
if(_log.IsDebugEnabled)
127+
_log.Debug($"Partitions were assigned: {string.Join(", ", partitions)}");
127128
_pausedPartitions = partitions.ToImmutableList();
128129

129130
_commitRefreshing.AssignedPositions(partitions, _consumer, _settings.PositionTimeout);
@@ -139,8 +140,8 @@ private void PartitionsAssignedHandler(IImmutableSet<TopicPartition> partitions)
139140
// This is RebalanceListener.OnPartitionRevoked on JVM
140141
private void PartitionsRevokedHandler(IImmutableSet<TopicPartitionOffset> partitions)
141142
{
142-
_log.Debug($"Partitions were revoked: {string.Join(", ", partitions)}");
143-
143+
if(_log.IsDebugEnabled)
144+
_log.Debug($"Partitions were revoked: {string.Join(", ", partitions)}");
144145
var watch = Stopwatch.StartNew();
145146
_partitionEventHandler.OnRevoke(partitions, _restrictedConsumer);
146147
watch.Stop();
@@ -153,8 +154,8 @@ private void PartitionsRevokedHandler(IImmutableSet<TopicPartitionOffset> partit
153154
// This is RebalanceListener.OnPartitionLost on JVM
154155
private void PartitionsLostHandler(IImmutableSet<TopicPartitionOffset> partitions)
155156
{
156-
_log.Debug($"Partitions were lost: {string.Join(", ", partitions)}");
157-
157+
if(_log.IsDebugEnabled)
158+
_log.Debug($"Partitions were lost: {string.Join(", ", partitions)}");
158159
var watch = Stopwatch.StartNew();
159160
_partitionEventHandler.OnLost(partitions, _restrictedConsumer);
160161
watch.Stop();
@@ -230,8 +231,8 @@ protected override bool Receive(object message)
230231
return true;
231232

232233
case KafkaConsumerActorMetadata.Internal.RequestMessages requestMessages:
233-
_log.Debug("Messages was requested, RequestId: {0}, Partitions: {1}", requestMessages.RequestId, string.Join(", ", requestMessages.Topics));
234-
234+
if(_log.IsDebugEnabled)
235+
_log.Debug("Messages was requested, RequestId: {0}, Partitions: {1}", requestMessages.RequestId, string.Join(", ", requestMessages.Topics));
235236
Context.Watch(Sender);
236237
CheckOverlappingRequests("RequestMessages", Sender, requestMessages.Topics);
237238
_requests = _requests.SetItem(Sender, requestMessages);
@@ -587,12 +588,12 @@ private void ProcessExceptions(Exception exception)
587588

588589
private void ProcessResult(IImmutableSet<TopicPartition> partitionsToFetch, List<ConsumeResult<K,V>> rawResult)
589590
{
590-
if(rawResult.IsEmpty())
591-
return;
592-
593591
if(_log.IsDebugEnabled)
594592
_log.Debug("Processing poll result with {0} records", rawResult.Count);
595593

594+
if(rawResult.IsEmpty())
595+
return;
596+
596597
var fetchedTps = rawResult.Select(m => m.TopicPartition).ToImmutableSet();
597598
if (!fetchedTps.Except(partitionsToFetch).IsEmpty())
598599
throw new ArgumentException(

0 commit comments

Comments
 (0)