1818 RawBytesMessageMapper ,
1919 RawTextMessageMapper ,
2020)
21- from csp .adapters .websocket_types import ActionType , ConnectionRequest , WebsocketHeaderUpdate , WebsocketStatus
21+ from csp .adapters .websocket_types import (
22+ ActionType ,
23+ ConnectionRequest ,
24+ InternalConnectionRequest ,
25+ WebsocketHeaderUpdate ,
26+ WebsocketStatus ,
27+ )
2228from csp .impl .wiring import input_adapter_def , output_adapter_def , status_adapter_def
2329from csp .impl .wiring .delayed_node import DelayedNodeWrapperDef
2430from csp .lib import _websocketadapterimpl
@@ -396,18 +402,6 @@ def _instantiate(self):
396402 _launch_application (self ._port , manager , csp .const ("stub" ))
397403
398404
399- # Maybe, we can have the Adapter manager have all the connections
400- # If we get a new connection request, we include that adapter for the
401- # subscriptions. When we pop it, we remove it.
402- # Then, each edge will effectively be independent.
403- # Maybe. have each websocket push to a shared queue, then from there we
404- # pass it along to all edges ("input adapters") that are subscribed to it
405-
406- # Ok, maybe, let's keep it at just 1 subscribe and send call.
407- # However, we can subscribe to the send and subscribe calls separately.
408- # We just have to keep track of the Endpoints we have, and
409-
410-
411405class WebsocketAdapterManager :
412406 """
413407 Can subscribe dynamically via ts[List[ConnectionRequest]]
@@ -453,7 +447,7 @@ def __init__(
453447 connection_request = ConnectionRequest (
454448 uri = uri , reconnect_interval = reconnect_interval , headers = headers or {}
455449 )
456- self ._properties .update (self ._get_properties (connection_request ))
450+ self ._properties .update (self ._get_properties (connection_request ). to_dict () )
457451
458452 # This is a counter that will be used to identify every function call
459453 # We keep track of the subscribes and sends separately
@@ -467,7 +461,7 @@ def __init__(
467461 def _dynamic (self ):
468462 return self ._properties .get ("dynamic" , False )
469463
470- def _get_properties (self , conn_request : ConnectionRequest ) -> dict :
464+ def _get_properties (self , conn_request : ConnectionRequest ) -> InternalConnectionRequest :
471465 uri = conn_request .uri
472466 reconnect_interval = conn_request .reconnect_interval
473467
@@ -476,14 +470,14 @@ def _get_properties(self, conn_request: ConnectionRequest) -> dict:
476470 if resp .hostname is None :
477471 raise ValueError (f"Failed to parse host from URI: { uri } " )
478472
479- res = dict (
473+ res = InternalConnectionRequest (
480474 host = resp .hostname ,
481475 # if no port is explicitly present in the uri, the resp.port is None
482476 port = _sanitize_port (uri , resp .port ),
483477 route = resp .path or "/" , # resource shouldn't be empty string
484478 use_ssl = uri .startswith ("wss" ),
485479 reconnect_interval = reconnect_interval ,
486- headers = conn_request .headers ,
480+ headers = rapidjson . dumps ( conn_request .headers ) if conn_request . headers else "" ,
487481 persistent = conn_request .persistent ,
488482 action = conn_request .action .name ,
489483 on_connect_payload = conn_request .on_connect_payload ,
@@ -537,7 +531,9 @@ def subscribe(
537531 adapter_props = AdapterInfo (caller_id = caller_id , is_subscribe = True ).model_dump ()
538532 connection_request = csp .null_ts (List [ConnectionRequest ]) if connection_request is None else connection_request
539533 request_dict = csp .apply (
540- connection_request , lambda conn_reqs : [self ._get_properties (conn_req ) for conn_req in conn_reqs ], list
534+ connection_request ,
535+ lambda conn_reqs : [self ._get_properties (conn_req ) for conn_req in conn_reqs ],
536+ List [InternalConnectionRequest ],
541537 )
542538 # Output adapter to handle connection requests
543539 _websocket_connection_request_adapter_def (self , request_dict , adapter_props )
@@ -566,7 +562,9 @@ def send(self, x: ts["T"], connection_request: Optional[ts[List[ConnectionReques
566562 adapter_props = AdapterInfo (caller_id = caller_id , is_subscribe = False ).model_dump ()
567563 connection_request = csp .null_ts (List [ConnectionRequest ]) if connection_request is None else connection_request
568564 request_dict = csp .apply (
569- connection_request , lambda conn_reqs : [self ._get_properties (conn_req ) for conn_req in conn_reqs ], list
565+ connection_request ,
566+ lambda conn_reqs : [self ._get_properties (conn_req ) for conn_req in conn_reqs ],
567+ List [InternalConnectionRequest ],
570568 )
571569 _websocket_connection_request_adapter_def (self , request_dict , adapter_props )
572570 return _websocket_output_adapter_def (self , x , adapter_props )
@@ -614,6 +612,6 @@ def _create(self, engine, memo):
614612 "websocket_connection_request_adapter" ,
615613 _websocketadapterimpl ._websocket_connection_request_adapter ,
616614 WebsocketAdapterManager ,
617- input = ts [list ], # needed, List[dict] didn't work on c++ level
615+ input = ts [List [ InternalConnectionRequest ] ], # needed, List[dict] didn't work on c++ level
618616 properties = dict ,
619617)
0 commit comments