Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 187 additions & 42 deletions langfuse/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,167 @@
)


def _finalize_stream_response(
*,
resource: OpenAiDefinition,
items: list[Any],
generation: LangfuseGeneration,
completion_start_time: Optional[datetime],
) -> None:
try:
model, completion, usage, metadata = (
_extract_streamed_response_api_response(items)
if resource.object == "Responses" or resource.object == "AsyncResponses"
else _extract_streamed_openai_response(resource, items)
)

_create_langfuse_update(
completion,
generation,
completion_start_time,
model=model,
usage=usage,
metadata=metadata,
)
except Exception:
pass
finally:
generation.end()


async def _finalize_stream_response_async(
*,
resource: OpenAiDefinition,
items: list[Any],
generation: LangfuseGeneration,
completion_start_time: Optional[datetime],
) -> None:
_finalize_stream_response(
resource=resource,
items=items,
generation=generation,
completion_start_time=completion_start_time,
)


def _instrument_openai_stream(
*,
resource: OpenAiDefinition,
response: Any,
generation: LangfuseGeneration,
) -> Any:
if not hasattr(response, "_iterator"):
return LangfuseResponseGeneratorSync(
resource=resource,
response=response,
generation=generation,
)

items: list[Any] = []
raw_iterator = response._iterator
completion_start_time: Optional[datetime] = None
is_finalized = False
close = response.close

def finalize_once() -> None:
nonlocal is_finalized
if is_finalized:
return

is_finalized = True
_finalize_stream_response(
resource=resource,
items=items,
generation=generation,
completion_start_time=completion_start_time,
)

def traced_iterator() -> Any:
nonlocal completion_start_time
try:
for item in raw_iterator:
items.append(item)

if completion_start_time is None:
completion_start_time = _get_timestamp()

yield item
finally:
finalize_once()

def traced_close() -> Any:
try:
return close()
finally:
finalize_once()

response._iterator = traced_iterator()
response.close = traced_close

Check failure on line 928 in langfuse/openai.py

View check run for this annotation

Claude / Claude Code Review

Langfuse generation leaked when user breaks from stream loop without calling close()

The new native stream instrumentation introduces a regression: breaking from a `for chunk in stream:` loop without calling `stream.close()` leaves the Langfuse generation un-finalized until GC. This happens because `openai.Stream.__iter__` iterates `self._iterator` (an external attribute reference), and Python does not propagate `close()` to externally-referenced sub-generators when the outer generator is closed by a `break`. The previous `LangfuseResponseGeneratorSync.__iter__` had a `try/final
Comment thread
claude[bot] marked this conversation as resolved.

return response


def _instrument_openai_async_stream(
*,
resource: OpenAiDefinition,
response: Any,
generation: LangfuseGeneration,
) -> Any:
if not hasattr(response, "_iterator"):
return LangfuseResponseGeneratorAsync(
resource=resource,
response=response,
generation=generation,
)

items: list[Any] = []
raw_iterator = response._iterator
completion_start_time: Optional[datetime] = None
is_finalized = False
close = response.close

async def finalize_once() -> None:
nonlocal is_finalized
if is_finalized:
return

is_finalized = True
await _finalize_stream_response_async(
resource=resource,
items=items,
generation=generation,
completion_start_time=completion_start_time,
)

async def traced_iterator() -> Any:
nonlocal completion_start_time
try:
async for item in raw_iterator:
items.append(item)

if completion_start_time is None:
completion_start_time = _get_timestamp()

yield item
finally:
await finalize_once()

async def traced_close() -> Any:
try:
return await close()
finally:
await finalize_once()

async def traced_aclose() -> Any:
return await traced_close()

response._iterator = traced_iterator()
response.close = traced_close
response.aclose = traced_aclose
Comment thread
hassiebp marked this conversation as resolved.

return response


@_langfuse_wrapper
def _wrap(
open_ai_resource: OpenAiDefinition, wrapped: Any, args: Any, kwargs: Any
Expand Down Expand Up @@ -863,7 +1024,13 @@
try:
openai_response = wrapped(**arg_extractor.get_openai_args())

if _is_streaming_response(openai_response):
if _is_openai_v1() and isinstance(openai_response, openai.Stream):
return _instrument_openai_stream(
resource=open_ai_resource,
response=openai_response,
generation=generation,
)
elif _is_streaming_response(openai_response):
return LangfuseResponseGeneratorSync(
resource=open_ai_resource,
response=openai_response,
Comment thread
hassiebp marked this conversation as resolved.
Expand Down Expand Up @@ -934,7 +1101,13 @@
try:
openai_response = await wrapped(**arg_extractor.get_openai_args())

if _is_streaming_response(openai_response):
if _is_openai_v1() and isinstance(openai_response, openai.AsyncStream):
return _instrument_openai_async_stream(
resource=open_ai_resource,
response=openai_response,
generation=generation,
)
elif _is_streaming_response(openai_response):
return LangfuseResponseGeneratorAsync(
resource=open_ai_resource,
response=openai_response,
Expand Down Expand Up @@ -1045,26 +1218,12 @@
pass

def _finalize(self) -> None:
try:
model, completion, usage, metadata = (
_extract_streamed_response_api_response(self.items)
if self.resource.object == "Responses"
or self.resource.object == "AsyncResponses"
else _extract_streamed_openai_response(self.resource, self.items)
)

_create_langfuse_update(
completion,
self.generation,
self.completion_start_time,
model=model,
usage=usage,
metadata=metadata,
)
except Exception:
pass
finally:
self.generation.end()
_finalize_stream_response(
resource=self.resource,
items=self.items,
generation=self.generation,
completion_start_time=self.completion_start_time,
)


class LangfuseResponseGeneratorAsync:
Comment thread
claude[bot] marked this conversation as resolved.
Outdated
Expand Down Expand Up @@ -1116,26 +1275,12 @@
pass

async def _finalize(self) -> None:
try:
model, completion, usage, metadata = (
_extract_streamed_response_api_response(self.items)
if self.resource.object == "Responses"
or self.resource.object == "AsyncResponses"
else _extract_streamed_openai_response(self.resource, self.items)
)

_create_langfuse_update(
completion,
self.generation,
self.completion_start_time,
model=model,
usage=usage,
metadata=metadata,
)
except Exception:
pass
finally:
self.generation.end()
await _finalize_stream_response_async(
resource=self.resource,
items=self.items,
generation=self.generation,
completion_start_time=self.completion_start_time,
)

async def close(self) -> None:
"""Close the response and release the connection.
Expand Down
Loading
Loading