Skip to content

Commit cf3ec39

Browse files
committed
push
1 parent ab92d40 commit cf3ec39

7 files changed

Lines changed: 109 additions & 155 deletions

File tree

langfuse/_client/attributes.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
ObservationTypeGenerationLike,
1919
ObservationTypeSpanLike,
2020
)
21-
2221
from langfuse._utils.serializer import EventSerializer
2322
from langfuse.model import PromptClient
2423
from langfuse.types import MapValue, SpanLevel

langfuse/_client/client.py

Lines changed: 38 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,7 @@
4747

4848
from langfuse._client.attributes import LangfuseOtelSpanAttributes
4949
from langfuse._client.constants import (
50-
LANGFUSE_CTX_METADATA,
51-
LANGFUSE_CTX_SESSION_ID,
52-
LANGFUSE_CTX_USER_ID,
50+
LANGFUSE_CORRELATION_CONTEXT_KEY,
5351
ObservationTypeGenerationLike,
5452
ObservationTypeLiteral,
5553
ObservationTypeLiteralNoEvent,
@@ -80,7 +78,10 @@
8078
LangfuseSpan,
8179
LangfuseTool,
8280
)
83-
from langfuse._client.utils import run_async_safely
81+
from langfuse._client.utils import (
82+
get_attribute_key_from_correlation_context,
83+
run_async_safely,
84+
)
8485
from langfuse._utils import _get_timestamp
8586
from langfuse._utils.parse_error import handle_fern_exception
8687
from langfuse._utils.prompt_cache import PromptCache
@@ -363,19 +364,18 @@ def start_span(
363364
)
364365

365366
@_agnosticcontextmanager
366-
def with_attributes(
367+
def correlation_context(
367368
self,
368-
session_id: Optional[str] = None,
369-
user_id: Optional[str] = None,
370-
metadata: Optional[dict[str, str]] = None,
369+
correlation_context: Dict[str, str],
370+
*,
371371
as_baggage: bool = False,
372372
) -> Generator[None, None, None]:
373-
"""Creates a context manager that propagates the given attributes to all spans created within the context.
373+
"""Create a context manager that propagates the given correlation_context to all spans within the context manager's scope.
374374
375375
Args:
376-
session_id (str): Session identifier.
377-
user_id (str): User identifier.
378-
metadata (dict): Additional metadata to associate with all spans in the context. Values must be strings and are truncated to 200 characters.
376+
correlation_context (Dict[str, str]): Dictionary containing key-value pairs to be propagated
377+
to all spans within the context manager's scope. Common keys include user_id, session_id,
378+
and custom metadata. All values must be strings below 200 characters.
379379
as_baggage (bool, optional): If True, stores the values in OpenTelemetry baggage
380380
for cross-service propagation. If False, stores only in local context for
381381
current-service propagation. Defaults to False.
@@ -388,79 +388,55 @@ def with_attributes(
388388
outbound requests made within this context. Only use this for non-sensitive
389389
identifiers that are safe to transmit across service boundaries.
390390
391-
Example:
391+
Examples:
392392
```python
393-
# Local context only (default)
394-
with langfuse.with_attributes(session_id="session_123"):
393+
# Local context only (default) - pass context as dictionary
394+
with langfuse.correlation_context({"session_id": "session_123"}):
395395
with langfuse.start_as_current_span(name="process-request") as span:
396396
# This span and all its children will have session_id="session_123"
397397
child_span = langfuse.start_span(name="child-operation")
398398
399+
# Multiple values in context dictionary
400+
with langfuse.correlation_context({"user_id": "user_456", "experiment": "A"}):
401+
# All spans will have both user_id and experiment attributes
402+
span = langfuse.start_span(name="experiment-operation")
403+
399404
# Cross-service propagation (use with caution)
400-
with langfuse.with_attributes(session_id="session_123", as_baggage=True):
405+
with langfuse.correlation_context({"session_id": "session_123"}, as_baggage=True):
401406
# session_id will be propagated to external service calls
402407
response = requests.get("https://api.example.com/data")
403408
```
404409
"""
405410
current_context = otel_context_api.get_current()
406411
current_span = otel_trace_api.get_current_span()
407412

408-
# Process session_id
409-
if session_id is not None:
410-
current_context = otel_context_api.set_value(
411-
LANGFUSE_CTX_SESSION_ID, session_id, current_context
412-
)
413-
if current_span is not None and current_span.is_recording():
414-
current_span.set_attribute("session.id", session_id)
415-
if as_baggage:
416-
current_context = otel_baggage_api.set_baggage(
417-
"session.id", session_id, current_context
418-
)
413+
current_context = otel_context_api.set_value(
414+
LANGFUSE_CORRELATION_CONTEXT_KEY, correlation_context, current_context
415+
)
419416

420-
# Process user_id
421-
if user_id is not None:
422-
current_context = otel_context_api.set_value(
423-
LANGFUSE_CTX_USER_ID, user_id, current_context
424-
)
425-
if current_span is not None and current_span.is_recording():
426-
current_span.set_attribute("user.id", user_id)
427-
if as_baggage:
428-
current_context = otel_baggage_api.set_baggage(
429-
"user.id", user_id, current_context
417+
for key, value in correlation_context.items():
418+
if len(value) > 200:
419+
langfuse_logger.warning(
420+
f"Correlation context key '{key}' is over 200 characters ({len(value)} chars). Dropping value."
430421
)
422+
continue
431423

432-
# Process metadata
433-
if metadata is not None:
434-
# Truncate values with size > 200 to 200 characters and emit warning including the ky
435-
for k, v in metadata.items():
436-
if not isinstance(v, str):
437-
# Ignore unreachable mypy warning as this runtime guard should make sense either way
438-
warnings.warn( # type: ignore[unreachable]
439-
f"Metadata values must be strings, got {type(v)} for key '{k}'"
440-
)
441-
del metadata[k]
442-
if len(v) > 200:
443-
warnings.warn(
444-
f"Metadata value for key '{k}' exceeds 200 characters and will be truncated."
445-
)
446-
metadata[k] = v[:200]
424+
attribute_key = get_attribute_key_from_correlation_context(key)
447425

448-
current_context = otel_context_api.set_value(
449-
LANGFUSE_CTX_METADATA, metadata, current_context
450-
)
451426
if current_span is not None and current_span.is_recording():
452-
for k, v in metadata.items():
453-
current_span.set_attribute(f"langfuse.metadata.{k}", v)
427+
current_span.set_attribute(attribute_key, value)
428+
454429
if as_baggage:
455-
for k, v in metadata.items():
456-
current_context = otel_baggage_api.set_baggage(
457-
f"langfuse.metadata.{k}", str(v), current_context
458-
)
430+
current_context = otel_baggage_api.set_baggage(
431+
key, value, current_context
432+
)
459433

460434
# Activate context, execute, and detach context
461435
token = otel_context_api.attach(current_context)
436+
462437
try:
463438
yield
439+
464440
finally:
465441
otel_context_api.detach(token)
466442

@@ -1782,7 +1758,7 @@ def update_current_trace(
17821758
```
17831759
"""
17841760
warnings.warn(
1785-
"update_current_trace is deprecated and will be removed in a future version. Use `with langfuse.with_attributes(...)` instead. ",
1761+
"update_current_trace is deprecated and will be removed in a future version. Use `with langfuse.correlation_context(...)` instead. ",
17861762
DeprecationWarning,
17871763
stacklevel=2,
17881764
)

langfuse/_client/constants.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,13 @@
33
This module defines constants used throughout the Langfuse OpenTelemetry integration.
44
"""
55

6-
from typing import Literal, List, get_args, Union, Any
6+
from typing import Any, List, Literal, Union, get_args
7+
78
from typing_extensions import TypeAlias
89

910
LANGFUSE_TRACER_NAME = "langfuse-sdk"
1011

11-
# Context key constants for Langfuse context propagation
12-
LANGFUSE_CTX_USER_ID = "langfuse.ctx.user.id"
13-
LANGFUSE_CTX_SESSION_ID = "langfuse.ctx.session.id"
14-
LANGFUSE_CTX_METADATA = "langfuse.ctx.metadata"
12+
LANGFUSE_CORRELATION_CONTEXT_KEY = "langfuse.ctx.correlation"
1513

1614

1715
"""Note: this type is used with .__args__ / get_args in some cases and therefore must remain flat"""

langfuse/_client/span.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ def update_trace(
234234
public: Whether the trace should be publicly accessible
235235
"""
236236
warnings.warn(
237-
"update_trace is deprecated and will be removed in a future version. Use `with langfuse.with_attributes(...)` instead. ",
237+
"update_trace is deprecated and will be removed in a future version. Use `with langfuse.correlation_context(...)` instead. ",
238238
DeprecationWarning,
239239
stacklevel=2,
240240
)

langfuse/_client/span_processor.py

Lines changed: 39 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -12,28 +12,32 @@
1212
"""
1313

1414
import base64
15-
import json
1615
import os
1716
from typing import Dict, List, Optional
1817

19-
from opentelemetry import baggage, context as context_api
18+
from opentelemetry import baggage
19+
from opentelemetry import context as context_api
2020
from opentelemetry.context import Context
2121
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
2222
from opentelemetry.sdk.trace import ReadableSpan, Span
2323
from opentelemetry.sdk.trace.export import BatchSpanProcessor
24+
from opentelemetry.trace import format_span_id
2425

26+
from langfuse._client.attributes import LangfuseOtelSpanAttributes
2527
from langfuse._client.constants import (
28+
LANGFUSE_CORRELATION_CONTEXT_KEY,
2629
LANGFUSE_TRACER_NAME,
27-
LANGFUSE_CTX_USER_ID,
28-
LANGFUSE_CTX_SESSION_ID,
29-
LANGFUSE_CTX_METADATA,
3030
)
3131
from langfuse._client.environment_variables import (
3232
LANGFUSE_FLUSH_AT,
3333
LANGFUSE_FLUSH_INTERVAL,
3434
LANGFUSE_OTEL_TRACES_EXPORT_PATH,
3535
)
36-
from langfuse._client.utils import span_formatter
36+
from langfuse._client.utils import (
37+
correlation_context_to_attribute_map,
38+
get_attribute_key_from_correlation_context,
39+
span_formatter,
40+
)
3741
from langfuse.logger import langfuse_logger
3842
from langfuse.version import __version__ as langfuse_version
3943

@@ -123,86 +127,46 @@ def __init__(
123127
)
124128

125129
def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None:
126-
"""Handle span start event and propagate context and baggage to span attributes.
127-
128-
This method is called when a span starts and applies context propagation:
129-
1. Propagates all baggage keys as span attributes
130-
2. Propagates langfuse.ctx.* context variables as span attributes
131-
3. Distributes langfuse.ctx.metadata keys as individual langfuse.metadata.* attributes
132-
133-
Args:
134-
span: The span that is starting
135-
parent_context: The context when the span was created (optional)
136-
"""
137-
# Get the current context (use parent_context if available, otherwise current)
130+
# Propagate correlation context to span
138131
current_context = parent_context or context_api.get_current()
139-
140-
# Dictionary to collect span attributes that were propagated
141132
propagated_attributes = {}
142133

143-
# 1. Propagate all baggage keys as span attributes
134+
# Propagate correlation context in baggage
144135
baggage_entries = baggage.get_all(context=current_context)
136+
145137
for key, value in baggage_entries.items():
146-
# Only propagate user.id, session.id and langfuse.metadata.* as those are set by us on the baggage
147-
if key.startswith("langfuse.metadata.") or key in [
148-
"user.id",
149-
"session.id",
150-
]:
138+
if (
139+
key.startswith(LangfuseOtelSpanAttributes.TRACE_METADATA)
140+
or key in correlation_context_to_attribute_map.values()
141+
):
151142
propagated_attributes[key] = value
152-
langfuse_logger.debug(
153-
f"Propagated baggage key '{key}' = '{value}' to span '{span.name}'"
154-
)
155-
156-
# 2. Propagate langfuse.ctx.* context variables
157-
langfuse_ctx_keys = [LANGFUSE_CTX_USER_ID, LANGFUSE_CTX_SESSION_ID]
158-
for ctx_key in langfuse_ctx_keys:
159-
try:
160-
value = context_api.get_value(ctx_key, context=current_context)
161-
if value is not None:
162-
# Convert context key to span attribute name (remove langfuse.ctx. prefix)
163-
attr_key = ctx_key.replace("langfuse.ctx.", "")
164-
propagated_attributes[attr_key] = value
165-
langfuse_logger.debug(
166-
f"Propagated context key '{ctx_key}' = '{value}' to span '{span.name}'"
167-
)
168-
except Exception as e:
169-
langfuse_logger.debug(f"Could not read context key '{ctx_key}': {e}")
170-
171-
# 3. Handle langfuse.ctx.metadata - distribute keys as individual attributes
172-
try:
173-
# Get metadata dict from context
174-
metadata_dict = context_api.get_value(
175-
LANGFUSE_CTX_METADATA, context=current_context
143+
144+
# Propagate correlation context in OTEL context
145+
correlation_context = (
146+
context_api.get_value(LANGFUSE_CORRELATION_CONTEXT_KEY, current_context)
147+
or {}
148+
)
149+
150+
if not isinstance(correlation_context, dict):
151+
langfuse_logger.error(
152+
f"Correlation context is not of type dict. Got type '{type(correlation_context)}'."
176153
)
177-
if metadata_dict is not None and isinstance(metadata_dict, dict):
178-
# Set each metadata key as a separate span attribute with langfuse.metadata. prefix
179-
for key, value in metadata_dict.items():
180-
attr_key = f"langfuse.metadata.{key}"
181-
182-
# Convert value to appropriate type for span attribute (naive or json stringify)
183-
attr_value = (
184-
value
185-
if isinstance(value, (str, int, float, bool))
186-
else json.dumps(value)
187-
)
188-
189-
propagated_attributes[attr_key] = attr_value
190-
langfuse_logger.debug(
191-
f"Propagated metadata key '{key}' = '{attr_value}' to span '{span.name}'"
192-
)
193-
except Exception as e:
194-
langfuse_logger.debug(f"Could not read metadata from context: {e}")
195-
196-
# Log summary of propagated attributes
154+
155+
return super().on_start(span, parent_context)
156+
157+
for key, value in correlation_context.items():
158+
attribute_key = get_attribute_key_from_correlation_context(key)
159+
propagated_attributes[attribute_key] = value
160+
161+
# Write attributes on span
197162
if propagated_attributes:
163+
for key, value in propagated_attributes.items():
164+
span.set_attribute(key, str(value))
165+
198166
langfuse_logger.debug(
199-
f"Propagated {len(propagated_attributes)} attributes to span '{span.name}': {list(propagated_attributes.keys())}"
167+
f"Propagated {len(propagated_attributes)} attributes to span '{format_span_id(span.context.span_id)}': {propagated_attributes}"
200168
)
201169

202-
# Set all propagated attributes on the span
203-
for key, value in propagated_attributes.items():
204-
span.set_attribute(key, value) # type: ignore[arg-type]
205-
206170
return super().on_start(span, parent_context)
207171

208172
def on_end(self, span: ReadableSpan) -> None:

langfuse/_client/utils.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
from opentelemetry.sdk import util
1414
from opentelemetry.sdk.trace import ReadableSpan
1515

16+
from langfuse._client.attributes import LangfuseOtelSpanAttributes
17+
1618

1719
def span_formatter(span: ReadableSpan) -> str:
1820
parent_id = (
@@ -125,3 +127,16 @@ async def my_async_function():
125127
else:
126128
# Loop exists but not running, safe to use asyncio.run()
127129
return asyncio.run(coro)
130+
131+
132+
correlation_context_to_attribute_map = {
133+
"session_id": LangfuseOtelSpanAttributes.TRACE_SESSION_ID,
134+
"user_id": LangfuseOtelSpanAttributes.TRACE_USER_ID,
135+
}
136+
137+
138+
def get_attribute_key_from_correlation_context(correlation_context_key: str) -> str:
139+
return (
140+
correlation_context_to_attribute_map.get(correlation_context_key)
141+
or f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.{correlation_context_key}"
142+
)

0 commit comments

Comments
 (0)