Skip to content

Commit 2cbf43b

Browse files
committed
push
1 parent 7a2232a commit 2cbf43b

2 files changed

Lines changed: 844 additions & 0 deletions

File tree

langfuse/_client/experiments.py

Lines changed: 324 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,324 @@
1+
"""Langfuse experiment functionality for running and evaluating tasks on datasets.
2+
3+
This module provides the core experiment functionality for the Langfuse Python SDK,
4+
allowing users to run experiments on datasets with automatic tracing, evaluation,
5+
and result formatting.
6+
"""
7+
8+
import asyncio
9+
import logging
10+
from typing import (
11+
TYPE_CHECKING,
12+
Any,
13+
Awaitable,
14+
Dict,
15+
List,
16+
Optional,
17+
Protocol,
18+
TypedDict,
19+
Union,
20+
)
21+
22+
from langfuse.model import DatasetItem
23+
24+
if TYPE_CHECKING:
25+
from langfuse._client.datasets import DatasetItemClient
26+
27+
28+
class ExperimentItem(TypedDict, total=False):
29+
"""Structure for experiment data items.
30+
31+
Args:
32+
input: The input data to pass to the task function
33+
expected_output: Optional expected output for evaluation purposes
34+
metadata: Optional metadata for the experiment item
35+
"""
36+
37+
input: Any
38+
expected_output: Any
39+
metadata: Optional[Dict[str, Any]]
40+
41+
42+
class Evaluation(TypedDict, total=False):
43+
"""Structure for evaluation results.
44+
45+
Args:
46+
name: Name of the evaluation metric
47+
value: The evaluation score/value (numeric or string)
48+
comment: Optional comment explaining the evaluation
49+
metadata: Optional metadata for the evaluation
50+
"""
51+
52+
name: str
53+
value: Union[int, float, str, bool]
54+
comment: Optional[str]
55+
metadata: Optional[Dict[str, Any]]
56+
57+
58+
class ExperimentItemResult(TypedDict):
59+
"""Result structure for individual experiment items.
60+
61+
Args:
62+
item: The original experiment item that was processed
63+
output: The actual output produced by the task
64+
evaluations: List of evaluation results for this item
65+
trace_id: Langfuse trace ID for this item's execution
66+
dataset_run_id: Dataset run ID if this item was part of a Langfuse dataset
67+
"""
68+
69+
item: Union[ExperimentItem, DatasetItem]
70+
output: Any
71+
evaluations: List[Evaluation]
72+
trace_id: Optional[str]
73+
dataset_run_id: Optional[str]
74+
75+
76+
class ExperimentResult(TypedDict):
77+
"""Complete result structure for experiment execution.
78+
79+
Args:
80+
item_results: Results from processing each individual data item
81+
run_evaluations: Results from run-level evaluators
82+
dataset_run_id: ID of the dataset run (if using Langfuse datasets)
83+
dataset_run_url: URL to view the dataset run in Langfuse UI
84+
"""
85+
86+
item_results: List[ExperimentItemResult]
87+
run_evaluations: List[Evaluation]
88+
dataset_run_id: Optional[str]
89+
dataset_run_url: Optional[str]
90+
91+
92+
class TaskFunction(Protocol):
93+
"""Protocol for experiment task functions."""
94+
95+
def __call__(
96+
self, item: Union[ExperimentItem, dict, DatasetItem, "DatasetItemClient"]
97+
) -> Union[Any, Awaitable[Any]]:
98+
"""Execute the task on an experiment item.
99+
100+
Args:
101+
item: The experiment or dataset item to process
102+
103+
Returns:
104+
The task output (can be sync or async)
105+
"""
106+
...
107+
108+
109+
class EvaluatorFunction(Protocol):
110+
"""Protocol for item-level evaluator functions."""
111+
112+
def __call__(
113+
self,
114+
*,
115+
input: Any,
116+
output: Any,
117+
expected_output: Any = None,
118+
metadata: Optional[Dict[str, Any]] = None,
119+
) -> Union[
120+
Evaluation, List[Evaluation], Awaitable[Union[Evaluation, List[Evaluation]]]
121+
]:
122+
"""Evaluate a task output.
123+
124+
Args:
125+
input: The original input to the task
126+
output: The output produced by the task
127+
expected_output: The expected output (if available)
128+
metadata: Optional metadata from the experiment item
129+
130+
Returns:
131+
Single evaluation or list of evaluations (can be sync or async)
132+
"""
133+
...
134+
135+
136+
class RunEvaluatorFunction(Protocol):
137+
"""Protocol for run-level evaluator functions."""
138+
139+
def __call__(
140+
self, *, item_results: List[ExperimentItemResult]
141+
) -> Union[
142+
Evaluation, List[Evaluation], Awaitable[Union[Evaluation, List[Evaluation]]]
143+
]:
144+
"""Evaluate the entire experiment run.
145+
146+
Args:
147+
item_results: Results from all processed experiment items
148+
149+
Returns:
150+
Single evaluation or list of evaluations (can be sync or async)
151+
"""
152+
...
153+
154+
155+
def format_experiment_results(
156+
item_results: List[ExperimentItemResult],
157+
run_evaluations: List[Evaluation],
158+
experiment_name: str,
159+
experiment_description: Optional[str] = None,
160+
dataset_run_url: Optional[str] = None,
161+
include_item_results: bool = False,
162+
) -> str:
163+
"""Format experiment results for display.
164+
165+
Args:
166+
item_results: Results from processing each item
167+
run_evaluations: Results from run-level evaluators
168+
experiment_name: Name of the experiment
169+
experiment_description: Optional description of the experiment
170+
dataset_run_url: Optional URL to dataset run in Langfuse UI
171+
include_item_results: Whether to include individual item details
172+
173+
Returns:
174+
Formatted string representation of the results
175+
"""
176+
if not item_results:
177+
return "No experiment results to display."
178+
179+
output = ""
180+
181+
# Individual results
182+
if include_item_results:
183+
for i, result in enumerate(item_results):
184+
output += f"\n{i + 1}. Item {i + 1}:\n"
185+
186+
# Input, expected, and actual
187+
item_input = None
188+
if isinstance(result["item"], dict):
189+
item_input = result["item"].get("input")
190+
elif hasattr(result["item"], "input"):
191+
item_input = result["item"].input
192+
193+
if item_input is not None:
194+
output += f" Input: {_format_value(item_input)}\n"
195+
196+
expected_output = None
197+
if isinstance(result["item"], dict):
198+
expected_output = result["item"].get("expected_output")
199+
elif hasattr(result["item"], "expected_output"):
200+
expected_output = result["item"].expected_output
201+
202+
if expected_output is not None:
203+
output += f" Expected: {_format_value(expected_output)}\n"
204+
output += f" Actual: {_format_value(result['output'])}\n"
205+
206+
# Scores
207+
if result["evaluations"]:
208+
output += " Scores:\n"
209+
for evaluation in result["evaluations"]:
210+
score = evaluation["value"]
211+
if isinstance(score, (int, float)):
212+
score = f"{score:.3f}"
213+
output += f" • {evaluation['name']}: {score}"
214+
if evaluation.get("comment"):
215+
output += f"\n 💭 {evaluation['comment']}"
216+
output += "\n"
217+
218+
# Trace link
219+
if result.get("trace_id"):
220+
# Note: We'd need the langfuse client to generate the actual URL
221+
output += f"\n Trace ID: {result['trace_id']}\n"
222+
else:
223+
output += f"Individual Results: Hidden ({len(item_results)} items)\n"
224+
output += "💡 Set include_item_results=True to view them\n"
225+
226+
# Experiment Overview
227+
output += f"\n{'─' * 50}\n"
228+
output += f"📊 {experiment_name}"
229+
if experiment_description:
230+
output += f" - {experiment_description}"
231+
232+
output += f"\n{len(item_results)} items"
233+
234+
# Get unique evaluation names
235+
evaluation_names = set()
236+
for result in item_results:
237+
for evaluation in result["evaluations"]:
238+
evaluation_names.add(evaluation["name"])
239+
240+
if evaluation_names:
241+
output += "\nEvaluations:"
242+
for eval_name in evaluation_names:
243+
output += f"\n{eval_name}"
244+
output += "\n"
245+
246+
# Average scores
247+
if evaluation_names:
248+
output += "\nAverage Scores:"
249+
for eval_name in evaluation_names:
250+
scores = []
251+
for result in item_results:
252+
for evaluation in result["evaluations"]:
253+
if evaluation["name"] == eval_name and isinstance(
254+
evaluation["value"], (int, float)
255+
):
256+
scores.append(evaluation["value"])
257+
258+
if scores:
259+
avg = sum(scores) / len(scores)
260+
output += f"\n{eval_name}: {avg:.3f}"
261+
output += "\n"
262+
263+
# Run evaluations
264+
if run_evaluations:
265+
output += "\nRun Evaluations:"
266+
for run_eval in run_evaluations:
267+
score = run_eval["value"]
268+
if isinstance(score, (int, float)):
269+
score = f"{score:.3f}"
270+
output += f"\n{run_eval['name']}: {score}"
271+
if run_eval.get("comment"):
272+
output += f"\n 💭 {run_eval['comment']}"
273+
output += "\n"
274+
275+
if dataset_run_url:
276+
output += f"\n🔗 Dataset Run:\n {dataset_run_url}"
277+
278+
return output
279+
280+
281+
def _format_value(value: Any) -> str:
282+
"""Format a value for display."""
283+
if isinstance(value, str):
284+
return value[:50] + "..." if len(value) > 50 else value
285+
return str(value)
286+
287+
288+
async def _run_evaluator(
289+
evaluator: EvaluatorFunction, **kwargs: Any
290+
) -> List[Evaluation]:
291+
"""Run an evaluator function and normalize the result."""
292+
try:
293+
result = evaluator(**kwargs)
294+
295+
# Handle async evaluators
296+
if asyncio.iscoroutine(result):
297+
result = await result
298+
299+
# Normalize to list
300+
if isinstance(result, dict):
301+
return [result]
302+
elif isinstance(result, list):
303+
return result
304+
else:
305+
return []
306+
307+
except Exception as e:
308+
evaluator_name = getattr(evaluator, "__name__", "unknown_evaluator")
309+
logging.getLogger("langfuse").error(f"Evaluator {evaluator_name} failed: {e}")
310+
return []
311+
312+
313+
async def _run_task(
314+
task: TaskFunction,
315+
item: Union[ExperimentItem, dict, DatasetItem, "DatasetItemClient"],
316+
) -> Any:
317+
"""Run a task function and handle sync/async."""
318+
result = task(item)
319+
320+
# Handle async tasks
321+
if asyncio.iscoroutine(result):
322+
result = await result
323+
324+
return result

0 commit comments

Comments
 (0)