@@ -163,6 +163,7 @@ def __init__(self):
163163 self .upload_opts = os .environ .get ("SE_UPLOAD_OPTS" , "-P --cutoff-mode SOFT --metadata --inplace" )
164164 self .retain_local = os .environ .get ("SE_UPLOAD_RETAIN_LOCAL_FILE" , "false" ).lower () == "true"
165165 self .upload_batch_size = int (os .environ .get ("SE_VIDEO_UPLOAD_BATCH_CHECK" , "10" ))
166+ self .upload_timeout = int (os .environ .get ("SE_VIDEO_UPLOAD_TIMEOUT" , "300" ))
166167 self .upload_failure_only = os .environ .get ("SE_UPLOAD_FAILURE_SESSION_ONLY" , "false" ).lower () == "true"
167168 default_failure_events = [":failure" , ":failed" ]
168169 custom_failure_events = os .environ .get ("SE_UPLOAD_FAILURE_SESSION_EVENTS" , "" ).lower ()
@@ -225,6 +226,9 @@ def __init__(self):
225226 self .recorder_done = asyncio .Event ()
226227 self .uploader_done = asyncio .Event ()
227228
229+ # Tracked delayed-cleanup tasks so they can be cancelled on shutdown
230+ self ._cleanup_tasks : List [asyncio .Task ] = []
231+
228232 # Rename SE_RCLONE_* env vars
229233 self ._rename_rclone_env ()
230234
@@ -588,7 +592,13 @@ async def process_upload(self, task: UploadTask) -> None:
588592 )
589593 self .active_uploads .append (proc )
590594 try :
591- stdout , stderr = await proc .communicate ()
595+ try :
596+ stdout , stderr = await asyncio .wait_for (proc .communicate (), timeout = self .upload_timeout )
597+ except asyncio .TimeoutError :
598+ logger .warning (f"Upload timed out after { self .upload_timeout } s: { task .video_file } , killing process" )
599+ proc .kill ()
600+ await proc .communicate ()
601+ return
592602 finally :
593603 try :
594604 self .active_uploads .remove (proc )
@@ -713,8 +723,11 @@ async def handle_session_closed(self, data: dict) -> None:
713723 await self .stop_recording (session )
714724 await self .queue_upload (session )
715725
716- # Clean up session after a delay (keep for potential late events)
717- asyncio .create_task (self ._cleanup_session_delayed (session_id , delay = 60 ))
726+ # Clean up session after a delay (keep for potential late events).
727+ # Tracked so cleanup() can cancel these on shutdown instead of waiting 60s.
728+ t = asyncio .create_task (self ._cleanup_session_delayed (session_id , delay = 60 ))
729+ self ._cleanup_tasks .append (t )
730+ t .add_done_callback (lambda fut : self ._cleanup_tasks .remove (fut ) if fut in self ._cleanup_tasks else None )
718731
719732 # Check drain condition
720733 if self .max_sessions > 0 and self .recorded_count >= self .max_sessions :
@@ -814,6 +827,10 @@ async def subscribe_events(self) -> None:
814827 if await self .subscriber .poll (timeout = 1000 ):
815828 frames = await self .subscriber .recv_multipart ()
816829
830+ # Re-check shutdown before spending time processing the event
831+ if self .shutdown_event .is_set ():
832+ break
833+
817834 if len (frames ) < 4 :
818835 continue
819836
@@ -892,6 +909,14 @@ async def cleanup(self) -> None:
892909 """Cleanup all resources."""
893910 logger .info ("Shutting down..." )
894911
912+ # Cancel delayed session-cleanup tasks immediately — they have a 60s
913+ # sleep that would keep the event loop alive long after shutdown.
914+ for t in list (self ._cleanup_tasks ):
915+ t .cancel ()
916+ if self ._cleanup_tasks :
917+ await asyncio .gather (* self ._cleanup_tasks , return_exceptions = True )
918+ self ._cleanup_tasks .clear ()
919+
895920 # Snapshot active sessions outside the lock so we don't hold
896921 # sessions_lock across slow awaits (stop_recording can take up to 10s).
897922 async with self .sessions_lock :
0 commit comments