Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 66 additions & 14 deletions src/sonic-py-common/sonic_py_common/task_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,44 +7,96 @@
#
# ProcessTaskBase =====================================================================
#
class ProcessTaskBase(object): # TODO: put this class to swss-platform-common
def __init__(self):
self.task_process = None
class ProcessTaskBase(object):
"""
Base class for creating an object that gets spawned as a separate process

Child class needs to implement the task_worker method, which should be
designed to return if task_stopping_event is set

"""
def __init__(self, stop_timeout_secs=1):
"""
Initializer

Args:
stop_timeout_secs (int): Number of seconds to wait for process to exit
upon calling task_stop(). If the process fails to stop before the
specified timeout, it will attemp to kill the process via brute
force. If you would like to wait indefinitely, pass in `None`.
"""
self._stop_timeout_secs = stop_timeout_secs
self._task_process = None
self.task_stopping_event = multiprocessing.Event()

def task_worker(self):
pass
raise NotImplementedError

def task_run(self):
if self.task_stopping_event.is_set():
return

self.task_process = multiprocessing.Process(target=self.task_worker)
self.task_process.start()
self._task_process = multiprocessing.Process(target=self.task_worker)
self._task_process.start()

def task_stop(self):
# Signal the process to stop
self.task_stopping_event.set()
os.kill(self.task_process.pid, signal.SIGKILL)

# Wait for the process to exit
self._task_process.join(self._stop_timeout_secs)

# If the process didn't exit, attempt to kill it
if self._task_process.is_alive():
os.kill(self._task_process.pid, signal.SIGKILL)

if self._task_process.is_alive():
return False

return True


#
# ThreadTaskBase =====================================================================
#
class ThreadTaskBase(object): # TODO: put this class to swss-platform-common;
def __init__(self):
self.task_thread = None
class ThreadTaskBase(object):
"""
Base class for creating an object that gets spawned as a separate thread

Child class needs to implement the task_worker method, which should be
designed to return if task_stopping_event is set
"""
def __init__(self, stop_timeout_secs=None):
"""
Initializer

Args:
stop_timeout_secs (int): Number of seconds to wait for thread to exit
upon calling task_stop(). If you would like to wait indefinitely,
pass in None.
"""
self._stop_timeout_secs = stop_timeout_secs
self._task_thread = None
self.task_stopping_event = threading.Event()

def task_worker(self):
pass
raise NotImplementedError

def task_run(self):
if self.task_stopping_event.is_set():
return

self.task_thread = threading.Thread(target=self.task_worker)
self.task_thread.start()
self._task_thread = threading.Thread(target=self.task_worker)
self._task_thread.start()

def task_stop(self):
# Signal the thread to stop
self.task_stopping_event.set()
self.task_thread.join()

# Wait for the thread to exit
self._task_thread.join(self._stop_timeout_secs)

if self._task_thread.is_alive():
return False

return True