88Exceptions and classes related to asyncio Queue implementations.
99"""
1010
11- from . import event
11+ from . import core
1212
1313
1414class QueueEmpty (Exception ):
@@ -19,6 +19,19 @@ class QueueFull(Exception):
1919 """Raised when the Queue.put_nowait() method is called on a full Queue."""
2020
2121
22+ async def _wait_on_task_queue (task_queue : core .TaskQueue ):
23+ task_queue .push (core .cur_task )
24+ # Set calling task's data to the TaskQueue so it can be removed if needed
25+ core .cur_task .data = task_queue
26+ # Send control back
27+ await core ._never ()
28+
29+
30+ def _release_task_queue (task_queue : core .TaskQueue ):
31+ while task_queue .peek ():
32+ core ._task_queue .push (task_queue .pop ())
33+
34+
2235class Queue :
2336 """
2437 A queue, useful for coordinating producer and consumer coroutines.
@@ -37,27 +50,21 @@ def __init__(self, maxsize=0):
3750
3851 self ._queue = []
3952
40- self ._join_counter = 0
41- self ._join_event = event .Event ()
42- self ._join_event .set ()
53+ self ._active_tasks = 0
4354
44- self ._put_event = event .Event ()
45- self ._get_event = event .Event ()
55+ self ._waiting_for_completion = core .TaskQueue ()
56+ self ._waiting_for_put = core .TaskQueue ()
57+ self ._waiting_for_get = core .TaskQueue ()
4658
4759 def _get (self ):
4860 value = self ._queue .pop (0 )
49- self ._get_event .set ()
50- self ._get_event .clear ()
61+ _release_task_queue (self ._waiting_for_get )
5162 return value
5263
5364 def _put (self , val ):
54- self ._join_counter += 1
55- self ._join_event .clear ()
56-
5765 self ._queue .append (val )
58-
59- self ._put_event .set ()
60- self ._put_event .clear ()
66+ self ._active_tasks += 1
67+ _release_task_queue (self ._waiting_for_put )
6168
6269 async def get (self ):
6370 """
@@ -66,7 +73,7 @@ async def get(self):
6673 If queue is empty, wait until an item is available.
6774 """
6875 while self .empty ():
69- await self ._put_event . wait ( )
76+ await _wait_on_task_queue ( self ._waiting_for_put )
7077 return self ._get ()
7178
7279 def get_nowait (self ):
@@ -87,7 +94,7 @@ async def put(self, val):
8794 slot is available before adding item.
8895 """
8996 while self .full ():
90- await self ._get_event . wait ( )
97+ await _wait_on_task_queue ( self ._waiting_for_get )
9198 self ._put (val )
9299
93100 def put_nowait (self , val ):
@@ -129,17 +136,17 @@ def task_done(self):
129136 Raises ValueError if called more times than there were items placed in
130137 the queue.
131138 """
132- if self ._join_counter == 0 :
139+ if self ._active_tasks == 0 :
133140 # Can't have less than 0
134141 raise ValueError ("task_done() called too many times" )
135142
136- self ._join_counter -= 1
143+ self ._active_tasks -= 1
137144
138- if self ._join_counter == 0 :
139- self ._join_event . set ( )
145+ if self ._active_tasks == 0 :
146+ _release_task_queue ( self ._waiting_for_completion )
140147
141148 async def join (self ):
142149 """
143150 Block until all items in the queue have been gotten and processed.
144151 """
145- await self ._join_event . wait ( )
152+ await _wait_on_task_queue ( self ._waiting_for_completion )
0 commit comments