88
99from __future__ import annotations
1010import asyncio
11+ from collections import deque
1112import json
1213import logging
1314import typing
2526
2627
2728logger = logging .getLogger (__name__ )
29+ EventT = typing .TypeVar ('EventT' )
2830
2931
3032class CDPError (Exception ):
@@ -96,6 +98,7 @@ def __init__(self, url: str, timeout: float = 30.0):
9698 self ._next_command_id = 1
9799 self ._pending_commands : typing .Dict [int , PendingCommand ] = {}
98100 self ._event_queue : asyncio .Queue = asyncio .Queue ()
101+ self ._event_buffer : typing .Deque [typing .Any ] = deque ()
99102 self ._recv_task : typing .Optional [asyncio .Task ] = None
100103 self ._closed = False
101104
@@ -216,6 +219,26 @@ async def _handle_event(self, data: T_JSON_DICT) -> None:
216219 await self ._event_queue .put (event )
217220 except Exception as e :
218221 logger .error (f"Failed to parse event: { e } " )
222+
223+ def _restore_deferred_events (self , events : typing .List [typing .Any ]) -> None :
224+ """Put deferred events back at the front of the buffer."""
225+ for event in reversed (events ):
226+ self ._event_buffer .appendleft (event )
227+
228+ async def _next_event (self , timeout : typing .Optional [float ] = None ) -> typing .Any :
229+ """
230+ Read the next event from the local buffer or queue.
231+
232+ Buffered events are always consumed first to preserve ordering for events
233+ temporarily skipped by ``wait_for``.
234+ """
235+ if self ._event_buffer :
236+ return self ._event_buffer .popleft ()
237+
238+ if timeout is None :
239+ return await self ._event_queue .get ()
240+
241+ return await asyncio .wait_for (self ._event_queue .get (), timeout = timeout )
219242
220243 async def execute (
221244 self ,
@@ -311,13 +334,67 @@ async def listen(self) -> typing.AsyncIterator[typing.Any]:
311334 """
312335 while not self ._closed :
313336 try :
314- event = await asyncio . wait_for ( self ._event_queue . get (), timeout = 1.0 )
337+ event = await self ._next_event ( timeout = 1.0 )
315338 yield event
316339 except asyncio .TimeoutError :
317340 # Check if connection is still alive
318341 if self ._closed :
319342 break
320343 continue
344+
345+ async def wait_for (
346+ self ,
347+ event_type : typing .Type [EventT ],
348+ timeout : typing .Optional [float ] = None ,
349+ predicate : typing .Optional [typing .Callable [[EventT ], bool ]] = None ,
350+ ) -> EventT :
351+ """
352+ Wait for the next event matching a type (and optional predicate).
353+
354+ Non-matching events are not discarded; they are restored and remain
355+ available to ``listen()`` and ``get_event_nowait()``.
356+
357+ Args:
358+ event_type: Event class to match (e.g. ``page.LoadEventFired``).
359+ timeout: Maximum seconds to wait for a matching event.
360+ predicate: Optional callback for additional filtering.
361+
362+ Returns:
363+ The first matching event instance.
364+
365+ Raises:
366+ asyncio.TimeoutError: If no matching event arrives in time.
367+ CDPConnectionError: If the connection is closed while waiting.
368+ """
369+ deferred : typing .List [typing .Any ] = []
370+ deadline : typing .Optional [float ] = None
371+ if timeout is not None :
372+ deadline = asyncio .get_running_loop ().time () + timeout
373+
374+ try :
375+ while True :
376+ if (
377+ self ._closed
378+ and not self ._event_buffer
379+ and self ._event_queue .empty ()
380+ ):
381+ raise CDPConnectionError ("Connection closed while waiting for event" )
382+
383+ remaining : typing .Optional [float ] = None
384+ if deadline is not None :
385+ remaining = deadline - asyncio .get_running_loop ().time ()
386+ if remaining <= 0 :
387+ raise asyncio .TimeoutError ()
388+
389+ event = await self ._next_event (timeout = remaining )
390+ if isinstance (event , event_type ) and (
391+ predicate is None or predicate (event )
392+ ):
393+ return event
394+
395+ deferred .append (event )
396+ finally :
397+ self ._restore_deferred_events (deferred )
321398
322399 def get_event_nowait (self ) -> typing .Optional [typing .Any ]:
323400 """
@@ -327,6 +404,8 @@ def get_event_nowait(self) -> typing.Optional[typing.Any]:
327404 A CDP event object, or None if no events are available
328405 """
329406 try :
407+ if self ._event_buffer :
408+ return self ._event_buffer .popleft ()
330409 return self ._event_queue .get_nowait ()
331410 except asyncio .QueueEmpty :
332411 return None
0 commit comments