Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions design/mvp/Async.md
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,8 @@ signalled by performing a `0`-length read or write (see the [Stream State]
section in the Canonical ABI explainer for details).

As a temporary limitation, if a `read` and `write` for a single stream or
future occur from within the same component, there is a trap. In the future
this limitation will be removed.
future occur from within the same component and the element type is non-empty,
there is a trap. In the future this limitation will be removed.

The `T` element type of streams and futures is optional, such that `future` and
`stream` can be written in WIT without a trailing `<T>`. In this case, the
Expand Down
10 changes: 9 additions & 1 deletion design/mvp/CanonicalABI.md
Original file line number Diff line number Diff line change
Expand Up @@ -1362,7 +1362,8 @@ but in the opposite direction. Both are implemented by a single underlying
self.set_pending(inst, buffer, on_partial_copy, on_copy_done)
return 'blocked'
else:
trap_if(inst is self.pending_inst) # temporary
assert(self.t == src.t == dst.t)
trap_if(inst is self.pending_inst and self.t is not None) # temporary
if self.pending_buffer.remain() > 0:
if buffer.remain() > 0:
dst.write(src.read(min(src.remain(), dst.remain())))
Expand All @@ -1379,6 +1380,13 @@ but in the opposite direction. Both are implemented by a single underlying
else:
return 'done'
```
Currently, there is a trap when both the `read` and `write` come from the same
component instance and there is a non-empty element type. This trap will be
removed in a subsequent release; the reason for the trap is that when lifting
and lowering can alias the same memory, interleavings can be complex and must
be handled carefully. Future improvements to the Canonical ABI ([lazy lowering])
can greatly simplify this interleaving and be more practical to implement.

The meaning of a `read` or `write` when the length is `0` is that the caller is
querying the "readiness" of the other side. When a `0`-length read/write
rendezvous with a non-`0`-length read/write, only the `0`-length read/write
Expand Down
3 changes: 2 additions & 1 deletion design/mvp/canonical-abi/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,8 @@ def copy(self, inst, buffer, on_partial_copy, on_copy_done, src, dst):
self.set_pending(inst, buffer, on_partial_copy, on_copy_done)
return 'blocked'
else:
trap_if(inst is self.pending_inst) # temporary
assert(self.t == src.t == dst.t)
trap_if(inst is self.pending_inst and self.t is not None) # temporary
if self.pending_buffer.remain() > 0:
if buffer.remain() > 0:
dst.write(src.read(min(src.remain(), dst.remain())))
Expand Down
66 changes: 64 additions & 2 deletions design/mvp/canonical-abi/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1062,6 +1062,7 @@ def write(self, vs):

class HostSink:
stream: ReadableStream
t: ValType
received: list[int]
chunk: int
write_remain: int
Expand All @@ -1070,6 +1071,7 @@ class HostSink:

def __init__(self, stream, chunk, remain = 2**64):
self.stream = stream
self.t = stream.t
self.received = []
self.chunk = chunk
self.write_remain = remain
Expand Down Expand Up @@ -1781,10 +1783,12 @@ async def core_func(task, args):


class HostFutureSink:
t: ValType
v: Optional[any]
has_v: asyncio.Event

def __init__(self):
def __init__(self, t):
self.t = t
self.v = None
self.has_v = asyncio.Event()

Expand Down Expand Up @@ -1849,7 +1853,7 @@ async def host_func(task, on_start, on_resolve, on_block):
[future] = on_start()
outgoing = HostFutureSource(U8Type())
on_resolve([outgoing])
incoming = HostFutureSink()
incoming = HostFutureSink(U8Type())
future.read(None, incoming, lambda:(), lambda why:())
wait = asyncio.create_task(incoming.has_v.wait())
await on_block(wait)
Expand Down Expand Up @@ -2169,6 +2173,63 @@ def on_resolve(results):
assert(len(got) == 1)
assert(got[0] == 42)

async def test_self_empty():
inst = ComponentInstance()
mem = bytearray(24)
sync_opts = mk_opts(memory=mem, sync=True)
async_opts = mk_opts(memory=mem, sync=False)

ft = FuncType([],[])
async def core_func(task, args):
[seti] = await canon_waitable_set_new(task)

[packed] = await canon_future_new(None, task)
rfi,wfi = unpack_new_ends(packed)

[ret] = await canon_future_write(None, async_opts, task, wfi, 10000)
assert(ret == definitions.BLOCKED)

[ret] = await canon_future_read(None, async_opts, task, rfi, 20000)
result,n = unpack_result(ret)
assert(n == 1 and result == definitions.CLOSED)
[] = await canon_future_close_readable(None, task, rfi)

[] = await canon_waitable_join(task, wfi, seti)
[event] = await canon_waitable_set_wait(True, mem, task, seti, 0)
assert(event == EventCode.FUTURE_WRITE)
assert(mem[0] == wfi)
result,n = unpack_result(mem[4])
assert(result == definitions.CLOSED)
assert(n == 1)
[] = await canon_future_close_writable(None, task, wfi)

[packed] = await canon_stream_new(None, task)
rsi,wsi = unpack_new_ends(packed)
[ret] = await canon_stream_write(None, async_opts, task, wsi, 10000, 3)
assert(ret == definitions.BLOCKED)

[ret] = await canon_stream_read(None, async_opts, task, rsi, 2000, 1)
result,n = unpack_result(ret)
assert(n == 1 and result == definitions.COMPLETED)
[ret] = await canon_stream_read(None, async_opts, task, rsi, 2000, 4)
result,n = unpack_result(ret)
assert(n == 2 and result == definitions.COMPLETED)
[] = await canon_stream_close_readable(None, task, rsi)

[] = await canon_waitable_join(task, wsi, seti)
[event] = await canon_waitable_set_wait(True, mem, task, seti, 0)
assert(event == EventCode.STREAM_WRITE)
assert(mem[0] == wsi)
result,n = unpack_result(mem[4])
assert(result == definitions.CLOSED)
assert(n == 3)
[] = await canon_stream_close_writable(None, task, wsi)

[] = await canon_waitable_set_drop(task, seti)
return []

await canon_lift(sync_opts, inst, ft, core_func, None, lambda:[], lambda _:(), host_on_block)

async def run_async_tests():
await test_roundtrips()
await test_handles()
Expand All @@ -2187,6 +2248,7 @@ async def run_async_tests():
await test_cancel_copy()
await test_futures()
await test_cancel_subtask()
await test_self_empty()

asyncio.run(run_async_tests())

Expand Down