11import datetime as dt
2- import logging
3- from typing import TYPE_CHECKING , Any , Dict , Generator , List , Optional
4-
5- from opentelemetry .util ._decorator import _agnosticcontextmanager
2+ from typing import TYPE_CHECKING , Any , Dict , List , Optional
63
74from langfuse .api import (
85 Dataset ,
96 DatasetItem ,
10- DatasetStatus ,
117)
128from langfuse .batch_evaluation import CompositeEvaluatorFunction
139from langfuse .experiment import (
1713 TaskFunction ,
1814)
1915
20- from .span import LangfuseSpan
21-
2216if TYPE_CHECKING :
2317 from langfuse ._client .client import Langfuse
2418
2519
26- class DatasetItemClient :
27- """Class for managing dataset items in Langfuse.
28-
29- Args:
30- id (str): Unique identifier of the dataset item.
31- status (DatasetStatus): The status of the dataset item. Can be either 'ACTIVE' or 'ARCHIVED'.
32- input (Any): Input data of the dataset item.
33- expected_output (Optional[Any]): Expected output of the dataset item.
34- metadata (Optional[Any]): Additional metadata of the dataset item.
35- source_trace_id (Optional[str]): Identifier of the source trace.
36- source_observation_id (Optional[str]): Identifier of the source observation.
37- dataset_id (str): Identifier of the dataset to which this item belongs.
38- dataset_name (str): Name of the dataset to which this item belongs.
39- created_at (datetime): Timestamp of dataset item creation.
40- updated_at (datetime): Timestamp of the last update to the dataset item.
41- langfuse (Langfuse): Instance of Langfuse client for API interactions.
42-
43- Example:
44- ```python
45- from langfuse import Langfuse
46-
47- langfuse = Langfuse()
48-
49- dataset = langfuse.get_dataset("<dataset_name>")
50-
51- for item in dataset.items:
52- # Generate a completion using the input of every item
53- completion, generation = llm_app.run(item.input)
54-
55- # Evaluate the completion
56- generation.score(
57- name="example-score",
58- value=1
59- )
60- ```
61- """
62-
63- log = logging .getLogger ("langfuse" )
64-
65- id : str
66- status : DatasetStatus
67- input : Any
68- expected_output : Optional [Any ]
69- metadata : Optional [Any ]
70- source_trace_id : Optional [str ]
71- source_observation_id : Optional [str ]
72- dataset_id : str
73- dataset_name : str
74- created_at : dt .datetime
75- updated_at : dt .datetime
76-
77- langfuse : "Langfuse"
78-
79- def __init__ (self , dataset_item : DatasetItem , langfuse : "Langfuse" ):
80- """Initialize the DatasetItemClient."""
81- self .id = dataset_item .id
82- self .status = dataset_item .status
83- self .input = dataset_item .input
84- self .expected_output = dataset_item .expected_output
85- self .metadata = dataset_item .metadata
86- self .source_trace_id = dataset_item .source_trace_id
87- self .source_observation_id = dataset_item .source_observation_id
88- self .dataset_id = dataset_item .dataset_id
89- self .dataset_name = dataset_item .dataset_name
90- self .created_at = dataset_item .created_at
91- self .updated_at = dataset_item .updated_at
92-
93- self .langfuse = langfuse
94-
95- @_agnosticcontextmanager
96- def run (
97- self ,
98- * ,
99- run_name : str ,
100- run_metadata : Optional [Any ] = None ,
101- run_description : Optional [str ] = None ,
102- ) -> Generator [LangfuseSpan , None , None ]:
103- """Create a context manager for the dataset item run that links the execution to a Langfuse trace.
104-
105- This method is a context manager that creates a trace for the dataset run and yields a span
106- that can be used to track the execution of the run.
107-
108- Args:
109- run_name (str): The name of the dataset run.
110- run_metadata (Optional[Any]): Additional metadata to include in dataset run.
111- run_description (Optional[str]): Description of the dataset run.
112-
113- Yields:
114- span: A LangfuseSpan that can be used to trace the execution of the run.
115- """
116- trace_name = f"Dataset run: { run_name } "
117-
118- with self .langfuse .start_as_current_span (name = trace_name ) as span :
119- span .update_trace (
120- name = trace_name ,
121- metadata = {
122- "dataset_item_id" : self .id ,
123- "run_name" : run_name ,
124- "dataset_id" : self .dataset_id ,
125- },
126- )
127-
128- self .log .debug (
129- f"Creating dataset run item: run_name={ run_name } id={ self .id } trace_id={ span .trace_id } "
130- )
131-
132- self .langfuse .api .dataset_run_items .create (
133- run_name = run_name ,
134- dataset_item_id = self .id ,
135- trace_id = span .trace_id ,
136- metadata = run_metadata ,
137- run_description = run_description ,
138- )
139-
140- yield span
141-
142-
14320class DatasetClient :
14421 """Class for managing datasets in Langfuse.
14522
@@ -151,7 +28,7 @@ class DatasetClient:
15128 project_id (str): Identifier of the project to which the dataset belongs.
15229 created_at (datetime): Timestamp of dataset creation.
15330 updated_at (datetime): Timestamp of the last update to the dataset.
154- items (List[DatasetItemClient ]): List of dataset items associated with the dataset.
31+ items (List[DatasetItem ]): List of dataset items associated with the dataset.
15532 version (Optional[datetime]): Timestamp of the dataset version.
15633 Example:
15734 Print the input of each dataset item in a dataset.
@@ -174,13 +51,17 @@ class DatasetClient:
17451 metadata : Optional [Any ]
17552 created_at : dt .datetime
17653 updated_at : dt .datetime
177- items : List [DatasetItemClient ]
54+ input_schema : Optional [Any ]
55+ expected_output_schema : Optional [Any ]
56+ items : List [DatasetItem ]
17857 version : Optional [dt .datetime ]
17958
18059 def __init__ (
18160 self ,
61+ * ,
18262 dataset : Dataset ,
183- items : List [DatasetItemClient ],
63+ items : List [DatasetItem ],
64+ langfuse_client : "Langfuse" ,
18465 version : Optional [dt .datetime ] = None ,
18566 ):
18667 """Initialize the DatasetClient."""
@@ -191,15 +72,11 @@ def __init__(
19172 self .metadata = dataset .metadata
19273 self .created_at = dataset .created_at
19374 self .updated_at = dataset .updated_at
75+ self .input_schema = dataset .input_schema
76+ self .expected_output_schema = dataset .expected_output_schema
19477 self .items = items
19578 self .version = version
196- self ._langfuse : Optional ["Langfuse" ] = None
197-
198- def _get_langfuse_client (self ) -> Optional ["Langfuse" ]:
199- """Get the Langfuse client from the first item."""
200- if self ._langfuse is None and self .items :
201- self ._langfuse = self .items [0 ].langfuse
202- return self ._langfuse
79+ self ._langfuse_client : "Langfuse" = langfuse_client
20380
20481 def run_experiment (
20582 self ,
@@ -410,11 +287,7 @@ def content_diversity(*, item_results, **kwargs):
410287 - This method works in both sync and async contexts (Jupyter notebooks, web apps, etc.)
411288 - Async execution is handled automatically with smart event loop detection
412289 """
413- langfuse_client = self ._get_langfuse_client ()
414- if not langfuse_client :
415- raise ValueError ("No Langfuse client available. Dataset items are empty." )
416-
417- return langfuse_client .run_experiment (
290+ return self ._langfuse_client .run_experiment (
418291 name = name ,
419292 run_name = run_name ,
420293 description = description ,
0 commit comments