Skip to content
2 changes: 1 addition & 1 deletion ddtrace/contrib/integration_registry/registry.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,7 @@ integrations:
tested_versions_by_dependency:
rq:
min: 1.8.1
max: 1.16.2
max: 2.7.0

- integration_name: sanic
is_external_package: true
Expand Down
6 changes: 3 additions & 3 deletions ddtrace/contrib/internal/rq/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def traced_queue_enqueue_job(rq, pin, func, instance, args, kwargs):
COMPONENT: config.rq.integration_name,
SPAN_KIND: SpanKind.PRODUCER,
QUEUE_NAME: instance.name,
JOB_ID: job.get_id(),
JOB_ID: job.id,
JOB_FUNC_NAME: job.func_name,
},
) as ctx,
Expand Down Expand Up @@ -122,7 +122,7 @@ def traced_perform_job(rq, pin, func, instance, args, kwargs):
integration_config=config.rq_worker,
distributed_headers=job.meta,
activate_distributed_headers=True,
tags={COMPONENT: config.rq.integration_name, SPAN_KIND: SpanKind.CONSUMER, JOB_ID: job.get_id()},
tags={COMPONENT: config.rq.integration_name, SPAN_KIND: SpanKind.CONSUMER, JOB_ID: job.id},
) as ctx,
ctx.span,
):
Expand Down Expand Up @@ -154,7 +154,7 @@ def traced_job_perform(rq, pin, func, instance, args, kwargs):
span_name="rq.job.perform",
resource=job.func_name,
pin=pin,
tags={COMPONENT: config.rq.integration_name, JOB_ID: job.get_id()},
tags={COMPONENT: config.rq.integration_name, JOB_ID: job.id},
) as ctx,
ctx.span,
):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
fixes:
- |
rq: Fixes an ``AttributeError`` caused by ``Job.get_id()`` being removed in
`RQ 2.0 <https://github.com/rq/rq/releases/tag/v2.0>`_. The integration now
uses the ``job.id`` property, which has been available since RQ 1.8.1 and is
the canonical way to access a job's identifier across all supported versions.
1 change: 1 addition & 0 deletions riotfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -2147,6 +2147,7 @@ def select_pys(min_version: str = MIN_PYTHON_VERSION, max_version: str = MAX_PYT
"rq": [
"~=1.8.1",
"~=1.10.0",
"~=2.0.0", # first major version; removed Job.get_id() in favour of job.id property
latest,
],
# https://github.com/rq/rq/issues/1469 rq [1.0,1.8] is incompatible with click 8.0+
Expand Down
32 changes: 32 additions & 0 deletions tests/contrib/rq/test_rq.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,38 @@ def test_worker_class_job(queue):
worker.work(burst=True)


def test_job_id_tag_on_enqueue_span(sync_queue, test_spans):
"""Regression test: job.id property is correctly set on enqueue spans (RQ 2.x removed get_id())."""
job = sync_queue.enqueue(job_add1, 1)
spans = test_spans.pop()
enqueue_span = next(s for s in spans if s.name == "rq.queue.enqueue_job")
assert enqueue_span.get_tag("job.id") == job.id


def test_job_id_tag_on_worker_spans(queue, test_spans):
"""Regression test: job.id property is correctly set on worker and job.perform spans."""
job = queue.enqueue(job_add1, 1)
worker = rq.SimpleWorker([queue], connection=queue.connection)
worker.work(burst=True)
spans = test_spans.pop()
worker_span = next(s for s in spans if s.name == "rq.worker.perform_job")
job_span = next(s for s in spans if s.name == "rq.job.perform")
assert worker_span.get_tag("job.id") == job.id
assert job_span.get_tag("job.id") == job.id


def test_custom_job_id_in_span_tags(sync_queue, test_spans):
"""Verify a user-supplied job ID is propagated correctly to all span tags."""
custom_id = "my-custom-job-id"
job = sync_queue.enqueue(job_add1, 1, job_id=custom_id)
assert job.id == custom_id
spans = test_spans.pop()
tagged = [s for s in spans if s.get_tag("job.id") is not None]
assert tagged, "Expected at least one span with job.id tag"
for span in tagged:
assert span.get_tag("job.id") == custom_id


@pytest.mark.parametrize("distributed_tracing_enabled", [False, None])
@pytest.mark.parametrize("worker_service_name", [None, "custom-worker-service"])
def test_enqueue(queue, distributed_tracing_enabled, worker_service_name):
Expand Down
Loading