Skip to content

Commit ca2c4c1

Browse files
authored
fix(observe): preserve streaming context without output capture (#1634)
1 parent cd9812c commit ca2c4c1

2 files changed

Lines changed: 343 additions & 51 deletions

File tree

langfuse/_client/observe.py

Lines changed: 108 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -563,10 +563,56 @@ def __init__(
563563
self.span = span
564564
self.capture_output = capture_output
565565
self.transform_fn = transform_fn
566+
self._span_ended = False
566567

567568
def __iter__(self) -> "_ContextPreservedSyncGeneratorWrapper":
568569
return self
569570

571+
def _finalize(self) -> None:
572+
if self._span_ended:
573+
return
574+
575+
if self.capture_output:
576+
output: Any = self.items
577+
578+
if self.transform_fn is not None:
579+
output = self.transform_fn(self.items)
580+
581+
elif all(isinstance(item, str) for item in self.items):
582+
output = "".join(self.items)
583+
584+
self.span.update(output=output)
585+
586+
self.span.end()
587+
self._span_ended = True
588+
589+
def _finalize_with_error(self, error: BaseException) -> None:
590+
if self._span_ended:
591+
return
592+
593+
self.span.update(
594+
level="ERROR", status_message=str(error) or type(error).__name__
595+
).end()
596+
self._span_ended = True
597+
598+
def close(self) -> None:
599+
if self._span_ended:
600+
return
601+
602+
try:
603+
self.context.run(self.generator.close)
604+
except (Exception, asyncio.CancelledError) as error:
605+
self._finalize_with_error(error)
606+
raise
607+
else:
608+
self._finalize()
609+
610+
def __del__(self) -> None:
611+
try:
612+
self.close()
613+
except BaseException:
614+
pass
615+
570616
def __next__(self) -> Any:
571617
try:
572618
# Run the generator's __next__ in the preserved context
@@ -577,27 +623,11 @@ def __next__(self) -> Any:
577623
return item
578624

579625
except StopIteration:
580-
# Handle output and span cleanup when generator is exhausted
581-
if self.capture_output:
582-
output: Any = self.items
583-
584-
if self.transform_fn is not None:
585-
output = self.transform_fn(self.items)
586-
587-
elif all(isinstance(item, str) for item in self.items):
588-
output = "".join(self.items)
589-
590-
self.span.update(output=output)
591-
592-
self.span.end()
593-
626+
self._finalize()
594627
raise # Re-raise StopIteration
595628

596629
except (Exception, asyncio.CancelledError) as e:
597-
self.span.update(
598-
level="ERROR", status_message=str(e) or type(e).__name__
599-
).end()
600-
630+
self._finalize_with_error(e)
601631
raise
602632

603633

@@ -628,47 +658,86 @@ def __init__(
628658
self.span = span
629659
self.capture_output = capture_output
630660
self.transform_fn = transform_fn
661+
self._span_ended = False
631662

632663
def __aiter__(self) -> "_ContextPreservedAsyncGeneratorWrapper":
633664
return self
634665

666+
def _finalize(self) -> None:
667+
if self._span_ended:
668+
return
669+
670+
if self.capture_output:
671+
output: Any = self.items
672+
673+
if self.transform_fn is not None:
674+
output = self.transform_fn(self.items)
675+
676+
elif all(isinstance(item, str) for item in self.items):
677+
output = "".join(self.items)
678+
679+
self.span.update(output=output)
680+
681+
self.span.end()
682+
self._span_ended = True
683+
684+
def _finalize_with_error(self, error: BaseException) -> None:
685+
if self._span_ended:
686+
return
687+
688+
self.span.update(
689+
level="ERROR", status_message=str(error) or type(error).__name__
690+
).end()
691+
self._span_ended = True
692+
693+
async def aclose(self) -> None:
694+
if self._span_ended:
695+
return
696+
697+
try:
698+
try:
699+
await asyncio.create_task(
700+
self.generator.aclose(),
701+
context=self.context,
702+
) # type: ignore
703+
except TypeError:
704+
await self.context.run(asyncio.create_task, self.generator.aclose())
705+
except (Exception, asyncio.CancelledError) as error:
706+
self._finalize_with_error(error)
707+
raise
708+
else:
709+
self._finalize()
710+
711+
async def close(self) -> None:
712+
await self.aclose()
713+
714+
def __del__(self) -> None:
715+
self._finalize()
716+
635717
async def __anext__(self) -> Any:
636718
try:
637719
# Run the generator's __anext__ in the preserved context
638720
try:
639-
# Python 3.10+ approach with context parameter
721+
# Python 3.11+ approach with explicit task context
640722
item = await asyncio.create_task(
641723
self.generator.__anext__(), # type: ignore
642724
context=self.context,
643725
) # type: ignore
644726
except TypeError:
645-
# Python < 3.10 fallback - context parameter not supported
646-
item = await self.generator.__anext__()
727+
# Python 3.10 fallback - create the task inside the preserved context.
728+
item = await self.context.run(
729+
asyncio.create_task,
730+
self.generator.__anext__(), # type: ignore
731+
)
647732

648733
if self.capture_output:
649734
self.items.append(item)
650735

651736
return item
652737

653738
except StopAsyncIteration:
654-
# Handle output and span cleanup when generator is exhausted
655-
if self.capture_output:
656-
output: Any = self.items
657-
658-
if self.transform_fn is not None:
659-
output = self.transform_fn(self.items)
660-
661-
elif all(isinstance(item, str) for item in self.items):
662-
output = "".join(self.items)
663-
664-
self.span.update(output=output)
665-
666-
self.span.end()
667-
739+
self._finalize()
668740
raise # Re-raise StopAsyncIteration
669741
except (Exception, asyncio.CancelledError) as e:
670-
self.span.update(
671-
level="ERROR", status_message=str(e) or type(e).__name__
672-
).end()
673-
742+
self._finalize_with_error(e)
674743
raise

0 commit comments

Comments
 (0)