Skip to content

Commit 2dcc02b

Browse files
committed
fix(openai): finalize sync stream on early break
1 parent b818885 commit 2dcc02b

2 files changed

Lines changed: 61 additions & 17 deletions

File tree

langfuse/openai.py

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -830,6 +830,30 @@ def _is_streaming_response(response: Any) -> bool:
830830
)
831831

832832

833+
_openai_stream_iter_hook_installed = False
834+
835+
836+
def _install_openai_stream_iteration_hooks() -> None:
837+
global _openai_stream_iter_hook_installed
838+
839+
if not _is_openai_v1():
840+
return
841+
842+
if not _openai_stream_iter_hook_installed:
843+
original_iter = openai.Stream.__iter__
844+
845+
def traced_iter(self: Any) -> Any:
846+
try:
847+
yield from original_iter(self)
848+
finally:
849+
finalize_once = getattr(self, "_langfuse_finalize_once", None)
850+
if finalize_once is not None:
851+
finalize_once()
852+
853+
openai.Stream.__iter__ = traced_iter
854+
_openai_stream_iter_hook_installed = True
855+
856+
833857
def _finalize_stream_response(
834858
*,
835859
resource: OpenAiDefinition,
@@ -858,21 +882,6 @@ def _finalize_stream_response(
858882
generation.end()
859883

860884

861-
async def _finalize_stream_response_async(
862-
*,
863-
resource: OpenAiDefinition,
864-
items: list[Any],
865-
generation: LangfuseGeneration,
866-
completion_start_time: Optional[datetime],
867-
) -> None:
868-
_finalize_stream_response(
869-
resource=resource,
870-
items=items,
871-
generation=generation,
872-
completion_start_time=completion_start_time,
873-
)
874-
875-
876885
def _instrument_openai_stream(
877886
*,
878887
resource: OpenAiDefinition,
@@ -905,6 +914,8 @@ def finalize_once() -> None:
905914
completion_start_time=completion_start_time,
906915
)
907916

917+
response._langfuse_finalize_once = finalize_once # type: ignore[attr-defined]
918+
908919
def traced_iterator() -> Any:
909920
nonlocal completion_start_time
910921
try:
@@ -955,7 +966,7 @@ async def finalize_once() -> None:
955966
return
956967

957968
is_finalized = True
958-
await _finalize_stream_response_async(
969+
_finalize_stream_response(
959970
resource=resource,
960971
items=items,
961972
generation=generation,
@@ -1167,6 +1178,7 @@ def register_tracing() -> None:
11671178

11681179

11691180
register_tracing()
1181+
_install_openai_stream_iteration_hooks()
11701182

11711183

11721184
class LangfuseResponseGeneratorSync:
@@ -1275,7 +1287,7 @@ async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None
12751287
pass
12761288

12771289
async def _finalize(self) -> None:
1278-
await _finalize_stream_response_async(
1290+
_finalize_stream_response(
12791291
resource=self.resource,
12801292
items=self.items,
12811293
generation=self.generation,

tests/unit/test_openai.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,38 @@ def test_openai_stream_preserves_original_stream_contract(
272272
}
273273

274274

275+
def test_openai_stream_break_still_finalizes_generation(
276+
langfuse_memory_client, get_span
277+
):
278+
openai_client = lf_openai.OpenAI(api_key="test")
279+
raw_response = DummySyncResponse()
280+
raw_stream = DummyOpenAIStream(_make_chat_stream_chunks(), raw_response)
281+
282+
with patch.object(openai_client.chat.completions, "_post", return_value=raw_stream):
283+
stream = openai_client.chat.completions.create(
284+
name="unit-openai-native-stream-break",
285+
model="gpt-4o-mini",
286+
messages=[{"role": "user", "content": "1 + 1 = ?"}],
287+
temperature=0,
288+
stream=True,
289+
)
290+
291+
for chunk in stream:
292+
assert chunk.choices[0].delta.content == "2"
293+
break
294+
295+
assert raw_response.closed is False
296+
297+
langfuse_memory_client.flush()
298+
span = get_span("unit-openai-native-stream-break")
299+
300+
assert span.attributes[LangfuseOtelSpanAttributes.OBSERVATION_OUTPUT] == "2"
301+
assert (
302+
span.attributes[LangfuseOtelSpanAttributes.OBSERVATION_COMPLETION_START_TIME]
303+
is not None
304+
)
305+
306+
275307
@pytest.mark.asyncio
276308
async def test_async_chat_completion_exports_generation_span(
277309
langfuse_memory_client, get_span, json_attr

0 commit comments

Comments
 (0)