diff --git a/docs/source/reference-hazmat.rst b/docs/source/reference-hazmat.rst index 094f5901cc..efee0ba528 100644 --- a/docs/source/reference-hazmat.rst +++ b/docs/source/reference-hazmat.rst @@ -253,6 +253,19 @@ anything real. See `#26 Windows-specific API -------------------- +.. function:: WaitForSingleObject(handle) + :async: + + Async and cancellable variant of `WaitForSingleObject + `__. + Windows only. + + :arg handle: + A Win32 object handle, as a Python integer. + :raises OSError: + If the handle is invalid, e.g. when it is already closed. + + TODO: these are implemented, but are currently more of a sketch than anything real. See `#26 `__ and `#52 diff --git a/newsfragments/233.feature.rst b/newsfragments/233.feature.rst new file mode 100644 index 0000000000..cc3c9d0ddf --- /dev/null +++ b/newsfragments/233.feature.rst @@ -0,0 +1 @@ +Add :func:`trio.hazmat.WaitForSingleObject` async function to await Windows handles. diff --git a/trio/__init__.py b/trio/__init__.py index 7ea2df21eb..5f84226570 100644 --- a/trio/__init__.py +++ b/trio/__init__.py @@ -1,3 +1,6 @@ +"""Trio - Pythonic async I/O for humans and snake people. +""" + # General layout: # # trio/_core/... is the self-contained core library. It does various diff --git a/trio/_core/__init__.py b/trio/_core/__init__.py index 25779eef04..976975ae48 100644 --- a/trio/_core/__init__.py +++ b/trio/_core/__init__.py @@ -1,3 +1,10 @@ +""" +This namespace represents the core functionality that has to be built-in +and deal with private internal data structures. Things in this namespace +are publicly available in either trio, trio.hazmat, or trio.testing. +""" + + # Needs to be defined early so it can be imported: def _public(fn): # Used to mark methods on _Runner and on IOManager implementations that diff --git a/trio/_core/_io_windows.py b/trio/_core/_io_windows.py index 90a79c032d..885ddccf6f 100644 --- a/trio/_core/_io_windows.py +++ b/trio/_core/_io_windows.py @@ -22,6 +22,7 @@ INVALID_HANDLE_VALUE, raise_winerror, ErrorCodes, + _handle, ) # There's a lot to be said about the overall design of a Windows event @@ -96,19 +97,6 @@ def _check(success): return success -def _handle(obj): - # For now, represent handles as either cffi HANDLEs or as ints. If you - # try to pass in a file descriptor instead, it's not going to work - # out. (For that msvcrt.get_osfhandle does the trick, but I don't know if - # we'll actually need that for anything...) For sockets this doesn't - # matter, Python never allocates an fd. So let's wait until we actually - # encounter the problem before worrying about it. - if type(obj) is int: - return ffi.cast("HANDLE", obj) - else: - return obj - - @attr.s(frozen=True) class _WindowsStatistics: tasks_waiting_overlapped = attr.ib() diff --git a/trio/_core/_windows_cffi.py b/trio/_core/_windows_cffi.py index 3e124e6b61..16dd9a232b 100644 --- a/trio/_core/_windows_cffi.py +++ b/trio/_core/_windows_cffi.py @@ -35,6 +35,8 @@ typedef OVERLAPPED WSAOVERLAPPED; typedef LPOVERLAPPED LPWSAOVERLAPPED; +typedef PVOID LPSECURITY_ATTRIBUTES; +typedef PVOID LPCSTR; typedef struct _OVERLAPPED_ENTRY { ULONG_PTR lpCompletionKey; @@ -80,6 +82,34 @@ _In_opt_ void* HandlerRoutine, _In_ BOOL Add ); + +HANDLE CreateEventA( + LPSECURITY_ATTRIBUTES lpEventAttributes, + BOOL bManualReset, + BOOL bInitialState, + LPCSTR lpName +); + +BOOL SetEvent( + HANDLE hEvent +); + +BOOL ResetEvent( + HANDLE hEvent +); + +DWORD WaitForSingleObject( + HANDLE hHandle, + DWORD dwMilliseconds +); + +DWORD WaitForMultipleObjects( + DWORD nCount, + HANDLE *lpHandles, + BOOL bWaitAll, + DWORD dwMilliseconds +); + """ # cribbed from pywincffi @@ -104,6 +134,19 @@ INVALID_HANDLE_VALUE = ffi.cast("HANDLE", -1) +def _handle(obj): + # For now, represent handles as either cffi HANDLEs or as ints. If you + # try to pass in a file descriptor instead, it's not going to work + # out. (For that msvcrt.get_osfhandle does the trick, but I don't know if + # we'll actually need that for anything...) For sockets this doesn't + # matter, Python never allocates an fd. So let's wait until we actually + # encounter the problem before worrying about it. + if type(obj) is int: + return ffi.cast("HANDLE", obj) + else: + return obj + + def raise_winerror(winerror=None, *, filename=None, filename2=None): if winerror is None: winerror, msg = ffi.getwinerror() @@ -116,6 +159,10 @@ def raise_winerror(winerror=None, *, filename=None, filename2=None): class ErrorCodes(enum.IntEnum): STATUS_TIMEOUT = 0x102 + WAIT_TIMEOUT = 0x102 + WAIT_ABANDONED = 0x80 + WAIT_OBJECT_0 = 0x00 # object is signaled + WAIT_FAILED = 0xFFFFFFFF ERROR_IO_PENDING = 997 ERROR_OPERATION_ABORTED = 995 ERROR_ABANDONED_WAIT_0 = 735 diff --git a/trio/_core/tests/test_windows.py b/trio/_core/tests/test_windows.py index 11fbed45f2..c0ec5cce64 100644 --- a/trio/_core/tests/test_windows.py +++ b/trio/_core/tests/test_windows.py @@ -1,4 +1,5 @@ import os + import pytest on_windows = (os.name == "nt") diff --git a/trio/_wait_for_object.py b/trio/_wait_for_object.py new file mode 100644 index 0000000000..8fd2e28abc --- /dev/null +++ b/trio/_wait_for_object.py @@ -0,0 +1,70 @@ +from . import _timeouts +from . import _core +from ._threads import run_sync_in_worker_thread +from ._core._windows_cffi import ffi, kernel32, ErrorCodes, raise_winerror, _handle + +__all__ = ["WaitForSingleObject"] + + +class StubLimiter: + def release_on_behalf_of(self, x): + pass + + async def acquire_on_behalf_of(self, x): + pass + + +async def WaitForSingleObject(obj): + """Async and cancellable variant of WaitForSingleObject. Windows only. + + Args: + handle: A Win32 handle, as a Python integer. + + Raises: + OSError: If the handle is invalid, e.g. when it is already closed. + + """ + # Allow ints or whatever we can convert to a win handle + handle = _handle(obj) + + # Quick check; we might not even need to spawn a thread. The zero + # means a zero timeout; this call never blocks. We also exit here + # if the handle is already closed for some reason. + retcode = kernel32.WaitForSingleObject(handle, 0) + if retcode == ErrorCodes.WAIT_FAILED: + raise_winerror() + elif retcode != ErrorCodes.WAIT_TIMEOUT: + return + + # Wait for a thread that waits for two handles: the handle plus a handle + # that we can use to cancel the thread. + cancel_handle = kernel32.CreateEventA(ffi.NULL, True, False, ffi.NULL) + try: + await run_sync_in_worker_thread( + WaitForMultipleObjects_sync, + handle, + cancel_handle, + cancellable=True, + limiter=StubLimiter(), + ) + finally: + # Clean up our cancel handle. In case we get here because this task was + # cancelled, we also want to set the cancel_handle to stop the thread. + kernel32.SetEvent(cancel_handle) + kernel32.CloseHandle(cancel_handle) + + +def WaitForMultipleObjects_sync(*handles): + """Wait for any of the given Windows handles to be signaled. + + """ + n = len(handles) + handle_arr = ffi.new("HANDLE[{}]".format(n)) + for i in range(n): + handle_arr[i] = handles[i] + timeout = 0xffffffff # INFINITE + retcode = kernel32.WaitForMultipleObjects( + n, handle_arr, False, timeout + ) # blocking + if retcode == ErrorCodes.WAIT_FAILED: + raise_winerror() diff --git a/trio/hazmat.py b/trio/hazmat.py index e5a417ceb8..4aebcc1baf 100644 --- a/trio/hazmat.py +++ b/trio/hazmat.py @@ -1,7 +1,15 @@ -# These are all re-exported from trio._core. See comments in trio/__init__.py -# for details. To make static analysis easier, this lists all possible -# symbols, and then we prune some below if they aren't available on this -# system. +""" +This namespace represents low-level functionality not intended for daily use, +but useful for extending Trio's functionality. +""" + +import sys + +# This is the union of a subset of trio/_core/ and some things from trio/*.py. +# See comments in trio/__init__.py for details. To make static analysis easier, +# this lists all possible symbols from trio._core, and then we prune those that +# aren't available on this system. After that we add some symbols from trio/*.py. + __all__ = [ "cancel_shielded_checkpoint", "Abort", @@ -56,3 +64,8 @@ # who knows. remove_from_all = __all__.remove remove_from_all(_sym) + +# Import bits from trio/*.py +if sys.platform.startswith("win"): + from ._wait_for_object import WaitForSingleObject + __all__ += ["WaitForSingleObject"] diff --git a/trio/tests/test_wait_for_object.py b/trio/tests/test_wait_for_object.py new file mode 100644 index 0000000000..3a83e454e0 --- /dev/null +++ b/trio/tests/test_wait_for_object.py @@ -0,0 +1,220 @@ +import os +import time + +import pytest + +on_windows = (os.name == "nt") +# Mark all the tests in this file as being windows-only +pytestmark = pytest.mark.skipif(not on_windows, reason="windows only") + +from .._core.tests.tutil import slow +from .. import _core +from .. import _timeouts +if on_windows: + from .._core._windows_cffi import ffi, kernel32 + from .._wait_for_object import WaitForSingleObject, WaitForMultipleObjects_sync, run_sync_in_worker_thread + + +async def test_WaitForMultipleObjects_sync(): + # This does a series of tests where we set/close the handle before + # initiating the waiting for it. + # + # Note that closing the handle (not signaling) will cause the + # *initiation* of a wait to return immediately. But closing a handle + # that is already being waited on will not stop whatever is waiting + # for it. + + # One handle + handle1 = kernel32.CreateEventA(ffi.NULL, True, False, ffi.NULL) + kernel32.SetEvent(handle1) + WaitForMultipleObjects_sync(handle1) + kernel32.CloseHandle(handle1) + print('test_WaitForMultipleObjects_sync one OK') + + # Two handles, signal first + handle1 = kernel32.CreateEventA(ffi.NULL, True, False, ffi.NULL) + handle2 = kernel32.CreateEventA(ffi.NULL, True, False, ffi.NULL) + kernel32.SetEvent(handle1) + WaitForMultipleObjects_sync(handle1, handle2) + kernel32.CloseHandle(handle1) + kernel32.CloseHandle(handle2) + print('test_WaitForMultipleObjects_sync set first OK') + + # Two handles, signal second + handle1 = kernel32.CreateEventA(ffi.NULL, True, False, ffi.NULL) + handle2 = kernel32.CreateEventA(ffi.NULL, True, False, ffi.NULL) + kernel32.SetEvent(handle2) + WaitForMultipleObjects_sync(handle1, handle2) + kernel32.CloseHandle(handle1) + kernel32.CloseHandle(handle2) + print('test_WaitForMultipleObjects_sync set second OK') + + # Two handles, close first + handle1 = kernel32.CreateEventA(ffi.NULL, True, False, ffi.NULL) + handle2 = kernel32.CreateEventA(ffi.NULL, True, False, ffi.NULL) + kernel32.CloseHandle(handle1) + with pytest.raises(OSError): + WaitForMultipleObjects_sync(handle1, handle2) + kernel32.CloseHandle(handle2) + print('test_WaitForMultipleObjects_sync close first OK') + + # Two handles, close second + handle1 = kernel32.CreateEventA(ffi.NULL, True, False, ffi.NULL) + handle2 = kernel32.CreateEventA(ffi.NULL, True, False, ffi.NULL) + kernel32.CloseHandle(handle2) + with pytest.raises(OSError): + WaitForMultipleObjects_sync(handle1, handle2) + kernel32.CloseHandle(handle1) + print('test_WaitForMultipleObjects_sync close second OK') + + +@slow +async def test_WaitForMultipleObjects_sync_slow(): + # This does a series of test in which the main thread sync-waits for + # handles, while we spawn a thread to set the handles after a short while. + + TIMEOUT = 0.3 + + # One handle + handle1 = kernel32.CreateEventA(ffi.NULL, True, False, ffi.NULL) + t0 = _core.current_time() + async with _core.open_nursery() as nursery: + nursery.start_soon( + run_sync_in_worker_thread, WaitForMultipleObjects_sync, handle1 + ) + await _timeouts.sleep(TIMEOUT) + # If we would comment the line below, the above thread will be stuck, + # and trio wont exit this scope + kernel32.SetEvent(handle1) + t1 = _core.current_time() + assert TIMEOUT <= (t1 - t0) < 2.0 * TIMEOUT + kernel32.CloseHandle(handle1) + print('test_WaitForMultipleObjects_sync_slow one OK') + + # Two handles, signal first + handle1 = kernel32.CreateEventA(ffi.NULL, True, False, ffi.NULL) + handle2 = kernel32.CreateEventA(ffi.NULL, True, False, ffi.NULL) + t0 = _core.current_time() + async with _core.open_nursery() as nursery: + nursery.start_soon( + run_sync_in_worker_thread, WaitForMultipleObjects_sync, handle1, + handle2 + ) + await _timeouts.sleep(TIMEOUT) + kernel32.SetEvent(handle1) + t1 = _core.current_time() + assert TIMEOUT <= (t1 - t0) < 2.0 * TIMEOUT + kernel32.CloseHandle(handle1) + kernel32.CloseHandle(handle2) + print('test_WaitForMultipleObjects_sync_slow thread-set first OK') + + # Two handles, signal second + handle1 = kernel32.CreateEventA(ffi.NULL, True, False, ffi.NULL) + handle2 = kernel32.CreateEventA(ffi.NULL, True, False, ffi.NULL) + t0 = _core.current_time() + async with _core.open_nursery() as nursery: + nursery.start_soon( + run_sync_in_worker_thread, WaitForMultipleObjects_sync, handle1, + handle2 + ) + await _timeouts.sleep(TIMEOUT) + kernel32.SetEvent(handle2) + t1 = _core.current_time() + assert TIMEOUT <= (t1 - t0) < 2.0 * TIMEOUT + kernel32.CloseHandle(handle1) + kernel32.CloseHandle(handle2) + print('test_WaitForMultipleObjects_sync_slow thread-set second OK') + + +async def test_WaitForSingleObject(): + # This does a series of test for setting/closing the handle before + # initiating the wait. + + # Test already set + handle = kernel32.CreateEventA(ffi.NULL, True, False, ffi.NULL) + kernel32.SetEvent(handle) + await WaitForSingleObject(handle) # should return at once + kernel32.CloseHandle(handle) + print('test_WaitForSingleObject already set OK') + + # Test already set, as int + handle = kernel32.CreateEventA(ffi.NULL, True, False, ffi.NULL) + handle_int = int(ffi.cast("intptr_t", handle)) + kernel32.SetEvent(handle) + await WaitForSingleObject(handle_int) # should return at once + kernel32.CloseHandle(handle) + print('test_WaitForSingleObject already set OK') + + # Test already closed + handle = kernel32.CreateEventA(ffi.NULL, True, False, ffi.NULL) + kernel32.CloseHandle(handle) + with pytest.raises(OSError): + await WaitForSingleObject(handle) # should return at once + print('test_WaitForSingleObject already closed OK') + + # Not a handle + with pytest.raises(TypeError): + await WaitForSingleObject("not a handle") # Wrong type + # with pytest.raises(OSError): + # await WaitForSingleObject(99) # If you're unlucky, it actually IS a handle :( + print('test_WaitForSingleObject not a handle OK') + + +@slow +async def test_WaitForSingleObject_slow(): + # This does a series of test for setting the handle in another task, + # and cancelling the wait task. + + # Set the timeout used in the tests. We test the waiting time against + # the timeout with a certain margin. + TIMEOUT = 0.3 + + async def signal_soon_async(handle): + await _timeouts.sleep(TIMEOUT) + kernel32.SetEvent(handle) + + # Test handle is SET after TIMEOUT in separate coroutine + + handle = kernel32.CreateEventA(ffi.NULL, True, False, ffi.NULL) + t0 = _core.current_time() + + async with _core.open_nursery() as nursery: + nursery.start_soon(WaitForSingleObject, handle) + nursery.start_soon(signal_soon_async, handle) + + kernel32.CloseHandle(handle) + t1 = _core.current_time() + assert TIMEOUT <= (t1 - t0) < 2.0 * TIMEOUT + print('test_WaitForSingleObject_slow set from task OK') + + # Test handle is SET after TIMEOUT in separate coroutine, as int + + handle = kernel32.CreateEventA(ffi.NULL, True, False, ffi.NULL) + handle_int = int(ffi.cast("intptr_t", handle)) + t0 = _core.current_time() + + async with _core.open_nursery() as nursery: + nursery.start_soon(WaitForSingleObject, handle_int) + nursery.start_soon(signal_soon_async, handle) + + kernel32.CloseHandle(handle) + t1 = _core.current_time() + assert TIMEOUT <= (t1 - t0) < 2.0 * TIMEOUT + print('test_WaitForSingleObject_slow set from task as int OK') + + # Test handle is CLOSED after 1 sec - NOPE see comment above + + pass + + # Test cancellation + + handle = kernel32.CreateEventA(ffi.NULL, True, False, ffi.NULL) + t0 = _core.current_time() + + with _timeouts.move_on_after(TIMEOUT): + await WaitForSingleObject(handle) + + kernel32.CloseHandle(handle) + t1 = _core.current_time() + assert TIMEOUT <= (t1 - t0) < 2.0 * TIMEOUT + print('test_WaitForSingleObject_slow cancellation OK')