Skip to content

Commit b8b2f8c

Browse files
committed
add run safe async
1 parent 0625b11 commit b8b2f8c

4 files changed

Lines changed: 341 additions & 12 deletions

File tree

langfuse/_client/client.py

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
LangfuseSpan,
8181
LangfuseTool,
8282
)
83+
from langfuse._client.utils import run_async_safely
8384
from langfuse._utils import _get_timestamp
8485
from langfuse._utils.parse_error import handle_fern_exception
8586
from langfuse._utils.prompt_cache import PromptCache
@@ -2617,18 +2618,23 @@ def average_accuracy(*, item_results, **kwargs):
26172618
- Individual item failures are logged but don't stop the experiment
26182619
- All executions are automatically traced and visible in Langfuse UI
26192620
- When using Langfuse datasets, results are automatically linked for easy comparison
2621+
- This method works in both sync and async contexts (Jupyter notebooks, web apps, etc.)
2622+
- Async execution is handled automatically with smart event loop detection
26202623
"""
2621-
return asyncio.run(
2622-
self._run_experiment_async(
2623-
name=name,
2624-
description=description,
2625-
data=data,
2626-
task=task,
2627-
evaluators=evaluators or [],
2628-
run_evaluators=run_evaluators or [],
2629-
max_concurrency=max_concurrency,
2630-
metadata=metadata or {},
2631-
)
2624+
return cast(
2625+
ExperimentResult,
2626+
run_async_safely(
2627+
self._run_experiment_async(
2628+
name=name,
2629+
description=description,
2630+
data=data,
2631+
task=task,
2632+
evaluators=evaluators or [],
2633+
run_evaluators=run_evaluators or [],
2634+
max_concurrency=max_concurrency,
2635+
metadata=metadata or {},
2636+
),
2637+
),
26322638
)
26332639

26342640
async def _run_experiment_async(

langfuse/_client/datasets.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,8 @@ def content_diversity(*, item_results, **kwargs):
382382
- Results can be easily compared across different experiment runs in the UI
383383
- The dataset_run_url provides direct access to detailed results and analysis
384384
- Failed items are handled gracefully and logged without stopping the experiment
385+
- This method works in both sync and async contexts (Jupyter notebooks, web apps, etc.)
386+
- Async execution is handled automatically with smart event loop detection
385387
"""
386388
langfuse_client = self._get_langfuse_client()
387389
if not langfuse_client:

langfuse/_client/utils.py

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
"""Utility functions for Langfuse OpenTelemetry integration.
22
33
This module provides utility functions for working with OpenTelemetry spans,
4-
including formatting and serialization of span data.
4+
including formatting and serialization of span data, and async execution helpers.
55
"""
66

7+
import asyncio
78
import json
9+
import threading
10+
from typing import Any, Coroutine
811

912
from opentelemetry import trace as otel_trace_api
1013
from opentelemetry.sdk import util
@@ -58,3 +61,67 @@ def span_formatter(span: ReadableSpan) -> str:
5861
)
5962
+ "\n"
6063
)
64+
65+
66+
class _RunAsyncThread(threading.Thread):
67+
"""Helper thread class for running async coroutines in a separate thread."""
68+
69+
def __init__(self, coro: Coroutine[Any, Any, Any]) -> None:
70+
self.coro = coro
71+
self.result: Any = None
72+
self.exception: Exception | None = None
73+
super().__init__()
74+
75+
def run(self) -> None:
76+
try:
77+
self.result = asyncio.run(self.coro)
78+
except Exception as e:
79+
self.exception = e
80+
81+
82+
def run_async_safely(coro: Coroutine[Any, Any, Any]) -> Any:
83+
"""Safely run an async coroutine, handling existing event loops.
84+
85+
This function detects if there's already a running event loop and uses
86+
a separate thread if needed to avoid the "asyncio.run() cannot be called
87+
from a running event loop" error. This is particularly useful in environments
88+
like Jupyter notebooks, FastAPI applications, or other async frameworks.
89+
90+
Args:
91+
coro: The coroutine to run
92+
93+
Returns:
94+
The result of the coroutine
95+
96+
Raises:
97+
Any exception raised by the coroutine
98+
99+
Example:
100+
```python
101+
# Works in both sync and async contexts
102+
async def my_async_function():
103+
await asyncio.sleep(1)
104+
return "done"
105+
106+
result = run_async_safely(my_async_function())
107+
```
108+
"""
109+
try:
110+
# Check if there's already a running event loop
111+
loop = asyncio.get_running_loop()
112+
except RuntimeError:
113+
# No running loop, safe to use asyncio.run()
114+
return asyncio.run(coro)
115+
116+
if loop and loop.is_running():
117+
# There's a running loop, use a separate thread
118+
thread = _RunAsyncThread(coro)
119+
thread.start()
120+
thread.join()
121+
122+
if thread.exception:
123+
raise thread.exception
124+
return thread.result
125+
else:
126+
# Loop exists but not running, safe to use asyncio.run()
127+
return asyncio.run(coro)

tests/test_utils.py

Lines changed: 254 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
"""Test suite for utility functions in langfuse._client.utils module."""
2+
3+
import asyncio
4+
import threading
5+
from unittest import mock
6+
7+
import pytest
8+
9+
from langfuse._client.utils import run_async_safely
10+
11+
12+
class TestRunAsyncSafely:
13+
"""Test suite for the run_async_safely function."""
14+
15+
def test_run_sync_context_simple(self):
16+
"""Test run_async_safely in sync context with simple coroutine."""
17+
18+
async def simple_coro():
19+
await asyncio.sleep(0.01)
20+
return "hello"
21+
22+
result = run_async_safely(simple_coro())
23+
assert result == "hello"
24+
25+
def test_run_sync_context_with_value(self):
26+
"""Test run_async_safely in sync context with parameter passing."""
27+
28+
async def coro_with_params(value, multiplier=2):
29+
await asyncio.sleep(0.01)
30+
return value * multiplier
31+
32+
result = run_async_safely(coro_with_params(5, multiplier=3))
33+
assert result == 15
34+
35+
def test_run_sync_context_with_exception(self):
36+
"""Test run_async_safely properly propagates exceptions in sync context."""
37+
38+
async def failing_coro():
39+
await asyncio.sleep(0.01)
40+
raise ValueError("Test error")
41+
42+
with pytest.raises(ValueError, match="Test error"):
43+
run_async_safely(failing_coro())
44+
45+
@pytest.mark.asyncio
46+
async def test_run_async_context_simple(self):
47+
"""Test run_async_safely from within async context (uses threading)."""
48+
49+
async def simple_coro():
50+
await asyncio.sleep(0.01)
51+
return "from_thread"
52+
53+
# This should use threading since we're already in an async context
54+
result = run_async_safely(simple_coro())
55+
assert result == "from_thread"
56+
57+
@pytest.mark.asyncio
58+
async def test_run_async_context_with_exception(self):
59+
"""Test run_async_safely properly propagates exceptions from thread."""
60+
61+
async def failing_coro():
62+
await asyncio.sleep(0.01)
63+
raise RuntimeError("Thread error")
64+
65+
with pytest.raises(RuntimeError, match="Thread error"):
66+
run_async_safely(failing_coro())
67+
68+
@pytest.mark.asyncio
69+
async def test_run_async_context_thread_isolation(self):
70+
"""Test that threaded execution is properly isolated."""
71+
# Set a thread-local value in the main async context
72+
threading.current_thread().test_value = "main_thread"
73+
74+
async def check_thread_isolation():
75+
# This should run in a different thread
76+
current_thread = threading.current_thread()
77+
# Should not have the test_value from main thread
78+
assert not hasattr(current_thread, "test_value")
79+
return "isolated"
80+
81+
result = run_async_safely(check_thread_isolation())
82+
assert result == "isolated"
83+
84+
def test_multiple_calls_sync_context(self):
85+
"""Test multiple sequential calls in sync context."""
86+
87+
async def counter_coro(count):
88+
await asyncio.sleep(0.001)
89+
return count * 2
90+
91+
results = []
92+
for i in range(5):
93+
result = run_async_safely(counter_coro(i))
94+
results.append(result)
95+
96+
assert results == [0, 2, 4, 6, 8]
97+
98+
@pytest.mark.asyncio
99+
async def test_multiple_calls_async_context(self):
100+
"""Test multiple sequential calls in async context (each uses threading)."""
101+
102+
async def counter_coro(count):
103+
await asyncio.sleep(0.001)
104+
return count * 3
105+
106+
results = []
107+
for i in range(3):
108+
result = run_async_safely(counter_coro(i))
109+
results.append(result)
110+
111+
assert results == [0, 3, 6]
112+
113+
def test_concurrent_calls_sync_context(self):
114+
"""Test concurrent calls in sync context using threading."""
115+
116+
async def slow_coro(value):
117+
await asyncio.sleep(0.02)
118+
return value**2
119+
120+
import concurrent.futures
121+
122+
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
123+
futures = []
124+
for i in range(3):
125+
future = executor.submit(run_async_safely, slow_coro(i + 1))
126+
futures.append(future)
127+
128+
results = [future.result() for future in futures]
129+
130+
# Results should be squares: 1^2, 2^2, 3^2
131+
assert sorted(results) == [1, 4, 9]
132+
133+
def test_event_loop_detection_mock(self):
134+
"""Test event loop detection logic with mocking."""
135+
136+
async def simple_coro():
137+
return "mocked"
138+
139+
# Mock no running loop - should use asyncio.run
140+
with mock.patch(
141+
"asyncio.get_running_loop", side_effect=RuntimeError("No loop")
142+
):
143+
with mock.patch(
144+
"asyncio.run", return_value="asyncio_run_called"
145+
) as mock_run:
146+
result = run_async_safely(simple_coro())
147+
assert result == "asyncio_run_called"
148+
mock_run.assert_called_once()
149+
150+
def test_complex_coroutine(self):
151+
"""Test with a more complex coroutine that does actual async work."""
152+
153+
async def complex_coro():
154+
# Simulate some async operations
155+
results = []
156+
for i in range(3):
157+
await asyncio.sleep(0.001)
158+
results.append(i**2)
159+
160+
# Simulate concurrent operations
161+
async def sub_task(x):
162+
await asyncio.sleep(0.001)
163+
return x * 10
164+
165+
tasks = [sub_task(x) for x in range(2)]
166+
concurrent_results = await asyncio.gather(*tasks)
167+
results.extend(concurrent_results)
168+
169+
return results
170+
171+
result = run_async_safely(complex_coro())
172+
assert result == [0, 1, 4, 0, 10] # [0^2, 1^2, 2^2, 0*10, 1*10]
173+
174+
@pytest.mark.asyncio
175+
async def test_nested_async_calls(self):
176+
"""Test that nested calls to run_async_safely work correctly."""
177+
178+
async def inner_coro(value):
179+
await asyncio.sleep(0.001)
180+
return value * 2
181+
182+
async def outer_coro(value):
183+
# This is already in an async context, so the inner call
184+
# will also use threading
185+
inner_result = run_async_safely(inner_coro(value))
186+
await asyncio.sleep(0.001)
187+
return inner_result + 1
188+
189+
result = run_async_safely(outer_coro(5))
190+
assert result == 11 # (5 * 2) + 1
191+
192+
def test_exception_types_preserved(self):
193+
"""Test that different exception types are properly preserved."""
194+
195+
async def custom_exception_coro():
196+
await asyncio.sleep(0.001)
197+
198+
class CustomError(Exception):
199+
pass
200+
201+
raise CustomError("Custom error message")
202+
203+
with pytest.raises(Exception) as exc_info:
204+
run_async_safely(custom_exception_coro())
205+
206+
# The exception type should be preserved
207+
assert "Custom error message" in str(exc_info.value)
208+
209+
def test_return_types_preserved(self):
210+
"""Test that various return types are properly preserved."""
211+
212+
async def dict_coro():
213+
await asyncio.sleep(0.001)
214+
return {"key": "value", "number": 42}
215+
216+
async def list_coro():
217+
await asyncio.sleep(0.001)
218+
return [1, 2, 3, "string"]
219+
220+
async def none_coro():
221+
await asyncio.sleep(0.001)
222+
return None
223+
224+
dict_result = run_async_safely(dict_coro())
225+
assert dict_result == {"key": "value", "number": 42}
226+
assert isinstance(dict_result, dict)
227+
228+
list_result = run_async_safely(list_coro())
229+
assert list_result == [1, 2, 3, "string"]
230+
assert isinstance(list_result, list)
231+
232+
none_result = run_async_safely(none_coro())
233+
assert none_result is None
234+
235+
@pytest.mark.asyncio
236+
async def test_real_world_scenario_jupyter_simulation(self):
237+
"""Test scenario simulating Jupyter notebook environment."""
238+
# This simulates being called from a Jupyter cell where there's
239+
# already an event loop running
240+
241+
async def simulate_llm_call(prompt):
242+
"""Simulate an LLM API call."""
243+
await asyncio.sleep(0.01) # Simulate network delay
244+
return f"Response to: {prompt}"
245+
246+
async def simulate_experiment_task(item):
247+
"""Simulate an experiment task function."""
248+
response = await simulate_llm_call(item["input"])
249+
await asyncio.sleep(0.001) # Additional processing
250+
return response
251+
252+
# This should work even though we're in an async context
253+
result = run_async_safely(simulate_experiment_task({"input": "test prompt"}))
254+
assert result == "Response to: test prompt"

0 commit comments

Comments
 (0)