1414)
1515from sentry_sdk .integrations .celery .utils import _now_seconds_since_epoch
1616from sentry_sdk .integrations .logging import ignore_logger
17+ from sentry_sdk .traces import StreamedSpan
1718from sentry_sdk .tracing import BAGGAGE_HEADER_NAME , Span , TransactionSource
18- from sentry_sdk .tracing_utils import Baggage
19+ from sentry_sdk .tracing_utils import Baggage , has_span_streaming_enabled
1920from sentry_sdk .utils import (
2021 capture_internal_exceptions ,
21- ensure_integration_enabled ,
2222 event_from_exception ,
2323 reraise ,
2424)
@@ -162,7 +162,9 @@ def event_processor(event: "Event", hint: "Hint") -> "Optional[Event]":
162162
163163
164164def _update_celery_task_headers (
165- original_headers : "dict[str, Any]" , span : "Optional[Span]" , monitor_beat_tasks : bool
165+ original_headers : "dict[str, Any]" ,
166+ span : "Optional[Union[StreamedSpan, Span]]" ,
167+ monitor_beat_tasks : bool ,
166168) -> "dict[str, Any]" :
167169 """
168170 Updates the headers of the Celery task with the tracing information
@@ -255,7 +257,8 @@ def _wrap_task_run(f: "F") -> "F":
255257 def apply_async (* args : "Any" , ** kwargs : "Any" ) -> "Any" :
256258 # Note: kwargs can contain headers=None, so no setdefault!
257259 # Unsure which backend though.
258- integration = sentry_sdk .get_client ().get_integration (CeleryIntegration )
260+ client = sentry_sdk .get_client ()
261+ integration = client .get_integration (CeleryIntegration )
259262 if integration is None :
260263 return f (* args , ** kwargs )
261264
@@ -274,17 +277,28 @@ def apply_async(*args: "Any", **kwargs: "Any") -> "Any":
274277 else :
275278 task_name = "<unknown Celery task>"
276279
280+ span_streaming = has_span_streaming_enabled (client .options )
281+
277282 task_started_from_beat = sentry_sdk .get_isolation_scope ()._name == "celery-beat"
278283
279- span_mgr : "Union[Span, NoOpMgr]" = (
280- sentry_sdk .start_span (
281- op = OP .QUEUE_SUBMIT_CELERY ,
282- name = task_name ,
283- origin = CeleryIntegration .origin ,
284- )
285- if not task_started_from_beat
286- else NoOpMgr ()
287- )
284+ span_mgr : "Union[StreamedSpan, Span, NoOpMgr]" = NoOpMgr ()
285+ if span_streaming :
286+ if not task_started_from_beat and sentry_sdk .get_current_span () is not None :
287+ span_mgr = sentry_sdk .traces .start_span (
288+ name = task_name ,
289+ attributes = {
290+ "sentry.op" : OP .QUEUE_SUBMIT_CELERY ,
291+ "sentry.origin" : CeleryIntegration .origin ,
292+ },
293+ )
294+
295+ else :
296+ if not task_started_from_beat :
297+ span_mgr = sentry_sdk .start_span (
298+ op = OP .QUEUE_SUBMIT_CELERY ,
299+ name = task_name ,
300+ origin = CeleryIntegration .origin ,
301+ )
288302
289303 with span_mgr as span :
290304 kwargs ["headers" ] = _update_celery_task_headers (
@@ -303,50 +317,78 @@ def _wrap_tracer(task: "Any", f: "F") -> "F":
303317 # Also because in Celery 3, signal dispatch returns early if one handler
304318 # crashes.
305319 @wraps (f )
306- @ensure_integration_enabled (CeleryIntegration , f )
307320 def _inner (* args : "Any" , ** kwargs : "Any" ) -> "Any" :
321+ client = sentry_sdk .get_client ()
322+ if client .get_integration (CeleryIntegration ) is None :
323+ return f (* args , ** kwargs )
324+
325+ span_streaming = has_span_streaming_enabled (client .options )
326+
308327 with isolation_scope () as scope :
309328 scope ._name = "celery"
310329 scope .clear_breadcrumbs ()
311330 scope .add_event_processor (_make_event_processor (task , * args , ** kwargs ))
312331
313- transaction = None
332+ task_name = getattr ( task , "name" , "<unknown Celery task>" )
314333
315- # Celery task objects are not a thing to be trusted. Even
316- # something such as attribute access can fail.
334+ custom_sampling_context = {}
317335 with capture_internal_exceptions ():
318- headers = args [3 ].get ("headers" ) or {}
319- transaction = continue_trace (
320- headers ,
321- op = OP .QUEUE_TASK_CELERY ,
322- name = "unknown celery task" ,
323- source = TransactionSource .TASK ,
324- origin = CeleryIntegration .origin ,
325- )
326- transaction .name = task .name
327- transaction .set_status (SPANSTATUS .OK )
328-
329- if transaction is None :
330- return f (* args , ** kwargs )
331-
332- with sentry_sdk .start_transaction (
333- transaction ,
334- custom_sampling_context = {
336+ custom_sampling_context = {
335337 "celery_job" : {
336- "task" : task . name ,
338+ "task" : task_name ,
337339 # for some reason, args[1] is a list if non-empty but a
338340 # tuple if empty
339341 "args" : list (args [1 ]),
340342 "kwargs" : args [2 ],
341343 }
342- },
343- ):
344+ }
345+
346+ span : "Union[Span, StreamedSpan]"
347+ span_ctx : "Union[StreamedSpan, Span, NoOpMgr]" = NoOpMgr ()
348+
349+ # Celery task objects are not a thing to be trusted. Even
350+ # something such as attribute access can fail.
351+ with capture_internal_exceptions ():
352+ headers = args [3 ].get ("headers" ) or {}
353+ if span_streaming :
354+ sentry_sdk .traces .continue_trace (headers )
355+ scope .set_custom_sampling_context (custom_sampling_context )
356+ span = sentry_sdk .traces .start_span (
357+ name = task_name ,
358+ parent_span = None , # make this a segment
359+ attributes = {
360+ "sentry.origin" : CeleryIntegration .origin ,
361+ "sentry.span.source" : TransactionSource .TASK .value ,
362+ "sentry.op" : OP .QUEUE_TASK_CELERY ,
363+ },
364+ )
365+
366+ span_ctx = span
367+
368+ else :
369+ span = continue_trace (
370+ headers ,
371+ op = OP .QUEUE_TASK_CELERY ,
372+ name = task_name ,
373+ source = TransactionSource .TASK ,
374+ origin = CeleryIntegration .origin ,
375+ )
376+ span .set_status (SPANSTATUS .OK )
377+
378+ span_ctx = sentry_sdk .start_transaction (
379+ span ,
380+ custom_sampling_context = custom_sampling_context ,
381+ )
382+
383+ with span_ctx :
344384 return f (* args , ** kwargs )
345385
346386 return _inner # type: ignore
347387
348388
349- def _set_messaging_destination_name (task : "Any" , span : "Span" ) -> None :
389+ def _set_messaging_destination_name (
390+ task : "Any" , span : "Union[StreamedSpan, Span]"
391+ ) -> None :
350392 """Set "messaging.destination.name" tag for span"""
351393 with capture_internal_exceptions ():
352394 delivery_info = task .request .delivery_info
@@ -355,26 +397,47 @@ def _set_messaging_destination_name(task: "Any", span: "Span") -> None:
355397 if delivery_info .get ("exchange" ) == "" and routing_key is not None :
356398 # Empty exchange indicates the default exchange, meaning the tasks
357399 # are sent to the queue with the same name as the routing key.
358- span .set_data (SPANDATA .MESSAGING_DESTINATION_NAME , routing_key )
400+ if isinstance (span , StreamedSpan ):
401+ span .set_attribute (SPANDATA .MESSAGING_DESTINATION_NAME , routing_key )
402+ else :
403+ span .set_data (SPANDATA .MESSAGING_DESTINATION_NAME , routing_key )
359404
360405
361406def _wrap_task_call (task : "Any" , f : "F" ) -> "F" :
362407 # Need to wrap task call because the exception is caught before we get to
363408 # see it. Also celery's reported stacktrace is untrustworthy.
364409
365- # functools.wraps is important here because celery-once looks at this
366- # method's name. @ensure_integration_enabled internally calls functools.wraps,
367- # but if we ever remove the @ensure_integration_enabled decorator, we need
368- # to add @functools.wraps(f) here.
369- # https://github.com/getsentry/sentry-python/issues/421
370- @ensure_integration_enabled (CeleryIntegration , f )
410+ @wraps (f )
371411 def _inner (* args : "Any" , ** kwargs : "Any" ) -> "Any" :
412+ client = sentry_sdk .get_client ()
413+ if client .get_integration (CeleryIntegration ) is None :
414+ return f (* args , ** kwargs )
415+
416+ span_streaming = has_span_streaming_enabled (client .options )
417+
372418 try :
373- with sentry_sdk .start_span (
374- op = OP .QUEUE_PROCESS ,
375- name = task .name ,
376- origin = CeleryIntegration .origin ,
377- ) as span :
419+ span : "Union[Span, StreamedSpan]"
420+ if span_streaming :
421+ span = sentry_sdk .traces .start_span (
422+ name = task .name ,
423+ attributes = {
424+ "sentry.op" : OP .QUEUE_PROCESS ,
425+ "sentry.origin" : CeleryIntegration .origin ,
426+ },
427+ )
428+ else :
429+ span = sentry_sdk .start_span (
430+ op = OP .QUEUE_PROCESS ,
431+ name = task .name ,
432+ origin = CeleryIntegration .origin ,
433+ )
434+
435+ with span :
436+ if isinstance (span , StreamedSpan ):
437+ set_on_span = span .set_attribute
438+ else :
439+ set_on_span = span .set_data
440+
378441 _set_messaging_destination_name (task , span )
379442
380443 latency = None
@@ -389,19 +452,19 @@ def _inner(*args: "Any", **kwargs: "Any") -> "Any":
389452
390453 if latency is not None :
391454 latency *= 1000 # milliseconds
392- span . set_data (SPANDATA .MESSAGING_MESSAGE_RECEIVE_LATENCY , latency )
455+ set_on_span (SPANDATA .MESSAGING_MESSAGE_RECEIVE_LATENCY , latency )
393456
394457 with capture_internal_exceptions ():
395- span . set_data (SPANDATA .MESSAGING_MESSAGE_ID , task .request .id )
458+ set_on_span (SPANDATA .MESSAGING_MESSAGE_ID , task .request .id )
396459
397460 with capture_internal_exceptions ():
398- span . set_data (
461+ set_on_span (
399462 SPANDATA .MESSAGING_MESSAGE_RETRY_COUNT , task .request .retries
400463 )
401464
402465 with capture_internal_exceptions ():
403466 with task .app .connection () as conn :
404- span . set_data (
467+ set_on_span (
405468 SPANDATA .MESSAGING_SYSTEM ,
406469 conn .transport .driver_type ,
407470 )
@@ -476,8 +539,13 @@ def sentry_workloop(*args: "Any", **kwargs: "Any") -> "Any":
476539def _patch_producer_publish () -> None :
477540 original_publish = Producer .publish
478541
479- @ensure_integration_enabled (CeleryIntegration , original_publish )
480542 def sentry_publish (self : "Producer" , * args : "Any" , ** kwargs : "Any" ) -> "Any" :
543+ client = sentry_sdk .get_client ()
544+ if client .get_integration (CeleryIntegration ) is None :
545+ return original_publish (self , * args , ** kwargs )
546+
547+ span_streaming = has_span_streaming_enabled (client .options )
548+
481549 kwargs_headers = kwargs .get ("headers" , {})
482550 if not isinstance (kwargs_headers , Mapping ):
483551 # Ensure kwargs_headers is a Mapping, so we can safely call get().
@@ -487,31 +555,52 @@ def sentry_publish(self: "Producer", *args: "Any", **kwargs: "Any") -> "Any":
487555 # method will still work.
488556 kwargs_headers = {}
489557
490- task_name = kwargs_headers .get ("task" )
558+ task_name = kwargs_headers .get ("task" ) or "<unknown Celery task>"
491559 task_id = kwargs_headers .get ("id" )
492560 retries = kwargs_headers .get ("retries" )
493561
494562 routing_key = kwargs .get ("routing_key" )
495563 exchange = kwargs .get ("exchange" )
496564
497- with sentry_sdk .start_span (
498- op = OP .QUEUE_PUBLISH ,
499- name = task_name ,
500- origin = CeleryIntegration .origin ,
501- ) as span :
565+ span : "Union[StreamedSpan, Span, None]" = None
566+ if span_streaming :
567+ if sentry_sdk .get_current_span () is not None :
568+ span = sentry_sdk .traces .start_span (
569+ name = task_name ,
570+ attributes = {
571+ "sentry.op" : OP .QUEUE_PUBLISH ,
572+ "sentry.origin" : CeleryIntegration .origin ,
573+ },
574+ )
575+ else :
576+ span = sentry_sdk .start_span (
577+ op = OP .QUEUE_PUBLISH ,
578+ name = task_name ,
579+ origin = CeleryIntegration .origin ,
580+ )
581+
582+ if span is None :
583+ return original_publish (self , * args , ** kwargs )
584+
585+ with span :
586+ if isinstance (span , StreamedSpan ):
587+ set_on_span = span .set_attribute
588+ else :
589+ set_on_span = span .set_data
590+
502591 if task_id is not None :
503- span . set_data (SPANDATA .MESSAGING_MESSAGE_ID , task_id )
592+ set_on_span (SPANDATA .MESSAGING_MESSAGE_ID , task_id )
504593
505594 if exchange == "" and routing_key is not None :
506595 # Empty exchange indicates the default exchange, meaning messages are
507596 # routed to the queue with the same name as the routing key.
508- span . set_data (SPANDATA .MESSAGING_DESTINATION_NAME , routing_key )
597+ set_on_span (SPANDATA .MESSAGING_DESTINATION_NAME , routing_key )
509598
510599 if retries is not None :
511- span . set_data (SPANDATA .MESSAGING_MESSAGE_RETRY_COUNT , retries )
600+ set_on_span (SPANDATA .MESSAGING_MESSAGE_RETRY_COUNT , retries )
512601
513602 with capture_internal_exceptions ():
514- span . set_data (
603+ set_on_span (
515604 SPANDATA .MESSAGING_SYSTEM , self .connection .transport .driver_type
516605 )
517606
0 commit comments