Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@

repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.5.0
rev: v6.0.0
hooks:
- id: check-yaml
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.3.4
rev: v0.15.10
hooks:
- id: ruff-format
- id: ruff
args: ["--fix"]
- repo: https://github.com/fsfe/reuse-tool
rev: v3.0.1
rev: v6.2.0
hooks:
- id: reuse
2 changes: 1 addition & 1 deletion asyncio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ def __getattr__(attr):
mod = _attrs.get(attr, None)
if mod is None:
raise AttributeError(attr)
value = getattr(__import__(mod, None, None, True, 1), attr)
value = getattr(__import__(mod, globals(), None, True, 1), attr)
globals()[attr] = value
return value
38 changes: 25 additions & 13 deletions asyncio/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def __next__(self):
raise self.exc


# Pause task execution for the given time (integer in milliseconds, uPy extension)
# Pause task execution for the given time (integer in milliseconds, MicroPython extension)
# Use a SingletonGenerator to do it without allocating on the heap
def sleep_ms(t, sgen=SingletonGenerator()):
# CIRCUITPY-CHANGE: doc
Expand Down Expand Up @@ -267,9 +267,16 @@ def run_until_complete(main_task=None):
# A task waiting on _task_queue; "ph_key" is time to schedule task at
dt = max(0, ticks_diff(t.ph_key, ticks()))
elif not _io_queue.map:
# No tasks can be woken so finished running
# No tasks can be woken
cur_task = None
return
if not main_task or not main_task.state:
# no main_task, or main_task is done so finished running
return
# At this point, there is theoretically nothing that could wake the
# scheduler, but it is not allowed to exit either. We keep the code
# running so that a hypothetical debugger (or other such meta-process)
# can get a view of what is happening and possibly abort.
dt = 3
# print('(poll {})'.format(dt), len(_io_queue.map))
_io_queue.wait_io_event(dt)

Expand All @@ -291,31 +298,33 @@ def run_until_complete(main_task=None):
except excs_all as er:
# Check the task is not on any event queue
assert t.data is None
# This task is done, check if it's the main task and then loop should stop
if t is main_task:
# If it's the main task, it is considered as awaited by the caller
awaited = t is main_task
if awaited:
cur_task = None
if isinstance(er, StopIteration):
return er.value
raise er
if not isinstance(er, StopIteration):
t.state = False
raise er
if t.state is None:
t.state = False
if t.state:
# Task was running but is now finished.
waiting = False
if t.state is True:
# "None" indicates that the task is complete and not await'ed on (yet).
t.state = None
t.state = False if awaited else None
elif callable(t.state):
# The task has a callback registered to be called on completion.
t.state(t, er)
t.state = False
waiting = True
awaited = True
else:
# Schedule any other tasks waiting on the completion of this task.
while t.state.peek():
_task_queue.push(t.state.pop())
waiting = True
awaited = True
# "False" indicates that the task is complete and has been await'ed on.
t.state = False
if not waiting and not isinstance(er, excs_stop):
if not awaited and not isinstance(er, excs_stop):
# An exception ended this detached task, so queue it for later
# execution to handle the uncaught exception if no other task retrieves
# the exception in the meantime (this is handled by Task.throw).
Expand All @@ -333,6 +342,9 @@ def run_until_complete(main_task=None):
_exc_context["exception"] = exc
_exc_context["future"] = t
Loop.call_exception_handler(_exc_context)
# If it's the main task then the loop should stop
if t is main_task:
return er.value


# Create a new task from a coroutine and run it until it finishes
Expand Down
1 change: 1 addition & 0 deletions asyncio/funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ def done(t, er):
elif isinstance(ts[i].data, StopIteration):
# Sub-task ran to completion, get its return value.
ts[i] = ts[i].data.value
# Sub-task had an exception.
elif return_exceptions:
# Get the sub-task exception to return in the list of return values.
ts[i] = ts[i].data
Expand Down
16 changes: 12 additions & 4 deletions asyncio/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ async def readline(self):
# CIRCUITPY-CHANGE: await, not yield
await core._io_queue.queue_read(self.s)
l2 = self.s.readline() # may do multiple reads but won't block
if l2 is None:
continue
l += l2
if not l2 or l[-1] == 10: # \n (check l in case l2 is str)
return l
Expand All @@ -126,7 +128,10 @@ async def drain(self):
# CIRCUITPY-CHANGE: doc
"""Drain (write) all buffered output data out to the stream.
"""

if not self.out_buf:
# Drain must always yield, so a tight loop of write+drain can't block the scheduler.
# CIRCUITPYTHON-CHANGE: await
return (await core.sleep_ms(0))
mv = memoryview(self.out_buf)
off = 0
while off < len(mv):
Expand Down Expand Up @@ -199,6 +204,9 @@ def close(self):
# CIRCUITPY-CHANGE: doc
"""Close the server."""

# Note: the _serve task must have already started by now due to the sleep
# in start_server, so `state` won't be clobbered at the start of _serve.
self.state = True
self.task.cancel()

async def wait_closed(self):
Expand Down Expand Up @@ -255,11 +263,11 @@ async def start_server(cb, host, port, backlog=5):
import socket

# Create and bind server socket.
host = socket.getaddrinfo(host, port)[0] # TODO this is blocking!
s = socket.socket()
addr_info = socket.getaddrinfo(host, port)[0] # TODO this is blocking!
s = socket.socket(addr_info[0]) # Use address family from getaddrinfo
s.setblocking(False)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(host[-1])
s.bind(addr_info[-1])
s.listen(backlog)

# Create and return server object and task.
Expand Down
20 changes: 11 additions & 9 deletions asyncio/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,16 +198,18 @@ def cancel(self):
if self is core.cur_task:
raise RuntimeError("can't cancel self")
# If Task waits on another task then forward the cancel to the one it's waiting on.
while isinstance(self.data, Task):
self = self.data
# CIRCUITPY-CHANGE: don't reassign self
task = self
while isinstance(task.data, Task):
task = task.data
# Reschedule Task as a cancelled task.
if hasattr(self.data, "remove"):
if hasattr(task.data, "remove"):
# Not on the main running queue, remove the task from the queue it's on.
self.data.remove(self)
core._task_queue.push(self)
elif core.ticks_diff(self.ph_key, core.ticks()) > 0:
task.data.remove(task)
core._task_queue.push(task)
elif core.ticks_diff(task.ph_key, core.ticks()) > 0:
# On the main running queue but scheduled in the future, so bring it forward to now.
core._task_queue.remove(self)
core._task_queue.push(self)
self.data = core.CancelledError
core._task_queue.remove(task)
core._task_queue.push(task)
task.data = core.CancelledError
return True
3 changes: 2 additions & 1 deletion asyncio/traceback.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ def _print_traceback(traceback, limit=None, file=sys.stderr) -> List[str]:
name = frame_code.co_name
print(f' File "{filename}", line {line_number}, in {name}', file=file)
traceback = traceback.tb_next
n = n + 1
# CIRCUITPY-CHANGE: use +=
n += 1
if limit is not None and n >= limit:
break

Expand Down
Loading