Skip to content

Commit daaf571

Browse files
committed
fix(tests): address high-priority review feedback
1 parent 170beb8 commit daaf571

10 files changed

Lines changed: 129 additions & 14 deletions

File tree

.github/workflows/ci.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,9 @@ jobs:
7272

7373
name: Unit tests on Python ${{ matrix.python-version }}
7474
steps:
75-
- uses: actions/checkout@v3
75+
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6
7676
- name: Install uv and set Python version
77-
uses: astral-sh/setup-uv@v7
77+
uses: astral-sh/setup-uv@cec208311dfd045dd5311c1add060b2062131d57 # v8
7878
with:
7979
version: "0.11.2"
8080
python-version: ${{ matrix.python-version }}

langfuse/_client/resource_manager.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,8 @@ def _stop_and_join_consumer_threads(self) -> None:
399399
for media_upload_consumer in self._media_upload_consumers:
400400
media_upload_consumer.pause()
401401

402+
self._media_manager.signal_shutdown(count=len(self._media_upload_consumers))
403+
402404
for media_upload_consumer in self._media_upload_consumers:
403405
try:
404406
media_upload_consumer.join()

langfuse/_task_manager/media_manager.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,14 @@ def process_next_media_upload(self) -> None:
6060
)
6161
self._queue.task_done()
6262

63-
def signal_shutdown(self) -> None:
64-
try:
65-
self._queue.put(_SHUTDOWN_SENTINEL, block=False)
66-
except Full:
67-
# If the queue is full, the consumer will keep draining work and
68-
# observe the paused flag on the next loop iteration.
69-
pass
63+
def signal_shutdown(self, *, count: int = 1) -> None:
64+
for _ in range(count):
65+
try:
66+
self._queue.put(_SHUTDOWN_SENTINEL, block=False)
67+
except Full:
68+
# If the queue is full, the consumer will keep draining work and
69+
# observe the paused flag on the next loop iteration.
70+
break
7071

7172
def _find_and_process_media(
7273
self,

langfuse/_task_manager/media_upload_consumer.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,3 @@ def pause(self) -> None:
4242
f"Thread: Pausing media upload consumer thread #{self._identifier}"
4343
)
4444
self.running = False
45-
self._media_manager.signal_shutdown()

langfuse/_utils/prompt_cache.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ def run(self) -> None:
5151

5252
if task is _SHUTDOWN_SENTINEL:
5353
self._queue.task_done()
54-
continue
54+
break
5555

5656
logger.debug(
5757
f"PromptCacheRefreshConsumer processing task, {self._identifier}"
@@ -69,7 +69,6 @@ def run(self) -> None:
6969
def pause(self) -> None:
7070
"""Pause the consumer."""
7171
self.running = False
72-
self._queue.put(_SHUTDOWN_SENTINEL)
7372

7473

7574
class PromptCacheTaskManager(object):
@@ -134,6 +133,9 @@ def shutdown(self) -> None:
134133
for consumer in self._consumers:
135134
consumer.pause()
136135

136+
for _ in self._consumers:
137+
self._queue.put(_SHUTDOWN_SENTINEL)
138+
137139
for consumer in self._consumers:
138140
try:
139141
consumer.join()

tests/conftest.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,11 @@ def pytest_collection_modifyitems(items: list[pytest.Item]) -> None:
6767

6868

6969
@pytest.fixture(autouse=True)
70-
def reset_langfuse_state() -> Iterable[None]:
70+
def reset_langfuse_state(request: pytest.FixtureRequest) -> Iterable[None]:
71+
if request.node.get_closest_marker("live_provider") is not None:
72+
yield
73+
return
74+
7175
LangfuseResourceManager.reset()
7276
yield
7377
LangfuseResourceManager.reset()

tests/support/retry.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ def retry_until_ready(
5252

5353
last_error = error
5454
else:
55+
last_error = None
5556
if is_result_ready is None or is_result_ready(result):
5657
return result
5758

tests/unit/test_e2e_support.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from langfuse.api.commons.errors.not_found_error import NotFoundError
44
from tests.support.api_wrapper import LangfuseAPI as SupportLangfuseAPI
5+
from tests.support.retry import retry_until_ready
56
from tests.support.utils import get_api, wait_for_trace
67

78

@@ -138,3 +139,35 @@ class FakeClient:
138139
assert trace["id"] == "trace-123"
139140
assert len(trace["observations"]) == 3
140141
assert attempts["count"] == 3
142+
143+
144+
def test_retry_until_ready_clears_stale_error_after_success(monkeypatch):
145+
monkeypatch.setattr("tests.support.retry.sleep", lambda _: None)
146+
147+
monotonic_values = iter([0.0, 0.0, 0.05, 0.06, 0.11, 0.11])
148+
monkeypatch.setattr("tests.support.retry.monotonic", lambda: next(monotonic_values))
149+
150+
attempts = {"count": 0}
151+
152+
def operation():
153+
attempts["count"] += 1
154+
155+
if attempts["count"] == 1:
156+
raise NotFoundError(
157+
body={
158+
"error": "LangfuseNotFoundError",
159+
"message": "Trace trace-123 not found within authorized project",
160+
}
161+
)
162+
163+
return {"id": "trace-123", "attempt": attempts["count"], "observations": []}
164+
165+
trace = retry_until_ready(
166+
operation,
167+
is_result_ready=lambda _: False,
168+
timeout_seconds=0.1,
169+
interval_seconds=0,
170+
)
171+
172+
assert trace["id"] == "trace-123"
173+
assert trace["attempt"] == 3

tests/unit/test_prompt.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
DEFAULT_PROMPT_CACHE_TTL_SECONDS,
88
PromptCache,
99
PromptCacheItem,
10+
PromptCacheTaskManager,
1011
)
1112
from langfuse.api import NotFoundError, Prompt_Chat, Prompt_Text
1213
from langfuse.model import ChatPromptClient, TextPromptClient
@@ -142,6 +143,42 @@ def wait_for_prompt_refresh(langfuse: Langfuse) -> None:
142143
langfuse._resources.prompt_cache._task_manager.wait_for_idle()
143144

144145

146+
def test_prompt_cache_task_manager_pauses_all_workers_before_broadcasting_shutdown():
147+
manager = PromptCacheTaskManager(threads=0)
148+
events = []
149+
150+
class FakeConsumer:
151+
def __init__(self, identifier):
152+
self._identifier = identifier
153+
154+
def pause(self):
155+
events.append(("pause", self._identifier))
156+
157+
def join(self):
158+
events.append(("join", self._identifier))
159+
160+
class FakeQueue:
161+
def put(self, item):
162+
events.append(("put", item))
163+
164+
manager._consumers = [FakeConsumer(0), FakeConsumer(1), FakeConsumer(2)]
165+
manager._queue = FakeQueue()
166+
167+
manager.shutdown()
168+
169+
assert [event[0] for event in events] == [
170+
"pause",
171+
"pause",
172+
"pause",
173+
"put",
174+
"put",
175+
"put",
176+
"join",
177+
"join",
178+
"join",
179+
]
180+
181+
145182
def test_get_fresh_prompt(langfuse):
146183
prompt_name = "test_get_fresh_prompt"
147184
prompt = Prompt_Text(

tests/unit/test_resource_manager.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Test the LangfuseResourceManager and get_client() function."""
22

33
from queue import Queue
4+
from types import SimpleNamespace
45
from unittest.mock import Mock
56

67
from langfuse import Langfuse
@@ -125,7 +126,7 @@ def test_score_ingestion_consumer_pause_wakes_blocked_thread():
125126
assert not consumer.is_alive()
126127

127128

128-
def test_media_upload_consumer_pause_wakes_blocked_thread():
129+
def test_media_upload_consumer_signal_shutdown_wakes_blocked_thread():
129130
media_manager = MediaManager(
130131
api_client=Mock(),
131132
httpx_client=Mock(),
@@ -135,6 +136,41 @@ def test_media_upload_consumer_pause_wakes_blocked_thread():
135136

136137
consumer.start()
137138
consumer.pause()
139+
media_manager.signal_shutdown()
138140
consumer.join(timeout=0.5)
139141

140142
assert not consumer.is_alive()
143+
144+
145+
def test_stop_and_join_consumer_threads_broadcasts_media_shutdown_after_pausing_all():
146+
events = []
147+
148+
class FakeConsumer:
149+
def __init__(self, identifier):
150+
self._identifier = identifier
151+
152+
def pause(self):
153+
events.append(("pause", self._identifier))
154+
155+
def join(self):
156+
events.append(("join", self._identifier))
157+
158+
class FakeMediaManager:
159+
def signal_shutdown(self, *, count):
160+
events.append(("signal_shutdown", count))
161+
162+
fake_resource_manager = SimpleNamespace(
163+
_media_upload_consumers=[FakeConsumer(0), FakeConsumer(1)],
164+
_ingestion_consumers=[],
165+
_media_manager=FakeMediaManager(),
166+
)
167+
168+
LangfuseResourceManager._stop_and_join_consumer_threads(fake_resource_manager)
169+
170+
assert events == [
171+
("pause", 0),
172+
("pause", 1),
173+
("signal_shutdown", 2),
174+
("join", 0),
175+
("join", 1),
176+
]

0 commit comments

Comments
 (0)