Skip to content

Commit 3a5183b

Browse files
Transition local eventbuses to use aiodistbus.EventBus. (#291)
* WIP: Node eventbus refactoring. * Refactored ProcessorService * Node services now using new aiodistbus EventBus. * Worker's NodeHandlerService tests passing. * Converted all local eventbus in the Worker to use aiodistbus. * Finished Manager, Worker, and Node refactoring to use aiodistbus's local EventBus implementation. * Reconfigured testing to improve testing speed and separation. * Correct TEST_DATA_DIR variable after file move. * Removed docker tests. * Removed non-public APIs that are unnecessary. * Refactored the Worker's services to improve decoupling. * Finished refactoring structs into a single data_protocols file to improve model re-used. * Fixing minor issues.
1 parent 6f81a7a commit 3a5183b

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+1381
-2941
lines changed

.github/workflows/test.yml

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,17 +67,35 @@ jobs:
6767
${{ steps.cp39.outputs.python-path }} -m pip install .[test]
6868
echo "MANUAL_OS_SET=Windows" >> $GITHUB_ENV
6969
70-
- name: Perform faster tests
70+
- name: Perform ChimeraPy utils tests
7171
run: |
72-
${{ steps.cp39.outputs.python-path }} -m coverage run --source=chimerapy -m pytest -v --reruns 5 --color yes --reruns-delay 5 -m "not slow" test
72+
${{ steps.cp39.outputs.python-path }} -m coverage run --source=chimerapy -m pytest -v --reruns 5 --color yes --reruns-delay 5 -m test/core
7373
${{ steps.cp39.outputs.python-path }} -m coverage combine --append
74-
mv chimerapy-engine-test.log chimerapy-engine-test-fast.log
74+
mv chimerapy-engine-test.log chimerapy-engine-test-utils.log
7575
76-
- name: Perform slower tests
76+
- name: Perform ChimeraPy logger tests
7777
run: |
78-
${{ steps.cp39.outputs.python-path }} -m coverage run --source=chimerapy -m pytest -v --reruns 5 --color yes --reruns-delay 5 -m "slow" test
78+
${{ steps.cp39.outputs.python-path }} -m coverage run --source=chimerapy -m pytest -v --reruns 5 --color yes --reruns-delay 5 -m test/logger
7979
${{ steps.cp39.outputs.python-path }} -m coverage combine --append
80-
mv chimerapy-engine-test.log chimerapy-engine-test-slow.log
80+
mv chimerapy-engine-test.log chimerapy-engine-test-logger.log
81+
82+
- name: Perform ChimeraPy Node tests
83+
run: |
84+
${{ steps.cp39.outputs.python-path }} -m coverage run --source=chimerapy -m pytest -v --reruns 5 --color yes --reruns-delay 5 -m test/node
85+
${{ steps.cp39.outputs.python-path }} -m coverage combine --append
86+
mv chimerapy-engine-test.log chimerapy-engine-test-node.log
87+
88+
- name: Perform ChimeraPy Worker tests
89+
run: |
90+
${{ steps.cp39.outputs.python-path }} -m coverage run --source=chimerapy -m pytest -v --reruns 5 --color yes --reruns-delay 5 -m test/worker
91+
${{ steps.cp39.outputs.python-path }} -m coverage combine --append
92+
mv chimerapy-engine-test.log chimerapy-engine-test-worker.log
93+
94+
- name: Perform ChimeraPy Manager tests
95+
run: |
96+
${{ steps.cp39.outputs.python-path }} -m coverage run --source=chimerapy -m pytest -v --reruns 5 --color yes --reruns-delay 5 -m test/manager
97+
${{ steps.cp39.outputs.python-path }} -m coverage combine --append
98+
mv chimerapy-engine-test.log chimerapy-engine-test-manager.log
8199
82100
- name: Combine test logs
83101
run : |

chimerapy/engine/data_protocols.py

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,18 @@
11
import datetime
2+
import enum
3+
import logging
4+
import typing
25
from dataclasses import dataclass, field
3-
from typing import Dict
6+
from typing import Any, Dict, List, Literal, Optional, Union
7+
8+
if typing.TYPE_CHECKING:
9+
from .graph import Graph
10+
from .states import NodeState
411

512
from dataclasses_json import DataClassJsonMixin
613

14+
from .networking import DataChunk
15+
716

817
@dataclass
918
class NodePubEntry(DataClassJsonMixin):
@@ -18,6 +27,7 @@ class NodePubTable(DataClassJsonMixin):
1827

1928
@dataclass
2029
class NodeDiagnostics(DataClassJsonMixin):
30+
node_id: str = ""
2131
timestamp: str = field(
2232
default_factory=lambda: str(datetime.datetime.now().isoformat())
2333
) # ISO str
@@ -26,3 +36,85 @@ class NodeDiagnostics(DataClassJsonMixin):
2636
memory_usage: float = 0 # KB
2737
cpu_usage: float = 0 # percentage
2838
num_of_steps: int = 0
39+
40+
41+
########################################################################
42+
## Manager specific
43+
########################################################################
44+
45+
46+
@dataclass
47+
class RegisterMethodResponseData(DataClassJsonMixin):
48+
success: bool
49+
result: Dict[str, Any]
50+
51+
52+
@dataclass
53+
class UpdateSendArchiveData(DataClassJsonMixin):
54+
worker_id: str
55+
success: bool
56+
57+
58+
@dataclass
59+
class CommitData(DataClassJsonMixin):
60+
graph: "Graph"
61+
mapping: Dict[str, List[str]]
62+
context: Literal["multiprocessing", "threading"] = "multiprocessing"
63+
send_packages: Optional[List[Dict[str, Any]]] = None
64+
65+
66+
########################################################################
67+
## Worker specific
68+
########################################################################
69+
70+
71+
@dataclass
72+
class ConnectData(DataClassJsonMixin):
73+
method: Literal["ip", "zeroconf"]
74+
host: Optional[str] = None
75+
port: Optional[int] = None
76+
77+
78+
@dataclass
79+
class GatherData(DataClassJsonMixin):
80+
node_id: str
81+
output: Union[DataChunk, List[int]]
82+
83+
84+
@dataclass
85+
class ResultsData(DataClassJsonMixin):
86+
node_id: str
87+
success: bool
88+
output: Any
89+
90+
91+
@dataclass
92+
class ServerMessage(DataClassJsonMixin):
93+
signal: enum.Enum
94+
data: Dict[str, Any] = field(default_factory=dict)
95+
client_id: Optional[str] = None
96+
97+
98+
########################################################################
99+
## Node specific
100+
########################################################################
101+
102+
103+
@dataclass
104+
class PreSetupData(DataClassJsonMixin):
105+
state: "NodeState"
106+
logger: logging.Logger
107+
108+
109+
@dataclass
110+
class RegisteredMethod(DataClassJsonMixin):
111+
name: str
112+
style: Literal["concurrent", "blocking", "reset"] = "concurrent"
113+
params: Dict[str, str] = field(default_factory=dict)
114+
115+
116+
@dataclass
117+
class RegisteredMethodData(DataClassJsonMixin):
118+
node_id: str
119+
method_name: str
120+
params: Dict[str, Any] = field(default_factory=dict)

chimerapy/engine/eventbus/__init__.py

Lines changed: 0 additions & 13 deletions
This file was deleted.

chimerapy/engine/eventbus/eventbus.py

Lines changed: 0 additions & 188 deletions
This file was deleted.

0 commit comments

Comments
 (0)