Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 209 additions & 42 deletions langfuse/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,178 @@
)


_openai_stream_iter_hook_installed = False


def _install_openai_stream_iteration_hooks() -> None:
global _openai_stream_iter_hook_installed

if not _is_openai_v1():
return

if not _openai_stream_iter_hook_installed:
original_iter = openai.Stream.__iter__

def traced_iter(self: Any) -> Any:
try:
yield from original_iter(self)
finally:
finalize_once = getattr(self, "_langfuse_finalize_once", None)
if finalize_once is not None:
finalize_once()

setattr(openai.Stream, "__iter__", traced_iter)
_openai_stream_iter_hook_installed = True
Comment thread
claude[bot] marked this conversation as resolved.


def _finalize_stream_response(
*,
resource: OpenAiDefinition,
items: list[Any],
generation: LangfuseGeneration,
completion_start_time: Optional[datetime],
) -> None:
try:
model, completion, usage, metadata = (
_extract_streamed_response_api_response(items)
if resource.object == "Responses" or resource.object == "AsyncResponses"
else _extract_streamed_openai_response(resource, items)
)

_create_langfuse_update(
completion,
generation,
completion_start_time,
model=model,
usage=usage,
metadata=metadata,
)
except Exception:
pass
finally:
generation.end()


def _instrument_openai_stream(
*,
resource: OpenAiDefinition,
response: Any,
generation: LangfuseGeneration,
) -> Any:
if not hasattr(response, "_iterator"):
return LangfuseResponseGeneratorSync(
resource=resource,
response=response,
generation=generation,
)

items: list[Any] = []
raw_iterator = response._iterator
completion_start_time: Optional[datetime] = None
is_finalized = False
close = response.close

def finalize_once() -> None:
nonlocal is_finalized
if is_finalized:
return

is_finalized = True
_finalize_stream_response(
resource=resource,
items=items,
generation=generation,
completion_start_time=completion_start_time,
)

response._langfuse_finalize_once = finalize_once # type: ignore[attr-defined]

def traced_iterator() -> Any:
nonlocal completion_start_time
try:
for item in raw_iterator:
items.append(item)

if completion_start_time is None:
completion_start_time = _get_timestamp()

yield item
finally:
finalize_once()

def traced_close() -> Any:
try:
return close()
finally:
finalize_once()

response._iterator = traced_iterator()
response.close = traced_close
Comment thread
claude[bot] marked this conversation as resolved.

return response


def _instrument_openai_async_stream(
*,
resource: OpenAiDefinition,
response: Any,
generation: LangfuseGeneration,
) -> Any:
if not hasattr(response, "_iterator"):
return LangfuseResponseGeneratorAsync(
resource=resource,
response=response,
generation=generation,
)

items: list[Any] = []
raw_iterator = response._iterator
completion_start_time: Optional[datetime] = None
is_finalized = False
close = response.close

async def finalize_once() -> None:
nonlocal is_finalized
if is_finalized:
return

is_finalized = True
_finalize_stream_response(
resource=resource,
items=items,
generation=generation,
completion_start_time=completion_start_time,
)

async def traced_iterator() -> Any:
nonlocal completion_start_time
try:
async for item in raw_iterator:
items.append(item)

if completion_start_time is None:
completion_start_time = _get_timestamp()

yield item
finally:
await finalize_once()

async def traced_close() -> Any:
try:
return await close()
finally:
await finalize_once()

async def traced_aclose() -> Any:
return await traced_close()

response._iterator = traced_iterator()
response.close = traced_close
response.aclose = traced_aclose
Comment thread
hassiebp marked this conversation as resolved.

return response


@_langfuse_wrapper
def _wrap(
open_ai_resource: OpenAiDefinition, wrapped: Any, args: Any, kwargs: Any
Expand Down Expand Up @@ -863,7 +1035,13 @@
try:
openai_response = wrapped(**arg_extractor.get_openai_args())

if _is_streaming_response(openai_response):
if _is_openai_v1() and isinstance(openai_response, openai.Stream):
return _instrument_openai_stream(
resource=open_ai_resource,
response=openai_response,
generation=generation,
)
elif _is_streaming_response(openai_response):
return LangfuseResponseGeneratorSync(
resource=open_ai_resource,
response=openai_response,
Comment thread
hassiebp marked this conversation as resolved.
Expand Down Expand Up @@ -934,7 +1112,13 @@
try:
openai_response = await wrapped(**arg_extractor.get_openai_args())

if _is_streaming_response(openai_response):
if _is_openai_v1() and isinstance(openai_response, openai.AsyncStream):
return _instrument_openai_async_stream(
resource=open_ai_resource,
response=openai_response,
generation=generation,
)
elif _is_streaming_response(openai_response):
return LangfuseResponseGeneratorAsync(
resource=open_ai_resource,
response=openai_response,
Expand Down Expand Up @@ -994,6 +1178,7 @@


register_tracing()
_install_openai_stream_iteration_hooks()


class LangfuseResponseGeneratorSync:
Expand All @@ -1010,6 +1195,7 @@
self.response = response
self.generation = generation
self.completion_start_time: Optional[datetime] = None
self._is_finalized = False

def __iter__(self) -> Any:
try:
Expand Down Expand Up @@ -1045,26 +1231,16 @@
pass

def _finalize(self) -> None:
try:
model, completion, usage, metadata = (
_extract_streamed_response_api_response(self.items)
if self.resource.object == "Responses"
or self.resource.object == "AsyncResponses"
else _extract_streamed_openai_response(self.resource, self.items)
)

_create_langfuse_update(
completion,
self.generation,
self.completion_start_time,
model=model,
usage=usage,
metadata=metadata,
)
except Exception:
pass
finally:
self.generation.end()
if self._is_finalized:
return

self._is_finalized = True
_finalize_stream_response(
resource=self.resource,
items=self.items,
generation=self.generation,
completion_start_time=self.completion_start_time,
)


class LangfuseResponseGeneratorAsync:
Expand All @@ -1081,6 +1257,7 @@
self.response = response
self.generation = generation
self.completion_start_time: Optional[datetime] = None
self._is_finalized = False

async def __aiter__(self) -> Any:
try:
Expand Down Expand Up @@ -1113,32 +1290,22 @@
return self.__aiter__()

async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
pass

async def _finalize(self) -> None:
try:
model, completion, usage, metadata = (
_extract_streamed_response_api_response(self.items)
if self.resource.object == "Responses"
or self.resource.object == "AsyncResponses"
else _extract_streamed_openai_response(self.resource, self.items)
)

_create_langfuse_update(
completion,
self.generation,
self.completion_start_time,
model=model,
usage=usage,
metadata=metadata,
)
except Exception:
pass
finally:
self.generation.end()
if self._is_finalized:
return

self._is_finalized = True
_finalize_stream_response(
resource=self.resource,
items=self.items,
generation=self.generation,
completion_start_time=self.completion_start_time,
)

async def close(self) -> None:
"""Close the response and release the connection.

Check notice on line 1308 in langfuse/openai.py

View check run for this annotation

Claude / Claude Code Review

Fallback async wrapper close/exit paths skip _finalize

The fallback async wrapper's `close()`, `aclose()`, and `__aexit__` methods never call `self._finalize()`, so if a user receives a `LangfuseResponseGeneratorAsync` wrapper (for non-OpenAI generator-style async streams) and closes or context-manages it without iterating, `generation.end()` is never called and the Langfuse span leaks. The same issue exists for `LangfuseResponseGeneratorSync.__exit__`. This is a pre-existing issue that predates this PR; the PR's `_is_finalized` guard improvements t
Comment thread
claude[bot] marked this conversation as resolved.
Outdated

Automatically called if the response body is read to completion.
"""
Expand Down
Loading
Loading