@@ -60,7 +60,7 @@ async def refresh(self):
6060 session = self .sessions [index ]
6161
6262 try :
63- session .connection . send (str .encode (' ' ))
63+ session .writer . write (str .encode (' ' ))
6464 except socket .error :
6565 self .log .debug ('Error occurred when refreshing session %s. Removing from session pool.' , session .id )
6666 del self .sessions [index ]
@@ -73,21 +73,20 @@ async def accept(self, reader, writer):
7373 except Exception as e :
7474 self .log .debug ('Handshake failed: %s' % e )
7575 return
76- connection = writer .get_extra_info ('socket' )
7776 profile ['executors' ] = [e for e in profile ['executors' ].split (',' ) if e ]
7877 profile ['contact' ] = 'tcp'
7978 agent , _ = await self .services .get ('contact_svc' ).handle_heartbeat (** profile )
80- new_session = Session (id = self .generate_number (size = 6 ), paw = agent .paw , connection = connection )
79+ new_session = Session (id = self .generate_number (size = 6 ), paw = agent .paw , reader = reader , writer = writer )
8180 self .sessions .append (new_session )
8281 await self .send (new_session .id , agent .paw , timeout = 5 )
8382
8483 async def send (self , session_id : int , cmd : str , timeout : int = 60 ) -> Tuple [int , str , str , str ]:
8584 try :
86- conn = next (i . connection for i in self .sessions if i .id == int (session_id ))
87- conn . send (str .encode (' ' ))
85+ session = next (i for i in self .sessions if i .id == int (session_id ))
86+ session . writer . write (str .encode (' ' ))
8887 time .sleep (0.01 )
89- conn . send (str .encode ('%s\n ' % cmd ))
90- response = await self ._attempt_connection (session_id , conn , timeout = timeout )
88+ session . writer . write (str .encode ('%s\n ' % cmd ))
89+ response = await self ._attempt_connection (session_id , session . reader , timeout = timeout )
9190 response = json .loads (response )
9291 return response ['status' ], response ['pwd' ], response ['response' ], response .get ('agent_reported_time' , '' )
9392 except Exception as e :
@@ -99,22 +98,17 @@ async def _handshake(reader):
9998 profile_bites = (await reader .readline ()).strip ()
10099 return json .loads (profile_bites )
101100
102- async def _attempt_connection (self , session_id , connection , timeout ):
101+ async def _attempt_connection (self , session_id , reader , timeout ):
103102 buffer = 4096
104103 data = b''
105- waited_seconds = 0
106104 time .sleep (0.1 ) # initial wait for fast operations.
107105 while True :
108106 try :
109- part = connection . recv (buffer )
107+ part = await reader . read (buffer )
110108 data += part
111109 if len (part ) < buffer :
112110 break
113- except BlockingIOError as err :
114- if waited_seconds < timeout :
115- time .sleep (1 )
116- waited_seconds += 1
117- else :
118- self .log .error ("Timeout reached for session %s" , session_id )
119- return json .dumps (dict (status = 1 , pwd = '~$ ' , response = str (err )))
111+ except Exception as err :
112+ self .log .error ("Timeout reached for session %s" , session_id )
113+ return json .dumps (dict (status = 1 , pwd = '~$ ' , response = str (err )))
120114 return str (data , 'utf-8' )
0 commit comments