1+ """Attribute propagation utilities for Langfuse OpenTelemetry integration.
2+
3+ This module provides the `propagate_attributes` context manager for setting trace-level
4+ attributes (user_id, session_id, metadata) that automatically propagate to all child spans
5+ within the context.
6+ """
7+
18from typing import Any , Dict , Generator , List , Literal , Optional , Union
29
310from opentelemetry import baggage
@@ -26,10 +33,123 @@ def propagate_attributes(
2633 metadata : Optional [Dict [str , str ]] = None ,
2734 as_baggage : bool = False ,
2835) -> Generator [Any , Any , Any ]:
36+ """Propagate trace-level attributes to all spans created within this context.
37+
38+ This context manager sets attributes on the currently active span AND automatically
39+ propagates them to all new child spans created within the context. This is the
40+ recommended way to set trace-level attributes like user_id, session_id, and metadata
41+ dimensions that should be consistently applied across all observations in a trace.
42+
43+ **IMPORTANT**: Call this as early as possible within your trace/workflow. Only the
44+ currently active span and spans created after entering this context will have these
45+ attributes. Pre-existing spans will NOT be retroactively updated.
46+
47+ **Why this matters**: Langfuse aggregation queries (e.g., total cost by user_id,
48+ filtering by session_id) only include observations that have the attribute set.
49+ If you call `propagate_attributes` late in your workflow, earlier spans won't be
50+ included in aggregations for that attribute.
51+
52+ Args:
53+ user_id: User identifier to associate with all spans in this context.
54+ Must be US-ASCII string, ≤200 characters. Use this to track which user
55+ generated each trace and enable e.g. per-user cost/performance analysis.
56+ session_id: Session identifier to associate with all spans in this context.
57+ Must be US-ASCII string, ≤200 characters. Use this to group related traces
58+ within a user session (e.g., a conversation thread, multi-turn interaction).
59+ metadata: Additional key-value metadata to propagate to all spans.
60+ - Keys and values must be US-ASCII strings
61+ - All values must be ≤200 characters
62+ - Use for dimensions like internal correlating identifiers
63+ - AVOID: large payloads, sensitive data, non-string values (will be dropped with warning)
64+ as_baggage: If True, propagates attributes using OpenTelemetry baggage for
65+ cross-process/service propagation. **Security warning**: When enabled,
66+ attribute values are added to HTTP headers on ALL outbound requests.
67+ Only enable if values are safe to transmit via HTTP headers and you need
68+ cross-service tracing. Default: False.
69+
70+ Returns:
71+ Context manager that propagates attributes to all child spans.
72+
73+ Example:
74+ Basic usage with user and session tracking:
75+
76+ ```python
77+ from langfuse import Langfuse
78+
79+ langfuse = Langfuse()
80+
81+ # Set attributes early in the trace
82+ with langfuse.start_as_current_span(name="user_workflow") as span:
83+ with langfuse.propagate_attributes(
84+ user_id="user_123",
85+ session_id="session_abc",
86+ metadata={"experiment": "variant_a", "environment": "production"}
87+ ):
88+ # All spans created here will have user_id, session_id, and metadata
89+ with langfuse.start_span(name="llm_call") as llm_span:
90+ # This span inherits: user_id, session_id, experiment, environment
91+ ...
92+
93+ with langfuse.start_generation(name="completion") as gen:
94+ # This span also inherits all attributes
95+ ...
96+ ```
97+
98+ Late propagation (anti-pattern):
99+
100+ ```python
101+ with langfuse.start_as_current_span(name="workflow") as span:
102+ # These spans WON'T have user_id
103+ early_span = langfuse.start_span(name="early_work")
104+ early_span.end()
105+
106+ # Set attributes in the middle
107+ with langfuse.propagate_attributes(user_id="user_123"):
108+ # Only spans created AFTER this point will have user_id
109+ late_span = langfuse.start_span(name="late_work")
110+ late_span.end()
111+
112+ # Result: Aggregations by user_id will miss "early_work" span
113+ ```
114+
115+ Cross-service propagation with baggage (advanced):
116+
117+ ```python
118+ # Service A - originating service
119+ with langfuse.start_as_current_span(name="api_request"):
120+ with langfuse.propagate_attributes(
121+ user_id="user_123",
122+ session_id="session_abc",
123+ as_baggage=True # Propagate via HTTP headers
124+ ):
125+ # Make HTTP request to Service B
126+ response = requests.get("https://service-b.example.com/api")
127+ # user_id and session_id are now in HTTP headers
128+
129+ # Service B - downstream service
130+ # OpenTelemetry will automatically extract baggage from HTTP headers
131+ # and propagate to spans in Service B
132+ ```
133+
134+ Note:
135+ - **Nesting**: Nesting `propagate_attributes` contexts is possible but
136+ discouraged. Inner contexts will overwrite outer values for the same keys.
137+ - **Migration**: This replaces the deprecated `update_trace()` and
138+ `update_current_trace()` methods, which only set attributes on a single span
139+ (causing aggregation gaps). Always use `propagate_attributes` for new code.
140+ - **Validation**: All attribute values (user_id, session_id, metadata values)
141+ must be strings ≤200 characters. Invalid values will be dropped with a
142+ warning logged. Ensure values meet constraints before calling.
143+ - **OpenTelemetry**: This uses OpenTelemetry context propagation under the hood,
144+ making it compatible with other OTel-instrumented libraries.
145+
146+ Raises:
147+ No exceptions are raised. Invalid values are logged as warnings and dropped.
148+ """
29149 context = otel_context_api .get_current ()
30150 current_span = otel_trace_api .get_current_span ()
31151
32- if user_id is not None :
152+ if user_id is not None and _validate_propagated_string ( user_id , "user_id" ) :
33153 context = _set_propagated_attribute (
34154 key = "user_id" ,
35155 value = user_id ,
@@ -38,7 +158,7 @@ def propagate_attributes(
38158 as_baggage = as_baggage ,
39159 )
40160
41- if session_id is not None :
161+ if session_id is not None and _validate_propagated_string ( session_id , "session_id" ) :
42162 context = _set_propagated_attribute (
43163 key = "session_id" ,
44164 value = session_id ,
@@ -48,13 +168,20 @@ def propagate_attributes(
48168 )
49169
50170 if metadata is not None :
51- context = _set_propagated_attribute (
52- key = "metadata" ,
53- value = _validate_propagated_metadata (metadata ),
54- context = context ,
55- span = current_span ,
56- as_baggage = as_baggage ,
57- )
171+ # Filter metadata to only include valid string values
172+ validated_metadata : Dict [str , str ] = {}
173+ for key , value in metadata .items ():
174+ if _validate_propagated_string (value , f"metadata.{ key } " ):
175+ validated_metadata [key ] = value
176+
177+ if validated_metadata :
178+ context = _set_propagated_attribute (
179+ key = "metadata" ,
180+ value = validated_metadata ,
181+ context = context ,
182+ span = current_span ,
183+ as_baggage = as_baggage ,
184+ )
58185
59186 # Activate context, execute, and detach context
60187 token = otel_context_api .attach (context = context )
@@ -87,6 +214,9 @@ def _get_propagated_attributes_from_context(
87214 context_key = _get_propagated_context_key (key )
88215 value = otel_context_api .get_value (key = context_key , context = context )
89216
217+ if value is None :
218+ continue
219+
90220 if isinstance (value , dict ):
91221 # Handle metadata
92222 for k , v in value .items ():
@@ -153,25 +283,29 @@ def _set_propagated_attribute(
153283 return context
154284
155285
156- def _validate_propagated_metadata ( metadata : Dict [ str , str ] ) -> Dict [ str , str ] :
157- validated_metadata : Dict [ str , str ] = {}
286+ def _validate_propagated_string ( value : str , attribute_name : str ) -> bool :
287+ """Validate a propagated attribute string value.
158288
159- for key , value in metadata .items ():
160- if not isinstance (value , str ):
161- langfuse_logger .warning ( # type: ignore
162- f"Propagated attribute value of '{ key } ' not a string. Dropping value."
163- )
164- continue
289+ Args:
290+ value: The string value to validate
291+ attribute_name: Name of the attribute for error messages
165292
166- if len (value ) > 200 :
167- langfuse_logger .warning (
168- f"Propagated attribute value of '{ key } ' is over 200 characters ({ len (value )} chars). Dropping value."
169- )
170- continue
293+ Returns:
294+ True if valid, False otherwise (with warning logged)
295+ """
296+ if not isinstance (value , str ):
297+ langfuse_logger .warning ( # type: ignore
298+ f"Propagated attribute '{ attribute_name } ' value is not a string. Dropping value."
299+ )
300+ return False
171301
172- validated_metadata [key ] = value
302+ if len (value ) > 200 :
303+ langfuse_logger .warning (
304+ f"Propagated attribute '{ attribute_name } ' value is over 200 characters ({ len (value )} chars). Dropping value."
305+ )
306+ return False
173307
174- return validated_metadata
308+ return True
175309
176310
177311def _get_propagated_context_key (key : PropagatedKeys ) -> str :
0 commit comments