1818from opentelemetry .instrumentation .celery import CeleryInstrumentor
1919from opentelemetry .semconv .trace import SpanAttributes
2020from opentelemetry .test .test_base import TestBase
21- from opentelemetry .trace import SpanKind
21+ from opentelemetry .trace import SpanKind , StatusCode
2222
23- from .celery_test_tasks import app , task_add
23+ from .celery_test_tasks import app , task_add , task_raises
2424
2525
2626class TestCeleryInstrumentation (TestBase ):
@@ -66,6 +66,10 @@ def test_task(self):
6666 },
6767 )
6868
69+ self .assertEqual (consumer .status .status_code , StatusCode .UNSET )
70+
71+ self .assertEqual (0 , len (consumer .events ))
72+
6973 self .assertEqual (
7074 producer .name , "apply_async/tests.celery_test_tasks.task_add"
7175 )
@@ -84,6 +88,69 @@ def test_task(self):
8488 self .assertEqual (consumer .parent .span_id , producer .context .span_id )
8589 self .assertEqual (consumer .context .trace_id , producer .context .trace_id )
8690
91+ def test_task_raises (self ):
92+ CeleryInstrumentor ().instrument ()
93+
94+ result = task_raises .delay ()
95+
96+ timeout = time .time () + 60 * 1 # 1 minutes from now
97+ while not result .ready ():
98+ if time .time () > timeout :
99+ break
100+ time .sleep (0.05 )
101+
102+ spans = self .sorted_spans (self .memory_exporter .get_finished_spans ())
103+ self .assertEqual (len (spans ), 2 )
104+
105+ consumer , producer = spans
106+
107+ self .assertEqual (consumer .name , "run/tests.celery_test_tasks.task_raises" )
108+ self .assertEqual (consumer .kind , SpanKind .CONSUMER )
109+ self .assertSpanHasAttributes (
110+ consumer ,
111+ {
112+ "celery.action" : "run" ,
113+ "celery.state" : "FAILURE" ,
114+ SpanAttributes .MESSAGING_DESTINATION : "celery" ,
115+ "celery.task_name" : "tests.celery_test_tasks.task_raises" ,
116+ },
117+ )
118+
119+ self .assertEqual (consumer .status .status_code , StatusCode .ERROR )
120+
121+ self .assertEqual (1 , len (consumer .events ))
122+ event = consumer .events [0 ]
123+
124+ self .assertIn (SpanAttributes .EXCEPTION_STACKTRACE , event .attributes )
125+
126+ self .assertEqual (
127+ event .attributes [SpanAttributes .EXCEPTION_TYPE ],
128+ "CustomError"
129+ )
130+
131+ self .assertEqual (
132+ event .attributes [SpanAttributes .EXCEPTION_MESSAGE ],
133+ "The task failed!"
134+ )
135+
136+ self .assertEqual (
137+ producer .name , "apply_async/tests.celery_test_tasks.task_raises"
138+ )
139+ self .assertEqual (producer .kind , SpanKind .PRODUCER )
140+ self .assertSpanHasAttributes (
141+ producer ,
142+ {
143+ "celery.action" : "apply_async" ,
144+ "celery.task_name" : "tests.celery_test_tasks.task_raises" ,
145+ SpanAttributes .MESSAGING_DESTINATION_KIND : "queue" ,
146+ SpanAttributes .MESSAGING_DESTINATION : "celery" ,
147+ },
148+ )
149+
150+ self .assertNotEqual (consumer .parent , producer .context )
151+ self .assertEqual (consumer .parent .span_id , producer .context .span_id )
152+ self .assertEqual (consumer .context .trace_id , producer .context .trace_id )
153+
87154 def test_uninstrument (self ):
88155 CeleryInstrumentor ().instrument ()
89156 CeleryInstrumentor ().uninstrument ()
0 commit comments