1212# See the License for the specific language governing permissions and
1313# limitations under the License.
1414
15+ import os
16+ import time
1517import logging
1618import socket
1719from .log_helper import get_logger
@@ -31,6 +33,8 @@ class ControllerClient(object):
3133 client_name(str): Current client name, random generate for counting client number. Default: None.
3234 """
3335
36+ START = True
37+
3438 def __init__ (self ,
3539 server_ip = None ,
3640 server_port = None ,
@@ -52,23 +56,58 @@ def update(self, tokens, reward, iter):
5256 reward(float): The reward of tokens.
5357 iter(int): The iteration number of current client.
5458 """
59+ ControllerClient .START = False
5560 socket_client = socket .socket (socket .AF_INET , socket .SOCK_STREAM )
56- socket_client .connect ((self .server_ip , self .server_port ))
57- tokens = "," .join ([str (token ) for token in tokens ])
58- socket_client .send ("{}\t {}\t {}\t {}\t {}" .format (
59- self ._key , tokens , reward , iter , self ._client_name ).encode ())
60- response = socket_client .recv (1024 ).decode ()
61- if "ok" in response .strip ('\n ' ).split ("\t " ):
62- return True
61+ errno = socket_client .connect_ex ((self .server_ip , self .server_port ))
62+ if errno != 0 :
63+ _logger .info ("Server is closed!!!" )
64+ os ._exit (0 )
6365 else :
64- return False
66+ tokens = "," .join ([str (token ) for token in tokens ])
67+ socket_client .send ("{}\t {}\t {}\t {}\t {}" .format (
68+ self ._key , tokens , reward , iter , self ._client_name ).encode ())
69+ try :
70+ response = socket_client .recv (1024 ).decode ()
71+ if "ok" in response .strip ('\n ' ).split ("\t " ):
72+ return True
73+ else :
74+ return False
75+ except Exception as err :
76+ _logger .error (err )
77+ os ._exit (0 )
6578
6679 def next_tokens (self ):
6780 """
6881 Get next tokens.
6982 """
70- socket_client = socket .socket (socket .AF_INET , socket .SOCK_STREAM )
71- socket_client .connect ((self .server_ip , self .server_port ))
83+ retry_cnt = 0
84+
85+ if ControllerClient .START :
86+ while True :
87+ socket_client = socket .socket (socket .AF_INET ,
88+ socket .SOCK_STREAM )
89+ errno = socket_client .connect_ex (
90+ (self .server_ip , self .server_port ))
91+ if errno != 0 :
92+ retry_cnt += 1
93+ _logger .info ("Server is NOT ready, wait 10 second to retry" )
94+ time .sleep (10 )
95+ else :
96+ break
97+
98+ if retry_cnt == 6 :
99+ _logger .error (
100+ "Server is NOT ready in 1 minute, please check if it start"
101+ )
102+ os ._exit (errno )
103+
104+ else :
105+ socket_client = socket .socket (socket .AF_INET , socket .SOCK_STREAM )
106+ errno = socket_client .connect_ex ((self .server_ip , self .server_port ))
107+ if errno != 0 :
108+ _logger .info ("Server is closed" )
109+ os ._exit (0 )
110+
72111 socket_client .send ("next_tokens" .encode ())
73112 tokens = socket_client .recv (1024 ).decode ()
74113 tokens = [int (token ) for token in tokens .strip ("\n " ).split ("," )]
@@ -79,7 +118,11 @@ def request_current_info(self):
79118 Request for current information.
80119 """
81120 socket_client = socket .socket (socket .AF_INET , socket .SOCK_STREAM )
82- socket_client .connect ((self .server_ip , self .server_port ))
83- socket_client .send ("current_info" .encode ())
84- current_info = socket_client .recv (1024 ).decode ()
85- return eval (current_info )
121+ errno = socket_client .connect_ex ((self .server_ip , self .server_port ))
122+ if errno != 0 :
123+ _logger .info ("Server is closed" )
124+ return None
125+ else :
126+ socket_client .send ("current_info" .encode ())
127+ current_info = socket_client .recv (1024 ).decode ()
128+ return eval (current_info )
0 commit comments