Skip to content

Commit 3c9971e

Browse files
authored
Merge branch 'main' into fix/control-flow-exception-check-in-error-handlers
2 parents 44b9fcc + cd9812c commit 3c9971e

6 files changed

Lines changed: 199 additions & 92 deletions

File tree

.github/workflows/dependabot-merge.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ jobs:
1515
steps:
1616
- name: Dependabot metadata
1717
id: metadata
18-
uses: dependabot/fetch-metadata@ffa630c65fa7e0ecfa0625b5ceda64399aea1b36 # v3
18+
uses: dependabot/fetch-metadata@ffa630c65fa7e0ecfa0625b5ceda64399aea1b36 # v3.0.0
1919
with:
2020
github-token: "${{ secrets.GITHUB_TOKEN }}"
2121
- name: Enable auto-merge for Dependabot PRs

.github/workflows/zizmor.yml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ jobs:
1919
if: github.event_name != 'pull_request' || github.event.pull_request.head.repo.full_name == github.repository
2020
runs-on: ubuntu-latest
2121
permissions:
22-
security-events: write
2322
contents: read
2423
steps:
2524
- name: Checkout
@@ -29,4 +28,7 @@ jobs:
2928
- name: Run zizmor
3029
uses: zizmorcore/zizmor-action@b1d7e1fb5de872772f31590499237e7cce841e8e # v0.5.3
3130
with:
32-
advanced-security: true
31+
# Using false as a code scanning ruleset would block the release
32+
# workflow which creates a new commit and pushes directly to main.
33+
advanced-security: false
34+
min-severity: medium

langfuse/_client/observe.py

Lines changed: 99 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -290,42 +290,15 @@ async def async_wrapper(*args: Tuple[Any], **kwargs: Dict[str, Any]) -> Any:
290290

291291
try:
292292
result = await func(*args, **kwargs)
293-
294-
if capture_output is True:
295-
if inspect.isgenerator(result):
296-
is_return_type_generator = True
297-
298-
return self._wrap_sync_generator_result(
299-
langfuse_span_or_generation,
300-
result,
301-
transform_to_string,
302-
)
303-
304-
if inspect.isasyncgen(result):
305-
is_return_type_generator = True
306-
307-
return self._wrap_async_generator_result(
308-
langfuse_span_or_generation,
309-
result,
310-
transform_to_string,
311-
)
312-
313-
# handle starlette.StreamingResponse
314-
if type(result).__name__ == "StreamingResponse" and hasattr(
315-
result, "body_iterator"
316-
):
317-
is_return_type_generator = True
318-
319-
result.body_iterator = (
320-
self._wrap_async_generator_result(
321-
langfuse_span_or_generation,
322-
result.body_iterator,
323-
transform_to_string,
324-
)
325-
)
326-
327-
langfuse_span_or_generation.update(output=result)
328-
293+
(
294+
is_return_type_generator,
295+
result,
296+
) = self._handle_observe_result(
297+
langfuse_span_or_generation,
298+
result,
299+
capture_output=capture_output,
300+
transform_to_string=transform_to_string,
301+
)
329302
return result
330303
except (Exception, asyncio.CancelledError) as e:
331304
langfuse_span_or_generation.update(
@@ -408,42 +381,15 @@ def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
408381

409382
try:
410383
result = func(*args, **kwargs)
411-
412-
if capture_output is True:
413-
if inspect.isgenerator(result):
414-
is_return_type_generator = True
415-
416-
return self._wrap_sync_generator_result(
417-
langfuse_span_or_generation,
418-
result,
419-
transform_to_string,
420-
)
421-
422-
if inspect.isasyncgen(result):
423-
is_return_type_generator = True
424-
425-
return self._wrap_async_generator_result(
426-
langfuse_span_or_generation,
427-
result,
428-
transform_to_string,
429-
)
430-
431-
# handle starlette.StreamingResponse
432-
if type(result).__name__ == "StreamingResponse" and hasattr(
433-
result, "body_iterator"
434-
):
435-
is_return_type_generator = True
436-
437-
result.body_iterator = (
438-
self._wrap_async_generator_result(
439-
langfuse_span_or_generation,
440-
result.body_iterator,
441-
transform_to_string,
442-
)
443-
)
444-
445-
langfuse_span_or_generation.update(output=result)
446-
384+
(
385+
is_return_type_generator,
386+
result,
387+
) = self._handle_observe_result(
388+
langfuse_span_or_generation,
389+
result,
390+
capture_output=capture_output,
391+
transform_to_string=transform_to_string,
392+
)
447393
return result
448394
except (Exception, asyncio.CancelledError) as e:
449395
langfuse_span_or_generation.update(
@@ -493,6 +439,7 @@ def _wrap_sync_generator_result(
493439
LangfuseGuardrail,
494440
],
495441
generator: Generator,
442+
capture_output: bool,
496443
transform_to_string: Optional[Callable[[Iterable], str]] = None,
497444
) -> Any:
498445
preserved_context = contextvars.copy_context()
@@ -501,6 +448,7 @@ def _wrap_sync_generator_result(
501448
generator,
502449
preserved_context,
503450
langfuse_span_or_generation,
451+
capture_output,
504452
transform_to_string,
505453
)
506454

@@ -518,6 +466,7 @@ def _wrap_async_generator_result(
518466
LangfuseGuardrail,
519467
],
520468
generator: AsyncGenerator,
469+
capture_output: bool,
521470
transform_to_string: Optional[Callable[[Iterable], str]] = None,
522471
) -> Any:
523472
preserved_context = contextvars.copy_context()
@@ -526,9 +475,61 @@ def _wrap_async_generator_result(
526475
generator,
527476
preserved_context,
528477
langfuse_span_or_generation,
478+
capture_output,
529479
transform_to_string,
530480
)
531481

482+
def _handle_observe_result(
483+
self,
484+
langfuse_span_or_generation: Union[
485+
LangfuseSpan,
486+
LangfuseGeneration,
487+
LangfuseAgent,
488+
LangfuseTool,
489+
LangfuseChain,
490+
LangfuseRetriever,
491+
LangfuseEvaluator,
492+
LangfuseEmbedding,
493+
LangfuseGuardrail,
494+
],
495+
result: Any,
496+
*,
497+
capture_output: bool,
498+
transform_to_string: Optional[Callable[[Iterable], str]] = None,
499+
) -> Tuple[bool, Any]:
500+
if inspect.isgenerator(result):
501+
return True, self._wrap_sync_generator_result(
502+
langfuse_span_or_generation,
503+
result,
504+
capture_output,
505+
transform_to_string,
506+
)
507+
508+
if inspect.isasyncgen(result):
509+
return True, self._wrap_async_generator_result(
510+
langfuse_span_or_generation,
511+
result,
512+
capture_output,
513+
transform_to_string,
514+
)
515+
516+
# handle starlette.StreamingResponse
517+
if type(result).__name__ == "StreamingResponse" and hasattr(
518+
result, "body_iterator"
519+
):
520+
result.body_iterator = self._wrap_async_generator_result(
521+
langfuse_span_or_generation,
522+
result.body_iterator,
523+
capture_output,
524+
transform_to_string,
525+
)
526+
return True, result
527+
528+
if capture_output is True:
529+
langfuse_span_or_generation.update(output=result)
530+
531+
return False, result
532+
532533

533534
_decorator = LangfuseDecorator()
534535

@@ -553,12 +554,14 @@ def __init__(
553554
LangfuseEmbedding,
554555
LangfuseGuardrail,
555556
],
557+
capture_output: bool,
556558
transform_fn: Optional[Callable[[Iterable], str]],
557559
) -> None:
558560
self.generator = generator
559561
self.context = context
560562
self.items: List[Any] = []
561563
self.span = span
564+
self.capture_output = capture_output
562565
self.transform_fn = transform_fn
563566

564567
def __iter__(self) -> "_ContextPreservedSyncGeneratorWrapper":
@@ -568,21 +571,25 @@ def __next__(self) -> Any:
568571
try:
569572
# Run the generator's __next__ in the preserved context
570573
item = self.context.run(next, self.generator)
571-
self.items.append(item)
574+
if self.capture_output:
575+
self.items.append(item)
572576

573577
return item
574578

575579
except StopIteration:
576580
# Handle output and span cleanup when generator is exhausted
577-
output: Any = self.items
581+
if self.capture_output:
582+
output: Any = self.items
583+
584+
if self.transform_fn is not None:
585+
output = self.transform_fn(self.items)
578586

579-
if self.transform_fn is not None:
580-
output = self.transform_fn(self.items)
587+
elif all(isinstance(item, str) for item in self.items):
588+
output = "".join(self.items)
581589

582-
elif all(isinstance(item, str) for item in self.items):
583-
output = "".join(self.items)
590+
self.span.update(output=output)
584591

585-
self.span.update(output=output).end()
592+
self.span.end()
586593

587594
raise # Re-raise StopIteration
588595

@@ -612,12 +619,14 @@ def __init__(
612619
LangfuseEmbedding,
613620
LangfuseGuardrail,
614621
],
622+
capture_output: bool,
615623
transform_fn: Optional[Callable[[Iterable], str]],
616624
) -> None:
617625
self.generator = generator
618626
self.context = context
619627
self.items: List[Any] = []
620628
self.span = span
629+
self.capture_output = capture_output
621630
self.transform_fn = transform_fn
622631

623632
def __aiter__(self) -> "_ContextPreservedAsyncGeneratorWrapper":
@@ -636,21 +645,25 @@ async def __anext__(self) -> Any:
636645
# Python < 3.10 fallback - context parameter not supported
637646
item = await self.generator.__anext__()
638647

639-
self.items.append(item)
648+
if self.capture_output:
649+
self.items.append(item)
640650

641651
return item
642652

643653
except StopAsyncIteration:
644654
# Handle output and span cleanup when generator is exhausted
645-
output: Any = self.items
655+
if self.capture_output:
656+
output: Any = self.items
657+
658+
if self.transform_fn is not None:
659+
output = self.transform_fn(self.items)
646660

647-
if self.transform_fn is not None:
648-
output = self.transform_fn(self.items)
661+
elif all(isinstance(item, str) for item in self.items):
662+
output = "".join(self.items)
649663

650-
elif all(isinstance(item, str) for item in self.items):
651-
output = "".join(self.items)
664+
self.span.update(output=output)
652665

653-
self.span.update(output=output).end()
666+
self.span.end()
654667

655668
raise # Re-raise StopAsyncIteration
656669
except (Exception, asyncio.CancelledError) as e:

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "langfuse"
3-
version = "4.3.1"
3+
version = "4.4.0b1"
44
description = "A client library for accessing langfuse"
55
readme = "README.md"
66
authors = [{ name = "langfuse", email = "developers@langfuse.com" }]

0 commit comments

Comments
 (0)