@@ -2943,17 +2943,17 @@ async def _process_experiment_item(
29432943 }
29442944 )
29452945
2946- with _propagate_attributes (
2947- experiment = PropagatedExperimentAttributes (
2948- experiment_id = experiment_id ,
2949- experiment_name = experiment_run_name ,
2950- experiment_metadata = _serialize ( experiment_metadata ) ,
2951- experiment_dataset_id = dataset_id ,
2952- experiment_item_id = experiment_item_id ,
2953- experiment_item_metadata = _serialize ( item_metadata ) ,
2954- experiment_item_root_observation_id = span . id ,
2955- )
2956- ):
2946+ propagated_experiment_attributes = PropagatedExperimentAttributes (
2947+ experiment_id = experiment_id ,
2948+ experiment_name = experiment_run_name ,
2949+ experiment_metadata = _serialize ( experiment_metadata ) ,
2950+ experiment_dataset_id = dataset_id ,
2951+ experiment_item_id = experiment_item_id ,
2952+ experiment_item_metadata = _serialize ( item_metadata ) ,
2953+ experiment_item_root_observation_id = span . id ,
2954+ )
2955+
2956+ with _propagate_attributes ( experiment = propagated_experiment_attributes ):
29572957 output = await _run_task (task , item )
29582958
29592959 span .update (
@@ -2968,95 +2968,101 @@ async def _process_experiment_item(
29682968 )
29692969 raise e
29702970
2971- # Run evaluators
2972- evaluations = []
2973-
2974- for evaluator in evaluators :
2975- try :
2976- eval_metadata : Optional [Dict [str , Any ]] = None
2971+ # Run evaluators
2972+ evaluations = []
29772973
2978- if isinstance (item , dict ):
2979- eval_metadata = item .get ("metadata" )
2980- elif hasattr (item , "metadata" ):
2981- eval_metadata = item .metadata
2974+ for evaluator in evaluators :
2975+ try :
2976+ eval_metadata : Optional [Dict [str , Any ]] = None
29822977
2983- eval_results = await _run_evaluator (
2984- evaluator ,
2985- input = input_data ,
2986- output = output ,
2987- expected_output = expected_output ,
2988- metadata = eval_metadata ,
2989- )
2990- evaluations .extend (eval_results )
2991-
2992- # Store evaluations as scores
2993- for evaluation in eval_results :
2994- self .create_score (
2995- trace_id = trace_id ,
2996- observation_id = span .id ,
2997- name = evaluation .name ,
2998- value = evaluation .value , # type: ignore
2999- comment = evaluation .comment ,
3000- metadata = evaluation .metadata ,
3001- config_id = evaluation .config_id ,
3002- data_type = evaluation .data_type , # type: ignore
3003- )
3004-
3005- except Exception as e :
3006- langfuse_logger .error (f"Evaluator failed: { e } " )
3007-
3008- # Run composite evaluator if provided and we have evaluations
3009- if composite_evaluator and evaluations :
3010- try :
3011- composite_eval_metadata : Optional [Dict [str , Any ]] = None
3012- if isinstance (item , dict ):
3013- composite_eval_metadata = item .get ("metadata" )
3014- elif hasattr (item , "metadata" ):
3015- composite_eval_metadata = item .metadata
2978+ if isinstance (item , dict ):
2979+ eval_metadata = item .get ("metadata" )
2980+ elif hasattr (item , "metadata" ):
2981+ eval_metadata = item .metadata
30162982
3017- result = composite_evaluator (
3018- input = input_data ,
3019- output = output ,
3020- expected_output = expected_output ,
3021- metadata = composite_eval_metadata ,
3022- evaluations = evaluations ,
3023- )
3024-
3025- # Handle async composite evaluators
3026- if asyncio .iscoroutine (result ):
3027- result = await result
3028-
3029- # Normalize to list
3030- composite_evals : List [Evaluation ] = []
3031- if isinstance (result , (dict , Evaluation )):
3032- composite_evals = [result ] # type: ignore
3033- elif isinstance (result , list ):
3034- composite_evals = result # type: ignore
3035-
3036- # Store composite evaluations as scores and add to evaluations list
3037- for composite_evaluation in composite_evals :
3038- self .create_score (
3039- trace_id = trace_id ,
3040- observation_id = span .id ,
3041- name = composite_evaluation .name ,
3042- value = composite_evaluation .value , # type: ignore
3043- comment = composite_evaluation .comment ,
3044- metadata = composite_evaluation .metadata ,
3045- config_id = composite_evaluation .config_id ,
3046- data_type = composite_evaluation .data_type , # type: ignore
3047- )
3048- evaluations .append (composite_evaluation )
3049-
3050- except Exception as e :
3051- langfuse_logger .error (f"Composite evaluator failed: { e } " )
2983+ with _propagate_attributes (
2984+ experiment = propagated_experiment_attributes
2985+ ):
2986+ eval_results = await _run_evaluator (
2987+ evaluator ,
2988+ input = input_data ,
2989+ output = output ,
2990+ expected_output = expected_output ,
2991+ metadata = eval_metadata ,
2992+ )
2993+ evaluations .extend (eval_results )
2994+
2995+ # Store evaluations as scores
2996+ for evaluation in eval_results :
2997+ self .create_score (
2998+ trace_id = trace_id ,
2999+ observation_id = span .id ,
3000+ name = evaluation .name ,
3001+ value = evaluation .value , # type: ignore
3002+ comment = evaluation .comment ,
3003+ metadata = evaluation .metadata ,
3004+ config_id = evaluation .config_id ,
3005+ data_type = evaluation .data_type , # type: ignore
3006+ )
3007+
3008+ except Exception as e :
3009+ langfuse_logger .error (f"Evaluator failed: { e } " )
3010+
3011+ # Run composite evaluator if provided and we have evaluations
3012+ if composite_evaluator and evaluations :
3013+ try :
3014+ composite_eval_metadata : Optional [Dict [str , Any ]] = None
3015+ if isinstance (item , dict ):
3016+ composite_eval_metadata = item .get ("metadata" )
3017+ elif hasattr (item , "metadata" ):
3018+ composite_eval_metadata = item .metadata
3019+
3020+ with _propagate_attributes (
3021+ experiment = propagated_experiment_attributes
3022+ ):
3023+ result = composite_evaluator (
3024+ input = input_data ,
3025+ output = output ,
3026+ expected_output = expected_output ,
3027+ metadata = composite_eval_metadata ,
3028+ evaluations = evaluations ,
3029+ )
30523030
3053- return ExperimentItemResult (
3054- item = item ,
3055- output = output ,
3056- evaluations = evaluations ,
3057- trace_id = trace_id ,
3058- dataset_run_id = dataset_run_id ,
3059- )
3031+ # Handle async composite evaluators
3032+ if asyncio .iscoroutine (result ):
3033+ result = await result
3034+
3035+ # Normalize to list
3036+ composite_evals : List [Evaluation ] = []
3037+ if isinstance (result , (dict , Evaluation )):
3038+ composite_evals = [result ] # type: ignore
3039+ elif isinstance (result , list ):
3040+ composite_evals = result # type: ignore
3041+
3042+ # Store composite evaluations as scores and add to evaluations list
3043+ for composite_evaluation in composite_evals :
3044+ self .create_score (
3045+ trace_id = trace_id ,
3046+ observation_id = span .id ,
3047+ name = composite_evaluation .name ,
3048+ value = composite_evaluation .value , # type: ignore
3049+ comment = composite_evaluation .comment ,
3050+ metadata = composite_evaluation .metadata ,
3051+ config_id = composite_evaluation .config_id ,
3052+ data_type = composite_evaluation .data_type , # type: ignore
3053+ )
3054+ evaluations .append (composite_evaluation )
3055+
3056+ except Exception as e :
3057+ langfuse_logger .error (f"Composite evaluator failed: { e } " )
3058+
3059+ return ExperimentItemResult (
3060+ item = item ,
3061+ output = output ,
3062+ evaluations = evaluations ,
3063+ trace_id = trace_id ,
3064+ dataset_run_id = dataset_run_id ,
3065+ )
30603066
30613067 def _create_experiment_run_name (
30623068 self , * , name : Optional [str ] = None , run_name : Optional [str ] = None
0 commit comments