Skip to content

Commit 7a2232a

Browse files
committed
feat(experiments): add experiment runner
1 parent fed7240 commit 7a2232a

2 files changed

Lines changed: 358 additions & 1 deletion

File tree

langfuse/_client/client.py

Lines changed: 288 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
This module implements Langfuse's core observability functionality on top of the OpenTelemetry (OTel) standard.
44
"""
55

6+
import asyncio
67
import logging
78
import os
89
import re
@@ -13,6 +14,7 @@
1314
from time import time_ns
1415
from typing import (
1516
Any,
17+
Callable,
1618
Dict,
1719
List,
1820
Literal,
@@ -44,6 +46,11 @@
4446
get_observation_types_list,
4547
)
4648
from langfuse._client.datasets import DatasetClient, DatasetItemClient
49+
from langfuse._client.experiments import (
50+
ExperimentItem,
51+
ExperimentItemResult,
52+
ExperimentResult,
53+
)
4754
from langfuse._client.environment_variables import (
4855
LANGFUSE_DEBUG,
4956
LANGFUSE_HOST,
@@ -2444,6 +2451,287 @@ def get_dataset(
24442451
handle_fern_exception(e)
24452452
raise e
24462453

2454+
def run_experiment(
2455+
self,
2456+
*,
2457+
name: str,
2458+
description: Optional[str] = None,
2459+
data: Union[
2460+
List[Union[ExperimentItem, dict, DatasetItem]], List[DatasetItemClient]
2461+
],
2462+
task: Callable[
2463+
[Union[ExperimentItem, dict, DatasetItem, DatasetItemClient]], Any
2464+
],
2465+
evaluators: Optional[List[Callable]] = None,
2466+
run_evaluators: Optional[List[Callable]] = None,
2467+
max_concurrency: Optional[int] = None,
2468+
metadata: Optional[Dict[str, Any]] = None,
2469+
) -> ExperimentResult:
2470+
"""Run an experiment on a dataset with automatic tracing and evaluation.
2471+
2472+
This method executes a task function on each item in the provided dataset,
2473+
traces the execution with Langfuse, runs evaluators on the outputs,
2474+
and returns formatted results.
2475+
2476+
Args:
2477+
name: Human-readable name for the experiment
2478+
description: Optional description of the experiment's purpose
2479+
data: Array of data items to process (ExperimentItem or DatasetItem)
2480+
task: Function that processes each data item and returns output
2481+
evaluators: Optional list of functions to evaluate each item's output
2482+
run_evaluators: Optional list of functions to evaluate the entire experiment
2483+
max_concurrency: Maximum number of concurrent task executions
2484+
metadata: Optional metadata to attach to the experiment
2485+
2486+
Returns:
2487+
ExperimentResult containing item results, evaluations, and formatting functions
2488+
2489+
Example:
2490+
```python
2491+
def task(item):
2492+
return f"Processed: {item['input']}"
2493+
2494+
def evaluator(*, input, output, expected_output=None, **kwargs):
2495+
return {"name": "length", "value": len(output)}
2496+
2497+
result = langfuse.run_experiment(
2498+
name="Test Experiment",
2499+
data=[{"input": "test", "expected_output": "expected"}],
2500+
task=task,
2501+
evaluators=[evaluator]
2502+
)
2503+
2504+
print(result["item_results"])
2505+
```
2506+
"""
2507+
return asyncio.run(
2508+
self._run_experiment_async(
2509+
name=name,
2510+
description=description,
2511+
data=data,
2512+
task=task,
2513+
evaluators=evaluators or [],
2514+
run_evaluators=run_evaluators or [],
2515+
max_concurrency=max_concurrency,
2516+
metadata=metadata or {},
2517+
)
2518+
)
2519+
2520+
async def _run_experiment_async(
2521+
self,
2522+
*,
2523+
name: str,
2524+
description: Optional[str],
2525+
data: Union[
2526+
List[Union[ExperimentItem, dict, DatasetItem]], List[DatasetItemClient]
2527+
],
2528+
task: Callable,
2529+
evaluators: List[Callable],
2530+
run_evaluators: List[Callable],
2531+
max_concurrency: Optional[int],
2532+
metadata: Dict[str, Any],
2533+
) -> ExperimentResult:
2534+
"""Internal async implementation of run_experiment."""
2535+
from langfuse._client.experiments import _run_evaluator
2536+
2537+
langfuse_logger.debug(f"Starting experiment '{name}' with {len(data)} items")
2538+
2539+
# Set up concurrency control
2540+
max_workers = (
2541+
max_concurrency if max_concurrency is not None else min(len(data), 10)
2542+
)
2543+
semaphore = asyncio.Semaphore(max_workers)
2544+
2545+
# Process all items
2546+
async def process_item(
2547+
item: Union[ExperimentItem, dict, DatasetItem, DatasetItemClient],
2548+
) -> dict:
2549+
async with semaphore:
2550+
return await self._process_experiment_item(
2551+
item, task, evaluators, name, description, metadata
2552+
)
2553+
2554+
# Run all items concurrently
2555+
tasks = [process_item(item) for item in data]
2556+
item_results = await asyncio.gather(*tasks, return_exceptions=True)
2557+
2558+
# Filter out any exceptions and log errors
2559+
valid_results: List[ExperimentItemResult] = []
2560+
for i, result in enumerate(item_results):
2561+
if isinstance(result, Exception):
2562+
langfuse_logger.error(f"Item {i} failed: {result}")
2563+
elif isinstance(result, dict):
2564+
# Type-cast since we know the structure matches ExperimentItemResult
2565+
valid_results.append(result) # type: ignore
2566+
2567+
# Run experiment-level evaluators
2568+
run_evaluations = []
2569+
for run_evaluator in run_evaluators:
2570+
try:
2571+
evaluations = await _run_evaluator(
2572+
run_evaluator, item_results=valid_results
2573+
)
2574+
run_evaluations.extend(evaluations)
2575+
except Exception as e:
2576+
langfuse_logger.error(f"Run evaluator failed: {e}")
2577+
2578+
# Generate dataset run URL if applicable
2579+
dataset_run_id = (
2580+
valid_results[0].get("dataset_run_id") if valid_results else None
2581+
)
2582+
dataset_run_url = None
2583+
if dataset_run_id and data:
2584+
try:
2585+
# Check if the first item has dataset_id (for DatasetItem objects)
2586+
first_item = data[0]
2587+
dataset_id = None
2588+
if hasattr(first_item, "dataset_id"):
2589+
dataset_id = getattr(first_item, "dataset_id", None)
2590+
2591+
if dataset_id:
2592+
project_id = self._get_project_id()
2593+
if project_id:
2594+
dataset_run_url = f"{self._host}/project/{project_id}/datasets/{dataset_id}/runs/{dataset_run_id}"
2595+
except Exception:
2596+
pass # URL generation is optional
2597+
2598+
# Store run-level evaluations as scores
2599+
for evaluation in run_evaluations:
2600+
try:
2601+
if dataset_run_id:
2602+
self.create_score(
2603+
dataset_run_id=dataset_run_id,
2604+
name=evaluation["name"],
2605+
value=evaluation["value"],
2606+
comment=evaluation.get("comment"),
2607+
metadata=evaluation.get("metadata"),
2608+
)
2609+
except Exception as e:
2610+
langfuse_logger.error(f"Failed to store run evaluation: {e}")
2611+
2612+
return {
2613+
"item_results": valid_results,
2614+
"run_evaluations": run_evaluations,
2615+
"dataset_run_id": dataset_run_id,
2616+
"dataset_run_url": dataset_run_url,
2617+
}
2618+
2619+
async def _process_experiment_item(
2620+
self,
2621+
item: Union[ExperimentItem, dict, DatasetItem, DatasetItemClient],
2622+
task: Callable,
2623+
evaluators: List[Callable],
2624+
experiment_name: str,
2625+
experiment_description: Optional[str],
2626+
experiment_metadata: Dict[str, Any],
2627+
) -> dict:
2628+
"""Process a single experiment item with tracing and evaluation."""
2629+
from langfuse._client.experiments import _run_evaluator, _run_task
2630+
2631+
# Execute task with tracing
2632+
span_name = "experiment-item-run"
2633+
with self.start_as_current_span(name=span_name) as span:
2634+
try:
2635+
# Run the task
2636+
output = await _run_task(task, item)
2637+
2638+
# Update span with input/output
2639+
input_data = (
2640+
item.get("input")
2641+
if isinstance(item, dict)
2642+
else getattr(item, "input", None)
2643+
)
2644+
# Prepare metadata
2645+
item_metadata: Dict[str, Any] = {}
2646+
if isinstance(item, dict):
2647+
item_metadata = item.get("metadata", {}) or {}
2648+
2649+
final_metadata = {
2650+
"experiment_name": experiment_name,
2651+
**experiment_metadata,
2652+
}
2653+
if isinstance(item_metadata, dict):
2654+
final_metadata.update(item_metadata)
2655+
2656+
span.update(
2657+
input=input_data,
2658+
output=output,
2659+
metadata=final_metadata,
2660+
)
2661+
2662+
# Get trace ID for linking
2663+
trace_id = span.trace_id
2664+
dataset_run_id = None
2665+
2666+
# Link to dataset run if this is a dataset item
2667+
if hasattr(item, "id") and hasattr(item, "dataset_id"):
2668+
try:
2669+
from langfuse.model import CreateDatasetRunItemRequest
2670+
2671+
dataset_run_item = self.api.dataset_run_items.create(
2672+
request=CreateDatasetRunItemRequest(
2673+
runName=experiment_name,
2674+
runDescription=experiment_description,
2675+
metadata=experiment_metadata,
2676+
datasetItemId=item.id, # type: ignore
2677+
traceId=trace_id,
2678+
)
2679+
)
2680+
dataset_run_id = dataset_run_item.dataset_run_id
2681+
except Exception as e:
2682+
langfuse_logger.error(f"Failed to create dataset run item: {e}")
2683+
2684+
# Run evaluators
2685+
evaluations = []
2686+
for evaluator in evaluators:
2687+
try:
2688+
expected_output = None
2689+
if isinstance(item, dict):
2690+
expected_output = item.get("expected_output")
2691+
elif hasattr(item, "expected_output"):
2692+
expected_output = item.expected_output
2693+
2694+
eval_metadata: Optional[Dict[str, Any]] = None
2695+
if isinstance(item, dict):
2696+
eval_metadata = item.get("metadata")
2697+
elif hasattr(item, "metadata"):
2698+
eval_metadata = item.metadata
2699+
2700+
eval_results = await _run_evaluator(
2701+
evaluator,
2702+
input=input_data,
2703+
output=output,
2704+
expected_output=expected_output,
2705+
metadata=eval_metadata,
2706+
)
2707+
evaluations.extend(eval_results)
2708+
2709+
# Store evaluations as scores
2710+
for evaluation in eval_results:
2711+
self.create_score(
2712+
trace_id=trace_id,
2713+
name=evaluation["name"],
2714+
value=evaluation["value"],
2715+
comment=evaluation.get("comment"),
2716+
metadata=evaluation.get("metadata"),
2717+
)
2718+
except Exception as e:
2719+
langfuse_logger.error(f"Evaluator failed: {e}")
2720+
2721+
return {
2722+
"item": item,
2723+
"output": output,
2724+
"evaluations": evaluations,
2725+
"trace_id": trace_id,
2726+
"dataset_run_id": dataset_run_id,
2727+
}
2728+
2729+
except Exception as e:
2730+
span.update(
2731+
output=f"Error: {str(e)}", level="ERROR", status_message=str(e)
2732+
)
2733+
raise e
2734+
24472735
def auth_check(self) -> bool:
24482736
"""Check if the provided credentials (public and secret key) are valid.
24492737

langfuse/_client/datasets.py

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import datetime as dt
22
import logging
33
from .span import LangfuseSpan
4-
from typing import TYPE_CHECKING, Any, Generator, List, Optional
4+
from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional
55

66
from opentelemetry.util._decorator import _agnosticcontextmanager
77

@@ -181,3 +181,72 @@ def __init__(self, dataset: Dataset, items: List[DatasetItemClient]):
181181
self.created_at = dataset.created_at
182182
self.updated_at = dataset.updated_at
183183
self.items = items
184+
self._langfuse: Optional["Langfuse"] = None
185+
186+
def _get_langfuse_client(self) -> Optional["Langfuse"]:
187+
"""Get the Langfuse client from the first item."""
188+
if self._langfuse is None and self.items:
189+
self._langfuse = self.items[0].langfuse
190+
return self._langfuse
191+
192+
def run_experiment(
193+
self,
194+
*,
195+
name: str,
196+
description: Optional[str] = None,
197+
task: Any,
198+
evaluators: Optional[List[Any]] = None,
199+
run_evaluators: Optional[List[Any]] = None,
200+
max_concurrency: Optional[int] = None,
201+
metadata: Optional[Dict[str, Any]] = None,
202+
) -> Any:
203+
"""Run an experiment on this dataset.
204+
205+
This is a convenience method that calls the Langfuse client's run_experiment
206+
method with this dataset's items as the data.
207+
208+
Args:
209+
name: Human-readable name for the experiment
210+
description: Optional description of the experiment's purpose
211+
task: Function that processes each data item and returns output
212+
evaluators: Optional list of functions to evaluate each item's output
213+
run_evaluators: Optional list of functions to evaluate the entire experiment
214+
max_concurrency: Maximum number of concurrent task executions
215+
metadata: Optional metadata to attach to the experiment
216+
217+
Returns:
218+
ExperimentResult containing item results, evaluations, and formatting functions
219+
220+
Example:
221+
```python
222+
dataset = langfuse.get_dataset("my-dataset")
223+
224+
def task(item):
225+
return f"Processed: {item.input}"
226+
227+
def evaluator(*, input, output, expected_output=None, **kwargs):
228+
return {"name": "length", "value": len(output)}
229+
230+
result = dataset.run_experiment(
231+
name="Dataset Test Experiment",
232+
task=task,
233+
evaluators=[evaluator]
234+
)
235+
236+
print(result["item_results"])
237+
```
238+
"""
239+
langfuse_client = self._get_langfuse_client()
240+
if not langfuse_client:
241+
raise ValueError("No Langfuse client available. Dataset items are empty.")
242+
243+
return langfuse_client.run_experiment(
244+
name=name,
245+
description=description,
246+
data=self.items,
247+
task=task,
248+
evaluators=evaluators,
249+
run_evaluators=run_evaluators,
250+
max_concurrency=max_concurrency,
251+
metadata=metadata,
252+
)

0 commit comments

Comments
 (0)