Skip to content

Commit aa4378f

Browse files
sentrivanaclaudesentry-warden[bot]
authored
feat(redis): Support streaming spans (#6083)
Support span streaming in the Redis integration(s). - [x] support creating streamed spans - [x] move data and tags to attributes - [x] make sure attributes are in semantic conventions - [x] add tests The following tags/extra are purposefully **left out** in the streaming path as they are non-standard and unused: - `redis.commands` - `redis.transaction` - `redis.is_cluster` #### Issues * Closes #6052 * Closes PY-2354 --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Co-authored-by: sentry-warden[bot] <258096371+sentry-warden[bot]@users.noreply.github.com>
1 parent 22fc6a6 commit aa4378f

12 files changed

Lines changed: 1582 additions & 663 deletions

File tree

sentry_sdk/integrations/redis/_async_common.py

Lines changed: 60 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,15 @@
1111
_set_pipeline_data,
1212
)
1313
from sentry_sdk.tracing import Span
14+
from sentry_sdk.tracing_utils import has_span_streaming_enabled
1415
from sentry_sdk.utils import capture_internal_exceptions
1516

1617
from typing import TYPE_CHECKING
1718

1819
if TYPE_CHECKING:
1920
from collections.abc import Callable
20-
from typing import Any, Union
21+
from typing import Any, Optional, Union
22+
from sentry_sdk.traces import StreamedSpan
2123
from redis.asyncio.client import Pipeline, StrictRedis
2224
from redis.asyncio.cluster import ClusterPipeline, RedisCluster
2325

@@ -26,21 +28,36 @@ def patch_redis_async_pipeline(
2628
pipeline_cls: "Union[type[Pipeline[Any]], type[ClusterPipeline[Any]]]",
2729
is_cluster: bool,
2830
get_command_args_fn: "Any",
29-
set_db_data_fn: "Callable[[Span, Any], None]",
31+
set_db_data_fn: "Callable[[Union[Span, StreamedSpan], Any], None]",
3032
) -> None:
3133
old_execute = pipeline_cls.execute
3234

3335
from sentry_sdk.integrations.redis import RedisIntegration
3436

3537
async def _sentry_execute(self: "Any", *args: "Any", **kwargs: "Any") -> "Any":
36-
if sentry_sdk.get_client().get_integration(RedisIntegration) is None:
38+
client = sentry_sdk.get_client()
39+
if client.get_integration(RedisIntegration) is None:
3740
return await old_execute(self, *args, **kwargs)
3841

39-
with sentry_sdk.start_span(
40-
op=OP.DB_REDIS,
41-
name="redis.pipeline.execute",
42-
origin=SPAN_ORIGIN,
43-
) as span:
42+
span_streaming = has_span_streaming_enabled(client.options)
43+
44+
span: "Union[Span, StreamedSpan]"
45+
if span_streaming:
46+
span = sentry_sdk.traces.start_span(
47+
name="redis.pipeline.execute",
48+
attributes={
49+
"sentry.origin": SPAN_ORIGIN,
50+
"sentry.op": OP.DB_REDIS,
51+
},
52+
)
53+
else:
54+
span = sentry_sdk.start_span(
55+
op=OP.DB_REDIS,
56+
name="redis.pipeline.execute",
57+
origin=SPAN_ORIGIN,
58+
)
59+
60+
with span:
4461
with capture_internal_exceptions():
4562
try:
4663
command_seq = self._execution_strategy._command_queue
@@ -67,7 +84,7 @@ async def _sentry_execute(self: "Any", *args: "Any", **kwargs: "Any") -> "Any":
6784
def patch_redis_async_client(
6885
cls: "Union[type[StrictRedis[Any]], type[RedisCluster[Any]]]",
6986
is_cluster: bool,
70-
set_db_data_fn: "Callable[[Span, Any], None]",
87+
set_db_data_fn: "Callable[[Union[Span, StreamedSpan], Any], None]",
7188
) -> None:
7289
old_execute_command = cls.execute_command
7390

@@ -76,33 +93,55 @@ def patch_redis_async_client(
7693
async def _sentry_execute_command(
7794
self: "Any", name: str, *args: "Any", **kwargs: "Any"
7895
) -> "Any":
79-
integration = sentry_sdk.get_client().get_integration(RedisIntegration)
96+
client = sentry_sdk.get_client()
97+
integration = client.get_integration(RedisIntegration)
8098
if integration is None:
8199
return await old_execute_command(self, name, *args, **kwargs)
82100

101+
span_streaming = has_span_streaming_enabled(client.options)
102+
83103
cache_properties = _compile_cache_span_properties(
84104
name,
85105
args,
86106
kwargs,
87107
integration,
88108
)
89109

90-
cache_span = None
110+
cache_span: "Optional[Union[Span, StreamedSpan]]" = None
91111
if cache_properties["is_cache_key"] and cache_properties["op"] is not None:
92-
cache_span = sentry_sdk.start_span(
93-
op=cache_properties["op"],
94-
name=cache_properties["description"],
95-
origin=SPAN_ORIGIN,
96-
)
112+
if span_streaming:
113+
cache_span = sentry_sdk.traces.start_span(
114+
name=cache_properties["description"],
115+
attributes={
116+
"sentry.op": cache_properties["op"],
117+
"sentry.origin": SPAN_ORIGIN,
118+
},
119+
)
120+
else:
121+
cache_span = sentry_sdk.start_span(
122+
op=cache_properties["op"],
123+
name=cache_properties["description"],
124+
origin=SPAN_ORIGIN,
125+
)
97126
cache_span.__enter__()
98127

99128
db_properties = _compile_db_span_properties(integration, name, args)
100129

101-
db_span = sentry_sdk.start_span(
102-
op=db_properties["op"],
103-
name=db_properties["description"],
104-
origin=SPAN_ORIGIN,
105-
)
130+
db_span: "Union[Span, StreamedSpan]"
131+
if span_streaming:
132+
db_span = sentry_sdk.traces.start_span(
133+
name=db_properties["description"],
134+
attributes={
135+
"sentry.op": db_properties["op"],
136+
"sentry.origin": SPAN_ORIGIN,
137+
},
138+
)
139+
else:
140+
db_span = sentry_sdk.start_span(
141+
op=db_properties["op"],
142+
name=db_properties["description"],
143+
origin=SPAN_ORIGIN,
144+
)
106145
db_span.__enter__()
107146

108147
set_db_data_fn(db_span, self)

sentry_sdk/integrations/redis/_sync_common.py

Lines changed: 62 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -11,34 +11,51 @@
1111
_set_pipeline_data,
1212
)
1313
from sentry_sdk.tracing import Span
14+
from sentry_sdk.tracing_utils import has_span_streaming_enabled
1415
from sentry_sdk.utils import capture_internal_exceptions
1516

1617
from typing import TYPE_CHECKING
1718

1819
if TYPE_CHECKING:
1920
from collections.abc import Callable
20-
from typing import Any
21+
from typing import Any, Optional, Union
22+
from sentry_sdk.traces import StreamedSpan
2123

2224

2325
def patch_redis_pipeline(
2426
pipeline_cls: "Any",
2527
is_cluster: bool,
2628
get_command_args_fn: "Any",
27-
set_db_data_fn: "Callable[[Span, Any], None]",
29+
set_db_data_fn: "Callable[[Union[Span, StreamedSpan], Any], None]",
2830
) -> None:
2931
old_execute = pipeline_cls.execute
3032

3133
from sentry_sdk.integrations.redis import RedisIntegration
3234

3335
def sentry_patched_execute(self: "Any", *args: "Any", **kwargs: "Any") -> "Any":
34-
if sentry_sdk.get_client().get_integration(RedisIntegration) is None:
36+
client = sentry_sdk.get_client()
37+
if client.get_integration(RedisIntegration) is None:
3538
return old_execute(self, *args, **kwargs)
3639

37-
with sentry_sdk.start_span(
38-
op=OP.DB_REDIS,
39-
name="redis.pipeline.execute",
40-
origin=SPAN_ORIGIN,
41-
) as span:
40+
span_streaming = has_span_streaming_enabled(client.options)
41+
42+
span: "Union[Span, StreamedSpan]"
43+
if span_streaming:
44+
span = sentry_sdk.traces.start_span(
45+
name="redis.pipeline.execute",
46+
attributes={
47+
"sentry.origin": SPAN_ORIGIN,
48+
"sentry.op": OP.DB_REDIS,
49+
},
50+
)
51+
else:
52+
span = sentry_sdk.start_span(
53+
op=OP.DB_REDIS,
54+
name="redis.pipeline.execute",
55+
origin=SPAN_ORIGIN,
56+
)
57+
58+
with span:
4259
with capture_internal_exceptions():
4360
command_seq = None
4461
try:
@@ -61,7 +78,9 @@ def sentry_patched_execute(self: "Any", *args: "Any", **kwargs: "Any") -> "Any":
6178

6279

6380
def patch_redis_client(
64-
cls: "Any", is_cluster: bool, set_db_data_fn: "Callable[[Span, Any], None]"
81+
cls: "Any",
82+
is_cluster: bool,
83+
set_db_data_fn: "Callable[[Union[Span, StreamedSpan], Any], None]",
6584
) -> None:
6685
"""
6786
This function can be used to instrument custom redis client classes or
@@ -74,33 +93,55 @@ def patch_redis_client(
7493
def sentry_patched_execute_command(
7594
self: "Any", name: str, *args: "Any", **kwargs: "Any"
7695
) -> "Any":
77-
integration = sentry_sdk.get_client().get_integration(RedisIntegration)
96+
client = sentry_sdk.get_client()
97+
integration = client.get_integration(RedisIntegration)
7898
if integration is None:
7999
return old_execute_command(self, name, *args, **kwargs)
80100

101+
span_streaming = has_span_streaming_enabled(client.options)
102+
81103
cache_properties = _compile_cache_span_properties(
82104
name,
83105
args,
84106
kwargs,
85107
integration,
86108
)
87109

88-
cache_span = None
110+
cache_span: "Optional[Union[Span, StreamedSpan]]" = None
89111
if cache_properties["is_cache_key"] and cache_properties["op"] is not None:
90-
cache_span = sentry_sdk.start_span(
91-
op=cache_properties["op"],
92-
name=cache_properties["description"],
93-
origin=SPAN_ORIGIN,
94-
)
112+
if span_streaming:
113+
cache_span = sentry_sdk.traces.start_span(
114+
name=cache_properties["description"],
115+
attributes={
116+
"sentry.op": cache_properties["op"],
117+
"sentry.origin": SPAN_ORIGIN,
118+
},
119+
)
120+
else:
121+
cache_span = sentry_sdk.start_span(
122+
op=cache_properties["op"],
123+
name=cache_properties["description"],
124+
origin=SPAN_ORIGIN,
125+
)
95126
cache_span.__enter__()
96127

97128
db_properties = _compile_db_span_properties(integration, name, args)
98129

99-
db_span = sentry_sdk.start_span(
100-
op=db_properties["op"],
101-
name=db_properties["description"],
102-
origin=SPAN_ORIGIN,
103-
)
130+
db_span: "Union[Span, StreamedSpan]"
131+
if span_streaming:
132+
db_span = sentry_sdk.traces.start_span(
133+
name=db_properties["description"],
134+
attributes={
135+
"sentry.op": db_properties["op"],
136+
"sentry.origin": SPAN_ORIGIN,
137+
},
138+
)
139+
else:
140+
db_span = sentry_sdk.start_span(
141+
op=db_properties["op"],
142+
name=db_properties["description"],
143+
origin=SPAN_ORIGIN,
144+
)
104145
db_span.__enter__()
105146

106147
set_db_data_fn(db_span, self)

sentry_sdk/integrations/redis/modules/caches.py

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from sentry_sdk.consts import OP, SPANDATA
66
from sentry_sdk.integrations.redis.utils import _get_safe_key, _key_as_string
7+
from sentry_sdk.traces import StreamedSpan
78
from sentry_sdk.utils import capture_internal_exceptions
89

910
GET_COMMANDS = ("get", "mget")
@@ -14,7 +15,7 @@
1415
if TYPE_CHECKING:
1516
from sentry_sdk.integrations.redis import RedisIntegration
1617
from sentry_sdk.tracing import Span
17-
from typing import Any, Optional
18+
from typing import Any, Optional, Union
1819

1920

2021
def _get_op(name: str) -> "Optional[str]":
@@ -80,25 +81,30 @@ def _get_cache_span_description(
8081

8182

8283
def _set_cache_data(
83-
span: "Span",
84+
span: "Union[Span, StreamedSpan]",
8485
redis_client: "Any",
8586
properties: "dict[str, Any]",
8687
return_value: "Optional[Any]",
8788
) -> None:
89+
if isinstance(span, StreamedSpan):
90+
set_on_span = span.set_attribute
91+
else:
92+
set_on_span = span.set_data
93+
8894
with capture_internal_exceptions():
89-
span.set_data(SPANDATA.CACHE_KEY, properties["key"])
95+
set_on_span(SPANDATA.CACHE_KEY, properties["key"])
9096

9197
if properties["redis_command"] in GET_COMMANDS:
9298
if return_value is not None:
93-
span.set_data(SPANDATA.CACHE_HIT, True)
99+
set_on_span(SPANDATA.CACHE_HIT, True)
94100
size = (
95101
len(str(return_value).encode("utf-8"))
96102
if not isinstance(return_value, bytes)
97103
else len(return_value)
98104
)
99-
span.set_data(SPANDATA.CACHE_ITEM_SIZE, size)
105+
set_on_span(SPANDATA.CACHE_ITEM_SIZE, size)
100106
else:
101-
span.set_data(SPANDATA.CACHE_HIT, False)
107+
set_on_span(SPANDATA.CACHE_HIT, False)
102108

103109
elif properties["redis_command"] in SET_COMMANDS:
104110
if properties["value"] is not None:
@@ -107,7 +113,7 @@ def _set_cache_data(
107113
if not isinstance(properties["value"], bytes)
108114
else len(properties["value"])
109115
)
110-
span.set_data(SPANDATA.CACHE_ITEM_SIZE, size)
116+
set_on_span(SPANDATA.CACHE_ITEM_SIZE, size)
111117

112118
try:
113119
connection_params = redis_client.connection_pool.connection_kwargs
@@ -122,8 +128,8 @@ def _set_cache_data(
122128

123129
host = connection_params.get("host")
124130
if host is not None:
125-
span.set_data(SPANDATA.NETWORK_PEER_ADDRESS, host)
131+
set_on_span(SPANDATA.NETWORK_PEER_ADDRESS, host)
126132

127133
port = connection_params.get("port")
128134
if port is not None:
129-
span.set_data(SPANDATA.NETWORK_PEER_PORT, port)
135+
set_on_span(SPANDATA.NETWORK_PEER_PORT, port)

0 commit comments

Comments
 (0)