|
23 | 23 | Union, |
24 | 24 | cast, |
25 | 25 | overload, |
| 26 | + Generator, |
26 | 27 | ) |
27 | 28 |
|
28 | 29 | import backoff |
29 | 30 | import httpx |
30 | 31 | from opentelemetry import ( |
| 32 | + baggage as otel_baggage_api, |
31 | 33 | trace as otel_trace_api, |
| 34 | + context as otel_context_api, |
32 | 35 | ) |
33 | 36 | from opentelemetry.sdk.trace import TracerProvider |
34 | 37 | from opentelemetry.sdk.trace.id_generator import RandomIdGenerator |
|
111 | 114 | TextPromptClient, |
112 | 115 | ) |
113 | 116 | from langfuse.types import MaskFunction, ScoreDataType, SpanLevel, TraceContext |
114 | | -from langfuse._client.context_propagation import LangfuseContextPropagationMixin |
115 | 117 |
|
116 | 118 |
|
117 | | -class Langfuse(LangfuseContextPropagationMixin): |
| 119 | +# Context key constants for Langfuse context propagation |
| 120 | +LANGFUSE_CTX_USER_ID = "langfuse.ctx.user.id" |
| 121 | +LANGFUSE_CTX_SESSION_ID = "langfuse.ctx.session.id" |
| 122 | +LANGFUSE_CTX_METADATA = "langfuse.ctx.metadata" |
| 123 | + |
| 124 | + |
| 125 | +class Langfuse: |
118 | 126 | """Main client for Langfuse tracing and platform features. |
119 | 127 |
|
120 | 128 | This class provides an interface for creating and managing traces, spans, |
@@ -355,6 +363,108 @@ def start_span( |
355 | 363 | status_message=status_message, |
356 | 364 | ) |
357 | 365 |
|
| 366 | + @_agnosticcontextmanager |
| 367 | + def with_attributes( |
| 368 | + self, |
| 369 | + session_id: Optional[str] = None, |
| 370 | + user_id: Optional[str] = None, |
| 371 | + metadata: Optional[dict[str, str]] = None, |
| 372 | + as_baggage: bool = False, |
| 373 | + ) -> Generator[None, None, None]: |
| 374 | + """Creates a context manager that propagates the given attributes to all spans created within the context. |
| 375 | +
|
| 376 | + Args: |
| 377 | + session_id (str): Session identifier. |
| 378 | + user_id (str): User identifier. |
| 379 | + metadata (dict): Additional metadata to associate with all spans in the context. Values must be strings and are truncated to 200 characters. |
| 380 | + as_baggage (bool, optional): If True, stores the values in OpenTelemetry baggage |
| 381 | + for cross-service propagation. If False, stores only in local context for |
| 382 | + current-service propagation. Defaults to False. |
| 383 | +
|
| 384 | + Returns: |
| 385 | + Context manager that sets values on all spans created within its scope. |
| 386 | +
|
| 387 | + Warning: |
| 388 | + When as_baggage=True, the values will be included in HTTP headers of any |
| 389 | + outbound requests made within this context. Only use this for non-sensitive |
| 390 | + identifiers that are safe to transmit across service boundaries. |
| 391 | +
|
| 392 | + Example: |
| 393 | + ```python |
| 394 | + # Local context only (default) |
| 395 | + with langfuse.with_attributes(session_id="session_123"): |
| 396 | + with langfuse.start_as_current_span(name="process-request") as span: |
| 397 | + # This span and all its children will have session_id="session_123" |
| 398 | + child_span = langfuse.start_span(name="child-operation") |
| 399 | +
|
| 400 | + # Cross-service propagation (use with caution) |
| 401 | + with langfuse.with_attributes(session_id="session_123", as_baggage=True): |
| 402 | + # session_id will be propagated to external service calls |
| 403 | + response = requests.get("https://api.example.com/data") |
| 404 | + ``` |
| 405 | + """ |
| 406 | + current_context = otel_context_api.get_current() |
| 407 | + current_span = otel_trace_api.get_current_span() |
| 408 | + |
| 409 | + # Process session_id |
| 410 | + if session_id is not None: |
| 411 | + current_context = otel_context_api.set_value( |
| 412 | + LANGFUSE_CTX_SESSION_ID, session_id, current_context |
| 413 | + ) |
| 414 | + if current_span is not None and current_span.is_recording(): |
| 415 | + current_span.set_attribute("session.id", session_id) |
| 416 | + if as_baggage: |
| 417 | + current_context = otel_baggage_api.set_baggage( |
| 418 | + "session.id", session_id, current_context |
| 419 | + ) |
| 420 | + |
| 421 | + # Process user_id |
| 422 | + if user_id is not None: |
| 423 | + current_context = otel_context_api.set_value( |
| 424 | + LANGFUSE_CTX_USER_ID, user_id, current_context |
| 425 | + ) |
| 426 | + if current_span is not None and current_span.is_recording(): |
| 427 | + current_span.set_attribute("user.id", user_id) |
| 428 | + if as_baggage: |
| 429 | + current_context = otel_baggage_api.set_baggage( |
| 430 | + "user.id", user_id, current_context |
| 431 | + ) |
| 432 | + |
| 433 | + # Process metadata |
| 434 | + if metadata is not None: |
| 435 | + # Truncate values with size > 200 to 200 characters and emit warning including the ky |
| 436 | + for k, v in metadata.items(): |
| 437 | + if not isinstance(v, str): |
| 438 | + # Ignore unreachable mypy warning as this runtime guard should make sense either way |
| 439 | + warnings.warn( # type: ignore[unreachable] |
| 440 | + f"Metadata values must be strings, got {type(v)} for key '{k}'" |
| 441 | + ) |
| 442 | + del metadata[k] |
| 443 | + if len(v) > 200: |
| 444 | + warnings.warn( |
| 445 | + f"Metadata value for key '{k}' exceeds 200 characters and will be truncated." |
| 446 | + ) |
| 447 | + metadata[k] = v[:200] |
| 448 | + |
| 449 | + current_context = otel_context_api.set_value( |
| 450 | + LANGFUSE_CTX_METADATA, metadata, current_context |
| 451 | + ) |
| 452 | + if current_span is not None and current_span.is_recording(): |
| 453 | + for k, v in metadata.items(): |
| 454 | + current_span.set_attribute(f"langfuse.metadata.{k}", v) |
| 455 | + if as_baggage: |
| 456 | + for k, v in metadata.items(): |
| 457 | + current_context = otel_baggage_api.set_baggage( |
| 458 | + f"langfuse.metadata.{k}", str(v), current_context |
| 459 | + ) |
| 460 | + |
| 461 | + # Activate context, execute, and detach context |
| 462 | + token = otel_context_api.attach(current_context) |
| 463 | + try: |
| 464 | + yield |
| 465 | + finally: |
| 466 | + otel_context_api.detach(token) |
| 467 | + |
358 | 468 | def start_as_current_span( |
359 | 469 | self, |
360 | 470 | *, |
@@ -1673,7 +1783,7 @@ def update_current_trace( |
1673 | 1783 | ``` |
1674 | 1784 | """ |
1675 | 1785 | warnings.warn( |
1676 | | - "update_current_trace is deprecated and will be removed in a future version. ", |
| 1786 | + "update_current_trace is deprecated and will be removed in a future version. Use `with langfuse.with_attributes(...)` instead. ", |
1677 | 1787 | DeprecationWarning, |
1678 | 1788 | stacklevel=2, |
1679 | 1789 | ) |
|
0 commit comments