@@ -232,7 +232,6 @@ class _UpdateRequestHandler(SocketServer.StreamRequestHandler):
232232 def handle (self ):
233233 from pyspark .accumulators import _accumulatorRegistry
234234 auth_token = self .server .auth_token
235-
236235 def poll (func ):
237236 while not self .server .server_shutdown :
238237 # Poll every 1 second for new data -- don't block in case of shutdown.
@@ -259,16 +258,14 @@ def authenticate_and_accum_updates():
259258 # we've authenticated, we can break out of the first loop now
260259 return True
261260 else :
262- raise Exception (
263- "The value of the provided token to the AccumulatorServer is not correct." )
261+ raise Exception ("The value of the provided token to the AccumulatorServer is not correct." )
264262
265263 if auth_token is not None :
266264 # first we keep polling till we've received the authentication token
267265 poll (authenticate_and_accum_updates )
268266 # now we've authenticated if needed, don't need to check for the token anymore
269267 poll (accum_updates )
270268
271-
272269class AccumulatorServer (SocketServer .TCPServer ):
273270
274271 def __init__ (self , server_address , RequestHandlerClass , auth_token ):
@@ -286,7 +283,6 @@ def shutdown(self):
286283 SocketServer .TCPServer .shutdown (self )
287284 self .server_close ()
288285
289-
290286def _start_update_server (auth_token ):
291287 """Start a TCP server to receive accumulator updates in a daemon thread, and returns it"""
292288 server = AccumulatorServer (("localhost" , 0 ), _UpdateRequestHandler , auth_token )
0 commit comments