11import datetime as dt
2- import logging
3- from typing import TYPE_CHECKING , Any , Dict , Generator , List , Optional
4-
5- from opentelemetry .util ._decorator import _agnosticcontextmanager
2+ from typing import TYPE_CHECKING , Any , Dict , List , Optional
63
74from langfuse .api import (
85 Dataset ,
96 DatasetItem ,
10- DatasetStatus ,
117)
128from langfuse .batch_evaluation import CompositeEvaluatorFunction
139from langfuse .experiment import (
1713 TaskFunction ,
1814)
1915
20- from .span import LangfuseSpan
21-
2216if TYPE_CHECKING :
2317 from langfuse ._client .client import Langfuse
2418
2519
26- class DatasetItemClient :
27- """Class for managing dataset items in Langfuse.
28-
29- Args:
30- id (str): Unique identifier of the dataset item.
31- status (DatasetStatus): The status of the dataset item. Can be either 'ACTIVE' or 'ARCHIVED'.
32- input (Any): Input data of the dataset item.
33- expected_output (Optional[Any]): Expected output of the dataset item.
34- metadata (Optional[Any]): Additional metadata of the dataset item.
35- source_trace_id (Optional[str]): Identifier of the source trace.
36- source_observation_id (Optional[str]): Identifier of the source observation.
37- dataset_id (str): Identifier of the dataset to which this item belongs.
38- dataset_name (str): Name of the dataset to which this item belongs.
39- created_at (datetime): Timestamp of dataset item creation.
40- updated_at (datetime): Timestamp of the last update to the dataset item.
41- langfuse (Langfuse): Instance of Langfuse client for API interactions.
42-
43- Example:
44- ```python
45- from langfuse import Langfuse
46-
47- langfuse = Langfuse()
48-
49- dataset = langfuse.get_dataset("<dataset_name>")
50-
51- for item in dataset.items:
52- # Generate a completion using the input of every item
53- completion, generation = llm_app.run(item.input)
54-
55- # Evaluate the completion
56- generation.score(
57- name="example-score",
58- value=1
59- )
60- ```
61- """
62-
63- log = logging .getLogger ("langfuse" )
64-
65- id : str
66- status : DatasetStatus
67- input : Any
68- expected_output : Optional [Any ]
69- metadata : Optional [Any ]
70- source_trace_id : Optional [str ]
71- source_observation_id : Optional [str ]
72- dataset_id : str
73- dataset_name : str
74- created_at : dt .datetime
75- updated_at : dt .datetime
76-
77- langfuse : "Langfuse"
78-
79- def __init__ (self , dataset_item : DatasetItem , langfuse : "Langfuse" ):
80- """Initialize the DatasetItemClient."""
81- self .id = dataset_item .id
82- self .status = dataset_item .status
83- self .input = dataset_item .input
84- self .expected_output = dataset_item .expected_output
85- self .metadata = dataset_item .metadata
86- self .source_trace_id = dataset_item .source_trace_id
87- self .source_observation_id = dataset_item .source_observation_id
88- self .dataset_id = dataset_item .dataset_id
89- self .dataset_name = dataset_item .dataset_name
90- self .created_at = dataset_item .created_at
91- self .updated_at = dataset_item .updated_at
92-
93- self .langfuse = langfuse
94-
95- @_agnosticcontextmanager
96- def run (
97- self ,
98- * ,
99- run_name : str ,
100- run_metadata : Optional [Any ] = None ,
101- run_description : Optional [str ] = None ,
102- ) -> Generator [LangfuseSpan , None , None ]:
103- """Create a context manager for the dataset item run that links the execution to a Langfuse trace.
104-
105- This method is a context manager that creates a trace for the dataset run and yields a span
106- that can be used to track the execution of the run.
107-
108- Args:
109- run_name (str): The name of the dataset run.
110- run_metadata (Optional[Any]): Additional metadata to include in dataset run.
111- run_description (Optional[str]): Description of the dataset run.
112-
113- Yields:
114- span: A LangfuseSpan that can be used to trace the execution of the run.
115- """
116- trace_name = f"Dataset run: { run_name } "
117-
118- with self .langfuse .start_as_current_span (name = trace_name ) as span :
119- span .update_trace (
120- name = trace_name ,
121- metadata = {
122- "dataset_item_id" : self .id ,
123- "run_name" : run_name ,
124- "dataset_id" : self .dataset_id ,
125- },
126- )
127-
128- self .log .debug (
129- f"Creating dataset run item: run_name={ run_name } id={ self .id } trace_id={ span .trace_id } "
130- )
131-
132- self .langfuse .api .dataset_run_items .create (
133- run_name = run_name ,
134- dataset_item_id = self .id ,
135- trace_id = span .trace_id ,
136- metadata = run_metadata ,
137- run_description = run_description ,
138- )
139-
140- yield span
141-
142-
14320class DatasetClient :
14421 """Class for managing datasets in Langfuse.
14522
@@ -151,7 +28,7 @@ class DatasetClient:
15128 project_id (str): Identifier of the project to which the dataset belongs.
15229 created_at (datetime): Timestamp of dataset creation.
15330 updated_at (datetime): Timestamp of the last update to the dataset.
154- items (List[DatasetItemClient ]): List of dataset items associated with the dataset.
31+ items (List[DatasetItem ]): List of dataset items associated with the dataset.
15532
15633 Example:
15734 Print the input of each dataset item in a dataset.
@@ -174,9 +51,13 @@ class DatasetClient:
17451 metadata : Optional [Any ]
17552 created_at : dt .datetime
17653 updated_at : dt .datetime
177- items : List [DatasetItemClient ]
54+ input_schema : Optional [Any ]
55+ expected_output_schema : Optional [Any ]
56+ items : List [DatasetItem ]
17857
179- def __init__ (self , dataset : Dataset , items : List [DatasetItemClient ]):
58+ def __init__ (
59+ self , * , dataset : Dataset , items : List [DatasetItem ], langfuse_client : "Langfuse"
60+ ):
18061 """Initialize the DatasetClient."""
18162 self .id = dataset .id
18263 self .name = dataset .name
@@ -185,14 +66,10 @@ def __init__(self, dataset: Dataset, items: List[DatasetItemClient]):
18566 self .metadata = dataset .metadata
18667 self .created_at = dataset .created_at
18768 self .updated_at = dataset .updated_at
69+ self .input_schema = dataset .input_schema
70+ self .expected_output_schema = dataset .expected_output_schema
18871 self .items = items
189- self ._langfuse : Optional ["Langfuse" ] = None
190-
191- def _get_langfuse_client (self ) -> Optional ["Langfuse" ]:
192- """Get the Langfuse client from the first item."""
193- if self ._langfuse is None and self .items :
194- self ._langfuse = self .items [0 ].langfuse
195- return self ._langfuse
72+ self ._langfuse_client : "Langfuse" = langfuse_client
19673
19774 def run_experiment (
19875 self ,
@@ -403,11 +280,7 @@ def content_diversity(*, item_results, **kwargs):
403280 - This method works in both sync and async contexts (Jupyter notebooks, web apps, etc.)
404281 - Async execution is handled automatically with smart event loop detection
405282 """
406- langfuse_client = self ._get_langfuse_client ()
407- if not langfuse_client :
408- raise ValueError ("No Langfuse client available. Dataset items are empty." )
409-
410- return langfuse_client .run_experiment (
283+ return self ._langfuse_client .run_experiment (
411284 name = name ,
412285 run_name = run_name ,
413286 description = description ,
0 commit comments