@@ -326,8 +326,9 @@ async def wait_for_node_ready(self) -> None:
326326 then extracts nodeId and externalUri from the response.
327327
328328 Response structure differs by mode:
329- - Standalone: $.value.nodes[0].id, $.value.nodes[0].externalUri
330- - Distributed: $.value.node.nodeId, $.value.node.externalUri
329+ - Standalone (hub): $.value.nodes[0].id, $.value.nodes[0].externalUri
330+ - Distributed (node): $.value.node.nodeId, $.value.node.externalUri
331+ - Standalone sidecar on dynamic grid node: falls back to $.value.node path
331332 """
332333 node_status_url = f"{ self .se_server_protocol } ://{ self .display_container } :{ self .se_node_port } /status"
333334 headers = {}
@@ -368,6 +369,12 @@ async def wait_for_node_ready(self) -> None:
368369 node_info = nodes [0 ]
369370 self .node_id = node_info .get ("id" )
370371 self .node_external_uri = node_info .get ("externalUri" )
372+ else :
373+ # Fallback: sidecar connected directly to a node
374+ # (e.g. dynamic grid where /status returns singular "node")
375+ node_info = body .get ("value" , {}).get ("node" , {})
376+ self .node_id = node_info .get ("nodeId" ) or node_info .get ("id" )
377+ self .node_external_uri = node_info .get ("externalUri" )
371378 else :
372379 node_info = body .get ("value" , {}).get ("node" , {})
373380 self .node_id = node_info .get ("nodeId" )
@@ -383,7 +390,10 @@ async def wait_for_node_ready(self) -> None:
383390 except Exception as e :
384391 logger .warning (f"Unexpected error polling Node /status: { e } " )
385392
386- await asyncio .sleep (self .node_poll_interval )
393+ try :
394+ await asyncio .wait_for (self .shutdown_event .wait (), timeout = self .node_poll_interval )
395+ except asyncio .TimeoutError :
396+ pass
387397
388398 # ==================== Recording Functions ====================
389399
@@ -451,9 +461,20 @@ async def start_recording(self, session: SessionState) -> bool:
451461 session .ffmpeg_process = await asyncio .create_subprocess_exec (
452462 * cmd ,
453463 env = env ,
454- stdout = asyncio .subprocess .PIPE ,
464+ stdout = asyncio .subprocess .DEVNULL ,
455465 stderr = asyncio .subprocess .PIPE ,
456466 )
467+ # Give ffmpeg a moment to fail fast (bad codec, missing display, etc.)
468+ await asyncio .sleep (0.5 )
469+ if session .ffmpeg_process .returncode is not None :
470+ stderr_output = await session .ffmpeg_process .stderr .read ()
471+ logger .error (
472+ f"ffmpeg exited immediately for { session .session_id } "
473+ f"(rc={ session .ffmpeg_process .returncode } ): { stderr_output .decode (errors = 'replace' ).strip ()} "
474+ )
475+ session .ffmpeg_process = None
476+ session .status = SessionStatus .CREATED
477+ return False
457478 logger .info (f"Started recording: session={ session .session_id } , file={ session .video_file } " )
458479 return True
459480 except Exception as e :
@@ -473,15 +494,25 @@ async def stop_recording(self, session: SessionState) -> bool:
473494 try :
474495 session .ffmpeg_process .terminate ()
475496 try :
476- await asyncio .wait_for (session .ffmpeg_process .wait (), timeout = 10.0 )
497+ _ , stderr_bytes = await asyncio .wait_for (session .ffmpeg_process .communicate (), timeout = 10.0 )
477498 except asyncio .TimeoutError :
478499 logger .warning (f"ffmpeg did not stop gracefully for { session .session_id } , killing" )
479500 session .ffmpeg_process .kill ()
480- await session .ffmpeg_process .wait ()
501+ _ , stderr_bytes = await session .ffmpeg_process .communicate ()
481502
503+ rc = session .ffmpeg_process .returncode
504+ if stderr_bytes :
505+ stderr_text = stderr_bytes .decode (errors = "replace" ).strip ()
506+ if stderr_text :
507+ logger .warning (f"ffmpeg stderr for { session .session_id } : { stderr_text } " )
482508 session .ffmpeg_process = None
483- self .recorded_count += 1
484509
510+ # 255 is ffmpeg's own graceful-stop exit code (exit_program(255) in its SIGTERM handler).
511+ if rc not in (0 , 255 , - signal .SIGTERM , - signal .SIGKILL ):
512+ logger .error (f"ffmpeg exited with unexpected code { rc } for { session .session_id } " )
513+ return False
514+
515+ self .recorded_count += 1
485516 duration = session .duration_seconds
486517 logger .info (
487518 f"Stopped recording: session={ session .session_id } , " f"duration={ duration :.1f} s"
@@ -556,10 +587,13 @@ async def process_upload(self, task: UploadTask) -> None:
556587 stderr = asyncio .subprocess .PIPE ,
557588 )
558589 self .active_uploads .append (proc )
559-
560- stdout , stderr = await proc .communicate ()
561-
562- self .active_uploads .remove (proc )
590+ try :
591+ stdout , stderr = await proc .communicate ()
592+ finally :
593+ try :
594+ self .active_uploads .remove (proc )
595+ except ValueError :
596+ pass
563597
564598 if proc .returncode == 0 :
565599 logger .info (f"Upload complete: { task .video_file } " )
@@ -575,14 +609,21 @@ async def upload_worker(self) -> None:
575609 active_tasks : List [asyncio .Task ] = []
576610
577611 try :
578- while not self . shutdown_event . is_set () or not self . upload_queue . empty () :
612+ while True :
579613 try :
580- # Get task with timeout to check shutdown
581- try :
582- task = await asyncio .wait_for (self .upload_queue .get (), timeout = 1.0 )
583- except asyncio .TimeoutError :
584- continue
614+ # Block until an item is available (or cancelled)
615+ task = await self .upload_queue .get ()
616+ except asyncio .CancelledError :
617+ logger .warning ("Upload worker cancelled, pending uploads may be lost" )
618+ for t in active_tasks :
619+ t .cancel ()
620+ raise
621+
622+ # None is the sentinel pushed by cleanup() to signal no more uploads
623+ if task is None :
624+ break
585625
626+ try :
586627 # Process upload (could run multiple in parallel up to batch_size)
587628 upload_task = asyncio .create_task (self .process_upload (task ))
588629 active_tasks .append (upload_task )
@@ -594,11 +635,10 @@ async def upload_worker(self) -> None:
594635 if len (active_tasks ) >= self .upload_batch_size :
595636 done , pending = await asyncio .wait (active_tasks , return_when = asyncio .FIRST_COMPLETED )
596637 active_tasks = list (pending )
597-
598638 except Exception as e :
599639 logger .error (f"Upload worker error: { e } " )
600640
601- # Wait for remaining uploads
641+ # Drain all in-flight uploads before exiting
602642 if active_tasks :
603643 logger .info (f"Waiting for { len (active_tasks )} pending uploads..." )
604644 await asyncio .gather (* active_tasks , return_exceptions = True )
@@ -671,9 +711,6 @@ async def handle_session_closed(self, data: dict) -> None:
671711 # Stop recording if in progress
672712 if session .ffmpeg_process is not None :
673713 await self .stop_recording (session )
674- # Small delay to ensure file is finalized
675- await asyncio .sleep (0.5 )
676- # Queue upload
677714 await self .queue_upload (session )
678715
679716 # Clean up session after a delay (keep for potential late events)
@@ -846,35 +883,28 @@ async def wait_for_display(self) -> None:
846883 return
847884 except Exception :
848885 pass
849- await asyncio .sleep (2 )
886+ try :
887+ await asyncio .wait_for (self .shutdown_event .wait (), timeout = 2 )
888+ except asyncio .TimeoutError :
889+ pass
850890
851891 async def cleanup (self ) -> None :
852892 """Cleanup all resources."""
853893 logger .info ("Shutting down..." )
854894
855- # Stop all active recordings
895+ # Snapshot active sessions outside the lock so we don't hold
896+ # sessions_lock across slow awaits (stop_recording can take up to 10s).
856897 async with self .sessions_lock :
857- for session in self .sessions .values ():
858- if session .ffmpeg_process is not None :
859- logger .info (f"Stopping recording: { session .session_id } " )
860- await self .stop_recording (session )
861- await self .queue_upload (session )
898+ active_sessions = [s for s in self .sessions .values () if s .ffmpeg_process is not None ]
862899
863- # Signal upload worker to finish
864- self .shutdown_event .set ()
900+ for session in active_sessions :
901+ logger .info (f"Stopping recording: { session .session_id } " )
902+ await self .stop_recording (session )
903+ await self .queue_upload (session )
865904
866- # Wait for upload worker
867- try :
868- await asyncio .wait_for (self .uploader_done .wait (), timeout = 30 )
869- except asyncio .TimeoutError :
870- logger .warning ("Upload worker did not finish in time" )
871-
872- # Kill any remaining uploads
873- for proc in self .active_uploads :
874- try :
875- proc .kill ()
876- except Exception :
877- pass
905+ # Push sentinel so the upload worker exits after draining the queue.
906+ # run() is responsible for awaiting the upload task with a timeout.
907+ await self .upload_queue .put (None )
878908
879909 logger .info ("Shutdown complete" )
880910
@@ -909,19 +939,35 @@ async def run(self) -> None:
909939 logger .error ("Failed to resolve Node ID from /status endpoint, exiting" )
910940 return
911941
912- # Start workers
913- tasks = [
914- asyncio .create_task (self .subscribe_events (), name = "event_subscriber" ),
915- asyncio .create_task (self .upload_worker (), name = "upload_worker" ),
916- ]
942+ # Upload worker runs independently — it exits only when cleanup() pushes
943+ # a None sentinel, so it is NOT included in the gather below.
944+ upload_task = asyncio .create_task (self .upload_worker (), name = "upload_worker" )
917945
918946 try :
919- await asyncio .gather (* tasks )
947+ await asyncio .gather (
948+ asyncio .create_task (self .subscribe_events (), name = "event_subscriber" ),
949+ )
920950 except asyncio .CancelledError :
921951 logger .info ("Tasks cancelled" )
922952 finally :
953+ # cleanup() stops recordings, queues uploads, then pushes the sentinel.
923954 await self .cleanup ()
924955
956+ # Wait for the upload worker to drain and exit.
957+ try :
958+ await asyncio .wait_for (upload_task , timeout = 30 )
959+ except asyncio .TimeoutError :
960+ logger .warning ("Upload worker did not finish in time, cancelling" )
961+ upload_task .cancel ()
962+ await asyncio .gather (upload_task , return_exceptions = True )
963+
964+ # Kill any rclone processes still in flight
965+ for proc in self .active_uploads :
966+ try :
967+ proc .kill ()
968+ except Exception :
969+ pass
970+
925971
926972async def main ():
927973 """Main entry point."""
0 commit comments