|
16 | 16 | Any, |
17 | 17 | Callable, |
18 | 18 | Dict, |
| 19 | + Generator, |
19 | 20 | List, |
20 | 21 | Literal, |
21 | 22 | Optional, |
|
27 | 28 |
|
28 | 29 | import backoff |
29 | 30 | import httpx |
30 | | -from opentelemetry import trace |
31 | | -from opentelemetry import trace as otel_trace_api |
| 31 | +from opentelemetry import ( |
| 32 | + baggage as otel_baggage_api, |
| 33 | +) |
| 34 | +from opentelemetry import ( |
| 35 | + context as otel_context_api, |
| 36 | +) |
| 37 | +from opentelemetry import ( |
| 38 | + trace as otel_trace_api, |
| 39 | +) |
32 | 40 | from opentelemetry.sdk.trace import TracerProvider |
33 | 41 | from opentelemetry.sdk.trace.id_generator import RandomIdGenerator |
34 | 42 | from opentelemetry.util._decorator import ( |
|
39 | 47 |
|
40 | 48 | from langfuse._client.attributes import LangfuseOtelSpanAttributes |
41 | 49 | from langfuse._client.constants import ( |
| 50 | + LANGFUSE_CORRELATION_CONTEXT_KEY, |
42 | 51 | ObservationTypeGenerationLike, |
43 | 52 | ObservationTypeLiteral, |
44 | 53 | ObservationTypeLiteralNoEvent, |
|
69 | 78 | LangfuseSpan, |
70 | 79 | LangfuseTool, |
71 | 80 | ) |
72 | | -from langfuse._client.utils import run_async_safely |
| 81 | +from langfuse._client.utils import ( |
| 82 | + get_attribute_key_from_correlation_context, |
| 83 | + run_async_safely, |
| 84 | +) |
73 | 85 | from langfuse._utils import _get_timestamp |
74 | 86 | from langfuse._utils.parse_error import handle_fern_exception |
75 | 87 | from langfuse._utils.prompt_cache import PromptCache |
@@ -189,6 +201,7 @@ class Langfuse: |
189 | 201 | _resources: Optional[LangfuseResourceManager] = None |
190 | 202 | _mask: Optional[MaskFunction] = None |
191 | 203 | _otel_tracer: otel_trace_api.Tracer |
| 204 | + _host: str |
192 | 205 |
|
193 | 206 | def __init__( |
194 | 207 | self, |
@@ -348,6 +361,83 @@ def start_span( |
348 | 361 | status_message=status_message, |
349 | 362 | ) |
350 | 363 |
|
| 364 | + @_agnosticcontextmanager |
| 365 | + def correlation_context( |
| 366 | + self, |
| 367 | + correlation_context: Dict[str, str], |
| 368 | + *, |
| 369 | + as_baggage: bool = False, |
| 370 | + ) -> Generator[None, None, None]: |
| 371 | + """Create a context manager that propagates the given correlation_context to all spans within the context manager's scope. |
| 372 | +
|
| 373 | + Args: |
| 374 | + correlation_context (Dict[str, str]): Dictionary containing key-value pairs to be propagated |
| 375 | + to all spans within the context manager's scope. Common keys include user_id, session_id, |
| 376 | + and custom metadata. All values must be strings below 200 characters. |
| 377 | + as_baggage (bool, optional): If True, stores the values in OpenTelemetry baggage |
| 378 | + for cross-service propagation. If False, stores only in local context for |
| 379 | + current-service propagation. Defaults to False. |
| 380 | +
|
| 381 | + Returns: |
| 382 | + Context manager that sets values on all spans created within its scope. |
| 383 | +
|
| 384 | + Warning: |
| 385 | + When as_baggage=True, the values will be included in HTTP headers of any |
| 386 | + outbound requests made within this context. Only use this for non-sensitive |
| 387 | + identifiers that are safe to transmit across service boundaries. |
| 388 | +
|
| 389 | + Examples: |
| 390 | + ```python |
| 391 | + # Local context only (default) - pass context as dictionary |
| 392 | + with langfuse.correlation_context({"session_id": "session_123"}): |
| 393 | + with langfuse.start_as_current_span(name="process-request") as span: |
| 394 | + # This span and all its children will have session_id="session_123" |
| 395 | + child_span = langfuse.start_span(name="child-operation") |
| 396 | +
|
| 397 | + # Multiple values in context dictionary |
| 398 | + with langfuse.correlation_context({"user_id": "user_456", "experiment": "A"}): |
| 399 | + # All spans will have both user_id and experiment attributes |
| 400 | + span = langfuse.start_span(name="experiment-operation") |
| 401 | +
|
| 402 | + # Cross-service propagation (use with caution) |
| 403 | + with langfuse.correlation_context({"session_id": "session_123"}, as_baggage=True): |
| 404 | + # session_id will be propagated to external service calls |
| 405 | + response = requests.get("https://api.example.com/data") |
| 406 | + ``` |
| 407 | + """ |
| 408 | + current_context = otel_context_api.get_current() |
| 409 | + current_span = otel_trace_api.get_current_span() |
| 410 | + |
| 411 | + current_context = otel_context_api.set_value( |
| 412 | + LANGFUSE_CORRELATION_CONTEXT_KEY, correlation_context, current_context |
| 413 | + ) |
| 414 | + |
| 415 | + for key, value in correlation_context.items(): |
| 416 | + if len(value) > 200: |
| 417 | + langfuse_logger.warning( |
| 418 | + f"Correlation context key '{key}' is over 200 characters ({len(value)} chars). Dropping value." |
| 419 | + ) |
| 420 | + continue |
| 421 | + |
| 422 | + attribute_key = get_attribute_key_from_correlation_context(key) |
| 423 | + |
| 424 | + if current_span is not None and current_span.is_recording(): |
| 425 | + current_span.set_attribute(attribute_key, value) |
| 426 | + |
| 427 | + if as_baggage: |
| 428 | + current_context = otel_baggage_api.set_baggage( |
| 429 | + key, value, current_context |
| 430 | + ) |
| 431 | + |
| 432 | + # Activate context, execute, and detach context |
| 433 | + token = otel_context_api.attach(current_context) |
| 434 | + |
| 435 | + try: |
| 436 | + yield |
| 437 | + |
| 438 | + finally: |
| 439 | + otel_context_api.detach(token) |
| 440 | + |
351 | 441 | def start_as_current_span( |
352 | 442 | self, |
353 | 443 | *, |
@@ -1665,6 +1755,11 @@ def update_current_trace( |
1665 | 1755 | span.update(output=response) |
1666 | 1756 | ``` |
1667 | 1757 | """ |
| 1758 | + warnings.warn( |
| 1759 | + "update_current_trace is deprecated and will be removed in a future version. Use `with langfuse.correlation_context(...)` instead. ", |
| 1760 | + DeprecationWarning, |
| 1761 | + stacklevel=2, |
| 1762 | + ) |
1668 | 1763 | if not self._tracing_enabled: |
1669 | 1764 | langfuse_logger.debug( |
1670 | 1765 | "Operation skipped: update_current_trace - Tracing is disabled or client is in no-op mode." |
@@ -1809,7 +1904,7 @@ def _create_remote_parent_span( |
1809 | 1904 | is_remote=False, |
1810 | 1905 | ) |
1811 | 1906 |
|
1812 | | - return trace.NonRecordingSpan(span_context) |
| 1907 | + return otel_trace_api.NonRecordingSpan(span_context) |
1813 | 1908 |
|
1814 | 1909 | def _is_valid_trace_id(self, trace_id: str) -> bool: |
1815 | 1910 | pattern = r"^[0-9a-f]{32}$" |
|
0 commit comments