Skip to content

Commit 8b32cb1

Browse files
committed
refactor to propagate_attributes
1 parent d3521ef commit 8b32cb1

7 files changed

Lines changed: 225 additions & 161 deletions

File tree

langfuse/_client/client.py

Lines changed: 2 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
Any,
1717
Callable,
1818
Dict,
19-
Generator,
2019
List,
2120
Literal,
2221
Optional,
@@ -28,12 +27,6 @@
2827

2928
import backoff
3029
import httpx
31-
from opentelemetry import (
32-
baggage as otel_baggage_api,
33-
)
34-
from opentelemetry import (
35-
context as otel_context_api,
36-
)
3730
from opentelemetry import (
3831
trace as otel_trace_api,
3932
)
@@ -47,7 +40,6 @@
4740

4841
from langfuse._client.attributes import LangfuseOtelSpanAttributes
4942
from langfuse._client.constants import (
50-
LANGFUSE_CORRELATION_CONTEXT_KEY,
5143
ObservationTypeGenerationLike,
5244
ObservationTypeLiteral,
5345
ObservationTypeLiteralNoEvent,
@@ -78,10 +70,7 @@
7870
LangfuseSpan,
7971
LangfuseTool,
8072
)
81-
from langfuse._client.utils import (
82-
get_attribute_key_from_correlation_context,
83-
run_async_safely,
84-
)
73+
from langfuse._client.utils import run_async_safely
8574
from langfuse._utils import _get_timestamp
8675
from langfuse._utils.parse_error import handle_fern_exception
8776
from langfuse._utils.prompt_cache import PromptCache
@@ -361,83 +350,6 @@ def start_span(
361350
status_message=status_message,
362351
)
363352

364-
@_agnosticcontextmanager
365-
def correlation_context(
366-
self,
367-
correlation_context: Dict[str, str],
368-
*,
369-
as_baggage: bool = False,
370-
) -> Generator[None, None, None]:
371-
"""Create a context manager that propagates the given correlation_context to all spans within the context manager's scope.
372-
373-
Args:
374-
correlation_context (Dict[str, str]): Dictionary containing key-value pairs to be propagated
375-
to all spans within the context manager's scope. Common keys include user_id, session_id,
376-
and custom metadata. All values must be strings below 200 characters.
377-
as_baggage (bool, optional): If True, stores the values in OpenTelemetry baggage
378-
for cross-service propagation. If False, stores only in local context for
379-
current-service propagation. Defaults to False.
380-
381-
Returns:
382-
Context manager that sets values on all spans created within its scope.
383-
384-
Warning:
385-
When as_baggage=True, the values will be included in HTTP headers of any
386-
outbound requests made within this context. Only use this for non-sensitive
387-
identifiers that are safe to transmit across service boundaries.
388-
389-
Examples:
390-
```python
391-
# Local context only (default) - pass context as dictionary
392-
with langfuse.correlation_context({"session_id": "session_123"}):
393-
with langfuse.start_as_current_span(name="process-request") as span:
394-
# This span and all its children will have session_id="session_123"
395-
child_span = langfuse.start_span(name="child-operation")
396-
397-
# Multiple values in context dictionary
398-
with langfuse.correlation_context({"user_id": "user_456", "experiment": "A"}):
399-
# All spans will have both user_id and experiment attributes
400-
span = langfuse.start_span(name="experiment-operation")
401-
402-
# Cross-service propagation (use with caution)
403-
with langfuse.correlation_context({"session_id": "session_123"}, as_baggage=True):
404-
# session_id will be propagated to external service calls
405-
response = requests.get("https://api.example.com/data")
406-
```
407-
"""
408-
current_context = otel_context_api.get_current()
409-
current_span = otel_trace_api.get_current_span()
410-
411-
current_context = otel_context_api.set_value(
412-
LANGFUSE_CORRELATION_CONTEXT_KEY, correlation_context, current_context
413-
)
414-
415-
for key, value in correlation_context.items():
416-
if len(value) > 200:
417-
langfuse_logger.warning(
418-
f"Correlation context key '{key}' is over 200 characters ({len(value)} chars). Dropping value."
419-
)
420-
continue
421-
422-
attribute_key = get_attribute_key_from_correlation_context(key)
423-
424-
if current_span is not None and current_span.is_recording():
425-
current_span.set_attribute(attribute_key, value)
426-
427-
if as_baggage:
428-
current_context = otel_baggage_api.set_baggage(
429-
key, value, current_context
430-
)
431-
432-
# Activate context, execute, and detach context
433-
token = otel_context_api.attach(current_context)
434-
435-
try:
436-
yield
437-
438-
finally:
439-
otel_context_api.detach(token)
440-
441353
def start_as_current_span(
442354
self,
443355
*,
@@ -1756,7 +1668,7 @@ def update_current_trace(
17561668
```
17571669
"""
17581670
warnings.warn(
1759-
"update_current_trace is deprecated and will be removed in a future version. Use `with langfuse.correlation_context(...)` instead. ",
1671+
"update_current_trace is deprecated and will be removed in a future version. Use `with langfuse.propagate_attributes(...)` instead. ",
17601672
DeprecationWarning,
17611673
stacklevel=2,
17621674
)

langfuse/_client/constants.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99

1010
LANGFUSE_TRACER_NAME = "langfuse-sdk"
1111

12-
LANGFUSE_CORRELATION_CONTEXT_KEY = "langfuse.ctx.correlation"
13-
1412

1513
"""Note: this type is used with .__args__ / get_args in some cases and therefore must remain flat"""
1614
ObservationTypeGenerationLike: TypeAlias = Literal[

langfuse/_client/propagation.py

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
from typing import Any, Dict, Generator, List, Literal, Optional, Union
2+
3+
from opentelemetry import baggage
4+
from opentelemetry import (
5+
baggage as otel_baggage_api,
6+
)
7+
from opentelemetry import (
8+
context as otel_context_api,
9+
)
10+
from opentelemetry import (
11+
trace as otel_trace_api,
12+
)
13+
from opentelemetry.util._decorator import _agnosticcontextmanager
14+
15+
from langfuse._client.attributes import LangfuseOtelSpanAttributes
16+
from langfuse.logger import langfuse_logger
17+
18+
PropagatedKeys = Literal["user_id", "session_id", "metadata"]
19+
20+
21+
@_agnosticcontextmanager
22+
def propagate_attributes(
23+
*,
24+
user_id: Optional[str] = None,
25+
session_id: Optional[str] = None,
26+
metadata: Optional[Dict[str, str]] = None,
27+
as_baggage: bool = False,
28+
) -> Generator[Any, Any, Any]:
29+
context = otel_context_api.get_current()
30+
current_span = otel_trace_api.get_current_span()
31+
32+
if user_id is not None:
33+
context = _set_propagated_attribute(
34+
key="user_id",
35+
value=user_id,
36+
context=context,
37+
span=current_span,
38+
as_baggage=as_baggage,
39+
)
40+
41+
if session_id is not None:
42+
context = _set_propagated_attribute(
43+
key="session_id",
44+
value=session_id,
45+
context=context,
46+
span=current_span,
47+
as_baggage=as_baggage,
48+
)
49+
50+
if metadata is not None:
51+
context = _set_propagated_attribute(
52+
key="metadata",
53+
value=_validate_propagated_metadata(metadata),
54+
context=context,
55+
span=current_span,
56+
as_baggage=as_baggage,
57+
)
58+
59+
# Activate context, execute, and detach context
60+
token = otel_context_api.attach(context=context)
61+
62+
try:
63+
yield
64+
65+
finally:
66+
otel_context_api.detach(token)
67+
68+
69+
def _get_propagated_attributes_from_context(
70+
context: otel_context_api.Context,
71+
) -> Dict[str, str]:
72+
propagated_attributes: Dict[str, str] = {}
73+
74+
# Handle baggage
75+
baggage_entries = baggage.get_all(context=context)
76+
for baggage_key, baggage_value in baggage_entries.items():
77+
if baggage_key.startswith(LANGFUSE_BAGGAGE_PREFIX):
78+
span_key = _get_span_key_from_baggage_key(baggage_key)
79+
80+
if span_key:
81+
propagated_attributes[span_key] = str(baggage_value)
82+
83+
# Handle OTEL context
84+
propagated_keys: List[PropagatedKeys] = ["user_id", "session_id", "metadata"]
85+
86+
for key in propagated_keys:
87+
context_key = _get_propagated_context_key(key)
88+
value = otel_context_api.get_value(key=context_key, context=context)
89+
90+
if isinstance(value, dict):
91+
# Handle metadata
92+
for k, v in value.items():
93+
span_key = f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.{k}"
94+
propagated_attributes[span_key] = v
95+
96+
else:
97+
span_key = {
98+
"user_id": LangfuseOtelSpanAttributes.TRACE_USER_ID,
99+
"session_id": LangfuseOtelSpanAttributes.TRACE_SESSION_ID,
100+
}.get(key, f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.{key}")
101+
102+
propagated_attributes[span_key] = str(value)
103+
104+
return propagated_attributes
105+
106+
107+
def _set_propagated_attribute(
108+
*,
109+
key: PropagatedKeys,
110+
value: Union[str, Dict[str, str]],
111+
context: otel_context_api.Context,
112+
span: otel_trace_api.Span,
113+
as_baggage: bool,
114+
) -> otel_context_api.Context:
115+
# Get key names
116+
context_key = _get_propagated_context_key(key)
117+
span_key = _get_propagated_span_key(key)
118+
baggage_key = _get_propagated_baggage_key(key)
119+
120+
# Set in context
121+
context = otel_context_api.set_value(
122+
key=context_key,
123+
value=value,
124+
context=context,
125+
)
126+
127+
# Set on current span
128+
if span is not None and span.is_recording():
129+
if isinstance(value, dict):
130+
# Handle metadata
131+
for k, v in value.items():
132+
span.set_attribute(
133+
key=f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.{k}",
134+
value=v,
135+
)
136+
137+
else:
138+
span.set_attribute(key=span_key, value=value)
139+
140+
# Set on baggage
141+
if as_baggage:
142+
if isinstance(value, dict):
143+
# Handle metadata
144+
for k, v in value.items():
145+
context = otel_baggage_api.set_baggage(
146+
name=f"{baggage_key}_{k}", value=v, context=context
147+
)
148+
else:
149+
context = otel_baggage_api.set_baggage(
150+
name=baggage_key, value=value, context=context
151+
)
152+
153+
return context
154+
155+
156+
def _validate_propagated_metadata(metadata: Dict[str, str]) -> Dict[str, str]:
157+
validated_metadata: Dict[str, str] = {}
158+
159+
for key, value in metadata.items():
160+
if not isinstance(value, str):
161+
langfuse_logger.warning( # type: ignore
162+
f"Propagated attribute value of '{key}' not a string. Dropping value."
163+
)
164+
continue
165+
166+
if len(value) > 200:
167+
langfuse_logger.warning(
168+
f"Propagated attribute value of '{key}' is over 200 characters ({len(value)} chars). Dropping value."
169+
)
170+
continue
171+
172+
validated_metadata[key] = value
173+
174+
return validated_metadata
175+
176+
177+
def _get_propagated_context_key(key: PropagatedKeys) -> str:
178+
return f"langfuse.propagated.{key}"
179+
180+
181+
LANGFUSE_BAGGAGE_PREFIX = "langfuse_"
182+
183+
184+
def _get_propagated_baggage_key(key: PropagatedKeys) -> str:
185+
return f"{LANGFUSE_BAGGAGE_PREFIX}{key}"
186+
187+
188+
def _get_span_key_from_baggage_key(key: str) -> Optional[str]:
189+
if not key.startswith(LANGFUSE_BAGGAGE_PREFIX):
190+
return None
191+
192+
if "user_id" in key:
193+
return LangfuseOtelSpanAttributes.TRACE_USER_ID
194+
195+
if "session_id" in key:
196+
return LangfuseOtelSpanAttributes.TRACE_SESSION_ID
197+
198+
return f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.{key}"
199+
200+
201+
def _get_propagated_span_key(key: PropagatedKeys) -> str:
202+
return {
203+
"session_id": LangfuseOtelSpanAttributes.TRACE_SESSION_ID,
204+
"user_id": LangfuseOtelSpanAttributes.TRACE_USER_ID,
205+
}.get(key) or f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.{key}"

langfuse/_client/span.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,9 @@ def __init__(
190190
{k: v for k, v in attributes.items() if v is not None}
191191
)
192192
# Set OTEL span status if level is ERROR
193-
self._set_otel_span_status_if_error(level=level, status_message=status_message)
193+
self._set_otel_span_status_if_error(
194+
level=level, status_message=status_message
195+
)
194196

195197
def end(self, *, end_time: Optional[int] = None) -> "LangfuseObservationWrapper":
196198
"""End the span, marking it as completed.
@@ -237,7 +239,7 @@ def update_trace(
237239
public: Whether the trace should be publicly accessible
238240
"""
239241
warnings.warn(
240-
"update_trace is deprecated and will be removed in a future version. Use `with langfuse.correlation_context(...)` instead. ",
242+
"update_trace is deprecated and will be removed in a future version. Use `with langfuse.propagate_attributes(...)` instead. ",
241243
DeprecationWarning,
242244
stacklevel=2,
243245
)
@@ -549,7 +551,7 @@ def _process_media_in_attribute(
549551
return data
550552

551553
def _set_otel_span_status_if_error(
552-
self, *, level: Optional[SpanLevel] = None, status_message: Optional[str] = None
554+
self, *, level: Optional[SpanLevel] = None, status_message: Optional[str] = None
553555
) -> None:
554556
"""Set OpenTelemetry span status to ERROR if level is ERROR.
555557

0 commit comments

Comments
 (0)