Skip to content

Commit 6d483c1

Browse files
metrics: Implement release for handles and observers
This commit implements a solution for releasing instrument handles and observers. For the handles it is based on a ref count that is increased each time the handled is acquired, when the ref count reaches 0 the handle is removed on collection time. The direct call convention is updated to release the handle after it has been updated. The observer instrument is only updated on collection time, so it can be removed as soon as the user request to do so.
1 parent 7436362 commit 6d483c1

File tree

5 files changed

+169
-26
lines changed

5 files changed

+169
-26
lines changed

examples/metrics/record.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,14 @@
6363
# a labelset. A handle is essentially metric data that corresponds to a specific
6464
# set of labels. Therefore, getting a handle using the same set of labels will
6565
# yield the same metric handle.
66+
# Get a handle when you have to perform multiple operations using the same
67+
# labelset
6668
counter_handle = counter.get_handle(label_set)
67-
counter_handle.add(100)
69+
for i in range(1000):
70+
counter_handle.add(i)
71+
72+
# You can release the handle we you are done
73+
counter_handle.release()
6874

6975
# Direct metric usage
7076
# You can record metrics directly using the metric instrument. You pass in a
@@ -76,4 +82,5 @@
7682
# (metric, value) pairs. The value would be recorded for each metric using the
7783
# specified labelset for each.
7884
meter.record_batch(label_set, [(counter, 50), (counter2, 70)])
79-
time.sleep(100)
85+
86+
time.sleep(10)

opentelemetry-api/src/opentelemetry/metrics/__init__.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ def record(self, value: ValueT) -> None:
5454
value: The value to record to the handle.
5555
"""
5656

57+
def release(self) -> None:
58+
"""No-op implementation of release."""
59+
5760

5861
class CounterHandle:
5962
def add(self, value: ValueT) -> None:
@@ -292,6 +295,14 @@ def register_observer(
292295
Returns: A new ``Observer`` metric instrument.
293296
"""
294297

298+
@abc.abstractmethod
299+
def unregister_observer(self, observer: "Observer") -> None:
300+
"""Unregisters an ``Observer`` metric instrument.
301+
302+
Args:
303+
observer: The observer to unregister.
304+
"""
305+
295306
@abc.abstractmethod
296307
def get_label_set(self, labels: Dict[str, str]) -> "LabelSet":
297308
"""Gets a `LabelSet` with the given labels.
@@ -338,6 +349,9 @@ def register_observer(
338349
) -> "Observer":
339350
return DefaultObserver()
340351

352+
def unregister_observer(self, observer: "Observer") -> None:
353+
pass
354+
341355
def get_label_set(self, labels: Dict[str, str]) -> "LabelSet":
342356
# pylint: disable=no-self-use
343357
return DefaultLabelSet()

opentelemetry-api/tests/metrics/test_metrics.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,13 @@ def test_register_observer(self):
4040
)
4141
self.assertIsInstance(observer, metrics.DefaultObserver)
4242

43+
def test_unregister_observer(self):
44+
callback = mock.Mock()
45+
observer = self.meter.register_observer(
46+
callback, "", "", "", int, (), True
47+
)
48+
self.meter.unregister_observer(observer)
49+
4350
def test_get_label_set(self):
4451
metric = self.meter.get_label_set({})
4552
self.assertIsInstance(metric, metrics.DefaultLabelSet)
@@ -75,7 +82,8 @@ def test_measure_record(self):
7582
measure.record(1, label_set)
7683

7784
def test_default_handle(self):
78-
metrics.DefaultMetricHandle()
85+
handle = metrics.DefaultMetricHandle()
86+
handle.release()
7987

8088
def test_counter_handle(self):
8189
handle = metrics.CounterHandle()

opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py

Lines changed: 66 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
import logging
16+
import threading
1617
from collections import OrderedDict
1718
from typing import Dict, Sequence, Tuple, Type
1819

@@ -69,6 +70,8 @@ def __init__(
6970
self.enabled = enabled
7071
self.aggregator = aggregator
7172
self.last_update_timestamp = time_ns()
73+
self._ref_count = 0
74+
self._ref_count_lock = threading.Lock()
7275

7376
def _validate_update(self, value: metrics_api.ValueT) -> bool:
7477
if not self.enabled:
@@ -84,6 +87,21 @@ def update(self, value: metrics_api.ValueT):
8487
self.last_update_timestamp = time_ns()
8588
self.aggregator.update(value)
8689

90+
def release(self):
91+
self.decrease_ref_count()
92+
93+
def decrease_ref_count(self):
94+
with self._ref_count_lock:
95+
self._ref_count -= 1
96+
97+
def increase_ref_count(self):
98+
with self._ref_count_lock:
99+
self._ref_count += 1
100+
101+
def ref_count(self):
102+
with self._ref_count_lock:
103+
return self._ref_count
104+
87105
def __repr__(self):
88106
return '{}(data="{}", last_update_timestamp={})'.format(
89107
type(self).__name__,
@@ -135,18 +153,21 @@ def __init__(
135153
self.label_keys = label_keys
136154
self.enabled = enabled
137155
self.handles = {}
156+
self.handles_lock = threading.Lock()
138157

139158
def get_handle(self, label_set: LabelSet) -> BaseHandle:
140159
"""See `opentelemetry.metrics.Metric.get_handle`."""
141-
handle = self.handles.get(label_set)
142-
if not handle:
143-
handle = self.HANDLE_TYPE(
144-
self.value_type,
145-
self.enabled,
146-
# Aggregator will be created based off type of metric
147-
self.meter.batcher.aggregator_for(self.__class__),
148-
)
149-
self.handles[label_set] = handle
160+
with self.handles_lock:
161+
handle = self.handles.get(label_set)
162+
if not handle:
163+
handle = self.HANDLE_TYPE(
164+
self.value_type,
165+
self.enabled,
166+
# Aggregator will be created based off type of metric
167+
self.meter.batcher.aggregator_for(self.__class__),
168+
)
169+
self.handles[label_set] = handle
170+
handle.increase_ref_count()
150171
return handle
151172

152173
def __repr__(self):
@@ -185,7 +206,9 @@ def __init__(
185206

186207
def add(self, value: metrics_api.ValueT, label_set: LabelSet) -> None:
187208
"""See `opentelemetry.metrics.Counter.add`."""
188-
self.get_handle(label_set).add(value)
209+
handle = self.get_handle(label_set)
210+
handle.add(value)
211+
handle.release()
189212

190213
UPDATE_FUNCTION = add
191214

@@ -197,7 +220,9 @@ class Measure(Metric, metrics_api.Measure):
197220

198221
def record(self, value: metrics_api.ValueT, label_set: LabelSet) -> None:
199222
"""See `opentelemetry.metrics.Measure.record`."""
200-
self.get_handle(label_set).record(value)
223+
handle = self.get_handle(label_set)
224+
handle.record(value)
225+
handle.release()
201226

202227
UPDATE_FUNCTION = record
203228

@@ -290,6 +315,7 @@ def __init__(self, batcher: Batcher = UngroupedBatcher(True)):
290315
self.batcher = batcher
291316
self.metrics = set()
292317
self.observers = set()
318+
self.observers_lock = threading.Lock()
293319

294320
def collect(self) -> None:
295321
"""Collects all the metrics created with this `Meter` for export.
@@ -304,26 +330,39 @@ def collect(self) -> None:
304330

305331
def _collect_metrics(self) -> None:
306332
for metric in self.metrics:
307-
if metric.enabled:
333+
if not metric.enabled:
334+
continue
335+
336+
to_remove = []
337+
338+
with metric.handles_lock:
308339
for label_set, handle in metric.handles.items():
309340
# TODO: Consider storing records in memory?
310341
record = Record(metric, label_set, handle.aggregator)
311342
# Checkpoints the current aggregators
312343
# Applies different batching logic based on type of batcher
313344
self.batcher.process(record)
314345

346+
if handle.ref_count() == 0:
347+
to_remove.append(label_set)
348+
349+
# Remove handles that were released
350+
for label_set in to_remove:
351+
del metric.handles[label_set]
352+
315353
def _collect_observers(self) -> None:
316-
for observer in self.observers:
317-
if not observer.enabled:
318-
continue
354+
with self.observers_lock:
355+
for observer in self.observers:
356+
if not observer.enabled:
357+
continue
319358

320-
# TODO: capture timestamp?
321-
if not observer.run():
322-
continue
359+
# TODO: capture timestamp?
360+
if not observer.run():
361+
continue
323362

324-
for label_set, aggregator in observer.aggregators.items():
325-
record = Record(observer, label_set, aggregator)
326-
self.batcher.process(record)
363+
for label_set, aggregator in observer.aggregators.items():
364+
record = Record(observer, label_set, aggregator)
365+
self.batcher.process(record)
327366

328367
def record_batch(
329368
self,
@@ -378,9 +417,14 @@ def register_observer(
378417
label_keys,
379418
enabled,
380419
)
381-
self.observers.add(ob)
420+
with self.observers_lock:
421+
self.observers.add(ob)
382422
return ob
383423

424+
def unregister_observer(self, observer: "Observer") -> None:
425+
with self.observers_lock:
426+
self.observers.remove(observer)
427+
384428
def get_label_set(self, labels: Dict[str, str]):
385429
"""See `opentelemetry.metrics.Meter.create_metric`.
386430

opentelemetry-sdk/tests/metrics/test_metrics.py

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def test_collect(self):
3535
)
3636
kvp = {"key1": "value1"}
3737
label_set = meter.get_label_set(kvp)
38-
counter.add(label_set, 1.0)
38+
counter.add(1.0, label_set)
3939
meter.metrics.add(counter)
4040
meter.collect()
4141
self.assertTrue(batcher_mock.process.called)
@@ -163,6 +163,18 @@ def test_register_observer(self):
163163
self.assertEqual(observer.label_keys, ())
164164
self.assertTrue(observer.enabled)
165165

166+
def test_unregister_observer(self):
167+
meter = metrics.Meter()
168+
169+
callback = mock.Mock()
170+
171+
observer = meter.register_observer(
172+
callback, "name", "desc", "unit", int, (), True
173+
)
174+
175+
meter.unregister_observer(observer)
176+
self.assertEqual(len(meter.observers), 0)
177+
166178
def test_get_label_set(self):
167179
meter = metrics.Meter()
168180
kvp = {"environment": "staging", "a": "z"}
@@ -177,6 +189,64 @@ def test_get_label_set_empty(self):
177189
label_set = meter.get_label_set(kvp)
178190
self.assertEqual(label_set, metrics.EMPTY_LABEL_SET)
179191

192+
def test_direct_call_release_handle(self):
193+
meter = metrics.Meter()
194+
label_keys = ("key1",)
195+
kvp = {"key1": "value1"}
196+
label_set = meter.get_label_set(kvp)
197+
198+
counter = metrics.Counter(
199+
"name", "desc", "unit", float, meter, label_keys
200+
)
201+
meter.metrics.add(counter)
202+
counter.add(4.0, label_set)
203+
204+
measure = metrics.Measure(
205+
"name", "desc", "unit", float, meter, label_keys
206+
)
207+
meter.metrics.add(measure)
208+
measure.record(42.0, label_set)
209+
210+
self.assertEqual(len(counter.handles), 1)
211+
self.assertEqual(len(measure.handles), 1)
212+
213+
meter.collect()
214+
215+
self.assertEqual(len(counter.handles), 0)
216+
self.assertEqual(len(measure.handles), 0)
217+
218+
def test_release_handle(self):
219+
meter = metrics.Meter()
220+
label_keys = ("key1",)
221+
kvp = {"key1": "value1"}
222+
label_set = meter.get_label_set(kvp)
223+
224+
counter = metrics.Counter(
225+
"name", "desc", "unit", float, meter, label_keys
226+
)
227+
meter.metrics.add(counter)
228+
counter_handle = counter.get_handle(label_set)
229+
counter_handle.add(4.0)
230+
231+
measure = metrics.Measure(
232+
"name", "desc", "unit", float, meter, label_keys
233+
)
234+
meter.metrics.add(measure)
235+
measure_handle = measure.get_handle(label_set)
236+
measure_handle.record(42)
237+
238+
counter_handle.release()
239+
measure_handle.release()
240+
241+
# be sure that handles are only released after collection
242+
self.assertEqual(len(counter.handles), 1)
243+
self.assertEqual(len(measure.handles), 1)
244+
245+
meter.collect()
246+
247+
self.assertEqual(len(counter.handles), 0)
248+
self.assertEqual(len(measure.handles), 0)
249+
180250

181251
class TestMetric(unittest.TestCase):
182252
def test_get_handle(self):

0 commit comments

Comments
 (0)