Skip to content

Commit 48fcd29

Browse files
committed
speed up unit shutdown without weakening assertions
1 parent 38b5c9a commit 48fcd29

6 files changed

Lines changed: 76 additions & 13 deletions

File tree

langfuse/_task_manager/media_manager.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
T = TypeVar("T")
2020
P = ParamSpec("P")
21+
_SHUTDOWN_SENTINEL = object()
2122

2223

2324
class MediaManager:
@@ -40,6 +41,11 @@ def __init__(
4041
def process_next_media_upload(self) -> None:
4142
try:
4243
upload_job = self._queue.get(block=True, timeout=1)
44+
45+
if upload_job is _SHUTDOWN_SENTINEL:
46+
self._queue.task_done()
47+
return
48+
4349
logger.debug(
4450
f"Media: Processing upload for media_id={upload_job['media_id']} in trace_id={upload_job['trace_id']}"
4551
)
@@ -54,6 +60,14 @@ def process_next_media_upload(self) -> None:
5460
)
5561
self._queue.task_done()
5662

63+
def signal_shutdown(self) -> None:
64+
try:
65+
self._queue.put(_SHUTDOWN_SENTINEL, block=False)
66+
except Full:
67+
# If the queue is full, the consumer will keep draining work and
68+
# observe the paused flag on the next loop iteration.
69+
pass
70+
5771
def _find_and_process_media(
5872
self,
5973
*,

langfuse/_task_manager/media_upload_consumer.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,4 @@ def pause(self) -> None:
4242
f"Thread: Pausing media upload consumer thread #{self._identifier}"
4343
)
4444
self.running = False
45+
self._media_manager.signal_shutdown()

langfuse/_task_manager/score_ingestion_consumer.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import os
33
import threading
44
import time
5-
from queue import Empty, Queue
5+
from queue import Empty, Full, Queue
66
from typing import Any, List, Optional
77

88
import backoff
@@ -17,6 +17,7 @@
1717

1818
MAX_EVENT_SIZE_BYTES = int(os.environ.get("LANGFUSE_MAX_EVENT_SIZE_BYTES", 1_000_000))
1919
MAX_BATCH_SIZE_BYTES = int(os.environ.get("LANGFUSE_MAX_BATCH_SIZE_BYTES", 2_500_000))
20+
_SHUTDOWN_SENTINEL = object()
2021

2122

2223
class ScoreIngestionMetadata(BaseModel):
@@ -71,6 +72,10 @@ def _next(self) -> list:
7172
block=True, timeout=self._flush_interval - elapsed
7273
)
7374

75+
if event is _SHUTDOWN_SENTINEL:
76+
self._ingestion_queue.task_done()
77+
break
78+
7479
# convert pydantic models to dicts
7580
if "body" in event and isinstance(event["body"], BaseModel):
7681
event["body"] = event["body"].model_dump(exclude_none=True)
@@ -139,6 +144,12 @@ def upload(self) -> None:
139144
def pause(self) -> None:
140145
"""Pause the consumer."""
141146
self.running = False
147+
try:
148+
self._ingestion_queue.put(_SHUTDOWN_SENTINEL, block=False)
149+
except Full:
150+
# If the queue is full, the consumer will wake up naturally while
151+
# draining items, so a dedicated shutdown signal is not required.
152+
pass
142153

143154
def _upload_batch(self, batch: List[Any]) -> None:
144155
logger.debug(

tests/unit/test_otel.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3054,7 +3054,7 @@ def test_metrics_and_timing(self, langfuse_client, memory_exporter):
30543054
span = langfuse_client.start_observation(name="timing-test-span")
30553055

30563056
# Add a small delay
3057-
time.sleep(0.01)
3057+
time.sleep(0.1)
30583058

30593059
# End the span
30603060
span.end()
@@ -3096,10 +3096,10 @@ def test_metrics_and_timing(self, langfuse_client, memory_exporter):
30963096
) / 1_000_000_000
30973097
assert span_duration_seconds > 0, "Span duration should be positive"
30983098

3099-
# Since we slept for 0.01 seconds, the span duration should be at least 0.005 seconds
3099+
# Since we slept for 0.1 seconds, the span duration should be at least 0.05 seconds
31003100
# but we'll be generous with the upper bound due to potential system delays
3101-
assert span_duration_seconds >= 0.005, (
3102-
f"Span duration ({span_duration_seconds}s) should be at least 0.005s"
3101+
assert span_duration_seconds >= 0.05, (
3102+
f"Span duration ({span_duration_seconds}s) should be at least 0.05s"
31033103
)
31043104

31053105

tests/unit/test_prompt_atexit.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@ def test_prompts_atexit():
2020
print("Adding prompt cache", PromptCache)
2121
prompt_cache = PromptCache(max_prompt_refresh_workers=10)
2222
23-
# example task that stays in flight briefly while the process exits
24-
def wait_briefly():
25-
time.sleep(0.1)
23+
# example task that takes 2 seconds but we will force it to exit earlier
24+
def wait_2_sec():
25+
time.sleep(2)
2626
2727
# 8 times
2828
for i in range(8):
2929
prompt_cache.add_refresh_prompt_task(
30-
f"key_wait_briefly_i_{i}", lambda: wait_briefly()
30+
f"key_wait_2_sec_i_{i}", lambda: wait_2_sec()
3131
)
3232
"""
3333

@@ -76,13 +76,13 @@ async def main():
7676
print("Adding prompt cache", PromptCache)
7777
prompt_cache = PromptCache(max_prompt_refresh_workers=10)
7878
79-
# example task that stays in flight briefly while the process exits
80-
def wait_briefly():
81-
time.sleep(0.1)
79+
# example task that takes 2 seconds but we will force it to exit earlier
80+
def wait_2_sec():
81+
time.sleep(2)
8282
8383
async def add_new_prompt_refresh(i: int):
8484
prompt_cache.add_refresh_prompt_task(
85-
f"key_wait_briefly_i_{i}", lambda: wait_briefly()
85+
f"key_wait_2_sec_i_{i}", lambda: wait_2_sec()
8686
)
8787
8888
# 8 times

tests/unit/test_resource_manager.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
11
"""Test the LangfuseResourceManager and get_client() function."""
22

3+
from queue import Queue
4+
from unittest.mock import Mock
5+
36
from langfuse import Langfuse
47
from langfuse._client.get_client import get_client
58
from langfuse._client.resource_manager import LangfuseResourceManager
9+
from langfuse._task_manager.media_manager import MediaManager
10+
from langfuse._task_manager.media_upload_consumer import MediaUploadConsumer
11+
from langfuse._task_manager.score_ingestion_consumer import ScoreIngestionConsumer
612

713

814
def test_get_client_preserves_all_settings(monkeypatch):
@@ -101,3 +107,34 @@ def should_export_b(span):
101107

102108
client_a.shutdown()
103109
client_b.shutdown()
110+
111+
112+
def test_score_ingestion_consumer_pause_wakes_blocked_thread():
113+
consumer = ScoreIngestionConsumer(
114+
ingestion_queue=Queue(),
115+
identifier=0,
116+
client=Mock(),
117+
public_key="pk-test",
118+
flush_interval=30,
119+
)
120+
121+
consumer.start()
122+
consumer.pause()
123+
consumer.join(timeout=0.5)
124+
125+
assert not consumer.is_alive()
126+
127+
128+
def test_media_upload_consumer_pause_wakes_blocked_thread():
129+
media_manager = MediaManager(
130+
api_client=Mock(),
131+
httpx_client=Mock(),
132+
media_upload_queue=Queue(),
133+
)
134+
consumer = MediaUploadConsumer(identifier=0, media_manager=media_manager)
135+
136+
consumer.start()
137+
consumer.pause()
138+
consumer.join(timeout=0.5)
139+
140+
assert not consumer.is_alive()

0 commit comments

Comments
 (0)