Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- Add metric instrumentation for celery
([#1679](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1679))

## Version 1.16.0/0.37b0 (2023-02-17)

### Added
Expand Down
2 changes: 1 addition & 1 deletion instrumentation/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
| [opentelemetry-instrumentation-boto](./opentelemetry-instrumentation-boto) | boto~=2.0 | No
| [opentelemetry-instrumentation-boto3sqs](./opentelemetry-instrumentation-boto3sqs) | boto3 ~= 1.0 | No
| [opentelemetry-instrumentation-botocore](./opentelemetry-instrumentation-botocore) | botocore ~= 1.0 | No
| [opentelemetry-instrumentation-celery](./opentelemetry-instrumentation-celery) | celery >= 4.0, < 6.0 | No
| [opentelemetry-instrumentation-celery](./opentelemetry-instrumentation-celery) | celery >= 4.0, < 6.0 | Yes
| [opentelemetry-instrumentation-confluent-kafka](./opentelemetry-instrumentation-confluent-kafka) | confluent-kafka >= 1.8.2, < 2.0.0 | No
| [opentelemetry-instrumentation-dbapi](./opentelemetry-instrumentation-dbapi) | dbapi | No
| [opentelemetry-instrumentation-django](./opentelemetry-instrumentation-django) | django >= 1.10 | Yes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def add(x, y):
"""

import logging
from timeit import default_timer
from typing import Collection, Iterable

from celery import signals # pylint: disable=no-name-in-module
Expand All @@ -69,6 +70,7 @@ def add(x, y):
from opentelemetry.instrumentation.celery.package import _instruments
from opentelemetry.instrumentation.celery.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.metrics import get_meter
from opentelemetry.propagate import extract, inject
from opentelemetry.propagators.textmap import Getter
from opentelemetry.semconv.trace import SpanAttributes
Expand Down Expand Up @@ -104,6 +106,11 @@ def keys(self, carrier):


class CeleryInstrumentor(BaseInstrumentor):
def __init__(self):
super().__init__()
self.metrics = None
self.task_id_to_start_time = {}

def instrumentation_dependencies(self) -> Collection[str]:
return _instruments

Expand All @@ -113,6 +120,11 @@ def _instrument(self, **kwargs):
# pylint: disable=attribute-defined-outside-init
self._tracer = trace.get_tracer(__name__, __version__, tracer_provider)

meter_provider = kwargs.get("meter_provider")
meter = get_meter(__name__, __version__, meter_provider)

self.create_celery_metrics(meter)

signals.task_prerun.connect(self._trace_prerun, weak=False)
signals.task_postrun.connect(self._trace_postrun, weak=False)
signals.before_task_publish.connect(
Expand All @@ -139,6 +151,7 @@ def _trace_prerun(self, *args, **kwargs):
if task is None or task_id is None:
return

self.update_task_start_time(task_id)
request = task.request
tracectx = extract(request, getter=celery_getter) or None

Expand All @@ -153,8 +166,7 @@ def _trace_prerun(self, *args, **kwargs):
activation.__enter__() # pylint: disable=E1101
utils.attach_span(task, task_id, (span, activation))

@staticmethod
def _trace_postrun(*args, **kwargs):
def _trace_postrun(self, *args, **kwargs):
task = utils.retrieve_task(kwargs)
task_id = utils.retrieve_task_id(kwargs)

Expand All @@ -178,6 +190,9 @@ def _trace_postrun(*args, **kwargs):

activation.__exit__(None, None, None)
utils.detach_span(task, task_id)
self.update_task_start_time(task_id)
labels = {"task": task.name, "worker": task.request.hostname}
self._record_histograms(task_id, labels)

def _trace_before_publish(self, *args, **kwargs):
task = utils.retrieve_task_from_sender(kwargs)
Expand Down Expand Up @@ -270,3 +285,27 @@ def _trace_retry(*args, **kwargs):
# Use `str(reason)` instead of `reason.message` in case we get
# something that isn't an `Exception`
span.set_attribute(_TASK_RETRY_REASON_KEY, str(reason))

def update_task_start_time(self, task_id):
cur_time = default_timer()
elapsed_time = (
cur_time - self.task_id_to_start_time[task_id]
if task_id in self.task_id_to_start_time
else cur_time
)
self.task_id_to_start_time[task_id] = elapsed_time

def _record_histograms(self, task_id, metric_attributes):
self.metrics["flower.task.runtime.seconds"].record(
self.task_id_to_start_time.get(task_id),
attributes=metric_attributes,
)

def create_celery_metrics(self, meter) -> None:
self.metrics = {
"flower.task.runtime.seconds": meter.create_histogram(
name="flower.task.runtime.seconds",
unit="seconds",
description="The time it took to run the task.",
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@


_instruments = ("celery >= 4.0, < 6.0",)
_supports_metrics = True
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import threading
import time
from timeit import default_timer
from typing import Optional, Union

from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.sdk.metrics._internal.point import Metric
from opentelemetry.sdk.metrics.export import (
HistogramDataPoint,
NumberDataPoint,
)
from opentelemetry.test.test_base import TestBase

from .celery_test_tasks import app, task_add


class TestMetrics(TestBase):
def setUp(self):
super().setUp()
self._worker = app.Worker(
app=app, pool="solo", concurrency=1, hostname="celery@akochavi"
)
self._thread = threading.Thread(target=self._worker.start)
self._thread.daemon = True
self._thread.start()

def tearDown(self):
super().tearDown()
self._worker.stop()
self._thread.join()

def get_metrics(self):
result = task_add.delay(1, 2)

timeout = time.time() + 60 * 1 # 1 minutes from now
while not result.ready():
if time.time() > timeout:
break
time.sleep(0.05)
resource_metrics = (
self.memory_metrics_reader.get_metrics_data().resource_metrics
)

all_metrics = []
for metrics in resource_metrics:
for scope_metrics in metrics.scope_metrics:
all_metrics.extend(scope_metrics.metrics)

return all_metrics

def assert_metric_expected(
self,
metric: Metric,
expected_value: Union[int, float],
expected_attributes: dict = None,
est_delta: Optional[float] = None,
):
data_point = next(iter(metric.data.data_points))

if isinstance(data_point, HistogramDataPoint):
self.assertEqual(
data_point.count,
1,
)
if est_delta is None:
self.assertEqual(
data_point.sum,
expected_value,
)
else:
self.assertAlmostEqual(
data_point.sum,
expected_value,
delta=est_delta,
)
elif isinstance(data_point, NumberDataPoint):
self.assertEqual(
data_point.value,
expected_value,
)

if expected_attributes:
self.assertDictEqual(
expected_attributes,
dict(data_point.attributes),
)

def test_basic_metric(self):
CeleryInstrumentor().instrument()
start_time = default_timer()
task_runtime_estimated = (default_timer() - start_time) * 1000

metrics = self.get_metrics()
CeleryInstrumentor().uninstrument()
self.assertEqual(len(metrics), 1)

task_runtime = metrics[0]
print(task_runtime)
self.assertEqual(task_runtime.name, "flower.task.runtime.seconds")
self.assert_metric_expected(
task_runtime,
task_runtime_estimated,
{
"task": "tests.celery_test_tasks.task_add",
"worker": "celery@akochavi",
},
est_delta=200,
)

def test_metric_uninstrument(self):
CeleryInstrumentor().instrument()
metrics = self.get_metrics()
self.assertEqual(len(metrics), 1)
CeleryInstrumentor().uninstrument()

metrics = self.get_metrics()
self.assertEqual(len(metrics), 1)

for metric in metrics:
for point in list(metric.data.data_points):
self.assertEqual(point.count, 1)