diff --git a/.ci_support/environment.yml b/.ci_support/environment.yml index 2a0903e5..5b852057 100644 --- a/.ci_support/environment.yml +++ b/.ci_support/environment.yml @@ -10,3 +10,5 @@ dependencies: - jinja2 =3.1.2 - paramiko =3.2.0 - tqdm =4.65.0 + - pympipool =0.5.4 + - cloudpickle =2.2.1 diff --git a/pysqa/__init__.py b/pysqa/__init__.py index d8a3f3d2..456047e2 100644 --- a/pysqa/__init__.py +++ b/pysqa/__init__.py @@ -2,6 +2,7 @@ __all__ = [] from pysqa.queueadapter import QueueAdapter +from pysqa.executor.executor import Executor from ._version import get_versions diff --git a/pysqa/executor/__init__.py b/pysqa/executor/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pysqa/executor/__main__.py b/pysqa/executor/__main__.py new file mode 100644 index 00000000..3ce0c989 --- /dev/null +++ b/pysqa/executor/__main__.py @@ -0,0 +1,6 @@ +import sys +from pysqa.executor.backend import command_line + + +if __name__ == "__main__": + command_line(arguments_lst=sys.argv[1:]) diff --git a/pysqa/executor/backend.py b/pysqa/executor/backend.py new file mode 100644 index 00000000..20ab4a27 --- /dev/null +++ b/pysqa/executor/backend.py @@ -0,0 +1,65 @@ +import os +import sys + +from pympipool import PoolExecutor +from pysqa.executor.helper import ( + read_from_file, + deserialize, + write_to_file, + serialize_result, +) + + +def execute_files_from_list(tasks_in_progress_dict, cache_directory, executor): + file_lst = os.listdir(cache_directory) + for file_name_in in file_lst: + key = file_name_in.split(".in.pl")[0] + file_name_out = key + ".out.pl" + if ( + file_name_in.endswith(".in.pl") + and file_name_out not in file_lst + and key not in tasks_in_progress_dict.keys() + ): + funct_dict = read_from_file( + file_name=os.path.join(cache_directory, file_name_in) + ) + apply_dict = deserialize(funct_dict=funct_dict) + for k, v in apply_dict.items(): + tasks_in_progress_dict[k] = executor.submit( + v["fn"], *v["args"], **v["kwargs"] + ) + for k, v in tasks_in_progress_dict.items(): + if v.done(): + write_to_file( + funct_dict=serialize_result(result_dict={k: v.result()}), + state="out", + cache_directory=cache_directory, + ) + + +def execute_tasks(cores, cache_directory): + tasks_in_progress_dict = {} + with PoolExecutor( + max_workers=cores, + oversubscribe=False, + enable_flux_backend=False, + enable_slurm_backend=False, + cwd=cache_directory, + sleep_interval=0.1, + queue_adapter=None, + queue_adapter_kwargs=None, + ) as exe: + while True: + execute_files_from_list( + tasks_in_progress_dict=tasks_in_progress_dict, + cache_directory=cache_directory, + executor=exe, + ) + + +def command_line(arguments_lst=None): + if arguments_lst is None: + arguments_lst = sys.argv[1:] + cores_arg = arguments_lst[arguments_lst.index("--cores") + 1] + path_arg = arguments_lst[arguments_lst.index("--path") + 1] + execute_tasks(cores=cores_arg, cache_directory=path_arg) diff --git a/pysqa/executor/executor.py b/pysqa/executor/executor.py new file mode 100644 index 00000000..836e4de0 --- /dev/null +++ b/pysqa/executor/executor.py @@ -0,0 +1,62 @@ +import os +import queue +from threading import Thread +from concurrent.futures import Future, Executor as FutureExecutor + +from pympipool import cancel_items_in_queue +from pysqa.executor.helper import ( + reload_previous_futures, + find_executed_tasks, + serialize_funct, + write_to_file, +) + + +class Executor(FutureExecutor): + def __init__(self, cwd=None, queue_adapter=None, queue_adapter_kwargs=None): + self._task_queue = queue.Queue() + self._memory_dict = {} + self._cache_directory = os.path.abspath(os.path.expanduser(cwd)) + self._queue_adapter = queue_adapter + reload_previous_futures( + future_queue=self._task_queue, + future_dict=self._memory_dict, + cache_directory=self._cache_directory, + ) + command = ( + "python -m pysqa.executor --cores " + + str(queue_adapter_kwargs["cores"]) + + " --path " + + str(self._cache_directory) + ) + self._queue_id = self._queue_adapter.submit_job( + working_directory=self._cache_directory, + command=command, + **queue_adapter_kwargs + ) + self._process = Thread( + target=find_executed_tasks, + kwargs={ + "future_queue": self._task_queue, + "cache_directory": self._cache_directory, + }, + ) + self._process.start() + + def submit(self, fn, *args, **kwargs): + funct_dict = serialize_funct(fn, *args, **kwargs) + key = list(funct_dict.keys())[0] + if key not in self._memory_dict.keys(): + self._memory_dict[key] = Future() + _ = write_to_file( + funct_dict=funct_dict, state="in", cache_directory=self._cache_directory + )[0] + self._task_queue.put({key: self._memory_dict[key]}) + return self._memory_dict[key] + + def shutdown(self, wait=True, *, cancel_futures=False): + if cancel_futures: + cancel_items_in_queue(que=self._task_queue) + self._task_queue.put({"shutdown": True, "wait": wait}) + self._queue_adapter.delete_job(process_id=self._queue_id) + self._process.join() diff --git a/pysqa/executor/helper.py b/pysqa/executor/helper.py new file mode 100644 index 00000000..0dd250f9 --- /dev/null +++ b/pysqa/executor/helper.py @@ -0,0 +1,102 @@ +import os +import re +import queue +from concurrent.futures import Future + +import hashlib +import cloudpickle + + +def deserialize(funct_dict): + try: + return {k: cloudpickle.loads(v) for k, v in funct_dict.items()} + except EOFError: + return {} + + +def find_executed_tasks(future_queue, cache_directory): + task_memory_dict = {} + while True: + task_dict = {} + try: + task_dict = future_queue.get_nowait() + except queue.Empty: + pass + if "shutdown" in task_dict.keys() and task_dict["shutdown"]: + break + else: + _update_task_dict( + task_dict=task_dict, + task_memory_dict=task_memory_dict, + cache_directory=cache_directory, + ) + + +def read_from_file(file_name): + name = file_name.split("/")[-1].split(".")[0] + with open(file_name, "rb") as f: + return {name: f.read()} + + +def reload_previous_futures(future_queue, future_dict, cache_directory): + file_lst = os.listdir(cache_directory) + for f in file_lst: + if f.endswith(".in.pl"): + key = f.split(".in.pl")[0] + future_dict[key] = Future() + file_name_out = key + ".out.pl" + if file_name_out in file_lst: + _set_future( + file_name=os.path.join(cache_directory, file_name_out), + future=future_dict[key], + ) + else: + future_queue.put({key: future_dict[key]}) + + +def serialize_result(result_dict): + return {k: cloudpickle.dumps(v) for k, v in result_dict.items()} + + +def serialize_funct(fn, *args, **kwargs): + binary = cloudpickle.dumps({"fn": fn, "args": args, "kwargs": kwargs}) + return {fn.__name__ + _get_hash(binary=binary): binary} + + +def write_to_file(funct_dict, state, cache_directory): + file_name_lst = [] + for k, v in funct_dict.items(): + file_name = _get_file_name(name=k, state=state) + file_name_lst.append(file_name) + with open(os.path.join(cache_directory, file_name), "wb") as f: + f.write(v) + return file_name_lst + + +def _get_file_name(name, state): + return name + "." + state + ".pl" + + +def _get_hash(binary): + # Remove specification of jupyter kernel from hash to be deterministic + binary_no_ipykernel = re.sub(b"(?<=/ipykernel_)(.*)(?=/)", b"", binary) + return str(hashlib.md5(binary_no_ipykernel).hexdigest()) + + +def _set_future(file_name, future): + values = deserialize(funct_dict=read_from_file(file_name=file_name)).values() + if len(values) == 1: + future.set_result(list(values)[0]) + + +def _update_task_dict(task_dict, task_memory_dict, cache_directory): + file_lst = os.listdir(cache_directory) + for key, future in task_dict.items(): + task_memory_dict[key] = future + for key, future in task_memory_dict.items(): + file_name_out = _get_file_name(name=key, state="out") + if not future.done() and file_name_out in file_lst: + _set_future( + file_name=os.path.join(cache_directory, file_name_out), + future=future, + ) diff --git a/setup.py b/setup.py index a7dc2c57..18754efb 100644 --- a/setup.py +++ b/setup.py @@ -33,11 +33,13 @@ extras_require={ "sge": ['defusedxml==0.7.1'], "remote": ['paramiko==3.2.0', 'tqdm==4.65.0'], + "executor": ['pympipool==0.5.4', 'cloudpickle==2.2.1'], }, cmdclass=versioneer.get_cmdclass(), entry_points={ "console_scripts": [ - 'pysqa=pysqa:main' + 'pysqa=pysqa.cmd:command_line', + 'pysqa-executor=pysqa.executor.backend:command_line' ] } ) diff --git a/tests/test_executor.py b/tests/test_executor.py new file mode 100644 index 00000000..397e3c04 --- /dev/null +++ b/tests/test_executor.py @@ -0,0 +1,156 @@ +import os +from queue import Queue +from concurrent.futures import Future, ThreadPoolExecutor +import unittest + +from pysqa.executor.backend import execute_files_from_list +from pysqa.executor.executor import Executor +from pysqa.executor.helper import ( + read_from_file, + deserialize, + serialize_funct, + write_to_file, + serialize_result, + reload_previous_futures, + _get_file_name, + _set_future, + _update_task_dict, +) +from pysqa.queueadapter import QueueAdapter + + +def funct_add(a, b): + return a+b + + +@unittest.skipIf(os.name == 'nt', "Runs forever on Windows") +class TestExecutorHelper(unittest.TestCase): + def setUp(self): + self.test_dir = os.path.join(os.path.abspath(os.path.dirname(__file__)), "cache") + os.makedirs(self.test_dir, exist_ok=True) + + def tearDown(self): + for f in os.listdir(self.test_dir): + os.remove(os.path.join(self.test_dir, f)) + os.removedirs(self.test_dir) + + def test_cache(self): + funct_dict = serialize_funct(fn=funct_add, a=1, b=2) + file_name_in = write_to_file( + funct_dict=funct_dict, + state="in", + cache_directory=self.test_dir + )[0] + self.assertEqual(len(os.listdir(self.test_dir)), 1) + funct_dict = read_from_file( + file_name=os.path.join(self.test_dir, file_name_in) + ) + apply_dict = deserialize(funct_dict=funct_dict) + key = list(apply_dict.keys())[0] + v = apply_dict[key] + result_dict = {key: v["fn"].__call__(*v["args"], **v["kwargs"])} + file_name_out = write_to_file( + funct_dict=serialize_result(result_dict=result_dict), + state="out", + cache_directory=self.test_dir, + )[0] + self.assertEqual(len(os.listdir(self.test_dir)), 2) + f = Future() + _set_future( + file_name=os.path.join(self.test_dir, file_name_out), + future=f + ) + self.assertEqual(f.result(), 3) + task_dict = {key: Future()} + _update_task_dict( + task_dict=task_dict, + task_memory_dict={}, + cache_directory=self.test_dir + ) + self.assertEqual(task_dict[key].result(), 3) + + def test_reload_previous_future(self): + funct_dict = serialize_funct(fn=funct_add, a=1, b=2) + file_name_in = write_to_file( + funct_dict=funct_dict, + state="in", + cache_directory=self.test_dir + )[0] + queue = Queue() + future_dict_one = {} + reload_previous_futures( + future_queue=queue, + future_dict=future_dict_one, + cache_directory=self.test_dir + ) + self.assertEqual(len(future_dict_one), 1) + self.assertEqual(list(future_dict_one.keys())[0], file_name_in.split(".in.pl")[0]) + self.assertEqual(len(os.listdir(self.test_dir)), 1) + with ThreadPoolExecutor() as exe: + execute_files_from_list( + tasks_in_progress_dict={}, + cache_directory=self.test_dir, + executor=exe + ) + self.assertEqual(len(os.listdir(self.test_dir)), 2) + future_dict_two = {} + reload_previous_futures( + future_queue=queue, + future_dict=future_dict_two, + cache_directory=self.test_dir + ) + key = list(future_dict_two.keys())[0] + self.assertEqual(len(future_dict_two), 1) + self.assertEqual(key, file_name_in.split(".in.pl")[0]) + self.assertEqual(future_dict_two[key].result(), 3) + + +class TestExecutor(unittest.TestCase): + def setUp(self): + self.test_dir = os.path.join(os.path.abspath(os.path.dirname(__file__)), "executor_cache") + os.makedirs(self.test_dir, exist_ok=True) + + def tearDown(self): + for f in os.listdir(self.test_dir): + os.remove(os.path.join(self.test_dir, f)) + os.removedirs(self.test_dir) + + def test_executor(self): + def execute_command( + commands, + working_directory=None, + split_output=True, + shell=False, + error_filename="pysqa.err", + ): + return str(1) + + queue_adapter = QueueAdapter( + directory=os.path.join(self.test_dir, "../config/slurm"), + execute_command=execute_command + ) + with Executor( + cwd=self.test_dir, + queue_adapter=queue_adapter, + queue_adapter_kwargs={ + "queue": "slurm", + "job_name": "test", + "cores": 1 + } + ) as exe: + fs = exe.submit(fn=funct_add, a=1, b=2) + funct_dict = serialize_funct(fn=funct_add, a=1, b=2) + file_name_in = _get_file_name(name=list(funct_dict.keys())[0], state="in") + funct_dict = read_from_file( + file_name=os.path.join(self.test_dir, file_name_in) + ) + apply_dict = deserialize(funct_dict=funct_dict) + result_dict = { + k: v["fn"].__call__(*v["args"], **v["kwargs"]) for k, v in apply_dict.items() + } + _ = write_to_file( + funct_dict=serialize_result(result_dict=result_dict), + state="out", + cache_directory=self.test_dir, + )[0] + self.assertEqual(fs.result(), 3)