Skip to content

Commit 9244e14

Browse files
authored
fix(experiments): flatten propagated metadata (#1641)
1 parent d5ce2d2 commit 9244e14

5 files changed

Lines changed: 224 additions & 50 deletions

File tree

langfuse/_client/attributes.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,32 @@ def _serialize(obj: Any) -> Optional[str]:
164164
return json.dumps(obj, cls=EventSerializer)
165165

166166

167+
def _flatten_and_serialize_metadata_values(
168+
metadata: Optional[Dict[str, Any]],
169+
) -> Optional[Dict[str, str]]:
170+
if metadata is None:
171+
return None
172+
173+
flattened_metadata: Dict[str, str] = {}
174+
175+
def flatten_value(path: str, value: Any) -> None:
176+
if isinstance(value, dict):
177+
for nested_key, nested_value in value.items():
178+
flatten_value(f"{path}.{nested_key}", nested_value)
179+
180+
return
181+
182+
serialized_value = _serialize(value)
183+
184+
if serialized_value is not None:
185+
flattened_metadata[path] = serialized_value
186+
187+
for key, value in metadata.items():
188+
flatten_value(str(key), value)
189+
190+
return flattened_metadata
191+
192+
167193
def _flatten_and_serialize_metadata(
168194
metadata: Any, type: Literal["observation", "trace"]
169195
) -> dict:

langfuse/_client/client.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,11 @@
3838
from packaging.version import Version
3939
from typing_extensions import deprecated
4040

41-
from langfuse._client.attributes import LangfuseOtelSpanAttributes, _serialize
41+
from langfuse._client.attributes import (
42+
LangfuseOtelSpanAttributes,
43+
_flatten_and_serialize_metadata_values,
44+
_serialize,
45+
)
4246
from langfuse._client.constants import (
4347
LANGFUSE_SDK_EXPERIMENT_ENVIRONMENT,
4448
ObservationTypeGenerationLike,
@@ -2791,10 +2795,14 @@ async def _process_experiment_item(
27912795
propagated_experiment_attributes = PropagatedExperimentAttributes(
27922796
experiment_id=experiment_id,
27932797
experiment_name=experiment_run_name,
2794-
experiment_metadata=_serialize(experiment_metadata),
2798+
experiment_metadata=_flatten_and_serialize_metadata_values(
2799+
experiment_metadata
2800+
),
27952801
experiment_dataset_id=dataset_id,
27962802
experiment_item_id=experiment_item_id,
2797-
experiment_item_metadata=_serialize(item_metadata),
2803+
experiment_item_metadata=_flatten_and_serialize_metadata_values(
2804+
item_metadata if isinstance(item_metadata, dict) else None
2805+
),
27982806
experiment_item_root_observation_id=span.id,
27992807
)
28002808

langfuse/_client/propagation.py

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,10 @@
6868
class PropagatedExperimentAttributes(TypedDict):
6969
experiment_id: str
7070
experiment_name: str
71-
experiment_metadata: Optional[str]
71+
experiment_metadata: Optional[Dict[str, str]]
7272
experiment_dataset_id: Optional[str]
7373
experiment_item_id: str
74-
experiment_item_metadata: Optional[str]
74+
experiment_item_metadata: Optional[Dict[str, str]]
7575
experiment_item_root_observation_id: str
7676

7777

@@ -247,9 +247,20 @@ def _propagate_attributes(
247247
"trace_name": trace_name,
248248
}
249249

250-
propagated_string_attributes = propagated_string_attributes | (
251-
cast(Dict[str, Union[str, List[str], None]], experiment) or {}
252-
)
250+
propagated_metadata_attributes: Dict[str, Optional[Dict[str, str]]] = {
251+
"metadata": metadata,
252+
}
253+
254+
if experiment:
255+
for key, value in experiment.items():
256+
if key in ("experiment_metadata", "experiment_item_metadata"):
257+
propagated_metadata_attributes[key] = cast(
258+
Optional[Dict[str, str]], value
259+
)
260+
else:
261+
propagated_string_attributes[key] = cast(
262+
Optional[Union[str, List[str]]], value
263+
)
253264

254265
# Filter out None values
255266
propagated_string_attributes = {
@@ -268,16 +279,19 @@ def _propagate_attributes(
268279
as_baggage=as_baggage,
269280
)
270281

271-
if metadata is not None:
282+
for metadata_key, metadata_value in propagated_metadata_attributes.items():
283+
if metadata_value is None:
284+
continue
285+
272286
validated_metadata: Dict[str, str] = {}
273287

274-
for key, value in metadata.items():
275-
if _validate_string_value(value=value, key=f"metadata.{key}"):
288+
for key, value in metadata_value.items():
289+
if _validate_string_value(value=value, key=f"{metadata_key}.{key}"):
276290
validated_metadata[key] = value
277291

278292
if validated_metadata:
279293
context = _set_propagated_attribute(
280-
key="metadata",
294+
key=metadata_key,
281295
value=validated_metadata,
282296
context=context,
283297
span=current_span,
@@ -322,9 +336,10 @@ def _get_propagated_attributes_from_context(
322336

323337
if isinstance(value, dict):
324338
# Handle metadata
339+
span_key = _get_propagated_span_key(key)
340+
325341
for k, v in value.items():
326-
span_key = f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.{k}"
327-
propagated_attributes[span_key] = v
342+
propagated_attributes[f"{span_key}.{k}"] = v
328343

329344
else:
330345
span_key = _get_propagated_span_key(key)
@@ -387,7 +402,7 @@ def _set_propagated_attribute(
387402
# Handle metadata
388403
for k, v in value.items():
389404
span.set_attribute(
390-
key=f"{LangfuseOtelSpanAttributes.TRACE_METADATA}.{k}",
405+
key=f"{span_key}.{k}",
391406
value=v,
392407
)
393408

@@ -469,10 +484,14 @@ def _get_span_key_from_baggage_key(key: str) -> Optional[str]:
469484
# Remove prefix to get the actual key name
470485
suffix = key[len(LANGFUSE_BAGGAGE_PREFIX) :]
471486

472-
if suffix.startswith("metadata_"):
473-
metadata_key = suffix[len("metadata_") :]
487+
for metadata_key in ("metadata", "experiment_metadata", "experiment_item_metadata"):
488+
baggage_metadata_prefix = f"{metadata_key}_"
474489

475-
return _get_propagated_span_key(metadata_key)
490+
if suffix.startswith(baggage_metadata_prefix):
491+
return (
492+
f"{_get_propagated_span_key(metadata_key)}."
493+
f"{suffix[len(baggage_metadata_prefix) :]}"
494+
)
476495

477496
return _get_propagated_span_key(suffix)
478497

@@ -484,6 +503,7 @@ def _get_propagated_span_key(key: str) -> str:
484503
"version": LangfuseOtelSpanAttributes.VERSION,
485504
"tags": LangfuseOtelSpanAttributes.TRACE_TAGS,
486505
"trace_name": LangfuseOtelSpanAttributes.TRACE_NAME,
506+
"metadata": LangfuseOtelSpanAttributes.TRACE_METADATA,
487507
"experiment_id": LangfuseOtelSpanAttributes.EXPERIMENT_ID,
488508
"experiment_name": LangfuseOtelSpanAttributes.EXPERIMENT_NAME,
489509
"experiment_metadata": LangfuseOtelSpanAttributes.EXPERIMENT_METADATA,

tests/e2e/test_experiments.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44
from typing import Any, Dict, List
55

66
import pytest
7+
from opentelemetry import trace as otel_trace_api
78

89
from langfuse import get_client
10+
from langfuse._client.attributes import LangfuseOtelSpanAttributes
911
from langfuse.experiment import (
1012
Evaluation,
1113
ExperimentData,
@@ -135,6 +137,63 @@ def test_run_experiment_on_local_dataset(sample_dataset):
135137
)
136138

137139

140+
def test_run_experiment_flattens_large_metadata_for_server_ingestion():
141+
"""Server ingestion handles flattened experiment metadata on non-SDK child spans."""
142+
langfuse_client = get_client()
143+
external_tracer = otel_trace_api.get_tracer("ai.langfuse-python.e2e")
144+
external_span_name = "external-experiment-metadata-child-" + create_uuid()[:8]
145+
146+
experiment_metadata = {
147+
"mode": "offline",
148+
"job_name": "agent-eval/PR-4",
149+
"build_url": "https://example.com/job/agent-eval-example/job/PR-4",
150+
"agent_name": "agent-eval-example",
151+
}
152+
153+
def task_with_external_child(*, item: ExperimentItem, **kwargs: Dict[str, Any]):
154+
with external_tracer.start_as_current_span(external_span_name) as span:
155+
span.set_attribute("gen_ai.operation.name", "experiment-metadata-e2e")
156+
157+
return "processed"
158+
159+
result = langfuse_client.run_experiment(
160+
name="Flattened Experiment Metadata " + create_uuid()[:8],
161+
data=[{"input": "test input", "expected_output": "processed"}],
162+
task=task_with_external_child,
163+
metadata=experiment_metadata,
164+
)
165+
166+
langfuse_client.flush()
167+
168+
trace_id = result.item_results[0].trace_id
169+
assert trace_id is not None
170+
171+
trace = wait_for_trace(
172+
trace_id,
173+
is_result_ready=lambda fetched_trace: any(
174+
observation.name == external_span_name
175+
for observation in fetched_trace.observations
176+
),
177+
)
178+
179+
assert trace.metadata is not None
180+
for metadata_key, metadata_value in experiment_metadata.items():
181+
assert trace.metadata[metadata_key] == metadata_value
182+
183+
external_observation = next(
184+
observation
185+
for observation in trace.observations
186+
if observation.name == external_span_name
187+
)
188+
external_metadata = external_observation.metadata or {}
189+
190+
assert not any(
191+
key == LangfuseOtelSpanAttributes.EXPERIMENT_METADATA
192+
or key.startswith(f"{LangfuseOtelSpanAttributes.EXPERIMENT_METADATA}.")
193+
for key in external_metadata
194+
)
195+
196+
138197
def test_run_experiment_on_langfuse_dataset():
139198
"""Test running experiment on Langfuse dataset."""
140199
langfuse_client = get_client()

0 commit comments

Comments
 (0)