Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
307 changes: 261 additions & 46 deletions langfuse/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime
from inspect import isclass
from inspect import isawaitable, isclass
from typing import Any, Optional, cast

from openai._types import NotGiven
Expand Down Expand Up @@ -830,6 +830,191 @@
)


_openai_stream_iter_hook_installed = False


def _install_openai_stream_iteration_hooks() -> None:
global _openai_stream_iter_hook_installed

if not _is_openai_v1():
return

if not _openai_stream_iter_hook_installed:
original_iter = openai.Stream.__iter__
original_aiter = openai.AsyncStream.__aiter__

def traced_iter(self: Any) -> Any:
try:
yield from original_iter(self)
finally:
finalize_once = getattr(self, "_langfuse_finalize_once", None)
if finalize_once is not None:
finalize_once()

async def traced_aiter(self: Any) -> Any:
try:
async for item in original_aiter(self):
yield item
finally:
finalize_once = getattr(self, "_langfuse_finalize_once", None)
if finalize_once is not None:
await finalize_once()

Check failure on line 862 in langfuse/openai.py

View check run for this annotation

Claude / Claude Code Review

Async break finalization non-deterministic: traced_aiter defers finalization to asyncio asyncgen hooks

The `traced_aiter` hook in `_install_openai_stream_iteration_hooks` is an async generator (`async for item in original_aiter(self): yield item`), so its `finally` block does **not** run synchronously when the caller does `async for chunk in stream: break` — Python defers async generator finalization to asyncio's `sys.set_asyncgen_hooks` mechanism (PEP 525), meaning `generation.end()` is called only after multiple event loop turns or never in non-asyncio environments. By contrast, the sync `trace
Comment thread
hassiebp marked this conversation as resolved.
setattr(openai.Stream, "__iter__", traced_iter)
setattr(openai.AsyncStream, "__aiter__", traced_aiter)
_openai_stream_iter_hook_installed = True
Comment thread
claude[bot] marked this conversation as resolved.


def _finalize_stream_response(
*,
resource: OpenAiDefinition,
items: list[Any],
generation: LangfuseGeneration,
completion_start_time: Optional[datetime],
) -> None:
try:
model, completion, usage, metadata = (
_extract_streamed_response_api_response(items)
if resource.object == "Responses" or resource.object == "AsyncResponses"
else _extract_streamed_openai_response(resource, items)
)

_create_langfuse_update(
completion,
generation,
completion_start_time,
model=model,
usage=usage,
metadata=metadata,
)
except Exception:
pass
finally:
generation.end()


def _instrument_openai_stream(
*,
resource: OpenAiDefinition,
response: Any,
generation: LangfuseGeneration,
) -> Any:
if not hasattr(response, "_iterator"):
return LangfuseResponseGeneratorSync(
resource=resource,
response=response,
generation=generation,
)

items: list[Any] = []
raw_iterator = response._iterator
completion_start_time: Optional[datetime] = None
is_finalized = False
close = response.close

def finalize_once() -> None:
nonlocal is_finalized
if is_finalized:
return

is_finalized = True
_finalize_stream_response(
resource=resource,
items=items,
generation=generation,
completion_start_time=completion_start_time,
)

response._langfuse_finalize_once = finalize_once # type: ignore[attr-defined]

def traced_iterator() -> Any:
nonlocal completion_start_time
try:
for item in raw_iterator:
items.append(item)

if completion_start_time is None:
completion_start_time = _get_timestamp()

yield item
finally:
finalize_once()

def traced_close() -> Any:
try:
return close()
finally:
finalize_once()

response._iterator = traced_iterator()
response.close = traced_close
Comment thread
claude[bot] marked this conversation as resolved.

return response


def _instrument_openai_async_stream(
*,
resource: OpenAiDefinition,
response: Any,
generation: LangfuseGeneration,
) -> Any:
if not hasattr(response, "_iterator"):
return LangfuseResponseGeneratorAsync(
resource=resource,
response=response,
generation=generation,
)

items: list[Any] = []
raw_iterator = response._iterator
completion_start_time: Optional[datetime] = None
is_finalized = False
close = response.close

async def finalize_once() -> None:
nonlocal is_finalized
if is_finalized:
return

is_finalized = True
_finalize_stream_response(
resource=resource,
items=items,
generation=generation,
completion_start_time=completion_start_time,
)

response._langfuse_finalize_once = finalize_once # type: ignore[attr-defined]

async def traced_iterator() -> Any:
nonlocal completion_start_time
try:
async for item in raw_iterator:
items.append(item)

if completion_start_time is None:
completion_start_time = _get_timestamp()

yield item
finally:
await finalize_once()

async def traced_close() -> Any:
try:
return await close()
finally:
await finalize_once()

async def traced_aclose() -> Any:
return await traced_close()

response._iterator = traced_iterator()
response.close = traced_close
response.aclose = traced_aclose
Comment thread
hassiebp marked this conversation as resolved.

return response


@_langfuse_wrapper
def _wrap(
open_ai_resource: OpenAiDefinition, wrapped: Any, args: Any, kwargs: Any
Expand Down Expand Up @@ -860,13 +1045,19 @@
prompt=langfuse_data.get("prompt", None),
)

try:
openai_response = wrapped(**arg_extractor.get_openai_args())

if _is_streaming_response(openai_response):
if _is_openai_v1() and isinstance(openai_response, openai.Stream):
return _instrument_openai_stream(
resource=open_ai_resource,
response=openai_response,
generation=generation,
)
elif _is_streaming_response(openai_response):
return LangfuseResponseGeneratorSync(
resource=open_ai_resource,
response=openai_response,

Check warning on line 1060 in langfuse/openai.py

View check run for this annotation

Claude / Claude Code Review

Dead code in _is_streaming_response: openai.Stream/AsyncStream conditions unreachable after PR

The `_is_streaming_response` function contains two dead conditions: `or (_is_openai_v1() and isinstance(response, openai.Stream))` and the `AsyncStream` equivalent. These can never be true when the function is called, because both call-sites (`_wrap` and `_wrap_async`) only reach the `elif _is_streaming_response(...)` branch after the preceding `isinstance(openai_response, openai.Stream/AsyncStream)` checks have already returned False. The dead conditions create a misleading impression that `_is
Comment thread
hassiebp marked this conversation as resolved.
generation=generation,
)

Expand Down Expand Up @@ -934,7 +1125,13 @@
try:
openai_response = await wrapped(**arg_extractor.get_openai_args())

if _is_streaming_response(openai_response):
if _is_openai_v1() and isinstance(openai_response, openai.AsyncStream):
return _instrument_openai_async_stream(
resource=open_ai_resource,
response=openai_response,
generation=generation,
)
elif _is_streaming_response(openai_response):
return LangfuseResponseGeneratorAsync(
resource=open_ai_resource,
response=openai_response,
Expand Down Expand Up @@ -994,6 +1191,7 @@


register_tracing()
_install_openai_stream_iteration_hooks()


class LangfuseResponseGeneratorSync:
Expand All @@ -1010,6 +1208,7 @@
self.response = response
self.generation = generation
self.completion_start_time: Optional[datetime] = None
self._is_finalized = False

def __iter__(self) -> Any:
try:
Expand Down Expand Up @@ -1042,29 +1241,28 @@
return self.__iter__()

def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
pass
self.close()

def _finalize(self) -> None:
try:
model, completion, usage, metadata = (
_extract_streamed_response_api_response(self.items)
if self.resource.object == "Responses"
or self.resource.object == "AsyncResponses"
else _extract_streamed_openai_response(self.resource, self.items)
)
def close(self) -> None:
close = getattr(self.response, "close", None)

_create_langfuse_update(
completion,
self.generation,
self.completion_start_time,
model=model,
usage=usage,
metadata=metadata,
)
except Exception:
pass
try:
if callable(close):
close()
finally:
self.generation.end()
self._finalize()

def _finalize(self) -> None:
if self._is_finalized:
return

self._is_finalized = True
_finalize_stream_response(
resource=self.resource,
items=self.items,
generation=self.generation,
completion_start_time=self.completion_start_time,
)


class LangfuseResponseGeneratorAsync:
Expand All @@ -1081,6 +1279,7 @@
self.response = response
self.generation = generation
self.completion_start_time: Optional[datetime] = None
self._is_finalized = False

async def __aiter__(self) -> Any:
try:
Expand Down Expand Up @@ -1113,40 +1312,56 @@
return self.__aiter__()

async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
pass
await self.aclose()

async def _finalize(self) -> None:
try:
model, completion, usage, metadata = (
_extract_streamed_response_api_response(self.items)
if self.resource.object == "Responses"
or self.resource.object == "AsyncResponses"
else _extract_streamed_openai_response(self.resource, self.items)
)

_create_langfuse_update(
completion,
self.generation,
self.completion_start_time,
model=model,
usage=usage,
metadata=metadata,
)
except Exception:
pass
finally:
self.generation.end()
if self._is_finalized:
return

self._is_finalized = True
_finalize_stream_response(
resource=self.resource,
items=self.items,
generation=self.generation,
completion_start_time=self.completion_start_time,
)

async def close(self) -> None:
"""Close the response and release the connection.

Automatically called if the response body is read to completion.
"""
await self.response.close()
close = getattr(self.response, "close", None)
aclose = getattr(self.response, "aclose", None)

try:
if callable(close):
result = close()
if isawaitable(result):
await result
elif callable(aclose):
result = aclose()
if isawaitable(result):
await result
finally:
await self._finalize()

async def aclose(self) -> None:
"""Close the response and release the connection.

Automatically called if the response body is read to completion.
"""
await self.response.aclose()
aclose = getattr(self.response, "aclose", None)
close = getattr(self.response, "close", None)

try:
if callable(aclose):
result = aclose()
if isawaitable(result):
await result
elif callable(close):
result = close()
if isawaitable(result):
await result
finally:
await self._finalize()
Loading
Loading