11# Built-in Imports
2- from typing import Literal , Callable , Dict
2+ from typing import Literal , Callable , Dict , Any
33import os
44import logging
55import collections
1818# Third-party Imports
1919
2020# Internal Imports
21+ from . import socket_handling as sh
2122from .utils import create_payload , decode_payload
2223from . import enums
24+ from . import _logger
2325
24- logger = logging .getLogger ("chimerapy" )
26+ logger = _logger .getLogger ("chimerapy-networking " )
2527
2628
2729class Client (threading .Thread ):
@@ -55,6 +57,8 @@ def __init__(
5557
5658 # Creating tempfolder to host items
5759 self .tempfolder = pathlib .Path (tempfile .mkdtemp ())
60+ self .handlers .update ({enums .FILE_TRANSFER_START : self .receive_file })
61+ self .file_transfer_records = collections .defaultdict (dict )
5862
5963 # Create the socket and connect
6064 try :
@@ -115,69 +119,32 @@ def run(self):
115119 # Continue checking for messages from client until not running
116120 while self .is_running .is_set ():
117121
118- # Check if the socket is closed
119- if self .socket .fileno () == - 1 :
120- break
121-
122- # Get message while not blocking
122+ # Monitor the socket
123123 try :
124- # Get the data
125- bs = self .socket .recv (8 )
126-
127- # If null, skip
128- if bs == b"" :
129- continue
130-
131- # Get the length
132- (length ,) = struct .unpack (">Q" , bs )
133- data = b""
134-
135- # Use the length to get the entire message
136- while len (data ) < int (length ):
137- to_read = int (length ) - len (data )
138- data += self .socket .recv (4096 if to_read > 4096 else to_read )
139-
140- except socket .timeout :
141- continue
142-
143- # If end, stop it
144- if data == b"" :
124+ success , msg = sh .monitor (self .name , self .socket )
125+ except (ConnectionResetError , ConnectionAbortedError ):
145126 break
146127
147- # Decode the message so we can process it
148- msg = decode_payload (data )
149-
150128 # Process the msg
151- self .process_msg (msg )
129+ if success :
130+ self .process_msg (msg )
152131
153132 self .socket .close ()
154133
155134 def send (self , msg : Dict , ack : bool = False ):
156135
157- # Create an uuid to track the message
158- msg_uuid = str (uuid .uuid4 ())
159-
160- # Convert msg data to bytes
161- msg_bytes , msg_length = create_payload (
162- type = self .sender_msg_type ,
163- signal = msg ["signal" ],
164- data = msg ["data" ],
136+ # Sending the data
137+ success , msg_uuid = sh .send (
138+ name = self .name ,
139+ s = self .socket ,
140+ msg = msg ,
141+ sender_msg_type = self .sender_msg_type ,
165142 ack = ack ,
166- provided_uuid = msg_uuid ,
167143 )
168144
169- # Send the message
170- try :
171- self .socket .sendall (msg_length + msg_bytes )
172- logger .debug (f"{ self } : send { msg ['signal' ]} " )
173- except socket .timeout :
174- logger .debug (f"{ self } : Socket Timeout: skipping" )
175- return
176- except :
177- logger .debug (
178- f"{ self } : Broken Pipe Error, handled for { msg ['signal' ]} " , exc_info = True
179- )
180- return
145+ # If not successful, skip ACK
146+ if not success :
147+ return None
181148
182149 # If requested ACK, wait
183150 if ack :
@@ -198,61 +165,40 @@ def send(self, msg: Dict, ack: bool = False):
198165
199166 miss_counter += 1
200167
201- def send_folder (self , name : str , dir : pathlib .Path , buffersize : int = 4096 ):
202-
203- assert (
204- dir .is_dir () and dir .exists ()
205- ), f"Sending { dir } needs to be a folder that exists."
206-
207- # First, we need to archive the folder into a zip file
208- format = "zip"
209- shutil .make_archive (str (dir ), format , dir .parent , dir .name )
210- zip_file = dir .parent / f"{ dir .name } .{ format } "
211-
212- # Relocate zip to the tempfolder
213- temp_zip_file = self .tempfolder / f"_{ zip_file .name } "
214- shutil .move (zip_file , temp_zip_file )
215-
216- # Get information about the filesize
217- filesize = os .path .getsize (temp_zip_file )
218- max_num_steps = math .ceil (filesize / buffersize )
219-
220- # Now start the process of sending content to the server
221- # First, we send the message inciting file transfer
222- init_msg = {
223- "type" : enums .WORKER_MESSAGE ,
224- "signal" : enums .FILE_TRANSFER_START ,
225- "data" : {
226- "name" : name ,
227- "filename" : f"{ dir .name } .{ format } " ,
228- "filesize" : filesize ,
229- "buffersize" : buffersize ,
230- "max_num_steps" : max_num_steps ,
231- },
232- }
233- self .send (init_msg )
234- logger .debug (f"{ self } : sent file transfer initialization" )
235-
236- # Having counter tracking number of messages
237- msg_counter = 1
238-
239- with open (temp_zip_file , "rb" ) as f :
240- while True :
241-
242- # Read the data to be sent
243- data = f .read (buffersize )
244- if not data :
245- break
168+ def receive_file (self , msg : Dict [str , Any ]):
169+
170+ # Obtain the file
171+ success , sender_name , dst_filepath = sh .file_transfer_receive (
172+ name = self .name , s = self .socket , msg = msg , tempfolder = self .tempfolder
173+ )
174+
175+ # Create a record of the files transferred and from whom
176+ if success :
177+ self .file_transfer_records [sender_name ][dst_filepath .name ] = dst_filepath
246178
247- logger .debug (
248- f"{ self } : file transfer, step { msg_counter } /{ max_num_steps } "
249- )
179+ def send_file (self , name : str , filepath : pathlib .Path ):
250180
251- # Send the data
252- self .socket .sendall (data )
253- msg_counter += 1
181+ assert filepath .exists () and filepath .is_file ()
254182
255- logger .debug (f"{ self } : finished file transfer" )
183+ sh .send_file (
184+ sender_name = name ,
185+ sender_msg_type = self .sender_msg_type ,
186+ s = self .socket ,
187+ filepath = filepath ,
188+ buffersize = 4096 ,
189+ )
190+
191+ def send_folder (self , name : str , folderpath : pathlib .Path ):
192+
193+ assert folderpath .exists () and folderpath .is_dir ()
194+
195+ sh .send_folder (
196+ name = name ,
197+ s = self .socket ,
198+ dir = folderpath ,
199+ tempfolder = self .tempfolder ,
200+ sender_msg_type = self .sender_msg_type ,
201+ )
256202
257203 def shutdown (self ):
258204
0 commit comments