1- import base64
2- import hashlib
31import logging
42import time
53from queue import Empty , Full , Queue
@@ -54,58 +52,14 @@ def process_next_media_upload(self):
5452 )
5553 self ._queue .task_done ()
5654
57- def process_media_in_event (self , event : dict ):
58- try :
59- if "body" not in event :
60- return
61-
62- body = event ["body" ]
63- trace_id = body .get ("traceId" , None ) or (
64- body .get ("id" , None )
65- if "type" in event and "trace" in event ["type" ]
66- else None
67- )
68-
69- if trace_id is None :
70- raise ValueError ("trace_id is required for media upload" )
71-
72- observation_id = (
73- body .get ("id" , None )
74- if "type" in event
75- and ("generation" in event ["type" ] or "span" in event ["type" ])
76- else None
77- )
78-
79- multimodal_fields = ["input" , "output" , "metadata" ]
80-
81- for field in multimodal_fields :
82- if field in body :
83- processed_data = self ._find_and_process_media (
84- data = body [field ],
85- trace_id = trace_id ,
86- observation_id = observation_id ,
87- field = field ,
88- )
89-
90- body [field ] = processed_data
91-
92- except Exception as e :
93- self ._log .error (
94- f"Media processing error: Failed to process multimodal event content. Event data may be incomplete. Error: { e } "
95- )
96-
9755 def _find_and_process_media (
9856 self ,
9957 * ,
10058 data : Any ,
10159 trace_id : str ,
10260 observation_id : Optional [str ],
10361 field : str ,
104- project_id : Optional [str ],
10562 ):
106- if not project_id :
107- return data
108-
10963 seen = set ()
11064 max_levels = 10
11165
@@ -121,7 +75,6 @@ def _process_data_recursively(data: Any, level: int):
12175 trace_id = trace_id ,
12276 observation_id = observation_id ,
12377 field = field ,
124- project_id = project_id ,
12578 )
12679
12780 return data
@@ -137,7 +90,6 @@ def _process_data_recursively(data: Any, level: int):
13790 trace_id = trace_id ,
13891 observation_id = observation_id ,
13992 field = field ,
140- project_id = project_id ,
14193 )
14294
14395 return media
@@ -159,7 +111,6 @@ def _process_data_recursively(data: Any, level: int):
159111 trace_id = trace_id ,
160112 observation_id = observation_id ,
161113 field = field ,
162- project_id = project_id ,
163114 )
164115
165116 data ["data" ] = media
@@ -183,7 +134,6 @@ def _process_data_recursively(data: Any, level: int):
183134 trace_id = trace_id ,
184135 observation_id = observation_id ,
185136 field = field ,
186- project_id = project_id ,
187137 )
188138
189139 data ["data" ] = media
@@ -210,7 +160,6 @@ def _process_media(
210160 trace_id : str ,
211161 observation_id : Optional [str ],
212162 field : str ,
213- project_id : str ,
214163 ):
215164 if (
216165 media ._content_length is None
@@ -220,10 +169,9 @@ def _process_media(
220169 ):
221170 return
222171
223- # Important as this is will be used in the media reference string in serializer
224- media ._media_id = self ._get_media_id (
225- project_id = project_id , content_sha256_hash = media ._content_sha256_hash
226- )
172+ if media ._media_id is None :
173+ self ._log .error ("Media ID is None. Skipping upload." )
174+ return
227175
228176 try :
229177 upload_media_job = UploadMediaJob (
@@ -255,13 +203,6 @@ def _process_media(
255203 f"Media processing error: Failed to process media_id={ media ._media_id } for trace_id={ trace_id } . Error: { str (e )} "
256204 )
257205
258- def _get_media_id (self , * , project_id : str , content_sha256_hash ) -> str :
259- hash_obj = hashlib .sha256 ()
260- hash_obj .update ((project_id + content_sha256_hash ).encode ("utf-8" ))
261- media_id = base64 .urlsafe_b64encode (hash_obj .digest ()).decode ("utf-8" )[:22 ]
262-
263- return media_id
264-
265206 def _process_upload_media_job (
266207 self ,
267208 * ,
0 commit comments