|
6 | 6 | import asyncio |
7 | 7 | import logging |
8 | 8 | import os |
9 | | -import json |
10 | 9 | import re |
11 | 10 | import urllib.parse |
12 | 11 | import warnings |
|
17 | 16 | Any, |
18 | 17 | Callable, |
19 | 18 | Dict, |
20 | | - Generator, |
21 | 19 | List, |
22 | 20 | Literal, |
23 | 21 | Optional, |
|
31 | 29 | import httpx |
32 | 30 | from opentelemetry import ( |
33 | 31 | trace as otel_trace_api, |
34 | | - baggage as otel_baggage_api, |
35 | | - context as otel_context_api, |
36 | 32 | ) |
37 | 33 | from opentelemetry.sdk.trace import TracerProvider |
38 | 34 | from opentelemetry.sdk.trace.id_generator import RandomIdGenerator |
|
115 | 111 | TextPromptClient, |
116 | 112 | ) |
117 | 113 | from langfuse.types import MaskFunction, ScoreDataType, SpanLevel, TraceContext |
| 114 | +from langfuse._client.context_propagation import LangfuseContextPropagationMixin |
118 | 115 |
|
119 | | -# Context key constants for Langfuse context propagation |
120 | | -LANGFUSE_CTX_USER_ID = "langfuse.ctx.user.id" |
121 | | -LANGFUSE_CTX_SESSION_ID = "langfuse.ctx.session.id" |
122 | | -LANGFUSE_CTX_METADATA = "langfuse.ctx.metadata" |
123 | 116 |
|
124 | | - |
125 | | -class Langfuse: |
| 117 | +class Langfuse(LangfuseContextPropagationMixin): |
126 | 118 | """Main client for Langfuse tracing and platform features. |
127 | 119 |
|
128 | 120 | This class provides an interface for creating and managing traces, spans, |
@@ -3468,206 +3460,3 @@ def clear_prompt_cache(self) -> None: |
3468 | 3460 | """ |
3469 | 3461 | if self._resources is not None: |
3470 | 3462 | self._resources.prompt_cache.clear() |
3471 | | - |
3472 | | - @_agnosticcontextmanager |
3473 | | - def session( |
3474 | | - self, id: str, *, as_baggage: bool = False |
3475 | | - ) -> Generator[None, None, None]: |
3476 | | - """Create a session context manager that propagates session_id to all child spans. |
3477 | | -
|
3478 | | - Args: |
3479 | | - id (str): The session identifier to propagate to child spans. |
3480 | | - as_baggage (bool, optional): If True, stores the session_id in OpenTelemetry baggage |
3481 | | - for cross-service propagation. If False, stores only in local context for |
3482 | | - current-service propagation. Defaults to False. |
3483 | | -
|
3484 | | - Returns: |
3485 | | - Context manager that sets session_id on all spans created within its scope. |
3486 | | -
|
3487 | | - Warning: |
3488 | | - When as_baggage=True, the session_id will be included in HTTP headers of any |
3489 | | - outbound requests made within this context. Only use this for non-sensitive |
3490 | | - identifiers that are safe to transmit across service boundaries. |
3491 | | -
|
3492 | | - Example: |
3493 | | - ```python |
3494 | | - # Local context only (default) |
3495 | | - with langfuse.session(id="session_123"): |
3496 | | - with langfuse.start_as_current_span(name="process-request") as span: |
3497 | | - # This span and all its children will have session_id="session_123" |
3498 | | - child_span = langfuse.start_span(name="child-operation") |
3499 | | -
|
3500 | | - # Cross-service propagation (use with caution) |
3501 | | - with langfuse.session(id="session_123", as_baggage=True): |
3502 | | - # session_id will be propagated to external service calls |
3503 | | - response = requests.get("https://api.example.com/data") |
3504 | | - ``` |
3505 | | - """ |
3506 | | - # Set context variable |
3507 | | - new_context = otel_context_api.set_value(LANGFUSE_CTX_SESSION_ID, id) |
3508 | | - token = otel_context_api.attach(new_context) |
3509 | | - |
3510 | | - # Set attribute on currently active span if exists |
3511 | | - current_span = otel_trace_api.get_current_span() |
3512 | | - if current_span is not None and current_span.is_recording(): |
3513 | | - current_span.set_attribute("session.id", id) |
3514 | | - |
3515 | | - # Set baggage if requested |
3516 | | - baggage_token = None |
3517 | | - if as_baggage: |
3518 | | - new_baggage = otel_baggage_api.set_baggage("session.id", id) |
3519 | | - baggage_token = otel_context_api.attach(new_baggage) |
3520 | | - |
3521 | | - try: |
3522 | | - yield |
3523 | | - finally: |
3524 | | - # Always detach context token |
3525 | | - otel_context_api.detach(token) |
3526 | | - |
3527 | | - # Detach baggage token if it was set |
3528 | | - if baggage_token is not None: |
3529 | | - otel_context_api.detach(baggage_token) |
3530 | | - |
3531 | | - @_agnosticcontextmanager |
3532 | | - def user(self, id: str, *, as_baggage: bool = False) -> Generator[None, None, None]: |
3533 | | - """Create a user context manager that propagates user_id to all child spans. |
3534 | | -
|
3535 | | - Args: |
3536 | | - id (str): The user identifier to propagate to child spans. |
3537 | | - as_baggage (bool, optional): If True, stores the user_id in OpenTelemetry baggage |
3538 | | - for cross-service propagation. If False, stores only in local context for |
3539 | | - current-service propagation. Defaults to False. |
3540 | | -
|
3541 | | - Returns: |
3542 | | - Context manager that sets user_id on all spans created within its scope. |
3543 | | -
|
3544 | | - Warning: |
3545 | | - When as_baggage=True, the user_id will be included in HTTP headers of any |
3546 | | - outbound requests made within this context. This may leak sensitive user |
3547 | | - information to external services. Use with extreme caution. |
3548 | | -
|
3549 | | - Example: |
3550 | | - ```python |
3551 | | - # Local context only (default, recommended for user IDs) |
3552 | | - with langfuse.user(id="user_456"): |
3553 | | - with langfuse.start_as_current_span(name="user-action") as span: |
3554 | | - # This span and all its children will have user_id="user_456" |
3555 | | - pass |
3556 | | -
|
3557 | | - # Cross-service propagation (NOT recommended for sensitive user IDs) |
3558 | | - with langfuse.user(id="public_user_456", as_baggage=True): |
3559 | | - # user_id will be propagated to external service calls |
3560 | | - response = requests.get("https://api.example.com/data") |
3561 | | - ``` |
3562 | | - """ |
3563 | | - # Set context variable |
3564 | | - new_context = otel_context_api.set_value(LANGFUSE_CTX_USER_ID, id) |
3565 | | - token = otel_context_api.attach(new_context) |
3566 | | - |
3567 | | - # Set attribute on currently active span if exists |
3568 | | - current_span = otel_trace_api.get_current_span() |
3569 | | - if current_span is not None and current_span.is_recording(): |
3570 | | - current_span.set_attribute("user.id", id) |
3571 | | - |
3572 | | - # Set baggage if requested |
3573 | | - baggage_token = None |
3574 | | - if as_baggage: |
3575 | | - new_baggage = otel_baggage_api.set_baggage("user.id", id) |
3576 | | - baggage_token = otel_context_api.attach(new_baggage) |
3577 | | - |
3578 | | - try: |
3579 | | - yield |
3580 | | - finally: |
3581 | | - # Always detach context token |
3582 | | - otel_context_api.detach(token) |
3583 | | - |
3584 | | - # Detach baggage token if it was set |
3585 | | - if baggage_token is not None: |
3586 | | - otel_context_api.detach(baggage_token) |
3587 | | - |
3588 | | - @_agnosticcontextmanager |
3589 | | - def metadata( |
3590 | | - self, *, as_baggage: bool = False, **kwargs: Any |
3591 | | - ) -> Generator[None, None, None]: |
3592 | | - """Create a metadata context manager that propagates metadata to all child spans. |
3593 | | -
|
3594 | | - Args: |
3595 | | - as_baggage (bool, optional): If True, stores the metadata in OpenTelemetry baggage |
3596 | | - for cross-service propagation. If False, stores only in local context for |
3597 | | - current-service propagation. Defaults to False. |
3598 | | - **kwargs: Metadata key-value pairs. Values should not exceed 200 characters. |
3599 | | -
|
3600 | | - Returns: |
3601 | | - Context manager that sets metadata on all spans created within its scope. |
3602 | | -
|
3603 | | - Warning: |
3604 | | - When as_baggage=True, all metadata key-value pairs will be included in HTTP |
3605 | | - headers of any outbound requests made within this context. Ensure no sensitive |
3606 | | - information is included in the metadata when using cross-service propagation. |
3607 | | -
|
3608 | | - Example: |
3609 | | - ```python |
3610 | | - # Local context only (default) |
3611 | | - with langfuse.metadata(experiment="A/B", version="1.2.3"): |
3612 | | - with langfuse.start_as_current_span(name="experiment-run") as span: |
3613 | | - # This span and all its children will have the metadata |
3614 | | - pass |
3615 | | -
|
3616 | | - # Cross-service propagation (use with caution) |
3617 | | - with langfuse.metadata(as_baggage=True, experiment="A/B", service="api"): |
3618 | | - # metadata will be propagated to external service calls |
3619 | | - response = requests.get("https://api.example.com/data") |
3620 | | - ``` |
3621 | | - """ |
3622 | | - if not kwargs: |
3623 | | - # No metadata to set, just yield |
3624 | | - yield |
3625 | | - return |
3626 | | - |
3627 | | - # Store metadata as a dict in context (not JSON string) |
3628 | | - # This allows span_processor to distribute keys as individual attributes |
3629 | | - new_context = otel_context_api.set_value(LANGFUSE_CTX_METADATA, kwargs) |
3630 | | - token = otel_context_api.attach(new_context) |
3631 | | - |
3632 | | - # Set attributes on currently active span if exists |
3633 | | - current_span = otel_trace_api.get_current_span() |
3634 | | - if current_span is not None and current_span.is_recording(): |
3635 | | - for key, value in kwargs.items(): |
3636 | | - attr_key = f"langfuse.metadata.{key}" |
3637 | | - # Convert value to appropriate type for span attribute |
3638 | | - if isinstance(value, (str, int, float, bool)): |
3639 | | - attr_value = value |
3640 | | - else: |
3641 | | - # For complex types, convert to JSON string |
3642 | | - attr_value = json.dumps(value) |
3643 | | - current_span.set_attribute(attr_key, attr_value) |
3644 | | - |
3645 | | - # Set baggage if requested |
3646 | | - baggage_tokens = [] |
3647 | | - if as_baggage: |
3648 | | - # Start with None context and chain baggage settings |
3649 | | - new_baggage = None |
3650 | | - |
3651 | | - # Add each metadata key-value pair to baggage |
3652 | | - for key, value in kwargs.items(): |
3653 | | - # Convert value to string and truncate if needed for baggage |
3654 | | - str_value = str(value) |
3655 | | - baggage_key = f"langfuse.metadata.{key}" |
3656 | | - new_baggage = otel_baggage_api.set_baggage( |
3657 | | - baggage_key, str_value, new_baggage |
3658 | | - ) |
3659 | | - |
3660 | | - # Attach the new baggage context |
3661 | | - if new_baggage is not None: |
3662 | | - baggage_token = otel_context_api.attach(new_baggage) |
3663 | | - baggage_tokens.append(baggage_token) |
3664 | | - |
3665 | | - try: |
3666 | | - yield |
3667 | | - finally: |
3668 | | - # Always detach context token |
3669 | | - otel_context_api.detach(token) |
3670 | | - |
3671 | | - # Detach all baggage tokens if they were set |
3672 | | - for baggage_token in baggage_tokens: |
3673 | | - otel_context_api.detach(baggage_token) |
0 commit comments