11"""Batch evaluation functionality for Langfuse.
22
33This module provides comprehensive batch evaluation capabilities for running evaluations
4- on traces, observations, and sessions fetched from Langfuse. It includes type definitions,
4+ on traces and observations fetched from Langfuse. It includes type definitions,
55protocols, result classes, and the implementation for large-scale evaluation workflows
66with error handling, retry logic, and resume capability.
77"""
2424
2525from langfuse .api .resources .commons .types import (
2626 ObservationsView ,
27- SessionWithTraces ,
2827 TraceWithFullDetails ,
2928)
3029from langfuse .experiment import Evaluation , EvaluatorFunction
@@ -39,7 +38,7 @@ class EvaluatorInputs:
3938 """Input data structure for evaluators, returned by mapper functions.
4039
4140 This class provides a strongly-typed container for transforming API response
42- objects (traces, observations, sessions ) into the standardized format expected
41+ objects (traces, observations) into the standardized format expected
4342 by evaluator functions. It ensures consistent access to input, output, expected
4443 output, and metadata regardless of the source entity type.
4544
@@ -89,24 +88,6 @@ def observation_mapper(observation):
8988 }
9089 )
9190 ```
92-
93- Mapper for sessions aggregating trace data:
94- ```python
95- def session_mapper(session):
96- # Aggregate data from all traces in the session
97- all_outputs = [trace.output for trace in session.traces if trace.output]
98- combined_output = " ".join(all_outputs)
99-
100- return EvaluatorInputs(
101- input=session.traces[0].input if session.traces else None,
102- output=combined_output,
103- expected_output=None,
104- metadata={
105- "session_id": session.id,
106- "trace_count": len(session.traces),
107- "user_id": session.user_id
108- }
109- )
11091 ```
11192
11293 Note:
@@ -141,13 +122,13 @@ def __init__(
141122class MapperFunction (Protocol ):
142123 """Protocol defining the interface for mapper functions in batch evaluation.
143124
144- Mapper functions transform API response objects (traces, observations, or sessions )
125+ Mapper functions transform API response objects (traces or observations )
145126 into the standardized EvaluatorInputs format that evaluators expect. This abstraction
146127 allows you to define how to extract and structure evaluation data from different
147128 entity types.
148129
149130 Mapper functions must:
150- - Accept a single item parameter (trace, observation, or session object )
131+ - Accept a single item parameter (trace, observation)
151132 - Return an EvaluatorInputs instance with input, output, expected_output, metadata
152133 - Can be either synchronous or asynchronous
153134 - Should handle missing or malformed data gracefully
@@ -156,7 +137,7 @@ class MapperFunction(Protocol):
156137 def __call__ (
157138 self ,
158139 * ,
159- item : Union ["TraceWithFullDetails" , "ObservationsView" , "SessionWithTraces" ],
140+ item : Union ["TraceWithFullDetails" , "ObservationsView" ],
160141 ** kwargs : Dict [str , Any ],
161142 ) -> Union [EvaluatorInputs , Awaitable [EvaluatorInputs ]]:
162143 """Transform an API response object into evaluator inputs.
@@ -169,7 +150,6 @@ def __call__(
169150 item: The API response object to transform. The type depends on the scope:
170151 - TraceWithFullDetails: When evaluating traces
171152 - ObservationsView: When evaluating observations
172- - SessionWithTraces: When evaluating sessions
173153
174154 Returns:
175155 EvaluatorInputs: A structured container with:
@@ -226,24 +206,6 @@ async def map_trace_async(trace):
226206 metadata={"trace_id": trace.id}
227207 )
228208 ```
229-
230- Session mapper aggregating multiple traces:
231- ```python
232- def map_session(session):
233- # Combine data from all traces in session
234- inputs = [t.input for t in session.traces if t.input]
235- outputs = [t.output for t in session.traces if t.output]
236-
237- return EvaluatorInputs(
238- input=inputs,
239- output=outputs,
240- expected_output=None,
241- metadata={
242- "session_id": session.id,
243- "trace_count": len(session.traces)
244- }
245- )
246- ```
247209 """
248210 ...
249211
@@ -265,7 +227,7 @@ class CompositeEvaluatorFunction(Protocol):
265227 def __call__ (
266228 self ,
267229 * ,
268- item : Union ["TraceWithFullDetails" , "ObservationsView" , "SessionWithTraces" ],
230+ item : Union ["TraceWithFullDetails" , "ObservationsView" ],
269231 evaluations : List [Evaluation ],
270232 ** kwargs : Dict [str , Any ],
271233 ) -> Union [
@@ -491,7 +453,7 @@ class BatchEvaluationResumeToken:
491453 dataset changed between runs.
492454
493455 Attributes:
494- scope: The type of items being evaluated ("traces", "observations", "sessions" ).
456+ scope: The type of items being evaluated ("traces", "observations").
495457 filter: The original JSON filter string used to query items.
496458 last_processed_timestamp: ISO 8601 timestamp of the last successfully processed item.
497459 Used to construct a filter that only fetches items after this timestamp.
@@ -588,7 +550,7 @@ def __init__(
588550 """Initialize BatchEvaluationResumeToken with the provided state.
589551
590552 Args:
591- scope: The scope type ("traces", "observations", "sessions" ).
553+ scope: The scope type ("traces", "observations").
592554 filter: The original JSON filter string.
593555 last_processed_timestamp: ISO 8601 timestamp of last processed item.
594556 last_processed_id: ID of last processed item.
@@ -888,7 +850,7 @@ async def run_async(
888850 and tracking statistics.
889851
890852 Args:
891- scope: The type of items to evaluate ("traces", "observations", "sessions" ).
853+ scope: The type of items to evaluate ("traces", "observations").
892854 mapper: Function to transform API response items to evaluator inputs.
893855 evaluators: List of evaluation functions to run on each item.
894856 filter: JSON filter string for querying items.
@@ -1017,7 +979,7 @@ async def run_async(
1017979
1018980 # Process items concurrently
1019981 async def process_item (
1020- item : Union [TraceWithFullDetails , ObservationsView , SessionWithTraces ],
982+ item : Union [TraceWithFullDetails , ObservationsView ],
1021983 ) -> Tuple [str , Union [Tuple [int , int , int ], Exception ]]:
1022984 """Process a single item and return (item_id, result)."""
1023985 async with semaphore :
@@ -1133,11 +1095,11 @@ async def _fetch_batch_with_retry(
11331095 page : int ,
11341096 limit : int ,
11351097 max_retries : int ,
1136- ) -> List [Union [TraceWithFullDetails , ObservationsView , SessionWithTraces ]]:
1098+ ) -> List [Union [TraceWithFullDetails , ObservationsView ]]:
11371099 """Fetch a batch of items with retry logic.
11381100
11391101 Args:
1140- scope: The type of items ("traces", "observations", "sessions" ).
1102+ scope: The type of items ("traces", "observations").
11411103 filter: JSON filter string for querying.
11421104 page: Page number (1-indexed).
11431105 limit: Number of items per page.
@@ -1166,20 +1128,13 @@ async def _fetch_batch_with_retry(
11661128 request_options = {"max_retries" : max_retries },
11671129 ) # type: ignore
11681130 return list (response .data ) # type: ignore
1169- elif scope == "sessions" :
1170- response = self .client .api .sessions .list (
1171- page = page ,
1172- limit = limit ,
1173- request_options = {"max_retries" : max_retries },
1174- ) # type: ignore
1175- return list (response .data ) # type: ignore
11761131 else :
11771132 error_message = f"Invalid scope: { scope } "
11781133 raise ValueError (error_message )
11791134
11801135 async def _process_batch_evaluation_item (
11811136 self ,
1182- item : Union [TraceWithFullDetails , ObservationsView , SessionWithTraces ],
1137+ item : Union [TraceWithFullDetails , ObservationsView ],
11831138 scope : str ,
11841139 mapper : MapperFunction ,
11851140 evaluators : List [EvaluatorFunction ],
@@ -1191,7 +1146,7 @@ async def _process_batch_evaluation_item(
11911146
11921147 Args:
11931148 item: The API response object to evaluate.
1194- scope: The type of item ("traces", "observations", "sessions" ).
1149+ scope: The type of item ("traces", "observations").
11951150 mapper: Function to transform item to evaluator inputs.
11961151 evaluators: List of evaluator functions.
11971152 composite_evaluator: Optional composite evaluator function.
@@ -1312,7 +1267,7 @@ async def _run_evaluator_internal(
13121267 async def _run_mapper (
13131268 self ,
13141269 mapper : MapperFunction ,
1315- item : Union [TraceWithFullDetails , ObservationsView , SessionWithTraces ],
1270+ item : Union [TraceWithFullDetails , ObservationsView ],
13161271 ) -> EvaluatorInputs :
13171272 """Run mapper function (handles both sync and async mappers).
13181273
@@ -1334,7 +1289,7 @@ async def _run_mapper(
13341289 async def _run_composite_evaluator (
13351290 self ,
13361291 composite_evaluator : CompositeEvaluatorFunction ,
1337- item : Union [TraceWithFullDetails , ObservationsView , SessionWithTraces ],
1292+ item : Union [TraceWithFullDetails , ObservationsView ],
13381293 evaluations : List [Evaluation ],
13391294 ) -> List [Evaluation ]:
13401295 """Run composite evaluator function (handles both sync and async).
@@ -1372,7 +1327,7 @@ def _create_score_for_scope(
13721327 """Create a score linked to the appropriate entity based on scope.
13731328
13741329 Args:
1375- scope: The type of entity ("traces", "observations", "sessions" ).
1330+ scope: The type of entity ("traces", "observations").
13761331 item_id: The ID of the entity.
13771332 evaluation: The evaluation result to create a score from.
13781333 additional_metadata: Additional metadata to merge with evaluation metadata.
@@ -1403,16 +1358,6 @@ def _create_score_for_scope(
14031358 data_type = evaluation .data_type , # type: ignore[arg-type]
14041359 config_id = evaluation .config_id ,
14051360 )
1406- elif scope == "sessions" :
1407- self .client .create_score (
1408- session_id = item_id ,
1409- name = evaluation .name ,
1410- value = evaluation .value , # type: ignore
1411- comment = evaluation .comment ,
1412- metadata = score_metadata ,
1413- data_type = evaluation .data_type , # type: ignore[arg-type]
1414- config_id = evaluation .config_id ,
1415- )
14161361
14171362 def _build_timestamp_filter (
14181363 self ,
@@ -1459,7 +1404,7 @@ def _build_timestamp_filter(
14591404
14601405 @staticmethod
14611406 def _get_item_id (
1462- item : Union [TraceWithFullDetails , ObservationsView , SessionWithTraces ],
1407+ item : Union [TraceWithFullDetails , ObservationsView ],
14631408 scope : str ,
14641409 ) -> str :
14651410 """Extract ID from item based on scope.
@@ -1475,7 +1420,7 @@ def _get_item_id(
14751420
14761421 @staticmethod
14771422 def _get_item_timestamp (
1478- item : Union [TraceWithFullDetails , ObservationsView , SessionWithTraces ],
1423+ item : Union [TraceWithFullDetails , ObservationsView ],
14791424 scope : str ,
14801425 ) -> str :
14811426 """Extract timestamp from item based on scope.
@@ -1495,10 +1440,6 @@ def _get_item_timestamp(
14951440 # Type narrowing for observations
14961441 if hasattr (item , "start_time" ):
14971442 return item .start_time .isoformat () # type: ignore[attr-defined]
1498- elif scope == "sessions" :
1499- # Sessions don't have a single timestamp, use created_at
1500- if hasattr (item , "created_at" ):
1501- return item .created_at .isoformat () # type: ignore[attr-defined]
15021443 return ""
15031444
15041445 @staticmethod
@@ -1515,8 +1456,6 @@ def _get_timestamp_field_for_scope(scope: str) -> str:
15151456 return "timestamp"
15161457 elif scope == "observations" :
15171458 return "start_time"
1518- elif scope == "sessions" :
1519- return "created_at"
15201459 return "timestamp" # Default
15211460
15221461 def _build_result (
0 commit comments