4646 get_observation_types_list ,
4747)
4848from langfuse ._client .datasets import DatasetClient , DatasetItemClient
49- from langfuse ._client .experiments import (
50- ExperimentItem ,
51- ExperimentItemResult ,
52- ExperimentResult ,
53- )
5449from langfuse ._client .environment_variables import (
5550 LANGFUSE_DEBUG ,
5651 LANGFUSE_HOST ,
6156 LANGFUSE_TRACING_ENABLED ,
6257 LANGFUSE_TRACING_ENVIRONMENT ,
6358)
59+ from langfuse ._client .experiments import (
60+ ExperimentItem ,
61+ ExperimentItemResult ,
62+ ExperimentResult ,
63+ _run_evaluator ,
64+ _run_task ,
65+ )
6466from langfuse ._client .resource_manager import LangfuseResourceManager
6567from langfuse ._client .span import (
6668 LangfuseAgent ,
@@ -742,7 +744,7 @@ def start_generation(
742744 cost_details : Optional [Dict [str , float ]] = None ,
743745 prompt : Optional [PromptClient ] = None ,
744746 ) -> LangfuseGeneration :
745- """[DEPRECATED] Create a new generation span for model generations.
747+ """Create a new generation span for model generations.
746748
747749 DEPRECATED: This method is deprecated and will be removed in a future version.
748750 Use start_observation(as_type='generation') instead.
@@ -838,7 +840,7 @@ def start_as_current_generation(
838840 prompt : Optional [PromptClient ] = None ,
839841 end_on_exit : Optional [bool ] = None ,
840842 ) -> _AgnosticContextManager [LangfuseGeneration ]:
841- """[DEPRECATED] Create a new generation span and set it as the current span in a context manager.
843+ """Create a new generation span and set it as the current span in a context manager.
842844
843845 DEPRECATED: This method is deprecated and will be removed in a future version.
844846 Use start_as_current_observation(as_type='generation') instead.
@@ -2531,9 +2533,6 @@ async def _run_experiment_async(
25312533 max_concurrency : Optional [int ],
25322534 metadata : Dict [str , Any ],
25332535 ) -> ExperimentResult :
2534- """Internal async implementation of run_experiment."""
2535- from langfuse ._client .experiments import _run_evaluator
2536-
25372536 langfuse_logger .debug (f"Starting experiment '{ name } ' with { len (data )} items" )
25382537
25392538 # Set up concurrency control
@@ -2561,7 +2560,6 @@ async def process_item(
25612560 if isinstance (result , Exception ):
25622561 langfuse_logger .error (f"Item { i } failed: { result } " )
25632562 elif isinstance (result , dict ):
2564- # Type-cast since we know the structure matches ExperimentItemResult
25652563 valid_results .append (result ) # type: ignore
25662564
25672565 # Run experiment-level evaluators
@@ -2585,13 +2583,16 @@ async def process_item(
25852583 # Check if the first item has dataset_id (for DatasetItem objects)
25862584 first_item = data [0 ]
25872585 dataset_id = None
2586+
25882587 if hasattr (first_item , "dataset_id" ):
25892588 dataset_id = getattr (first_item , "dataset_id" , None )
25902589
25912590 if dataset_id :
25922591 project_id = self ._get_project_id ()
2592+
25932593 if project_id :
25942594 dataset_run_url = f"{ self ._host } /project/{ project_id } /datasets/{ dataset_id } /runs/{ dataset_run_id } "
2595+
25952596 except Exception :
25962597 pass # URL generation is optional
25972598
@@ -2606,6 +2607,7 @@ async def process_item(
26062607 comment = evaluation .get ("comment" ),
26072608 metadata = evaluation .get ("metadata" ),
26082609 )
2610+
26092611 except Exception as e :
26102612 langfuse_logger .error (f"Failed to store run evaluation: { e } " )
26112613
@@ -2625,31 +2627,38 @@ async def _process_experiment_item(
26252627 experiment_description : Optional [str ],
26262628 experiment_metadata : Dict [str , Any ],
26272629 ) -> dict :
2628- """Process a single experiment item with tracing and evaluation."""
2629- from langfuse ._client .experiments import _run_evaluator , _run_task
2630-
26312630 # Execute task with tracing
26322631 span_name = "experiment-item-run"
2632+
26332633 with self .start_as_current_span (name = span_name ) as span :
26342634 try :
2635- # Run the task
26362635 output = await _run_task (task , item )
26372636
2638- # Update span with input/output
26392637 input_data = (
26402638 item .get ("input" )
26412639 if isinstance (item , dict )
26422640 else getattr (item , "input" , None )
26432641 )
2644- # Prepare metadata
2642+
26452643 item_metadata : Dict [str , Any ] = {}
2644+
26462645 if isinstance (item , dict ):
2647- item_metadata = item .get ("metadata" , {} ) or {}
2646+ item_metadata = item .get ("metadata" , None ) or {}
26482647
26492648 final_metadata = {
26502649 "experiment_name" : experiment_name ,
26512650 ** experiment_metadata ,
26522651 }
2652+
2653+ if (
2654+ not isinstance (item , dict )
2655+ and hasattr (item , "dataset_id" )
2656+ and hasattr (item , "id" )
2657+ ):
2658+ final_metadata .update (
2659+ {"dataset_id" : item .dataset_id , "dataset_item_id" : item .id }
2660+ )
2661+
26532662 if isinstance (item_metadata , dict ):
26542663 final_metadata .update (item_metadata )
26552664
@@ -2668,30 +2677,37 @@ async def _process_experiment_item(
26682677 try :
26692678 from langfuse .model import CreateDatasetRunItemRequest
26702679
2671- dataset_run_item = self .api .dataset_run_items .create (
2672- request = CreateDatasetRunItemRequest (
2673- runName = experiment_name ,
2674- runDescription = experiment_description ,
2675- metadata = experiment_metadata ,
2676- datasetItemId = item .id , # type: ignore
2677- traceId = trace_id ,
2680+ dataset_run_item = (
2681+ await self .async_api .dataset_run_items .create (
2682+ request = CreateDatasetRunItemRequest (
2683+ runName = experiment_name ,
2684+ runDescription = experiment_description ,
2685+ metadata = experiment_metadata ,
2686+ datasetItemId = item .id , # type: ignore
2687+ traceId = trace_id ,
2688+ )
26782689 )
26792690 )
2691+
26802692 dataset_run_id = dataset_run_item .dataset_run_id
2693+
26812694 except Exception as e :
26822695 langfuse_logger .error (f"Failed to create dataset run item: { e } " )
26832696
26842697 # Run evaluators
26852698 evaluations = []
2699+
26862700 for evaluator in evaluators :
26872701 try :
26882702 expected_output = None
2703+
26892704 if isinstance (item , dict ):
26902705 expected_output = item .get ("expected_output" )
26912706 elif hasattr (item , "expected_output" ):
26922707 expected_output = item .expected_output
26932708
26942709 eval_metadata : Optional [Dict [str , Any ]] = None
2710+
26952711 if isinstance (item , dict ):
26962712 eval_metadata = item .get ("metadata" )
26972713 elif hasattr (item , "metadata" ):
@@ -2710,11 +2726,12 @@ async def _process_experiment_item(
27102726 for evaluation in eval_results :
27112727 self .create_score (
27122728 trace_id = trace_id ,
2713- name = evaluation [ "name" ] ,
2714- value = evaluation [ "value" ] ,
2729+ name = evaluation . get ( "name" , "unknown" ) ,
2730+ value = evaluation . get ( "value" , - 1 ) ,
27152731 comment = evaluation .get ("comment" ),
27162732 metadata = evaluation .get ("metadata" ),
27172733 )
2734+
27182735 except Exception as e :
27192736 langfuse_logger .error (f"Evaluator failed: { e } " )
27202737
0 commit comments