Skip to content

Commit 66288a6

Browse files
committed
add composite evaluator to run_experiments
1 parent e069711 commit 66288a6

14 files changed

Lines changed: 543 additions & 249 deletions

.pre-commit-config.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
repos:
22
- repo: https://github.com/astral-sh/ruff-pre-commit
3-
rev: v0.3.2
3+
rev: v0.14.4
44
hooks:
55
# Run the linter and fix
66
- id: ruff
@@ -10,6 +10,7 @@ repos:
1010
# Run the formatter.
1111
- id: ruff-format
1212
types_or: [python, pyi, jupyter]
13+
args: [--config=ci.ruff.toml]
1314

1415
- repo: https://github.com/pre-commit/mirrors-mypy
1516
rev: v1.18.2

langfuse/_client/client.py

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2465,6 +2465,7 @@ def run_experiment(
24652465
data: ExperimentData,
24662466
task: TaskFunction,
24672467
evaluators: List[EvaluatorFunction] = [],
2468+
composite_evaluator: Optional[CompositeEvaluatorFunction] = None,
24682469
run_evaluators: List[RunEvaluatorFunction] = [],
24692470
max_concurrency: int = 50,
24702471
metadata: Optional[Dict[str, str]] = None,
@@ -2500,6 +2501,10 @@ def run_experiment(
25002501
evaluators: List of functions to evaluate each item's output individually.
25012502
Each evaluator receives input, output, expected_output, and metadata.
25022503
Can return single Evaluation dict or list of Evaluation dicts.
2504+
composite_evaluator: Optional function that creates composite scores from item-level evaluations.
2505+
Receives the same inputs as item-level evaluators (input, output, expected_output, metadata)
2506+
plus the list of evaluations from item-level evaluators. Useful for weighted averages,
2507+
pass/fail decisions based on multiple criteria, or custom scoring logic combining multiple metrics.
25032508
run_evaluators: List of functions to evaluate the entire experiment run.
25042509
Each run evaluator receives all item_results and can compute aggregate metrics.
25052510
Useful for calculating averages, distributions, or cross-item comparisons.
@@ -2637,6 +2642,7 @@ def average_accuracy(*, item_results, **kwargs):
26372642
data=data,
26382643
task=task,
26392644
evaluators=evaluators or [],
2645+
composite_evaluator=composite_evaluator,
26402646
run_evaluators=run_evaluators or [],
26412647
max_concurrency=max_concurrency,
26422648
metadata=metadata,
@@ -2653,6 +2659,7 @@ async def _run_experiment_async(
26532659
data: ExperimentData,
26542660
task: TaskFunction,
26552661
evaluators: List[EvaluatorFunction],
2662+
composite_evaluator: Optional[CompositeEvaluatorFunction],
26562663
run_evaluators: List[RunEvaluatorFunction],
26572664
max_concurrency: int,
26582665
metadata: Optional[Dict[str, Any]] = None,
@@ -2668,7 +2675,14 @@ async def _run_experiment_async(
26682675
async def process_item(item: ExperimentItem) -> ExperimentItemResult:
26692676
async with semaphore:
26702677
return await self._process_experiment_item(
2671-
item, task, evaluators, name, run_name, description, metadata
2678+
item,
2679+
task,
2680+
evaluators,
2681+
composite_evaluator,
2682+
name,
2683+
run_name,
2684+
description,
2685+
metadata,
26722686
)
26732687

26742688
# Run all items concurrently
@@ -2750,6 +2764,7 @@ async def _process_experiment_item(
27502764
item: ExperimentItem,
27512765
task: Callable,
27522766
evaluators: List[Callable],
2767+
composite_evaluator: Optional[CompositeEvaluatorFunction],
27532768
experiment_name: str,
27542769
experiment_run_name: str,
27552770
experiment_description: Optional[str],
@@ -2908,6 +2923,51 @@ async def _process_experiment_item(
29082923
except Exception as e:
29092924
langfuse_logger.error(f"Evaluator failed: {e}")
29102925

2926+
# Run composite evaluator if provided and we have evaluations
2927+
if composite_evaluator and evaluations:
2928+
try:
2929+
composite_eval_metadata: Optional[Dict[str, Any]] = None
2930+
if isinstance(item, dict):
2931+
composite_eval_metadata = item.get("metadata")
2932+
elif hasattr(item, "metadata"):
2933+
composite_eval_metadata = item.metadata
2934+
2935+
result = composite_evaluator(
2936+
input=input_data,
2937+
output=output,
2938+
expected_output=expected_output,
2939+
metadata=composite_eval_metadata,
2940+
evaluations=evaluations,
2941+
)
2942+
2943+
# Handle async composite evaluators
2944+
if asyncio.iscoroutine(result):
2945+
result = await result
2946+
2947+
# Normalize to list
2948+
composite_evals: List[Evaluation] = []
2949+
if isinstance(result, (dict, Evaluation)):
2950+
composite_evals = [result] # type: ignore
2951+
elif isinstance(result, list):
2952+
composite_evals = result # type: ignore
2953+
2954+
# Store composite evaluations as scores and add to evaluations list
2955+
for composite_evaluation in composite_evals:
2956+
self.create_score(
2957+
trace_id=trace_id,
2958+
observation_id=span.id,
2959+
name=composite_evaluation.name,
2960+
value=composite_evaluation.value, # type: ignore
2961+
comment=composite_evaluation.comment,
2962+
metadata=composite_evaluation.metadata,
2963+
config_id=composite_evaluation.config_id,
2964+
data_type=composite_evaluation.data_type, # type: ignore
2965+
)
2966+
evaluations.append(composite_evaluation)
2967+
2968+
except Exception as e:
2969+
langfuse_logger.error(f"Composite evaluator failed: {e}")
2970+
29112971
return ExperimentItemResult(
29122972
item=item,
29132973
output=output,

langfuse/_client/datasets.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from opentelemetry.util._decorator import _agnosticcontextmanager
66

7+
from langfuse.batch_evaluation import CompositeEvaluatorFunction
78
from langfuse.experiment import (
89
EvaluatorFunction,
910
ExperimentResult,
@@ -204,6 +205,7 @@ def run_experiment(
204205
description: Optional[str] = None,
205206
task: TaskFunction,
206207
evaluators: List[EvaluatorFunction] = [],
208+
composite_evaluator: Optional[CompositeEvaluatorFunction] = None,
207209
run_evaluators: List[RunEvaluatorFunction] = [],
208210
max_concurrency: int = 50,
209211
metadata: Optional[Dict[str, Any]] = None,
@@ -234,6 +236,10 @@ def run_experiment(
234236
.metadata attributes. Signature should be: task(*, item, **kwargs) -> Any
235237
evaluators: List of functions to evaluate each item's output individually.
236238
These will have access to the item's expected_output for comparison.
239+
composite_evaluator: Optional function that creates composite scores from item-level evaluations.
240+
Receives the same inputs as item-level evaluators (input, output, expected_output, metadata)
241+
plus the list of evaluations from item-level evaluators. Useful for weighted averages,
242+
pass/fail decisions based on multiple criteria, or custom scoring logic combining multiple metrics.
237243
run_evaluators: List of functions to evaluate the entire experiment run.
238244
Useful for computing aggregate statistics across all dataset items.
239245
max_concurrency: Maximum number of concurrent task executions (default: 50).
@@ -411,6 +417,7 @@ def content_diversity(*, item_results, **kwargs):
411417
data=self.items,
412418
task=task,
413419
evaluators=evaluators,
420+
composite_evaluator=composite_evaluator,
414421
run_evaluators=run_evaluators,
415422
max_concurrency=max_concurrency,
416423
metadata=metadata,

langfuse/_client/observe.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -589,7 +589,9 @@ def __next__(self) -> Any:
589589
raise # Re-raise StopIteration
590590

591591
except Exception as e:
592-
self.span.update(level="ERROR", status_message=str(e) or type(e).__name__).end()
592+
self.span.update(
593+
level="ERROR", status_message=str(e) or type(e).__name__
594+
).end()
593595

594596
raise
595597

@@ -654,6 +656,8 @@ async def __anext__(self) -> Any:
654656

655657
raise # Re-raise StopAsyncIteration
656658
except Exception as e:
657-
self.span.update(level="ERROR", status_message=str(e) or type(e).__name__).end()
659+
self.span.update(
660+
level="ERROR", status_message=str(e) or type(e).__name__
661+
).end()
658662

659663
raise

langfuse/batch_evaluation.py

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,8 @@ class CompositeEvaluatorFunction(Protocol):
218218
composite assessments based on individual evaluation results.
219219
220220
Composite evaluators:
221-
- Accept the original item and its list of evaluations
221+
- Accept the same inputs as item-level evaluators (input, output, expected_output, metadata)
222+
plus the list of evaluations
222223
- Return either a single Evaluation, a list of Evaluations, or a dict
223224
- Can be either synchronous or asynchronous
224225
- Have access to both raw item data and evaluation results
@@ -227,7 +228,10 @@ class CompositeEvaluatorFunction(Protocol):
227228
def __call__(
228229
self,
229230
*,
230-
item: Union["TraceWithFullDetails", "ObservationsView"],
231+
input: Optional[Any] = None,
232+
output: Optional[Any] = None,
233+
expected_output: Optional[Any] = None,
234+
metadata: Optional[Dict[str, Any]] = None,
231235
evaluations: List[Evaluation],
232236
**kwargs: Dict[str, Any],
233237
) -> Union[
@@ -245,8 +249,10 @@ def __call__(
245249
criteria, or custom scoring logic that considers multiple dimensions.
246250
247251
Args:
248-
item: The original API response object that was evaluated. Provides access
249-
to the raw entity data if needed for composite scoring logic.
252+
input: The input data that was provided to the system being evaluated.
253+
output: The output generated by the system being evaluated.
254+
expected_output: The expected/reference output for comparison (if available).
255+
metadata: Additional metadata about the evaluation context.
250256
evaluations: List of evaluation results from item-level evaluators.
251257
Each evaluation contains name, value, comment, and metadata.
252258
@@ -266,7 +272,7 @@ def __call__(
266272
Examples:
267273
Simple weighted average:
268274
```python
269-
def weighted_composite(*, item, evaluations):
275+
def weighted_composite(*, input, output, expected_output, metadata, evaluations):
270276
weights = {
271277
"accuracy": 0.5,
272278
"relevance": 0.3,
@@ -292,7 +298,7 @@ def weighted_composite(*, item, evaluations):
292298
293299
Pass/fail composite based on thresholds:
294300
```python
295-
def pass_fail_composite(*, item, evaluations):
301+
def pass_fail_composite(*, input, output, expected_output, metadata, evaluations):
296302
# Must pass all criteria
297303
thresholds = {
298304
"accuracy": 0.7,
@@ -320,13 +326,14 @@ def pass_fail_composite(*, item, evaluations):
320326
321327
Async composite with external scoring:
322328
```python
323-
async def llm_composite(*, item, evaluations):
329+
async def llm_composite(*, input, output, expected_output, metadata, evaluations):
324330
# Use LLM to synthesize multiple evaluation results
325331
eval_summary = "\n".join(
326332
f"- {e.name}: {e.value}" for e in evaluations
327333
)
328334
329335
prompt = f"Given these evaluation scores:\n{eval_summary}\n"
336+
prompt += f"For the output: {output}\n"
330337
prompt += "Provide an overall quality score from 0-1."
331338
332339
response = await openai.chat.completions.create(
@@ -345,12 +352,12 @@ async def llm_composite(*, item, evaluations):
345352
346353
Context-aware composite:
347354
```python
348-
def context_composite(*, item, evaluations):
349-
# Adjust weighting based on item characteristics
355+
def context_composite(*, input, output, expected_output, metadata, evaluations):
356+
# Adjust weighting based on metadata
350357
base_weights = {"accuracy": 0.5, "speed": 0.3, "cost": 0.2}
351358
352-
# If item has high importance, prioritize accuracy
353-
if hasattr(item, 'metadata') and item.metadata.get('importance') == 'high':
359+
# If metadata indicates high importance, prioritize accuracy
360+
if metadata and metadata.get('importance') == 'high':
354361
weights = {"accuracy": 0.7, "speed": 0.2, "cost": 0.1}
355362
else:
356363
weights = base_weights
@@ -1211,7 +1218,10 @@ async def _process_batch_evaluation_item(
12111218
try:
12121219
composite_evals = await self._run_composite_evaluator(
12131220
composite_evaluator,
1214-
item=item,
1221+
input=evaluator_inputs.input,
1222+
output=evaluator_inputs.output,
1223+
expected_output=evaluator_inputs.expected_output,
1224+
metadata=evaluator_inputs.metadata,
12151225
evaluations=evaluations,
12161226
)
12171227

@@ -1289,14 +1299,20 @@ async def _run_mapper(
12891299
async def _run_composite_evaluator(
12901300
self,
12911301
composite_evaluator: CompositeEvaluatorFunction,
1292-
item: Union[TraceWithFullDetails, ObservationsView],
1302+
input: Optional[Any],
1303+
output: Optional[Any],
1304+
expected_output: Optional[Any],
1305+
metadata: Optional[Dict[str, Any]],
12931306
evaluations: List[Evaluation],
12941307
) -> List[Evaluation]:
12951308
"""Run composite evaluator function (handles both sync and async).
12961309
12971310
Args:
12981311
composite_evaluator: The composite evaluator function.
1299-
item: The original API response object.
1312+
input: The input data provided to the system.
1313+
output: The output generated by the system.
1314+
expected_output: The expected/reference output.
1315+
metadata: Additional metadata about the evaluation context.
13001316
evaluations: List of item-level evaluations.
13011317
13021318
Returns:
@@ -1305,7 +1321,13 @@ async def _run_composite_evaluator(
13051321
Raises:
13061322
Exception: If composite evaluator raises an exception.
13071323
"""
1308-
result = composite_evaluator(item=item, evaluations=evaluations)
1324+
result = composite_evaluator(
1325+
input=input,
1326+
output=output,
1327+
expected_output=expected_output,
1328+
metadata=metadata,
1329+
evaluations=evaluations,
1330+
)
13091331
if asyncio.iscoroutine(result):
13101332
result = await result
13111333

tests/test_batch_evaluation.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -466,7 +466,7 @@ def accuracy_evaluator(*, input, output, **kwargs):
466466
def relevance_evaluator(*, input, output, **kwargs):
467467
return Evaluation(name="relevance", value=0.9)
468468

469-
def composite_evaluator(*, item, evaluations):
469+
def composite_evaluator(*, input, output, expected_output, metadata, evaluations):
470470
weights = {"accuracy": 0.6, "relevance": 0.4}
471471
total = sum(
472472
e.value * weights.get(e.name, 0)
@@ -503,7 +503,7 @@ def metric1_evaluator(*, input, output, **kwargs):
503503
def metric2_evaluator(*, input, output, **kwargs):
504504
return Evaluation(name="metric2", value=0.7)
505505

506-
def pass_fail_composite(*, item, evaluations):
506+
def pass_fail_composite(*, input, output, expected_output, metadata, evaluations):
507507
thresholds = {"metric1": 0.8, "metric2": 0.6}
508508

509509
passes = all(
@@ -536,7 +536,7 @@ async def test_async_composite_evaluator(sample_traces, langfuse_client):
536536
def evaluator1(*, input, output, **kwargs):
537537
return Evaluation(name="eval1", value=0.8)
538538

539-
async def async_composite(*, item, evaluations):
539+
async def async_composite(*, input, output, expected_output, metadata, evaluations):
540540
await asyncio.sleep(0.01) # Simulate async processing
541541
avg = sum(
542542
e.value for e in evaluations if isinstance(e.value, (int, float))
@@ -560,7 +560,7 @@ def test_composite_evaluator_with_no_evaluations(sample_traces, langfuse_client)
560560
def always_failing_evaluator(*, input, output, **kwargs):
561561
raise Exception("Always fails")
562562

563-
def composite_evaluator(*, item, evaluations):
563+
def composite_evaluator(*, input, output, expected_output, metadata, evaluations):
564564
# Should not be called if no evaluations succeed
565565
return Evaluation(name="composite", value=0.0)
566566

@@ -582,7 +582,7 @@ def test_composite_evaluator_failure_handling(sample_traces, langfuse_client):
582582
def evaluator1(*, input, output, **kwargs):
583583
return Evaluation(name="eval1", value=0.8)
584584

585-
def failing_composite(*, item, evaluations):
585+
def failing_composite(*, input, output, expected_output, metadata, evaluations):
586586
raise ValueError("Composite evaluator failed")
587587

588588
result = langfuse_client.run_batched_evaluation(

0 commit comments

Comments
 (0)