Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,4 @@ pkgs.mkShell {

LD_LIBRARY_PATH = pkgs.lib.makeLibraryPath nativeBuildInputs;
TMPDIR = "/tmp";

shellHook = ''
set -e
uv venv .venv --python=3.12
'';
}
145 changes: 144 additions & 1 deletion tests/msg/test_ext_types_msgspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
B~)

'''
from __future__ import annotations
from contextlib import (
contextmanager as cm,
# nullcontext,
Expand All @@ -20,7 +21,7 @@
# structs,
# msgpack,
Raw,
# Struct,
Struct,
ValidationError,
)
import pytest
Expand All @@ -46,6 +47,11 @@
apply_codec,
current_codec,
)
from tractor.msg._codec import (
default_builtins,
mk_dec_hook,
mk_codec_from_spec,
)
from tractor.msg.types import (
log,
Started,
Expand Down Expand Up @@ -743,6 +749,143 @@ async def main():
assert exc.boxed_type is TypeError


'''
Test the auto enc & dec hooks

Create a codec which will work for:
- builtins
- custom types
- lists of custom types
'''


class BytesTestClass(Struct, tag=True):
raw: bytes

def encode(self) -> bytes:
return self.raw

@classmethod
def from_bytes(self, raw: bytes) -> BytesTestClass:
return BytesTestClass(raw=raw)


class StrTestClass(Struct, tag=True):
s: str

def encode(self) -> str:
return self.s

@classmethod
def from_str(self, s: str) -> StrTestClass:
return StrTestClass(s=s)


class IntTestClass(Struct, tag=True):
num: int

def encode(self) -> int:
return self.num

@classmethod
def from_int(self, num: int) -> IntTestClass:
return IntTestClass(num=num)


builtins = tuple((
builtin
for builtin in default_builtins
if builtin is not list
))

TestClasses = (BytesTestClass, StrTestClass, IntTestClass)


TestSpec = (
*TestClasses, list[Union[*TestClasses]]
)


test_codec = mk_codec_from_spec(
spec=TestSpec
)


@tractor.context
async def child_custom_codec(
ctx: tractor.Context,
msgs: list[Union[*TestSpec]],
):
'''
Apply codec and send all msgs passed through stream

'''
with (
apply_codec(test_codec),
limit_plds(
test_codec.pld_spec,
dec_hook=mk_dec_hook(TestSpec),
ext_types=TestSpec + builtins
),
):
await ctx.started(None)
async with ctx.open_stream() as stream:
for msg in msgs:
await stream.send(msg)


def test_multi_custom_codec():
'''
Open subactor setup codec and pld_rx and wait to receive & assert from
stream

'''
msgs = [
None,
True, False,
0xdeadbeef,
.42069,
b'deadbeef',
BytesTestClass(raw=b'deadbeef'),
StrTestClass(s='deadbeef'),
IntTestClass(num=0xdeadbeef),
[
BytesTestClass(raw=b'deadbeef'),
StrTestClass(s='deadbeef'),
IntTestClass(num=0xdeadbeef),
]
]

async def main():
async with tractor.open_nursery() as an:
p: tractor.Portal = await an.start_actor(
'child',
enable_modules=[__name__],
)
async with (
p.open_context(
child_custom_codec,
msgs=msgs,
) as (ctx, _),
ctx.open_stream() as ipc
):
with (
apply_codec(test_codec),
limit_plds(
test_codec.pld_spec,
dec_hook=mk_dec_hook(TestSpec),
ext_types=TestSpec + builtins
)
):
msg_iter = iter(msgs)
async for recv_msg in ipc:
assert recv_msg == next(msg_iter)

await p.cancel_actor()

trio.run(main)


# def chk_pld_type(
# payload_spec: Type[Struct]|Any,
# pld: Any,
Expand Down
Loading
Loading