Skip to content

Commit e8f8620

Browse files
committed
Merge main into codex/migrate-poetry-to-uv
2 parents c155737 + 7619731 commit e8f8620

7 files changed

Lines changed: 97 additions & 24 deletions

File tree

langfuse/_client/propagation.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77

88
from typing import Any, Dict, Generator, List, Literal, Optional, TypedDict, Union, cast
99

10-
from opentelemetry import baggage
10+
from opentelemetry import (
11+
baggage,
12+
)
1113
from opentelemetry import (
1214
baggage as otel_baggage_api,
1315
)
@@ -17,6 +19,7 @@
1719
from opentelemetry import (
1820
trace as otel_trace_api,
1921
)
22+
from opentelemetry.context import _RUNTIME_CONTEXT
2023
from opentelemetry.util._decorator import (
2124
_AgnosticContextManager,
2225
_agnosticcontextmanager,
@@ -72,6 +75,22 @@ class PropagatedExperimentAttributes(TypedDict):
7275
experiment_item_root_observation_id: str
7376

7477

78+
def _detach_context_token_safely(token: Any) -> None:
79+
"""Detach a context token without emitting noisy async teardown errors.
80+
81+
OpenTelemetry tokens are backed by ``contextvars`` and must be detached in the
82+
same execution context where they were attached. Async frameworks can legitimately
83+
end spans or unwind context managers in a different task/context, in which case
84+
detach raises and the public OpenTelemetry helper logs an error. At that point the
85+
observation is already completed, so the mismatch is safe to ignore.
86+
"""
87+
88+
try:
89+
_RUNTIME_CONTEXT.detach(token)
90+
except Exception:
91+
pass
92+
93+
7594
def propagate_attributes(
7695
*,
7796
user_id: Optional[str] = None,
@@ -272,7 +291,7 @@ def _propagate_attributes(
272291
yield
273292

274293
finally:
275-
otel_context_api.detach(token)
294+
_detach_context_token_safely(token)
276295

277296

278297
def _get_propagated_attributes_from_context(

langfuse/_client/utils.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
"""
66

77
import asyncio
8+
import contextvars
89
import json
910
import threading
1011
from hashlib import sha256
@@ -69,13 +70,14 @@ class _RunAsyncThread(threading.Thread):
6970

7071
def __init__(self, coro: Coroutine[Any, Any, Any]) -> None:
7172
self.coro = coro
73+
self.context = contextvars.copy_context()
7274
self.result: Any = None
7375
self.exception: Exception | None = None
7476
super().__init__()
7577

7678
def run(self) -> None:
7779
try:
78-
self.result = asyncio.run(self.coro)
80+
self.result = self.context.run(asyncio.run, self.coro)
7981
except Exception as e:
8082
self.exception = e
8183

langfuse/langchain/CallbackHandler.py

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,13 @@
1515

1616
import pydantic
1717
from opentelemetry import context, trace
18-
from opentelemetry.context import _RUNTIME_CONTEXT
1918
from opentelemetry.util._decorator import _AgnosticContextManager
2019

2120
from langfuse import propagate_attributes
2221
from langfuse._client.attributes import LangfuseOtelSpanAttributes
2322
from langfuse._client.client import Langfuse
2423
from langfuse._client.get_client import get_client
24+
from langfuse._client.propagation import _detach_context_token_safely
2525
from langfuse._client.span import (
2626
LangfuseAgent,
2727
LangfuseChain,
@@ -458,18 +458,7 @@ def _detach_observation(
458458
token = self._context_tokens.pop(run_id, None)
459459

460460
if token:
461-
try:
462-
# Directly detach from runtime context to avoid error logging
463-
_RUNTIME_CONTEXT.detach(token)
464-
except Exception:
465-
# Context detach can fail in async scenarios - this is expected and safe to ignore
466-
# The span itself was properly ended and tracing data is correctly captured
467-
#
468-
# Examples:
469-
# 1. Token created in one async task/thread, detached in another
470-
# 2. Context already detached by framework or other handlers
471-
# 3. Runtime context state mismatch in concurrent execution
472-
pass
461+
_detach_context_token_safely(token)
473462

474463
return cast(
475464
Union[
@@ -564,11 +553,8 @@ def on_chain_end(
564553
input=kwargs.get("inputs"),
565554
)
566555

567-
if (
568-
parent_run_id is None
569-
and self._propagation_context_manager is not None
570-
):
571-
self._propagation_context_manager.__exit__(None, None, None)
556+
if parent_run_id is None:
557+
self._exit_propagation_context()
572558

573559
span.end()
574560

@@ -579,6 +565,7 @@ def on_chain_end(
579565

580566
finally:
581567
if parent_run_id is None:
568+
self._exit_propagation_context()
582569
self._reset()
583570

584571
def on_chain_error(
@@ -608,10 +595,19 @@ def on_chain_error(
608595
status_message=str(error) if level else None,
609596
input=kwargs.get("inputs"),
610597
cost_details={"total": 0},
611-
).end()
598+
)
599+
600+
if parent_run_id is None:
601+
self._exit_propagation_context()
602+
603+
observation.end()
612604

613605
except Exception as e:
614606
langfuse_logger.exception(e)
607+
finally:
608+
if parent_run_id is None:
609+
self._exit_propagation_context()
610+
self._reset()
615611

616612
def on_chat_model_start(
617613
self,
@@ -1026,6 +1022,15 @@ def on_llm_error(
10261022
def _reset(self) -> None:
10271023
self._child_to_parent_run_id_map = {}
10281024

1025+
def _exit_propagation_context(self) -> None:
1026+
manager = self._propagation_context_manager
1027+
1028+
if manager is None:
1029+
return
1030+
1031+
self._propagation_context_manager = None
1032+
manager.__exit__(None, None, None)
1033+
10291034
def __join_tags_and_metadata(
10301035
self,
10311036
tags: Optional[List[str]] = None,

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "langfuse"
3-
version = "4.0.1"
3+
version = "4.0.3"
44
description = "A client library for accessing langfuse"
55
readme = "README.md"
66
authors = [{ name = "langfuse", email = "developers@langfuse.com" }]

tests/test_propagate_attributes.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2295,6 +2295,36 @@ def test_tags_attribute_key_format(self, langfuse_client, memory_exporter):
22952295
class TestPropagateAttributesExperiment(TestPropagateAttributesBase):
22962296
"""Tests for experiment attribute propagation."""
22972297

2298+
@pytest.mark.asyncio
2299+
async def test_experiment_propagates_user_id_in_async_context(
2300+
self, langfuse_client, memory_exporter
2301+
):
2302+
"""Verify run_experiment keeps propagated attributes when called from async code."""
2303+
import asyncio
2304+
2305+
local_data = [{"input": "test input", "expected_output": "expected output"}]
2306+
2307+
async def async_task(*, item, **kwargs):
2308+
await asyncio.sleep(0.01)
2309+
return f"processed: {item['input']}"
2310+
2311+
with propagate_attributes(user_id="async-experiment-user"):
2312+
langfuse_client.run_experiment(
2313+
name="Async Experiment",
2314+
data=local_data,
2315+
task=async_task,
2316+
)
2317+
2318+
langfuse_client.flush()
2319+
time.sleep(0.1)
2320+
2321+
root_span = self.get_span_by_name(memory_exporter, "experiment-item-run")
2322+
self.verify_span_attribute(
2323+
root_span,
2324+
LangfuseOtelSpanAttributes.TRACE_USER_ID,
2325+
"async-experiment-user",
2326+
)
2327+
22982328
def test_experiment_attributes_propagate_without_dataset(
22992329
self, langfuse_client, memory_exporter
23002330
):

tests/test_utils.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Test suite for utility functions in langfuse._client.utils module."""
22

33
import asyncio
4+
import contextvars
45
import threading
56
from unittest import mock
67

@@ -81,6 +82,22 @@ async def check_thread_isolation():
8182
result = run_async_safely(check_thread_isolation())
8283
assert result == "isolated"
8384

85+
@pytest.mark.asyncio
86+
async def test_run_async_context_preserves_contextvars(self):
87+
"""Test that threaded execution preserves the caller's contextvars."""
88+
request_id = contextvars.ContextVar("request_id")
89+
token = request_id.set("req-123")
90+
91+
async def read_contextvar():
92+
await asyncio.sleep(0.001)
93+
return request_id.get()
94+
95+
try:
96+
result = run_async_safely(read_contextvar())
97+
assert result == "req-123"
98+
finally:
99+
request_id.reset(token)
100+
84101
def test_multiple_calls_sync_context(self):
85102
"""Test multiple sequential calls in sync context."""
86103

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)