Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions trio/_core/_io_windows.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import attr

from .. import _core
from .. import _timeouts
from . import _public
from ._wakeup_socketpair import WakeupSocketpair
from .._util import is_main_thread
Expand Down Expand Up @@ -109,6 +110,65 @@ def _handle(obj):
return obj


async def WaitForSingleObject(handle):
"""Async and cancellable variant of kernel32.WaitForSingleObject().

Args:
handle: A win32 handle.

"""
# Quick check; we might not even need to spawn a thread. The zero
# means a zero timeout; this call never blocks. We also exit here
# if the handle is already closed for some reason.
retcode = kernel32.WaitForSingleObject(handle, 0)
if retcode != ErrorCodes.WAIT_TIMEOUT:
return

# :'( avoid circular imports
from .._threads import run_sync_in_worker_thread

class StubLimiter:
def release_on_behalf_of(self, x):
pass

async def acquire_on_behalf_of(self, x):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since nothing here depends on access to the internals of the WindowsIOManager, I think we can move it out into some place like trio/_wait_for_single_object.py.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. This also gets rid of that circular import.

pass

# Wait for a thread that waits for two handles: the handle plus a handle
# that we can use to cancel the thread.
cancel_handle = kernel32.CreateEventA(ffi.NULL, True, False, ffi.NULL)
try:
await run_sync_in_worker_thread(
WaitForMultipleObjects_sync,
handle,
cancel_handle,
cancellable=True,
limiter=StubLimiter(),
)
finally:
# Clean up our cancel handle. In case we get here because this task was
# cancelled, we also want to set the cancel_handle to stop the thread.
kernel32.SetEvent(cancel_handle)
kernel32.CloseHandle(cancel_handle)


def WaitForMultipleObjects_sync(*handles):
"""Wait for any of the given Windows handles to be signaled.

"""
n = len(handles)
handle_arr = ffi.new("HANDLE[{}]".format(n))
for i in range(n):
handle_arr[i] = handles[i]
timeout = 1000 * 60 * 60 * 24 # todo: use INF here, whatever that is, and ditch the while
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Windows defines the special value INFINITE to be 0xFFFFFFFF. We should probably add this as a constant in _windows_cffi.py, next to the error codes.

while True:
retcode = kernel32.WaitForMultipleObjects(
n, handle_arr, False, timeout
)
if retcode != ErrorCodes.WAIT_TIMEOUT:
break
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs say that WaitForMultipleObjects can return WAIT_FAILED, and that in this case we should check GetLastError. I can't think of why this would ever fail, but probably best to check anyway to be safe. Fortunately _windows_cffi.py already has the machinery you need; just do something like:

if retcode == WAIT_FAILED:
    _windows_cffi.raise_winerror()



@attr.s(frozen=True)
class _WindowsStatistics:
tasks_waiting_overlapped = attr.ib()
Expand Down
34 changes: 34 additions & 0 deletions trio/_core/_windows_cffi.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@

typedef OVERLAPPED WSAOVERLAPPED;
typedef LPOVERLAPPED LPWSAOVERLAPPED;
typedef PVOID LPSECURITY_ATTRIBUTES;
typedef PVOID LPCSTR;

typedef struct _OVERLAPPED_ENTRY {
ULONG_PTR lpCompletionKey;
Expand Down Expand Up @@ -80,6 +82,34 @@
_In_opt_ void* HandlerRoutine,
_In_ BOOL Add
);

HANDLE CreateEventA(
LPSECURITY_ATTRIBUTES lpEventAttributes,
BOOL bManualReset,
BOOL bInitialState,
LPCSTR lpName
);

BOOL SetEvent(
HANDLE hEvent
);

BOOL ResetEvent(
HANDLE hEvent
);

DWORD WaitForSingleObject(
HANDLE hHandle,
DWORD dwMilliseconds
);

DWORD WaitForMultipleObjects(
DWORD nCount,
HANDLE *lpHandles,
BOOL bWaitAll,
DWORD dwMilliseconds
);

"""

# cribbed from pywincffi
Expand Down Expand Up @@ -116,6 +146,10 @@ def raise_winerror(winerror=None, *, filename=None, filename2=None):

class ErrorCodes(enum.IntEnum):
STATUS_TIMEOUT = 0x102
WAIT_TIMEOUT = 0x102
WAIT_ABANDONED = 0x80
WAIT_OBJECT_0 = 0x00 # object is signaled
WAIT_FAILED = 0xFFFFFFFF
ERROR_IO_PENDING = 997
ERROR_OPERATION_ABORTED = 995
ERROR_ABANDONED_WAIT_0 = 735
Expand Down
107 changes: 107 additions & 0 deletions trio/_core/tests/test_windows.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import os
from threading import Thread
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've learned the hard way that it's usually a bad idea to use threading.Thread directly in tests – it can do things like swallow exceptions :-). If you need threads, then Trio's primitives are actually easier to use and more reliable, so ... that's what I use now.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This tests a synchronous function, so afaik I cannot use a Trio primitive here, right? (I am not that familiar with Trio yet though :)).

I now changed the test, so that the main thread waits for the handle, and a thread is spawned only to set the handle after a short timeout (there is thus very little that can go wrong in that thread).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use an async test function to test a synchronous function :-).

async def test_...():
    async with trio.open_nursery() as nursery:
        # This runs 'sync_fn' in a background thread
        nursery.start_soon(trio.run_sync_in_worker_thread, sync_fn)
        # ... we can do other stuff here while it's running ...
    # When we de-dent to close the nursery, it automatically joins the background thread

So this automatically makes sure that you join the background thread (it can't accidentally keep running while other tests go, even if this test crashes), it automatically detects exceptions in the background thread and makes sure that if they happen it causes the test to fail, and it's actually simpler to use (IMO) than the raw threading.Thread API :-).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course. I don't understand why I did not think to use run_sync_in_worker_thread 🤔

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, don't feel bad, this concurrency/IO stuff is complicated. I didn't think of it either when I was writing the trio.ssl testsuite, even though I wrote run_sync_in_worker_thread like the month before... so you get to learn from my fail :-)

import pytest

on_windows = (os.name == "nt")
# Mark all the tests in this file as being windows-only
pytestmark = pytest.mark.skipif(not on_windows, reason="windows only")

from ... import _core
from ... import _timeouts
if on_windows:
from .._windows_cffi import ffi, kernel32
from .._io_windows import WaitForSingleObject, WaitForMultipleObjects_sync


async def test_completion_key_listen():
Expand Down Expand Up @@ -38,5 +41,109 @@ async def post(key):
print("end loop")


async def test_WaitForMultipleObjects_sync():
# One handle
handle1 = kernel32.CreateEventA(ffi.NULL, True, False, ffi.NULL)
t = Thread(target=WaitForMultipleObjects_sync, args=(handle1,))
t.start()
kernel32.SetEvent(handle1)
t.join() # the test succeeds if we do not block here :)
kernel32.CloseHandle(handle1)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this test needs a thread at all – there's no particular reason to think that the thread is actually running before we call SetEvent, so you can capture the important part of the test by doing:

handle1 = kernel32.CreateEventA(ffi.NULL, True, False, ffi.NULL)
kernel32.SetEvent(handle1)
WaitForMultipleObjects_sync(handle1)
kernel32.CloseHandle(handle1)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's no particular reason to think that the thread is actually running before we call SetEvent

Oh dear, I overlooked that :)

Signaling before waiting is one option (and I added it for the fast tests), but I think it would be good to also test actually "acquiring the wait" and then setting the handle to "release the wait". Put that in a @slow test.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense :-)


# Two handles, signal first
handle1 = kernel32.CreateEventA(ffi.NULL, True, False, ffi.NULL)
handle2 = kernel32.CreateEventA(ffi.NULL, True, False, ffi.NULL)
t = Thread(target=WaitForMultipleObjects_sync, args=(handle1, handle2))
t.start()
kernel32.SetEvent(handle1)
t.join() # the test succeeds if we do not block here :)
kernel32.CloseHandle(handle1)
kernel32.CloseHandle(handle2)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment applies.


# Two handles, signal seconds
handle1 = kernel32.CreateEventA(ffi.NULL, True, False, ffi.NULL)
handle2 = kernel32.CreateEventA(ffi.NULL, True, False, ffi.NULL)
t = Thread(target=WaitForMultipleObjects_sync, args=(handle1, handle2))
t.start()
kernel32.SetEvent(handle2)
t.join() # the test succeeds if we do not block here :)
kernel32.CloseHandle(handle1)
kernel32.CloseHandle(handle2)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment applies.


# Closing the handle will not stop the thread. Initiating a wait on a
# closed handle will fail/return, but closing a handle that is already
# being waited on will not stop whatever is waiting for it.


async def test_WaitForSingleObject():

# Set the timeout used in the tests. The resolution of WaitForSingleObject
# is 0.01 so anything more than a magnitude larger should probably do.
# If too large, the test become slow and we might need to mark it as @slow.
TIMEOUT = 0.5
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment about the resolution is out of date, right?

If this is a test that has to sleep – and I think it probably is – then we're probably going to mark it @slow :-). I try to keep the quick test run you do over and over while hacking as fast as possible, so right now the slowest single test is <0.3s. We always run the full test suite during CI runs though, so all the @slow tests still get run on every commit.

Maybe we could have one test that's not marked @slow and checks the trivial cases of (1) WaitForSingleObject on an already-ready object, (2) WaitForSingleObject that gets preemptively cancelled, and then a second test that is marked @slow and has the tests for (3) we wait a bit and then signal the object, (4) we wait a bit and then cancel the call?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that comment was outdated.


async def handle_setter(handle):
await _timeouts.sleep(TIMEOUT)
kernel32.SetEvent(handle)

# Test 1, handle is SET after 1 sec in separate coroutine

handle = kernel32.CreateEventA(ffi.NULL, True, False, ffi.NULL)
t0 = _core.current_time()

async with _core.open_nursery() as nursery:
nursery.start_soon(WaitForSingleObject, handle)
nursery.start_soon(handle_setter, handle)

kernel32.CloseHandle(handle)
t1 = _core.current_time()
assert TIMEOUT <= (t1 - t0) < 1.1 * TIMEOUT
print('test_WaitForSingleObject test 1 OK')

# Test 2, handle is CLOSED after 1 sec - NOPE, wont work unless we use zero timeout

pass

# Test 3, cancelation
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spelling: should be cancellation

(In American spelling, there's some inconsistency about whether you use one or two l's in words like "cancelled". But in trio we always use two, because that's acceptable everywhere, and it's easier to remember than trying to keep track of which words have two and which have one.)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


handle = kernel32.CreateEventA(ffi.NULL, True, False, ffi.NULL)
t0 = _core.current_time()

with _timeouts.move_on_after(TIMEOUT):
await WaitForSingleObject(handle)

kernel32.CloseHandle(handle)
t1 = _core.current_time()
assert TIMEOUT <= (t1 - t0) < 1.1 * TIMEOUT
print('test_WaitForSingleObject test 3 OK')

# Test 4, already cancelled

handle = kernel32.CreateEventA(ffi.NULL, True, False, ffi.NULL)
kernel32.SetEvent(handle)
t0 = _core.current_time()

with _timeouts.move_on_after(TIMEOUT):
await WaitForSingleObject(handle)

kernel32.CloseHandle(handle)
t1 = _core.current_time()
assert (t1 - t0) < 0.5 * TIMEOUT
print('test_WaitForSingleObject test 4 OK')
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already signaled, right, not already cancelled?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes


# Test 5, already closed

handle = kernel32.CreateEventA(ffi.NULL, True, False, ffi.NULL)
kernel32.CloseHandle(handle)
t0 = _core.current_time()

with _timeouts.move_on_after(TIMEOUT):
await WaitForSingleObject(handle)

t1 = _core.current_time()
assert (t1 - t0) < 0.5 * TIMEOUT
print('test_WaitForSingleObject test 5 OK')
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this one actually require a timeout? I would have thought that WaitForSingleObject would fail immediately, before even starting a thread...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, was mostly there as a fail-safe. Removed it because it makes the test much shorter.



# XX test setting the iomanager._iocp to something weird to make sure that the
# IOCP thread can send exceptions back to the main thread