-
Notifications
You must be signed in to change notification settings - Fork 262
Expand file tree
/
Copy pathtest_observe.py
More file actions
90 lines (65 loc) · 2.75 KB
/
test_observe.py
File metadata and controls
90 lines (65 loc) · 2.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import sys
import pytest
from langfuse import observe
from langfuse._client.attributes import LangfuseOtelSpanAttributes
def _finished_spans_by_name(memory_exporter, name: str):
return [span for span in memory_exporter.get_finished_spans() if span.name == name]
def test_sync_generator_preserves_context_without_output_capture(
langfuse_memory_client, memory_exporter
):
@observe(name="child_step")
def child_step(index: int) -> str:
return f"item_{index}"
@observe(name="root", capture_output=False)
def root():
def body():
for index in range(2):
yield child_step(index)
return body()
generator = root()
assert memory_exporter.get_finished_spans() == []
assert list(generator) == ["item_0", "item_1"]
langfuse_memory_client.flush()
root_span = _finished_spans_by_name(memory_exporter, "root")[0]
child_spans = _finished_spans_by_name(memory_exporter, "child_step")
assert len(child_spans) == 2
assert all(child.parent is not None for child in child_spans)
assert all(
child.parent.span_id == root_span.context.span_id for child in child_spans
)
assert all(
child.context.trace_id == root_span.context.trace_id for child in child_spans
)
assert LangfuseOtelSpanAttributes.OBSERVATION_OUTPUT not in root_span.attributes
@pytest.mark.asyncio
@pytest.mark.skipif(sys.version_info < (3, 11), reason="requires python3.11 or higher")
async def test_streaming_response_preserves_context_without_output_capture(
langfuse_memory_client, memory_exporter
):
class StreamingResponse:
def __init__(self, body_iterator):
self.body_iterator = body_iterator
@observe(name="stream_step")
async def stream_step(index: int) -> str:
return f"chunk_{index}"
async def body():
for index in range(2):
yield await stream_step(index)
@observe(name="endpoint", capture_output=False)
async def endpoint():
return StreamingResponse(body())
response = await endpoint()
assert memory_exporter.get_finished_spans() == []
assert [item async for item in response.body_iterator] == ["chunk_0", "chunk_1"]
langfuse_memory_client.flush()
endpoint_span = _finished_spans_by_name(memory_exporter, "endpoint")[0]
step_spans = _finished_spans_by_name(memory_exporter, "stream_step")
assert len(step_spans) == 2
assert all(step.parent is not None for step in step_spans)
assert all(
step.parent.span_id == endpoint_span.context.span_id for step in step_spans
)
assert all(
step.context.trace_id == endpoint_span.context.trace_id for step in step_spans
)
assert LangfuseOtelSpanAttributes.OBSERVATION_OUTPUT not in endpoint_span.attributes