Skip to content

Commit 1815057

Browse files
committed
fix((confluent-kafka): Extracted some shared logic between poll and consume methods.
1 parent 8a76540 commit 1815057

File tree

2 files changed

+23
-24
lines changed
  • instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka

2 files changed

+23
-24
lines changed

instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py

Lines changed: 6 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,9 @@ def instrument_consumer(consumer: Consumer, tracer_provider=None)
112112
from .package import _instruments
113113
from .utils import (
114114
KafkaPropertiesExtractor,
115+
_end_current_consume_span,
116+
_create_new_consume_span,
115117
_enrich_span,
116-
_get_links_from_records,
117118
_get_span_name,
118119
_kafka_getter,
119120
_kafka_setter,
@@ -348,23 +349,14 @@ def wrap_produce(func, instance, tracer, args, kwargs):
348349
@staticmethod
349350
def wrap_poll(func, instance, tracer, args, kwargs):
350351
if instance._current_consume_span:
351-
context.detach(instance._current_context_token)
352-
instance._current_context_token = None
353-
instance._current_consume_span.end()
354-
instance._current_consume_span = None
352+
_end_current_consume_span(instance)
355353

356354
with tracer.start_as_current_span(
357355
"recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER
358356
):
359357
record = func(*args, **kwargs)
360358
if record:
361-
links = _get_links_from_records([record])
362-
instance._current_consume_span = tracer.start_span(
363-
name=f"{record.topic()} process",
364-
links=links,
365-
kind=SpanKind.CONSUMER,
366-
)
367-
359+
_create_new_consume_span(instance, tracer, [record])
368360
_enrich_span(
369361
instance._current_consume_span,
370362
record.topic(),
@@ -381,23 +373,14 @@ def wrap_poll(func, instance, tracer, args, kwargs):
381373
@staticmethod
382374
def wrap_consume(func, instance, tracer, args, kwargs):
383375
if instance._current_consume_span:
384-
context.detach(instance._current_context_token)
385-
instance._current_context_token = None
386-
instance._current_consume_span.end()
387-
instance._current_consume_span = None
376+
_end_current_consume_span(instance)
388377

389378
with tracer.start_as_current_span(
390379
"recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER
391380
):
392381
records = func(*args, **kwargs)
393382
if len(records) > 0:
394-
links = _get_links_from_records(records)
395-
instance._current_consume_span = tracer.start_span(
396-
name=f"{records[0].topic()} process",
397-
links=links,
398-
kind=SpanKind.CONSUMER,
399-
)
400-
383+
_create_new_consume_span(instance, tracer, records)
401384
_enrich_span(
402385
instance._current_consume_span,
403386
records[0].topic(),

instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from logging import getLogger
22
from typing import List, Optional
33

4-
from opentelemetry import propagate
4+
from opentelemetry import context, propagate
55
from opentelemetry.trace import SpanKind, Link
66
from opentelemetry.propagators import textmap
77
from opentelemetry.semconv.trace import (
@@ -83,6 +83,22 @@ def set(self, carrier: textmap.CarrierT, key: str, value: str) -> None:
8383
_kafka_getter = KafkaContextGetter()
8484

8585

86+
def _end_current_consume_span(instance):
87+
context.detach(instance._current_context_token)
88+
instance._current_context_token = None
89+
instance._current_consume_span.end()
90+
instance._current_consume_span = None
91+
92+
93+
def _create_new_consume_span(instance, tracer, records):
94+
links = _get_links_from_records(records)
95+
instance._current_consume_span = tracer.start_span(
96+
name=f"{records[0].topic()} process",
97+
links=links,
98+
kind=SpanKind.CONSUMER,
99+
)
100+
101+
86102
def _enrich_span(
87103
span,
88104
topic,

0 commit comments

Comments
 (0)