Skip to content

Commit 1bc90b0

Browse files
committed
Use as_type
1 parent 9ddc018 commit 1bc90b0

4 files changed

Lines changed: 257 additions & 77 deletions

File tree

langfuse/_client/client.py

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ def start_as_current_span(
350350
level: Optional[SpanLevel] = None,
351351
status_message: Optional[str] = None,
352352
end_on_exit: Optional[bool] = None,
353-
observation_type: Optional[str] = None,
353+
as_type: Optional[str] = None,
354354
) -> _AgnosticContextManager[LangfuseSpan]:
355355
"""Create a new span and set it as the current span in a context manager.
356356
@@ -410,14 +410,13 @@ def start_as_current_span(
410410
version=version,
411411
level=level,
412412
status_message=status_message,
413-
observation_type=observation_type,
414413
),
415414
)
416415

417416
return cast(
418417
_AgnosticContextManager[LangfuseSpan],
419418
self._start_as_current_otel_span_with_processed_media(
420-
as_type="span",
419+
as_type=as_type,
421420
name=name,
422421
end_on_exit=end_on_exit,
423422
input=input,
@@ -426,7 +425,6 @@ def start_as_current_span(
426425
version=version,
427426
level=level,
428427
status_message=status_message,
429-
observation_type=observation_type,
430428
),
431429
)
432430

@@ -669,27 +667,26 @@ def start_as_current_generation(
669667
),
670668
)
671669

672-
def _get_span_class(self, as_type: str, observation_type: Optional[str] = None):
673-
"""Get the appropriate span class based on as_type and observation_type."""
674-
if observation_type == "AGENT":
670+
def _get_span_class(self, as_type: str):
671+
"""Get the appropriate span class based on as_type."""
672+
if as_type == "AGENT":
675673
return LangfuseAgent
676-
elif observation_type == "TOOL":
674+
elif as_type == "TOOL":
677675
return LangfuseTool
678-
elif observation_type == "CHAIN":
676+
elif as_type == "CHAIN":
679677
return LangfuseChain
680-
elif observation_type == "RETRIEVER":
678+
elif as_type == "RETRIEVER":
681679
return LangfuseRetriever
682-
elif observation_type == "EMBEDDING":
680+
elif as_type == "EMBEDDING":
683681
return LangfuseEmbedding
684-
if as_type == "generation" or observation_type == "GENERATION":
682+
elif as_type in ("generation", "GENERATION"):
685683
return LangfuseGeneration
686-
elif as_type == "event" or observation_type == "EVENT":
684+
elif as_type in ("event", "EVENT"):
687685
return LangfuseEvent
688-
elif as_type == "span" or observation_type == "SPAN":
686+
elif as_type in ("span", "SPAN"):
689687
return LangfuseSpan
690688
else:
691-
# TODO: this should never happen -> error out?
692-
# for now default to LangfuseSpan for unknown types or "span" as_type
689+
# Default to LangfuseSpan for unknown types
693690
return LangfuseSpan
694691

695692
@_agnosticcontextmanager
@@ -699,7 +696,7 @@ def _create_span_with_parent_context(
699696
name: str,
700697
parent: Optional[otel_trace_api.Span] = None,
701698
remote_parent_span: Optional[otel_trace_api.Span] = None,
702-
as_type: Literal["generation", "span"],
699+
as_type: Optional[str] = None,
703700
end_on_exit: Optional[bool] = None,
704701
input: Optional[Any] = None,
705702
output: Optional[Any] = None,
@@ -713,7 +710,6 @@ def _create_span_with_parent_context(
713710
usage_details: Optional[Dict[str, int]] = None,
714711
cost_details: Optional[Dict[str, float]] = None,
715712
prompt: Optional[PromptClient] = None,
716-
observation_type: Optional[str] = None,
717713
) -> Any:
718714
parent_span = parent or cast(otel_trace_api.Span, remote_parent_span)
719715

@@ -734,7 +730,6 @@ def _create_span_with_parent_context(
734730
usage_details=usage_details,
735731
cost_details=cost_details,
736732
prompt=prompt,
737-
observation_type=observation_type,
738733
) as langfuse_span:
739734
if remote_parent_span is not None:
740735
langfuse_span._otel_span.set_attribute(
@@ -748,7 +743,7 @@ def _start_as_current_otel_span_with_processed_media(
748743
self,
749744
*,
750745
name: str,
751-
as_type: Optional[Literal["generation", "span"]] = None,
746+
as_type: Optional[str] = None,
752747
end_on_exit: Optional[bool] = None,
753748
input: Optional[Any] = None,
754749
output: Optional[Any] = None,
@@ -762,13 +757,12 @@ def _start_as_current_otel_span_with_processed_media(
762757
usage_details: Optional[Dict[str, int]] = None,
763758
cost_details: Optional[Dict[str, float]] = None,
764759
prompt: Optional[PromptClient] = None,
765-
observation_type: Optional[str] = None,
766760
) -> Any:
767761
with self._otel_tracer.start_as_current_span(
768762
name=name,
769763
end_on_exit=end_on_exit if end_on_exit is not None else True,
770764
) as otel_span:
771-
span_class = self._get_span_class(as_type, observation_type)
765+
span_class = self._get_span_class(as_type)
772766
common_args = {
773767
"otel_span": otel_span,
774768
"langfuse_client": self,
@@ -790,12 +784,21 @@ def _start_as_current_otel_span_with_processed_media(
790784
"usage_details": usage_details,
791785
"cost_details": cost_details,
792786
"prompt": prompt,
793-
"observation_type": observation_type,
794787
}
795788
)
789+
elif span_class in [
790+
LangfuseAgent,
791+
LangfuseTool,
792+
LangfuseChain,
793+
LangfuseRetriever,
794+
LangfuseEmbedding,
795+
]:
796+
# Graph observation classes set their specific observation_type internally
797+
pass
796798
else:
797-
if observation_type is not None:
798-
common_args["observation_type"] = observation_type
799+
# Regular spans (LangfuseSpan, LangfuseEvent) need as_type
800+
if as_type is not None:
801+
common_args["as_type"] = as_type
799802

800803
yield span_class(**common_args)
801804

langfuse/_client/observe.py

Lines changed: 59 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,15 @@
1919
overload,
2020
)
2121

22-
from opentelemetry import trace
2322
from opentelemetry.util._decorator import _AgnosticContextManager
2423
from typing_extensions import ParamSpec
2524

26-
from langfuse._client.attributes import LangfuseOtelSpanAttributes
2725
from langfuse._client.environment_variables import (
2826
LANGFUSE_OBSERVE_DECORATOR_IO_CAPTURE_ENABLED,
2927
)
28+
29+
from langfuse._client.constants import VALID_OBSERVATION_TYPES
30+
from langfuse._client.get_client import _set_current_public_key, get_client
3031
from langfuse._client.span import LangfuseGeneration, LangfuseSpan
3132
from langfuse.types import TraceContext
3233

@@ -67,7 +68,18 @@ def observe(
6768
*,
6869
name: Optional[str] = None,
6970
as_type: Optional[Literal["generation"]] = None,
70-
type: Optional[Literal["SPAN", "EVENT", "GENERATION", "AGENT", "TOOL", "CHAIN", "RETRIEVER", "EMBEDDING"]] = None,
71+
type: Optional[
72+
Literal[
73+
"SPAN",
74+
"EVENT",
75+
"GENERATION",
76+
"AGENT",
77+
"TOOL",
78+
"CHAIN",
79+
"RETRIEVER",
80+
"EMBEDDING",
81+
]
82+
] = None,
7183
capture_input: Optional[bool] = None,
7284
capture_output: Optional[bool] = None,
7385
transform_to_string: Optional[Callable[[Iterable], str]] = None,
@@ -79,7 +91,18 @@ def observe(
7991
*,
8092
name: Optional[str] = None,
8193
as_type: Optional[Literal["generation"]] = None,
82-
type: Optional[Literal["SPAN", "EVENT", "GENERATION", "AGENT", "TOOL", "CHAIN", "RETRIEVER", "EMBEDDING"]] = None,
94+
type: Optional[
95+
Literal[
96+
"SPAN",
97+
"EVENT",
98+
"GENERATION",
99+
"AGENT",
100+
"TOOL",
101+
"CHAIN",
102+
"RETRIEVER",
103+
"EMBEDDING",
104+
]
105+
] = None,
83106
capture_input: Optional[bool] = None,
84107
capture_output: Optional[bool] = None,
85108
transform_to_string: Optional[Callable[[Iterable], str]] = None,
@@ -98,8 +121,8 @@ def observe(
98121
name (Optional[str]): Custom name for the created trace or span. If not provided, the function name is used.
99122
as_type (Optional[Literal["generation"]]): Set to "generation" to create a specialized LLM generation span
100123
with model metrics support, suitable for tracking language model outputs.
101-
type (Optional[Literal]): Set the observation type directly. Supported values: "SPAN", "EVENT",
102-
"GENERATION", "AGENT", "TOOL", "CHAIN", "RETRIEVER", "EMBEDDING". When specified, creates spans with
124+
type (Optional[Literal]): Set the observation type for agentic workflows. Supported values: "SPAN", "EVENT",
125+
"GENERATION", "AGENT", "TOOL", "CHAIN", "RETRIEVER", "EMBEDDING". When specified, creates spans with
103126
the specified type for graph visualization and filtering in the Langfuse UI.
104127
105128
Returns:
@@ -130,14 +153,7 @@ async def generate_answer(query):
130153
```python
131154
@observe(type="AGENT")
132155
def planning_agent():
133-
# Creates a span with observation type "AGENT" for graph visualization
134156
return create_plan()
135-
136-
@observe(type="AGENT")
137-
def execution_agent():
138-
# Creates a span with observation type "AGENT"
139-
# Parent relationships inferred from OpenTelemetry span hierarchy
140-
return execute_plan()
141157
```
142158
143159
For trace context propagation between functions:
@@ -166,13 +182,19 @@ def sub_process():
166182
- For async functions, the decorator returns an async function wrapper.
167183
- For sync functions, the decorator returns a synchronous wrapper.
168184
"""
169-
# Validate type parameter if provided
170-
if type is not None:
171-
from langfuse._client.constants import VALID_OBSERVATION_TYPES
172-
if type not in VALID_OBSERVATION_TYPES:
173-
raise ValueError(
174-
f"Invalid observation type '{type}'. Valid types are: {', '.join(sorted(VALID_OBSERVATION_TYPES))}"
175-
)
185+
# Validate parameters
186+
if type is not None and type not in VALID_OBSERVATION_TYPES:
187+
raise ValueError(
188+
f"Invalid observation type '{type}'. Valid types are: {', '.join(sorted(VALID_OBSERVATION_TYPES))}"
189+
)
190+
if as_type is not None and as_type.upper() not in VALID_OBSERVATION_TYPES:
191+
valid_values = sorted(
192+
list(VALID_OBSERVATION_TYPES)
193+
+ [t.lower() for t in VALID_OBSERVATION_TYPES]
194+
)
195+
raise ValueError(
196+
f"Invalid as_type '{as_type}'. Valid values are: {', '.join(valid_values)}"
197+
)
176198

177199
function_io_capture_enabled = os.environ.get(
178200
LANGFUSE_OBSERVE_DECORATOR_IO_CAPTURE_ENABLED, "True"
@@ -189,12 +211,14 @@ def sub_process():
189211
)
190212

191213
def decorator(func: F) -> F:
214+
# Merge as_type and type parameters - type takes precedence for graph observations
215+
final_as_type = type or as_type
216+
192217
return (
193218
self._async_observe(
194219
func,
195220
name=name,
196-
as_type=as_type,
197-
observation_type=type,
221+
as_type=final_as_type,
198222
capture_input=should_capture_input,
199223
capture_output=should_capture_output,
200224
transform_to_string=transform_to_string,
@@ -203,8 +227,7 @@ def decorator(func: F) -> F:
203227
else self._sync_observe(
204228
func,
205229
name=name,
206-
as_type=as_type,
207-
observation_type=type,
230+
as_type=final_as_type,
208231
capture_input=should_capture_input,
209232
capture_output=should_capture_output,
210233
transform_to_string=transform_to_string,
@@ -231,8 +254,7 @@ def _async_observe(
231254
func: F,
232255
*,
233256
name: Optional[str],
234-
as_type: Optional[Literal["generation"]],
235-
observation_type: Optional[str],
257+
as_type: Optional[str],
236258
capture_input: bool,
237259
capture_output: bool,
238260
transform_to_string: Optional[Callable[[Iterable], str]] = None,
@@ -252,7 +274,6 @@ async def async_wrapper(*args: Tuple[Any], **kwargs: Dict[str, Any]) -> Any:
252274
else None
253275
)
254276
final_name = name or func.__name__
255-
256277
input = (
257278
self._get_input_from_func_args(
258279
is_method=self._is_method(func),
@@ -264,9 +285,9 @@ async def async_wrapper(*args: Tuple[Any], **kwargs: Dict[str, Any]) -> Any:
264285
)
265286
public_key = cast(str, kwargs.pop("langfuse_public_key", None))
266287
langfuse_client = get_client(public_key=public_key)
267-
268-
# Determine final observation type and create appropriate span
269-
final_obs_type = observation_type or as_type
288+
289+
# Use consolidated as_type parameter
290+
final_obs_type = as_type
270291

271292
context_manager: Optional[
272293
Union[
@@ -281,13 +302,13 @@ async def async_wrapper(*args: Tuple[Any], **kwargs: Dict[str, Any]) -> Any:
281302
input=input,
282303
end_on_exit=False, # when returning a generator, closing on exit would be to early
283304
)
284-
if final_obs_type == "generation" or observation_type == "GENERATION"
305+
if final_obs_type in ("generation", "GENERATION")
285306
else langfuse_client.start_as_current_span(
286307
name=final_name,
287308
trace_context=trace_context,
288309
input=input,
289310
end_on_exit=False, # when returning a generator, closing on exit would be to early
290-
observation_type=observation_type,
311+
as_type=final_obs_type,
291312
)
292313
)
293314
if langfuse_client
@@ -298,7 +319,6 @@ async def async_wrapper(*args: Tuple[Any], **kwargs: Dict[str, Any]) -> Any:
298319
return await func(*args, **kwargs)
299320

300321
with context_manager as langfuse_span_or_generation:
301-
302322
is_return_type_generator = False
303323

304324
try:
@@ -343,8 +363,7 @@ def _sync_observe(
343363
func: F,
344364
*,
345365
name: Optional[str],
346-
as_type: Optional[Literal["generation"]],
347-
observation_type: Optional[str],
366+
as_type: Optional[str],
348367
capture_input: bool,
349368
capture_output: bool,
350369
transform_to_string: Optional[Callable[[Iterable], str]] = None,
@@ -362,7 +381,6 @@ def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
362381
else None
363382
)
364383
final_name = name or func.__name__
365-
366384
input = (
367385
self._get_input_from_func_args(
368386
is_method=self._is_method(func),
@@ -374,9 +392,9 @@ def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
374392
)
375393
public_key = kwargs.pop("langfuse_public_key", None)
376394
langfuse_client = get_client(public_key=public_key)
377-
378-
# Determine final observation type and create appropriate span
379-
final_obs_type = observation_type or as_type
395+
396+
# Use consolidated as_type parameter
397+
final_obs_type = as_type
380398

381399
context_manager: Optional[
382400
Union[
@@ -391,13 +409,13 @@ def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
391409
input=input,
392410
end_on_exit=False, # when returning a generator, closing on exit would be to early
393411
)
394-
if final_obs_type == "generation" or observation_type == "GENERATION"
412+
if final_obs_type in ("generation", "GENERATION")
395413
else langfuse_client.start_as_current_span(
396414
name=final_name,
397415
trace_context=trace_context,
398416
input=input,
399417
end_on_exit=False, # when returning a generator, closing on exit would be to early
400-
observation_type=observation_type,
418+
as_type=final_obs_type,
401419
)
402420
)
403421
if langfuse_client
@@ -454,7 +472,6 @@ def _is_method(func: Callable) -> bool:
454472
or "cls" in inspect.signature(func).parameters
455473
)
456474

457-
458475
def _get_input_from_func_args(
459476
self,
460477
*,

0 commit comments

Comments
 (0)