Low-level operations in trio.hazmat
¶
Warning
⚠️ DANGER DANGER DANGER ⚠️
You probably don’t want to use this module.
The trio.hazmat
API is public and stable (or at least, as
stable as anything in trio is!), but it has nasty
big pointy teeth. Mistakes may
not be handled gracefully; rules and conventions that are followed
strictly in the rest of trio do not always apply. Read and tread
carefully.
But if you find yourself needing to, for example, implement new synchronization primitives or expose new low-level I/O functionality, then you’re in the right place.
Low-level I/O primitives¶
Different environments expose different low-level APIs for performing
async I/O. trio.hazmat
attempts to expose these APIs in a
relatively direct way, so as to allow maximum power and flexibility
for higher level code. However, this means that the exact API provided
may vary depending on what system trio is running on.
Universally available API¶
All environments provide the following functions:
-
await
trio.hazmat.
wait_socket_readable
(sock)¶ Block until the given
socket.socket()
object is readable.The given object must be exactly of type
socket.socket()
, nothing else.Raises: - TypeError – if the given object is not of type
socket.socket()
. - RuntimeError – if another task is already waiting for the given socket to become readable.
- TypeError – if the given object is not of type
-
await
trio.hazmat.
wait_socket_writable
(sock)¶ Block until the given
socket.socket()
object is writable.The given object must be exactly of type
socket.socket()
, nothing else.Raises: - TypeError – if the given object is not of type
socket.socket()
. - RuntimeError – if another task is already waiting for the given socket to become writable.
- TypeError – if the given object is not of type
Unix-specific API¶
Unix-like systems provide the following functions:
-
await
trio.hazmat.
wait_readable
(fd)¶ Block until the given file descriptor is readable.
Warning
This is “readable” according to the operating system’s definition of readable. In particular, it probably won’t tell you anything useful for on-disk files.
Parameters: fd – integer file descriptor, or else an object with a fileno()
methodRaises: RuntimeError – if another task is already waiting for the given fd to become readable.
-
await
trio.hazmat.
wait_writable
(fd)¶ Block until the given file descriptor is writable.
Warning
This is “writable” according to the operating system’s definition of writable. In particular, it probably won’t tell you anything useful for on-disk files.
Parameters: fd – integer file descriptor, or else an object with a fileno()
methodRaises: RuntimeError – if another task is already waiting for the given fd to become writable.
System tasks¶
-
trio.hazmat.
spawn_system_task
()¶ Spawn a “system” task.
System tasks have a few differences from regular tasks:
- They don’t need an explicit nursery; instead they go into the internal “system nursery”.
- If a system task raises an exception, then it’s converted into a
TrioInternalError
and all tasks are cancelled. If you write a system task, you should be careful to make sure it doesn’t crash. - System tasks are automatically cancelled when the main task exits.
- By default, system tasks have
KeyboardInterrupt
protection enabled. If you want your task to be interruptible by control-C, then you need to usedisable_ki_protection()
explicitly.
Parameters: - async_fn – An async callable.
- args – Positional arguments for
async_fn
. If you want to pass keyword arguments, usefunctools.partial()
. - name – The name for this task. Only used for debugging/introspection
(e.g.
repr(task_obj)
). If this isn’t a string,spawn_system_task()
will try to make it one. A common use case is if you’re wrapping a function before spawning a new task, you might pass the original function as thename=
to make debugging easier.
Returns: the newly spawned task
Return type:
Entering trio from external threads or signal handlers¶
-
trio.hazmat.
current_call_soon_thread_and_signal_safe
()¶ Returns a reference to the
call_soon_thread_and_signal_safe
function for the current trio run:-
call_soon_thread_and_signal_safe
(sync_fn, *args, idempotent=False)¶ Schedule a call to
sync_fn(*args)
to occur in the context of a trio task. This is safe to call from the main thread, from other threads, and from signal handlers.The call is effectively run as part of a system task (see
spawn_system_task()
). In particular this means that:KeyboardInterrupt
protection is enabled by default; if you wantsync_fn
to be interruptible by control-C, then you need to usedisable_ki_protection()
explicitly.- If
sync_fn
raises an exception, then it’s converted into aTrioInternalError
and all tasks are cancelled. You should be careful thatsync_fn
doesn’t crash.
All calls with
idempotent=False
are processed in strict first-in first-out order.If
idempotent=True
, thensync_fn
andargs
must be hashable, and trio will make a best-effort attempt to discard any call submission which is equal to an already-pending call. Trio will make an attempt to process these in first-in first-out order, but no guarantees. (Currently processing is FIFO on CPython 3.6 and PyPy, but not CPython 3.5.)Any ordering guarantees apply separately to
idempotent=False
andidempotent=True
calls; there’s no rule for how calls in the different categories are ordered with respect to each other.Raises: trio.RunFinishedError – if the associated call to trio.run()
has already exited. (Any call that doesn’t raise this error is guaranteed to be fully processed beforetrio.run()
exits.)
-
Safer KeyboardInterrupt handling¶
Trio’s handling of control-C is designed to balance usability and
safety. On the one hand, there are sensitive regions (like the core
scheduling loop) where it’s simply impossible to handle arbitrary
KeyboardInterrupt
exceptions while maintaining our core
correctness invariants. On the other, if the user accidentally writes
an infinite loop, we do want to be able to break out of that. Our
solution is to install a default signal handler which checks whether
it’s safe to raise KeyboardInterrupt
at the place where the
signal is received. If so, then we do; otherwise, we schedule a
KeyboardInterrupt
to be delivered to the main task at the next
available opportunity (similar to how Cancelled
is
delivered).
So that’s great, but – how do we know whether we’re in one of the sensitive parts of the program or not?
This is determined on a function-by-function basis. By default, a function is protected if its caller is, and not if its caller isn’t; this is helpful because it means you only need to override the defaults at places where you transition from protected code to unprotected code or vice-versa.
These transitions are accomplished using two function decorators:
-
@
trio.hazmat.
disable_ki_protection
¶ Decorator that marks the given regular function, generator function, async function, or async generator function as unprotected against
KeyboardInterrupt
, i.e., the code inside this function can be rudely interrupted byKeyboardInterrupt
at any moment.If you have multiple decorators on the same function, then this should be at the bottom of the stack (closest to the actual function).
An example of where you’d use this is in implementing something like
run_in_trio_thread
, which usescall_soon_thread_and_signal_safe
to get into the trio thread.call_soon_thread_and_signal_safe
callbacks are run withKeyboardInterrupt
protection enabled, andrun_in_trio_thread
takes advantage of this to safely set up the machinery for sending a response back to the original thread, and then usesdisable_ki_protection()
when entering the user-provided function.
-
@
trio.hazmat.
enable_ki_protection
¶ Decorator that marks the given regular function, generator function, async function, or async generator function as protected against
KeyboardInterrupt
, i.e., the code inside this function won’t be rudely interrupted byKeyboardInterrupt
. (Though if it contains any checkpoints, then it can still receiveKeyboardInterrupt
at those. This is considered a polite interruption.)Warning
Be very careful to only use this decorator on functions that you know will either exit in bounded time, or else pass through a checkpoint regularly. (Of course all of your functions should have this property, but if you mess it up here then you won’t even be able to use control-C to escape!)
If you have multiple decorators on the same function, then this should be at the bottom of the stack (closest to the actual function).
An example of where you’d use this is on the
__exit__
implementation for something like aLock
, where a poorly-timedKeyboardInterrupt
could leave the lock in an inconsistent state and cause a deadlock.
-
trio.hazmat.
currently_ki_protected
()¶ Check whether the calling code has
KeyboardInterrupt
protection enabled.It’s surprisingly easy to think that one’s
KeyboardInterrupt
protection is enabled when it isn’t, or vice-versa. This function tells you what trio thinks of the matter, which makes it useful forassert
s and unit tests.Returns: True if protection is enabled, and False otherwise. Return type: bool
Sleeping and waking¶
Wait queue abstraction¶
-
class
trio.hazmat.
ParkingLot
¶ A fair wait queue with cancellation and requeueing.
This class encapsulates the tricky parts of implementing a wait queue. It’s useful for implementing higher-level synchronization primitives like queues and locks.
In addition to the methods below, you can use
len(parking_lot)
to get the number of parked tasks, andif parking_lot: ...
to check whether there are any parked tasks.-
await
park
()¶ Park the current task until woken by a call to
unpark()
orunpark_all()
.
-
await
unpark
(*, count=1)¶ Unpark one or more tasks.
This wakes up
count
tasks that are blocked inpark()
. If there are fewer thancount
tasks parked, then wakes as many tasks are available and then returns successfully.Parameters: count (int) – the number of tasks to unpark.
-
await
unpark_all
()¶ Unpark all parked tasks.
-
await
repark
(new_lot, *, count=1)¶ Move parked tasks from one
ParkingLot
object to another.This dequeues
count
tasks from one lot, and requeues them on another, preserving order. For example:async def parker(lot): print("sleeping") await lot.park() print("woken") async def main(): lot1 = trio.hazmat.ParkingLot() lot2 = trio.hazmat.ParkingLot() async with trio.open_nursery() as nursery: nursery.spawn(lot1) await trio.testing.wait_all_tasks_blocked() assert len(lot1) == 1 assert len(lot2) == 0 lot1.repark(lot2) assert len(lot1) == 0 assert len(lot2) == 1 # This wakes up the task that was originally parked in lot1 lot2.unpark()
If there are fewer than
count
tasks parked, then reparks as many tasks as are available and then returns successfully.Parameters: - new_lot (ParkingLot) – the parking lot to move tasks to.
- count (int) – the number of tasks to move.
-
await
repark_all
(new_lot)¶ Move all parked tasks from one
ParkingLot
object to another.See
repark()
for details.
-
await
Low-level checkpoint functions¶
-
await
trio.hazmat.
yield_briefly
()¶ A pure checkpoint.
This checks for cancellation and allows other tasks to be scheduled, without otherwise blocking.
Note that the scheduler has the option of ignoring this and continuing to run the current task if it decides this is appropriate (e.g. for increased efficiency).
Equivalent to
await trio.sleep(0)
(which is implemented by callingyield_briefly()
.)
The next two functions are used together to make up a checkpoint:
-
await
trio.hazmat.
yield_if_cancelled
()¶ A conditional checkpoint.
If a cancellation is active, then allows other tasks to be scheduled, and then raises
trio.Cancelled
.
-
await
trio.hazmat.
yield_briefly_no_cancel
()¶ Introduce a schedule point, but not a cancel point.
These are commonly used in cases where you have an operation that might-or-might-not block, and you want to implement trio’s standard checkpoint semantics. Example:
async def operation_that_maybe_blocks():
await yield_if_cancelled()
try:
ret = attempt_operation()
except BlockingIOError:
# need to block and then retry, which we do below
pass
except:
# some other error, finish the checkpoint then let it propagate
await yield_briefly_no_cancel()
raise
else:
# operation succeeded, finish the checkpoint then return
await yield_briefly_no_cancel()
return ret
while True:
await wait_for_operation_to_be_ready()
try:
return attempt_operation()
except BlockingIOError:
pass
This logic is a bit convoluted, but accomplishes all of the following:
- Every execution path passes through a checkpoint (assuming that
wait_for_operation_to_be_ready
is an unconditional checkpoint) - Our cancellation semantics say that
Cancelled
should only be raised if the operation didn’t happen. Usingyield_briefly_no_cancel()
on the early-exit branches accomplishes this. - On the path where we do end up blocking, we don’t pass through any schedule points before that, which avoids some unnecessary work.
- Avoids implicitly chaining the
BlockingIOError
with any errors raised byattempt_operation
orwait_for_operation_to_be_ready
, by keeping thewhile True:
loop outside of theexcept BlockingIOError:
block.
These functions can also be useful in other situations, e.g. if you’re
going to call an uncancellable operation like
trio.run_in_worker_thread()
or (potentially) overlapped I/O
operations on Windows, then you can call yield_if_cancelled()
first to make sure that the whole thing is a checkpoint.
Low-level blocking¶
-
class
trio.hazmat.
Abort
¶ enum.Enum
used as the return value from abort functions.See
yield_indefinitely()
for details.
-
trio.hazmat.
reschedule
()¶ Reschedule the given task with the given
Result
.See
yield_indefinitely()
for the gory details.There must be exactly one call to
reschedule()
for every call toyield_indefinitely()
. (And when counting, keep in mind that returningAbort.SUCCEEDED
from an abort callback is equivalent to callingreschedule()
once.)Parameters: - task (trio.Task) – the task to be rescheduled. Must be blocked in a
call to
yield_indefinitely()
. - next_send (trio.Result) – the value (or error) to return (or raise)
from
yield_indefinitely()
.
- task (trio.Task) – the task to be rescheduled. Must be blocked in a
call to
-
await
trio.hazmat.
yield_indefinitely
(abort_fn)¶ Put the current task to sleep, with cancellation support.
This is the lowest-level API for blocking in trio. Every time a
Task
blocks, it does so by calling this function.This is a tricky interface with no guard rails. If you can use
ParkingLot
or the built-in I/O wait functions instead, then you should.Generally the way it works is that before calling this function, you make arrangements for “someone” to call
reschedule()
on the current task at some later point.Then you call
yield_indefinitely()
, passing inabort_fn
, an “abort callback”.(Terminology: in trio, “aborting” is the process of attempting to interrupt a blocked task to deliver a cancellation.)
There are two possibilities for what happens next:
“Someone” calls
reschedule()
on the current task, andyield_indefinitely()
returns or raises whatever value or error was passed toreschedule()
.The call’s context transitions to a cancelled state (e.g. due to a timeout expiring). When this happens, the
abort_fn
is called. It’s interface looks like:def abort_fn(raise_cancel): ... return trio.hazmat.Abort.SUCCEEDED # or FAILED
It should attempt to clean up any state associated with this call, and in particular, arrange that
reschedule()
will not be called later. If (and only if!) it is successful, then it should returnAbort.SUCCEEDED
, in which case the task will automatically be rescheduled with an appropriateCancelled
error.Otherwise, it should return
Abort.FAILED
. This means that the task can’t be cancelled at this time, and still has to make sure that “someone” eventually callsreschedule()
.At that point there are again two possibilities. You can simply ignore the cancellation altogether: wait for the operation to complete and then reschedule and continue as normal. (For example, this is what
trio.run_in_worker_thread()
does if cancellation is disabled.) The other possibility is that theabort_fn
does succeed in cancelling the operation, but for some reason isn’t able to report that right away. (Example: on Windows, it’s possible to request that an async (“overlapped”) I/O operation be cancelled, but this request is also asynchronous – you don’t find out until later whether the operation was actually cancelled or not.) To report a delayed cancellation, then you should reschedule the task yourself, and call theraise_cancel
callback passed toabort_fn
to raise aCancelled
(or possiblyKeyboardInterrupt
) exception into this task. Either of the approaches sketched below can work:# Option 1: # Catch the exception from raise_cancel and inject it into the task. # (This is what trio does automatically for you if you return # Abort.SUCCEEDED.) trio.hazmat.reschedule(task, Result.capture(raise_cancel)) # Option 2: # wait to be woken by "someone", and then decide whether to raise # the error from inside the task. outer_raise_cancel = None def abort(inner_raise_cancel): nonlocal outer_raise_cancel outer_raise_cancel = inner_raise_cancel TRY_TO_CANCEL_OPERATION() return trio.hazmat.Abort.FAILED await yield_indefinitely(abort) if OPERATION_WAS_SUCCESSFULLY_CANCELLED: # raises the error outer_raise_cancel()
In any case it’s guaranteed that we only call the
abort_fn
at most once per call toyield_indefinitely()
.
Warning
If your
abort_fn
raises an error, or returns any value other thanAbort.SUCCEEDED
orAbort.FAILED
, then trio will crash violently. Be careful! Similarly, it is entirely possible to deadlock a trio program by failing to reschedule a blocked task, or cause havoc by callingreschedule()
too many times. Remember what we said up above about how you should use a higher-level API if at all possible?
Here’s an example lock class implemented using
yield_indefinitely()
directly. This implementation has a number
of flaws, including lack of fairness, O(n) cancellation, missing error
checking, failure to insert a checkpoint on the non-blocking path,
etc. If you really want to implement your own lock, then you should
study the implementation of trio.Lock
and use
ParkingLot
, which handles some of these issues for you. But
this does serve to illustrate the basic structure of the
yield_indefinitely()
API:
class NotVeryGoodLock:
def __init__(self):
self._blocked_tasks = collections.deque()
self._held = False
async def acquire(self):
while self._held:
task = trio.current_task()
self._blocked_tasks.append(task)
def abort_fn(_):
self._blocked_tasks.remove(task)
return trio.hazmat.Abort.SUCCEEDED
await trio.hazmat.yield_indefinitely(abort_fn)
self._held = True
def release(self):
self._held = False
if self._blocked_tasks:
woken_task = self._blocked_tasks.popleft()
trio.hazmat.reschedule(woken_task)