Skip to content

Commit aec2a0b

Browse files
committed
fix: preserve score body aliases during ingestion
1 parent c5dc24d commit aec2a0b

3 files changed

Lines changed: 76 additions & 4 deletions

File tree

langfuse/_task_manager/score_ingestion_consumer.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from langfuse._utils.parse_error import handle_exception
1212
from langfuse._utils.request import APIError, LangfuseClient
1313
from langfuse._utils.serializer import EventSerializer
14+
from langfuse.api.core.pydantic_utilities import UniversalBaseModel
1415
from langfuse.logger import langfuse_logger as logger
1516

1617
from ..version import __version__ as langfuse_version
@@ -73,7 +74,10 @@ def _next(self) -> list:
7374

7475
# convert pydantic models to dicts
7576
if "body" in event and isinstance(event["body"], BaseModel):
76-
event["body"] = event["body"].model_dump(exclude_none=True)
77+
if isinstance(event["body"], UniversalBaseModel):
78+
event["body"] = event["body"].dict(exclude_none=True)
79+
else:
80+
event["body"] = event["body"].model_dump(exclude_none=True)
7781

7882
item_size = self._get_item_size(event)
7983

langfuse/_utils/serializer.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
from pydantic import BaseModel
1717

18+
from langfuse.api.core.pydantic_utilities import UniversalBaseModel
1819
from langfuse.media import LangfuseMedia
1920

2021
# Attempt to import Serializable
@@ -104,6 +105,9 @@ def default(self, obj: Any) -> Any:
104105
if isinstance(raw := getattr(obj, "raw", None), BaseModel):
105106
raw.model_rebuild()
106107

108+
if isinstance(obj, UniversalBaseModel):
109+
return obj.dict()
110+
107111
return obj.model_dump()
108112

109113
if isinstance(obj, Path):

tests/test_serializer.py

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44
from datetime import date, datetime, timezone
55
from enum import Enum
66
from pathlib import Path
7+
from queue import Queue
8+
from unittest.mock import Mock
79
from uuid import UUID
810

911
from pydantic import BaseModel
1012

11-
from langfuse._utils.serializer import (
12-
EventSerializer,
13-
)
13+
from langfuse._task_manager.score_ingestion_consumer import ScoreIngestionConsumer
14+
from langfuse._utils.serializer import EventSerializer
15+
from langfuse.api import ScoreBody
1416

1517

1618
class TestEnum(Enum):
@@ -69,6 +71,68 @@ def test_pydantic_model():
6971
assert json.loads(serializer.encode(model)) == {"field": "test"}
7072

7173

74+
def test_langfuse_model_uses_aliases():
75+
model = ScoreBody(
76+
id="score-1",
77+
trace_id="trace-1",
78+
session_id="session-1",
79+
observation_id="observation-1",
80+
dataset_run_id="dataset-run-1",
81+
name="rating",
82+
value=2,
83+
data_type="NUMERIC",
84+
config_id="config-1",
85+
)
86+
serializer = EventSerializer()
87+
88+
assert json.loads(serializer.encode(model)) == {
89+
"id": "score-1",
90+
"traceId": "trace-1",
91+
"sessionId": "session-1",
92+
"observationId": "observation-1",
93+
"datasetRunId": "dataset-run-1",
94+
"name": "rating",
95+
"value": 2,
96+
"dataType": "NUMERIC",
97+
"configId": "config-1",
98+
}
99+
100+
101+
def test_score_ingestion_consumer_uses_aliases_for_langfuse_models():
102+
ingestion_queue = Queue()
103+
consumer = ScoreIngestionConsumer(
104+
ingestion_queue=ingestion_queue,
105+
identifier=0,
106+
client=Mock(),
107+
public_key="pk-test",
108+
)
109+
110+
ingestion_queue.put(
111+
{
112+
"id": "event-1",
113+
"type": "score-create",
114+
"timestamp": "2026-03-25T16:10:45.793Z",
115+
"body": ScoreBody(
116+
id="score-1",
117+
session_id="session-1",
118+
name="rating",
119+
value=2,
120+
data_type="NUMERIC",
121+
),
122+
}
123+
)
124+
125+
batch = consumer._next()
126+
127+
assert batch[0]["body"] == {
128+
"id": "score-1",
129+
"sessionId": "session-1",
130+
"name": "rating",
131+
"value": 2,
132+
"dataType": "NUMERIC",
133+
}
134+
135+
72136
def test_path():
73137
path = Path("/tmp/test.txt")
74138
serializer = EventSerializer()

0 commit comments

Comments
 (0)