Skip to content

Commit ac9ec26

Browse files
committed
fix(confluent-kafka): Fixed lint errors
1 parent 4c9cce6 commit ac9ec26

File tree

3 files changed

+48
-58
lines changed

3 files changed

+48
-58
lines changed

instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,9 @@ def poll(self, timeout=-1): # pylint: disable=useless-super-delegation
139139
return super().poll(timeout)
140140

141141
# This method is deliberately implemented in order to allow wrapt to wrap this function
142-
def consume(self, *args, **kwargs): # pylint: disable=useless-super-delegation
142+
def consume(
143+
self, *args, **kwargs
144+
): # pylint: disable=useless-super-delegation
143145
return super().consume(*args, **kwargs)
144146

145147

@@ -184,7 +186,11 @@ def commit(self, *args, **kwargs):
184186

185187
def consume(self, *args, **kwargs):
186188
return ConfluentKafkaInstrumentor.wrap_consume(
187-
self._consumer.consume, self, self._tracer, args, kwargs,
189+
self._consumer.consume,
190+
self,
191+
self._tracer,
192+
args,
193+
kwargs,
188194
)
189195

190196
def get_watermark_offsets(

instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py

Lines changed: 36 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@
1414

1515
# pylint: disable=no-name-in-module
1616

17-
from opentelemetry.semconv.trace import SpanAttributes, MessagingDestinationKindValues
17+
from opentelemetry.semconv.trace import (
18+
SpanAttributes,
19+
MessagingDestinationKindValues,
20+
)
1821
from opentelemetry.test.test_base import TestBase
1922
from .utils import MockConsumer, MockedMessage
2023

@@ -106,19 +109,16 @@ def test_context_getter(self) -> None:
106109
context_setter.set(carrier_list, "key1", "val1")
107110
self.assertEqual(context_getter.get(carrier_list, "key1"), ["val1"])
108111
self.assertEqual(["key1"], context_getter.keys(carrier_list))
109-
112+
110113
def test_poll(self) -> None:
111114
instrumentation = ConfluentKafkaInstrumentor()
112115
mocked_messages = [
113116
MockedMessage("topic-10", 0, 0, []),
114117
MockedMessage("topic-20", 2, 4, []),
115118
MockedMessage("topic-30", 1, 3, []),
116119
]
117-
expected_spans= [
118-
{
119-
"name": "recv",
120-
"attributes": {}
121-
},
120+
expected_spans = [
121+
{"name": "recv", "attributes": {}},
122122
{
123123
"name": "topic-10 process",
124124
"attributes": {
@@ -128,12 +128,9 @@ def test_poll(self) -> None:
128128
SpanAttributes.MESSAGING_DESTINATION: "topic-10",
129129
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
130130
SpanAttributes.MESSAGING_MESSAGE_ID: "topic-10.0.0",
131-
}
132-
},
133-
{
134-
"name": "recv",
135-
"attributes": {}
131+
},
136132
},
133+
{"name": "recv", "attributes": {}},
137134
{
138135
"name": "topic-20 process",
139136
"attributes": {
@@ -143,12 +140,9 @@ def test_poll(self) -> None:
143140
SpanAttributes.MESSAGING_DESTINATION: "topic-20",
144141
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
145142
SpanAttributes.MESSAGING_MESSAGE_ID: "topic-20.2.4",
146-
}
147-
},
148-
{
149-
"name": "recv",
150-
"attributes": {}
143+
},
151144
},
145+
{"name": "recv", "attributes": {}},
152146
{
153147
"name": "topic-30 process",
154148
"attributes": {
@@ -158,32 +152,29 @@ def test_poll(self) -> None:
158152
SpanAttributes.MESSAGING_DESTINATION: "topic-30",
159153
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
160154
SpanAttributes.MESSAGING_MESSAGE_ID: "topic-30.1.3",
161-
}
162-
},
163-
{
164-
"name": "recv",
165-
"attributes": {}
155+
},
166156
},
157+
{"name": "recv", "attributes": {}},
167158
]
168-
159+
169160
consumer = MockConsumer(
170161
mocked_messages,
171162
{
172163
"bootstrap.servers": "localhost:29092",
173164
"group.id": "mygroup",
174165
"auto.offset.reset": "earliest",
175-
}
166+
},
176167
)
177168
span_list = self.memory_exporter.clear()
178169
consumer = instrumentation.instrument_consumer(consumer)
179170
consumer.poll()
180171
consumer.poll()
181172
consumer.poll()
182173
consumer.poll()
183-
174+
184175
span_list = self.memory_exporter.get_finished_spans()
185176
self._compare_spans(span_list, expected_spans)
186-
177+
187178
def test_consume(self) -> None:
188179
instrumentation = ConfluentKafkaInstrumentor()
189180
mocked_messages = [
@@ -194,61 +185,49 @@ def test_consume(self) -> None:
194185
MockedMessage("topic-3", 0, 3, []),
195186
MockedMessage("topic-2", 0, 1, []),
196187
]
197-
expected_spans= [
198-
{
199-
"name": "recv",
200-
"attributes": {}
201-
},
188+
expected_spans = [
189+
{"name": "recv", "attributes": {}},
202190
{
203191
"name": "topic-1 process",
204192
"attributes": {
205193
SpanAttributes.MESSAGING_OPERATION: "process",
206194
SpanAttributes.MESSAGING_SYSTEM: "kafka",
207195
SpanAttributes.MESSAGING_DESTINATION: "topic-1",
208196
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
209-
}
210-
},
211-
{
212-
"name": "recv",
213-
"attributes": {}
197+
},
214198
},
199+
{"name": "recv", "attributes": {}},
215200
{
216201
"name": "topic-2 process",
217202
"attributes": {
218203
SpanAttributes.MESSAGING_OPERATION: "process",
219204
SpanAttributes.MESSAGING_SYSTEM: "kafka",
220205
SpanAttributes.MESSAGING_DESTINATION: "topic-2",
221206
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
222-
}
223-
},
224-
{
225-
"name": "recv",
226-
"attributes": {}
207+
},
227208
},
209+
{"name": "recv", "attributes": {}},
228210
{
229211
"name": "topic-3 process",
230212
"attributes": {
231213
SpanAttributes.MESSAGING_OPERATION: "process",
232214
SpanAttributes.MESSAGING_SYSTEM: "kafka",
233215
SpanAttributes.MESSAGING_DESTINATION: "topic-3",
234216
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
235-
}
236-
},
237-
{
238-
"name": "recv",
239-
"attributes": {}
217+
},
240218
},
219+
{"name": "recv", "attributes": {}},
241220
]
242-
221+
243222
consumer = MockConsumer(
244223
mocked_messages,
245224
{
246225
"bootstrap.servers": "localhost:29092",
247226
"group.id": "mygroup",
248227
"auto.offset.reset": "earliest",
249-
}
228+
},
250229
)
251-
230+
252231
span_list = self.memory_exporter.clear()
253232
consumer = instrumentation.instrument_consumer(consumer)
254233
consumer.consume(3)
@@ -259,7 +238,11 @@ def test_consume(self) -> None:
259238
self._compare_spans(span_list, expected_spans)
260239

261240
def _compare_spans(self, spans, expected_spans):
262-
for (span, expected_span) in zip(spans, expected_spans):
263-
self.assertEqual(expected_span['name'], span.name)
264-
for attribute_key, expected_attribute_value in expected_span['attributes'].items():
265-
self.assertEqual(expected_attribute_value, span.attributes[attribute_key])
241+
for span, expected_span in zip(spans, expected_spans):
242+
self.assertEqual(expected_span["name"], span.name)
243+
for attribute_key, expected_attribute_value in expected_span[
244+
"attributes"
245+
].items():
246+
self.assertEqual(
247+
expected_attribute_value, span.attributes[attribute_key]
248+
)

instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,17 @@
22

33

44
class MockConsumer(Consumer):
5-
65
def __init__(self, queue, config):
76
self._queue = queue
87
super().__init__(config)
98

10-
def consume(self, num_messages=1, *args, **kwargs): # pylint: disable=keyword-arg-before-vararg
9+
def consume(
10+
self, num_messages=1, *args, **kwargs
11+
): # pylint: disable=keyword-arg-before-vararg
1112
messages = self._queue[:num_messages]
1213
self._queue = self._queue[num_messages:]
1314
return messages
14-
15+
1516
def poll(self, timeout=None):
1617
if len(self._queue) > 0:
1718
return self._queue.pop(0)

0 commit comments

Comments
 (0)