Skip to content

Commit 463b1b9

Browse files
committed
Wip coordinator changes.
1 parent f0ce896 commit 463b1b9

File tree

9 files changed

+93
-17
lines changed

9 files changed

+93
-17
lines changed

packages/liblsl/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ Submitted [JOSS paper](./paper/paper.md): markdown version of the JOSS paper (pe
1414
- [x] iOS
1515
- [x] Android
1616
- [ ] Web - Possibly in the future, see: [`native_assets_cli` build.dart / link.dart web backends and WasmCode and JsCode assets](https://github.com/dart-lang/native/issues/988)
17+
- Alternatively, I have been working on the coordinator library, which currently only has an LSL backend implemented, but it might be possible to create a WebSockets version that can be used from a web app. This of course might be useful for WAN-based communication, but this is out of scope for this (LSL) package.
1718

1819
Also confirmed working on:
1920

packages/liblsl/pubspec.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ dev_dependencies:
2222
ffigen: ^19.0.0
2323
lints: ^6.0.0
2424
test: ^1.26.0
25-
dartdoc: ^8.3.3
25+
dartdoc: ^9.0.0
2626

2727
ffigen:
2828
name: Liblsl

packages/liblsl_coordinator/lib/src/coordination/coordinator_state.dart

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ class CoordinationState {
5151
bool get isCoordinator => _isCoordinator;
5252
String? get coordinatorUId => _coordinatorUId;
5353
List<Node> get connectedNodes => List.unmodifiable(_connectedNodes);
54+
List<Node> get connectedParticipantNodes => _connectedNodes
55+
.where((n) => n.role == NodeCapability.participant.toString())
56+
.toList();
5457

5558
Stream<CoordinationPhase> get phaseChanges => _phaseController.stream;
5659
Stream<Node> get nodeJoined => _nodeJoinedController.stream;

packages/liblsl_coordinator/lib/src/coordination/handlers.dart

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,13 @@ class CoordinatorMessageHandler extends CoordinationMessageHandler {
3131
Stream<CoordinationMessage> get outgoingMessages =>
3232
_outgoingController.stream;
3333

34+
final StreamController<UserParticipantMessage>
35+
_userParticipantMessageController =
36+
StreamController<UserParticipantMessage>.broadcast();
37+
38+
Stream<UserParticipantMessage> get userParticipantMessages =>
39+
_userParticipantMessageController.stream;
40+
3441
final StreamController<StreamReadyMessage> _streamReadyController =
3542
StreamController<StreamReadyMessage>.broadcast();
3643
Stream<StreamReadyMessage> get streamReadyNotifications =>
@@ -54,6 +61,7 @@ class CoordinatorMessageHandler extends CoordinationMessageHandler {
5461
CoordinationMessageType.joinRequest,
5562
CoordinationMessageType.nodeLeaving,
5663
CoordinationMessageType.streamReady,
64+
CoordinationMessageType.userParticipantMessage,
5765
}.contains(type);
5866
}
5967

packages/liblsl_coordinator/lib/src/coordination/messages.dart

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ enum CoordinationMessageType {
2020
flushStream,
2121
destroyStream,
2222
userMessage,
23+
userParticipantMessage,
2324
configUpdate,
2425
nodeLeaving,
2526
}
@@ -81,6 +82,8 @@ abstract class CoordinationMessage {
8182
return DestroyStreamMessage.fromMap(map);
8283
case CoordinationMessageType.userMessage:
8384
return UserCoordinationMessage.fromMap(map);
85+
case CoordinationMessageType.userParticipantMessage:
86+
return UserParticipantMessage.fromMap(map);
8487
case CoordinationMessageType.configUpdate:
8588
return ConfigUpdateMessage.fromMap(map);
8689
case CoordinationMessageType.nodeLeaving:
@@ -599,6 +602,44 @@ class UserCoordinationMessage extends CoordinationMessage {
599602
);
600603
}
601604

605+
class UserParticipantMessage extends CoordinationMessage {
606+
final String messageId;
607+
final String description;
608+
final Map<String, dynamic> payload;
609+
610+
UserParticipantMessage({
611+
required super.fromNodeUId,
612+
required this.messageId,
613+
required this.description,
614+
required this.payload,
615+
super.timestamp,
616+
super.metadata,
617+
}) : super(type: CoordinationMessageType.userParticipantMessage);
618+
619+
@override
620+
Map<String, dynamic> toMap() {
621+
return {
622+
'type': type.name,
623+
'fromNodeUId': fromNodeUId,
624+
'timestamp': timestamp.toIso8601String(),
625+
'messageId': messageId,
626+
'description': description,
627+
'payload': payload,
628+
'metadata': metadata,
629+
};
630+
}
631+
632+
factory UserParticipantMessage.fromMap(Map<String, dynamic> map) =>
633+
UserParticipantMessage(
634+
fromNodeUId: map['fromNodeUId'],
635+
messageId: map['messageId'],
636+
description: map['description'],
637+
payload: Map<String, dynamic>.from(map['payload'] ?? {}),
638+
timestamp: DateTime.parse(map['timestamp']),
639+
metadata: Map<String, dynamic>.from(map['metadata'] ?? {}),
640+
);
641+
}
642+
602643
class ConfigUpdateMessage extends CoordinationMessage {
603644
final Map<String, dynamic> config;
604645

packages/liblsl_coordinator/lib/transports/lsl/lsl_coordination.dart

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ class LSLCoordinationSession extends CoordinationSession with RuntimeTypeUID {
4444
_controller.streamDestroyCommands;
4545

4646
Stream<UserCoordinationMessage> get userMessages => _controller.userMessages;
47+
Stream<UserParticipantMessage> get userParticipantMessages =>
48+
_controller.userParticipantMessages;
4749
Stream<ConfigUpdateMessage> get configUpdates => _controller.configUpdates;
4850
Stream<Node> get nodeJoined => _controller.nodeJoined;
4951
Stream<Node> get nodeLeft => _controller.nodeLeft;
@@ -53,6 +55,8 @@ class LSLCoordinationSession extends CoordinationSession with RuntimeTypeUID {
5355
bool get isCoordinator => _controller.isCoordinator;
5456
String? get coordinatorUId => _controller.coordinatorUId;
5557
List<Node> get connectedNodes => _controller.connectedNodes;
58+
List<Node> get connectedParticipantNodes =>
59+
_controller.connectedParticipantNodes;
5660

5761
@override
5862
LSLTransport get transport => _transport;

packages/liblsl_coordinator/lib/transports/lsl/lsl_coordination_controller.dart

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ class CoordinationController {
2323
Timer? _nodeTimeoutTimer;
2424
StreamSubscription? _coordinationSubscription;
2525
StreamSubscription? _handlerSubscription;
26+
StreamSubscription? _userMessageSubscription;
2627
StreamSubscription? _discoverySubscription;
2728
StreamSubscription? _streamReadySubscription;
2829

@@ -47,6 +48,9 @@ class CoordinationController {
4748
StreamController<DestroyStreamMessage>.broadcast();
4849
final StreamController<UserCoordinationMessage> _userMessageController =
4950
StreamController<UserCoordinationMessage>.broadcast();
51+
final StreamController<UserParticipantMessage>
52+
_userParticipantMessageController =
53+
StreamController<UserParticipantMessage>.broadcast();
5054
final StreamController<ConfigUpdateMessage> _configUpdateController =
5155
StreamController<ConfigUpdateMessage>.broadcast();
5256
final StreamController<Node> _nodeJoinedController =
@@ -74,6 +78,8 @@ class CoordinationController {
7478
_streamDestroyController.stream;
7579
Stream<UserCoordinationMessage> get userMessages =>
7680
_userMessageController.stream;
81+
Stream<UserParticipantMessage> get userParticipantMessages =>
82+
_userParticipantMessageController.stream;
7783
Stream<ConfigUpdateMessage> get configUpdates =>
7884
_configUpdateController.stream;
7985
Stream<Node> get nodeJoined => _nodeJoinedController.stream;
@@ -83,6 +89,7 @@ class CoordinationController {
8389
bool get isCoordinator => _state.isCoordinator;
8490
String? get coordinatorUId => _state.coordinatorUId;
8591
List<Node> get connectedNodes => _state.connectedNodes;
92+
List<Node> get connectedParticipantNodes => _state.connectedParticipantNodes;
8693

8794
CoordinationController({
8895
required this.coordinationConfig,
@@ -322,6 +329,8 @@ class CoordinationController {
322329
);
323330
_streamReadySubscription = _coordinatorHandler!.streamReadyNotifications
324331
.listen(_streamReadyController.add);
332+
_userMessageSubscription = _coordinatorHandler!.userParticipantMessages
333+
.listen(_userParticipantMessageController.add);
325334
// Start heartbeat
326335
_startHeartbeat();
327336

@@ -364,6 +373,7 @@ class CoordinationController {
364373
_streamDestroyController.add,
365374
);
366375
_participantHandler!.userMessages.listen(_userMessageController.add);
376+
367377
_participantHandler!.configUpdates.listen(_configUpdateController.add);
368378

369379
// Send join request
@@ -631,7 +641,16 @@ class CoordinationController {
631641
Map<String, dynamic> payload,
632642
) async {
633643
if (!_state.isCoordinator) {
634-
throw StateError('Only coordinator can send user messages');
644+
// @TODO: Implement properly
645+
await _participantHandler!.sendMessage(
646+
UserCoordinationMessage(
647+
fromNodeUId: thisNode.uId,
648+
messageId: messageId,
649+
description: description,
650+
payload: payload,
651+
),
652+
);
653+
return;
635654
}
636655
await _coordinatorHandler!.broadcastUserMessage(
637656
messageId,

packages/liblsl_coordinator/lib/transports/lsl/lsl_stream.dart

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1024,7 +1024,7 @@ class LSLDataStream extends DataStream<DataStreamConfig, IMessage>
10241024
)
10251025
..setMetadata('lsl_timestamp', data.lslTimestamp)
10261026
..setMetadata('lsl_time_correction', data.lslTimeCorrection)
1027-
..setMetadata('received_at', DateTime.now());
1027+
..setMetadata('received_at', DateTime.now().toIso8601String());
10281028
}
10291029
break;
10301030
case StreamDataType.int8:
@@ -1036,7 +1036,7 @@ class LSLDataStream extends DataStream<DataStreamConfig, IMessage>
10361036
)
10371037
..setMetadata('lsl_timestamp', data.lslTimestamp)
10381038
..setMetadata('lsl_time_correction', data.lslTimeCorrection)
1039-
..setMetadata('received_at', DateTime.now());
1039+
..setMetadata('received_at', DateTime.now().toIso8601String());
10401040
}
10411041
break;
10421042
case StreamDataType.int16:
@@ -1048,7 +1048,7 @@ class LSLDataStream extends DataStream<DataStreamConfig, IMessage>
10481048
)
10491049
..setMetadata('lsl_timestamp', data.lslTimestamp)
10501050
..setMetadata('lsl_time_correction', data.lslTimeCorrection)
1051-
..setMetadata('received_at', DateTime.now());
1051+
..setMetadata('received_at', DateTime.now().toIso8601String());
10521052
}
10531053
break;
10541054
case StreamDataType.int32:
@@ -1060,7 +1060,7 @@ class LSLDataStream extends DataStream<DataStreamConfig, IMessage>
10601060
)
10611061
..setMetadata('lsl_timestamp', data.lslTimestamp)
10621062
..setMetadata('lsl_time_correction', data.lslTimeCorrection)
1063-
..setMetadata('received_at', DateTime.now());
1063+
..setMetadata('received_at', DateTime.now().toIso8601String());
10641064
}
10651065
break;
10661066
case StreamDataType.int64:
@@ -1072,7 +1072,7 @@ class LSLDataStream extends DataStream<DataStreamConfig, IMessage>
10721072
)
10731073
..setMetadata('lsl_timestamp', data.lslTimestamp)
10741074
..setMetadata('lsl_time_correction', data.lslTimeCorrection)
1075-
..setMetadata('received_at', DateTime.now());
1075+
..setMetadata('received_at', DateTime.now().toIso8601String());
10761076
}
10771077
break;
10781078
case StreamDataType.string:
@@ -1084,7 +1084,7 @@ class LSLDataStream extends DataStream<DataStreamConfig, IMessage>
10841084
)
10851085
..setMetadata('lsl_timestamp', data.lslTimestamp)
10861086
..setMetadata('lsl_time_correction', data.lslTimeCorrection)
1087-
..setMetadata('received_at', DateTime.now());
1087+
..setMetadata('received_at', DateTime.now().toIso8601String());
10881088
}
10891089
break;
10901090
}
@@ -1217,7 +1217,7 @@ class LSLCoordinationStream
12171217
)
12181218
..setMetadata('lsl_timestamp', data.lslTimestamp)
12191219
..setMetadata('lsl_time_correction', data.lslTimeCorrection)
1220-
..setMetadata('received_at', DateTime.now());
1220+
..setMetadata('received_at', DateTime.now().toIso8601String());
12211221
}
12221222
return null;
12231223
}

pubspec.lock

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,18 @@ packages:
55
dependency: transitive
66
description:
77
name: _fe_analyzer_shared
8-
sha256: da0d9209ca76bde579f2da330aeb9df62b6319c834fa7baae052021b0462401f
8+
sha256: c209688d9f5a5f26b2fb47a188131a6fb9e876ae9e47af3737c0b4f58a93470d
99
url: "https://pub.dev"
1010
source: hosted
11-
version: "85.0.0"
11+
version: "91.0.0"
1212
analyzer:
1313
dependency: transitive
1414
description:
1515
name: analyzer
16-
sha256: "974859dc0ff5f37bc4313244b3218c791810d03ab3470a579580279ba971a48d"
16+
sha256: a40a0cee526a7e1f387c6847bd8a5ccbf510a75952ef8a28338e989558072cb0
1717
url: "https://pub.dev"
1818
source: hosted
19-
version: "7.7.1"
19+
version: "8.4.0"
2020
ansi_styles:
2121
dependency: transitive
2222
description:
@@ -189,18 +189,18 @@ packages:
189189
dependency: transitive
190190
description:
191191
name: dart_style
192-
sha256: "8a0e5fba27e8ee025d2ffb4ee820b4e6e2cf5e4246a6b1a477eb66866947e0bb"
192+
sha256: c87dfe3d56f183ffe9106a18aebc6db431fc7c98c31a54b952a77f3d54a85697
193193
url: "https://pub.dev"
194194
source: hosted
195-
version: "3.1.1"
195+
version: "3.1.2"
196196
dartdoc:
197197
dependency: transitive
198198
description:
199199
name: dartdoc
200-
sha256: f978526530e42dbb831295af743c057d94533e89c27ce1f4023b252f3d85b8be
200+
sha256: "73335ae67d2766ff753dc78b0f33ceb399ba5b94a5cfe8ce8d4128fb631717ee"
201201
url: "https://pub.dev"
202202
source: hosted
203-
version: "8.3.4"
203+
version: "9.0.0"
204204
dartframe:
205205
dependency: transitive
206206
description:

0 commit comments

Comments
 (0)