Skip to content

Commit 231a84f

Browse files
committed
add media
1 parent 1bef9c6 commit 231a84f

8 files changed

Lines changed: 440 additions & 110 deletions

File tree

langfuse/_task_manager/media_manager.py

Lines changed: 77 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
import base64
2+
import hashlib
13
import logging
24
import time
3-
from queue import Empty, Queue
4-
from typing import Any, Callable, Optional, TypeVar
5+
from queue import Empty, Full, Queue
6+
from typing import Any, Callable, Optional, TypeVar, cast
57

68
import backoff
79
import requests
@@ -10,6 +12,7 @@
1012
from langfuse.api import GetMediaUploadUrlRequest, PatchMediaBody
1113
from langfuse.api.client import FernLangfuse
1214
from langfuse.api.core import ApiError
15+
from langfuse.api.resources.media.types.media_content_type import MediaContentType
1316
from langfuse.media import LangfuseMedia
1417
from langfuse.utils import _get_timestamp
1518

@@ -20,7 +23,7 @@
2023

2124

2225
class MediaManager:
23-
_log = logging.getLogger(__name__)
26+
_log = logging.getLogger("langfuse")
2427

2528
def __init__(
2629
self,
@@ -92,7 +95,11 @@ def _find_and_process_media(
9295
trace_id: str,
9396
observation_id: Optional[str],
9497
field: str,
98+
project_id: Optional[str],
9599
):
100+
if not project_id:
101+
return data
102+
96103
seen = set()
97104
max_levels = 10
98105

@@ -108,6 +115,7 @@ def _process_data_recursively(data: Any, level: int):
108115
trace_id=trace_id,
109116
observation_id=observation_id,
110117
field=field,
118+
project_id=project_id,
111119
)
112120

113121
return data
@@ -123,6 +131,7 @@ def _process_data_recursively(data: Any, level: int):
123131
trace_id=trace_id,
124132
observation_id=observation_id,
125133
field=field,
134+
project_id=project_id,
126135
)
127136

128137
return media
@@ -144,6 +153,7 @@ def _process_data_recursively(data: Any, level: int):
144153
trace_id=trace_id,
145154
observation_id=observation_id,
146155
field=field,
156+
project_id=project_id,
147157
)
148158

149159
data["data"] = media
@@ -167,6 +177,7 @@ def _process_data_recursively(data: Any, level: int):
167177
trace_id=trace_id,
168178
observation_id=observation_id,
169179
field=field,
180+
project_id=project_id,
170181
)
171182

172183
data["data"] = media
@@ -193,6 +204,7 @@ def _process_media(
193204
trace_id: str,
194205
observation_id: Optional[str],
195206
field: str,
207+
project_id: str,
196208
):
197209
if (
198210
media._content_length is None
@@ -202,47 +214,83 @@ def _process_media(
202214
):
203215
return
204216

205-
upload_url_response = self._request_with_backoff(
206-
self._api_client.media.get_upload_url,
207-
request=GetMediaUploadUrlRequest(
208-
contentLength=media._content_length,
209-
contentType=media._content_type,
210-
sha256Hash=media._content_sha256_hash,
211-
field=field,
212-
traceId=trace_id,
213-
observationId=observation_id,
214-
),
217+
# Important as this is will be used in the media reference string in serializer
218+
media._media_id = self._get_media_id(
219+
project_id=project_id, content_sha256_hash=media._content_sha256_hash
215220
)
216221

217-
upload_url = upload_url_response.upload_url
218-
media._media_id = upload_url_response.media_id # Important as this is will be used in the media reference string in serializer
222+
try:
223+
upload_media_job = UploadMediaJob(
224+
media_id=media._media_id,
225+
content_bytes=media._content_bytes,
226+
content_type=media._content_type,
227+
content_length=media._content_length,
228+
content_sha256_hash=media._content_sha256_hash,
229+
trace_id=trace_id,
230+
observation_id=observation_id,
231+
field=field,
232+
)
219233

220-
if upload_url is not None:
221-
self._log.debug(f"Scheduling upload for {media._media_id}")
222234
self._queue.put(
223-
item={
224-
"upload_url": upload_url,
225-
"media_id": media._media_id,
226-
"content_bytes": media._content_bytes,
227-
"content_type": media._content_type,
228-
"content_sha256_hash": media._content_sha256_hash,
229-
},
230-
block=True,
231-
timeout=1,
235+
item=upload_media_job,
236+
block=False,
237+
)
238+
self._log.debug(
239+
f"Enqueued media ID {media._media_id} for upload processing"
240+
)
241+
242+
except Full:
243+
self._log.debug(
244+
f"Media queue is full. Failed to process media id {media._media_id}"
245+
)
246+
247+
except Exception as e:
248+
self._log.debug(
249+
f"Failed to process media with id {media._media_id}: {str(e)}"
232250
)
233251

234-
else:
235-
self._log.debug(f"Media {media._media_id} already uploaded")
252+
def _get_media_id(self, *, project_id: str, content_sha256_hash) -> str:
253+
hash_obj = hashlib.sha256()
254+
hash_obj.update((project_id + content_sha256_hash).encode("utf-8"))
255+
media_id = base64.urlsafe_b64encode(hash_obj.digest()).decode("utf-8")[:22]
256+
257+
return media_id
236258

237259
def _process_upload_media_job(
238260
self,
239261
*,
240262
data: UploadMediaJob,
241263
):
264+
upload_url_response = self._request_with_backoff(
265+
self._api_client.media.get_upload_url,
266+
request=GetMediaUploadUrlRequest(
267+
contentLength=data["content_length"],
268+
contentType=cast(MediaContentType, data["content_type"]),
269+
sha256Hash=data["content_sha256_hash"],
270+
field=data["field"],
271+
traceId=data["trace_id"],
272+
observationId=data["observation_id"],
273+
),
274+
)
275+
276+
upload_url = upload_url_response.upload_url
277+
278+
if not upload_url:
279+
self._log.debug(f"Media with ID {data['media_id']} already uploaded.")
280+
281+
return
282+
283+
if upload_url_response.media_id != data["media_id"]:
284+
self._log.error(
285+
f"Media ID mismatch: SDK {data['media_id']} vs Server {upload_url_response.media_id}. Upload cancelled."
286+
)
287+
288+
return
289+
242290
upload_start_time = time.time()
243291
upload_response = self._request_with_backoff(
244292
requests.put,
245-
data["upload_url"],
293+
upload_url,
246294
headers={
247295
"Content-Type": data["content_type"],
248296
"x-amz-checksum-sha256": data["content_sha256_hash"],
Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1-
from typing import TypedDict
1+
from typing import Optional, TypedDict
22

33

44
class UploadMediaJob(TypedDict):
5-
upload_url: str
65
media_id: str
76
content_type: str
7+
content_length: int
88
content_bytes: bytes
99
content_sha256_hash: str
10+
trace_id: str
11+
observation_id: Optional[str]
12+
field: str

langfuse/media.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@
55
import logging
66
import os
77
import re
8+
from typing import Any, Literal, Optional, Tuple, TypeVar, cast
9+
810
import requests
9-
from typing import Optional, cast, Tuple, Any, TypeVar, Literal
1011

1112
from langfuse.api import MediaContentType
1213
from langfuse.types import ParsedMediaReference

0 commit comments

Comments
 (0)