Skip to content

Commit 00565f6

Browse files
committed
push
1 parent 9eee51d commit 00565f6

4 files changed

Lines changed: 108 additions & 82 deletions

File tree

langfuse/_client/client.py

Lines changed: 17 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,13 @@
5757
LANGFUSE_TRACING_ENVIRONMENT,
5858
)
5959
from langfuse._client.experiments import (
60+
EvaluatorFunction,
61+
ExperimentData,
6062
ExperimentItem,
6163
ExperimentItemResult,
6264
ExperimentResult,
65+
RunEvaluatorFunction,
66+
TaskFunction,
6367
_run_evaluator,
6468
_run_task,
6569
)
@@ -2458,15 +2462,11 @@ def run_experiment(
24582462
*,
24592463
name: str,
24602464
description: Optional[str] = None,
2461-
data: Union[
2462-
List[Union[ExperimentItem, dict, DatasetItem]], List[DatasetItemClient]
2463-
],
2464-
task: Callable[
2465-
[Union[ExperimentItem, dict, DatasetItem, DatasetItemClient]], Any
2466-
],
2467-
evaluators: Optional[List[Callable]] = None,
2468-
run_evaluators: Optional[List[Callable]] = None,
2469-
max_concurrency: Optional[int] = None,
2465+
data: ExperimentData,
2466+
task: TaskFunction,
2467+
evaluators: List[EvaluatorFunction] = [],
2468+
run_evaluators: List[RunEvaluatorFunction] = [],
2469+
max_concurrency: int = 50,
24702470
metadata: Optional[Dict[str, Any]] = None,
24712471
) -> ExperimentResult:
24722472
"""Run an experiment on a dataset with automatic tracing and evaluation.
@@ -2524,27 +2524,20 @@ async def _run_experiment_async(
25242524
*,
25252525
name: str,
25262526
description: Optional[str],
2527-
data: Union[
2528-
List[Union[ExperimentItem, dict, DatasetItem]], List[DatasetItemClient]
2529-
],
2530-
task: Callable,
2531-
evaluators: List[Callable],
2532-
run_evaluators: List[Callable],
2533-
max_concurrency: Optional[int],
2527+
data: ExperimentData,
2528+
task: TaskFunction,
2529+
evaluators: List[EvaluatorFunction],
2530+
run_evaluators: List[RunEvaluatorFunction],
2531+
max_concurrency: int,
25342532
metadata: Dict[str, Any],
25352533
) -> ExperimentResult:
25362534
langfuse_logger.debug(f"Starting experiment '{name}' with {len(data)} items")
25372535

25382536
# Set up concurrency control
2539-
max_workers = (
2540-
max_concurrency if max_concurrency is not None else min(len(data), 10)
2541-
)
2542-
semaphore = asyncio.Semaphore(max_workers)
2537+
semaphore = asyncio.Semaphore(max_concurrency)
25432538

25442539
# Process all items
2545-
async def process_item(
2546-
item: Union[ExperimentItem, dict, DatasetItem, DatasetItemClient],
2547-
) -> dict:
2540+
async def process_item(item: ExperimentItem) -> dict:
25482541
async with semaphore:
25492542
return await self._process_experiment_item(
25502543
item, task, evaluators, name, description, metadata
@@ -2620,7 +2613,7 @@ async def process_item(
26202613

26212614
async def _process_experiment_item(
26222615
self,
2623-
item: Union[ExperimentItem, dict, DatasetItem, DatasetItemClient],
2616+
item: ExperimentItem,
26242617
task: Callable,
26252618
evaluators: List[Callable],
26262619
experiment_name: str,

langfuse/_client/datasets.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,23 @@
11
import datetime as dt
22
import logging
3-
from .span import LangfuseSpan
43
from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional
54

65
from opentelemetry.util._decorator import _agnosticcontextmanager
76

7+
from langfuse._client.experiments import (
8+
EvaluatorFunction,
9+
RunEvaluatorFunction,
10+
TaskFunction,
11+
)
812
from langfuse.model import (
913
CreateDatasetRunItemRequest,
1014
Dataset,
1115
DatasetItem,
1216
DatasetStatus,
1317
)
1418

19+
from .span import LangfuseSpan
20+
1521
if TYPE_CHECKING:
1622
from langfuse._client.client import Langfuse
1723

@@ -194,10 +200,10 @@ def run_experiment(
194200
*,
195201
name: str,
196202
description: Optional[str] = None,
197-
task: Any,
198-
evaluators: Optional[List[Any]] = None,
199-
run_evaluators: Optional[List[Any]] = None,
200-
max_concurrency: Optional[int] = None,
203+
task: TaskFunction,
204+
evaluators: List[EvaluatorFunction] = [],
205+
run_evaluators: List[RunEvaluatorFunction] = [],
206+
max_concurrency: int = 50,
201207
metadata: Optional[Dict[str, Any]] = None,
202208
) -> Any:
203209
"""Run an experiment on this dataset.

langfuse/_client/experiments.py

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,11 @@
1919
Union,
2020
)
2121

22-
from langfuse.model import DatasetItem
23-
2422
if TYPE_CHECKING:
2523
from langfuse._client.datasets import DatasetItemClient
2624

2725

28-
class ExperimentItem(TypedDict, total=False):
26+
class LocalExperimentItem(TypedDict, total=False):
2927
"""Structure for experiment data items.
3028
3129
Args:
@@ -39,6 +37,10 @@ class ExperimentItem(TypedDict, total=False):
3937
metadata: Optional[Dict[str, Any]]
4038

4139

40+
ExperimentItem = Union[LocalExperimentItem, DatasetItemClient]
41+
ExperimentData = Union[List[LocalExperimentItem], List[DatasetItemClient]]
42+
43+
4244
class Evaluation(TypedDict, total=False):
4345
"""Structure for evaluation results.
4446
@@ -66,7 +68,7 @@ class ExperimentItemResult(TypedDict):
6668
dataset_run_id: Dataset run ID if this item was part of a Langfuse dataset
6769
"""
6870

69-
item: Union[ExperimentItem, DatasetItem]
71+
item: ExperimentItem
7072
output: Any
7173
evaluations: List[Evaluation]
7274
trace_id: Optional[str]
@@ -93,7 +95,10 @@ class TaskFunction(Protocol):
9395
"""Protocol for experiment task functions."""
9496

9597
def __call__(
96-
self, item: Union[ExperimentItem, dict, DatasetItem, "DatasetItemClient"]
98+
self,
99+
*,
100+
item: ExperimentItem,
101+
**kwargs: Dict[str, Any],
97102
) -> Union[Any, Awaitable[Any]]:
98103
"""Execute the task on an experiment item.
99104
@@ -116,6 +121,7 @@ def __call__(
116121
output: Any,
117122
expected_output: Any = None,
118123
metadata: Optional[Dict[str, Any]] = None,
124+
**kwargs: Dict[str, Any],
119125
) -> Union[
120126
Evaluation, List[Evaluation], Awaitable[Union[Evaluation, List[Evaluation]]]
121127
]:
@@ -137,7 +143,10 @@ class RunEvaluatorFunction(Protocol):
137143
"""Protocol for run-level evaluator functions."""
138144

139145
def __call__(
140-
self, *, item_results: List[ExperimentItemResult]
146+
self,
147+
*,
148+
item_results: List[ExperimentItemResult],
149+
**kwargs: Dict[str, Any],
141150
) -> Union[
142151
Evaluation, List[Evaluation], Awaitable[Union[Evaluation, List[Evaluation]]]
143152
]:
@@ -286,7 +295,7 @@ def _format_value(value: Any) -> str:
286295

287296

288297
async def _run_evaluator(
289-
evaluator: EvaluatorFunction, **kwargs: Any
298+
evaluator: Union[EvaluatorFunction, RunEvaluatorFunction], **kwargs: Any
290299
) -> List[Evaluation]:
291300
"""Run an evaluator function and normalize the result."""
292301
try:
@@ -299,8 +308,10 @@ async def _run_evaluator(
299308
# Normalize to list
300309
if isinstance(result, dict):
301310
return [result]
311+
302312
elif isinstance(result, list):
303313
return result
314+
304315
else:
305316
return []
306317

@@ -310,12 +321,9 @@ async def _run_evaluator(
310321
return []
311322

312323

313-
async def _run_task(
314-
task: TaskFunction,
315-
item: Union[ExperimentItem, dict, DatasetItem, "DatasetItemClient"],
316-
) -> Any:
324+
async def _run_task(task: TaskFunction, item: ExperimentItem) -> Any:
317325
"""Run a task function and handle sync/async."""
318-
result = task(item)
326+
result = task(item=item)
319327

320328
# Handle async tasks
321329
if asyncio.iscoroutine(result):

0 commit comments

Comments
 (0)