diff --git a/pysqa/basic.py b/pysqa/basic.py new file mode 100644 index 00000000..6fbc3b84 --- /dev/null +++ b/pysqa/basic.py @@ -0,0 +1,512 @@ +# coding: utf-8 +# Copyright (c) Jan Janssen + +import getpass +import importlib +from jinja2 import Template +import os +import subprocess +import re +import pandas +from pysqa.queues import Queues + + +class BasisQueueAdapter(object): + """ + The goal of the QueueAdapter class is to make submitting to a queue system as easy as starting another sub process + locally. + + Args: + directory (str): directory containing the queue.yaml files as well as corresponding jinja2 templates for the + individual queues. + + Attributes: + + .. attribute:: config + + QueueAdapter configuration read from the queue.yaml file. + + .. attribute:: queue_list + + List of available queues + + .. attribute:: queue_view + + Pandas DataFrame representation of the available queues, read from queue.yaml. + + .. attribute:: queues + + Queues available for auto completion QueueAdapter().queues. returns the queue name. + """ + + def __init__(self, config, directory="~/.queues"): + self._config = config + self._fill_queue_dict(queue_lst_dict=self._config["queues"]) + self._load_templates(queue_lst_dict=self._config["queues"], directory=directory) + if self._config["queue_type"] == "SGE": + class_name = "SunGridEngineCommands" + module_name = "pysqa.wrapper.sge" + elif self._config["queue_type"] == "TORQUE": + class_name = "TorqueCommands" + module_name = "pysqa.wrapper.torque" + elif self._config["queue_type"] == "SLURM": + class_name = "SlurmCommands" + module_name = "pysqa.wrapper.slurm" + elif self._config["queue_type"] == "LSF": + class_name = "LsfCommands" + module_name = "pysqa.wrapper.lsf" + elif self._config["queue_type"] == "MOAB": + class_name = "MoabCommands" + module_name = "pysqa.wrapper.moab" + else: + raise ValueError() + self._commands = getattr(importlib.import_module(module_name), class_name)() + self._queues = Queues(self.queue_list) + + @property + def config(self): + """ + + Returns: + dict: + """ + return self._config + + @property + def queue_list(self): + """ + + Returns: + list: + """ + return list(self._config["queues"].keys()) + + @property + def queue_view(self): + """ + + Returns: + pandas.DataFrame: + """ + return pandas.DataFrame(self._config["queues"]).T.drop( + ["script", "template"], axis=1 + ) + + @property + def queues(self): + return self._queues + + def submit_job( + self, + queue=None, + job_name=None, + working_directory=None, + cores=None, + memory_max=None, + run_time_max=None, + command=None, + ): + """ + + Args: + queue (str/None): + job_name (str/None): + working_directory (str/None): + cores (int/None): + memory_max (int/None): + run_time_max (int/None): + command (str/None): + + Returns: + int: + """ + working_directory, queue_script_path = self._write_queue_script( + queue=queue, + job_name=job_name, + working_directory=working_directory, + cores=cores, + memory_max=memory_max, + run_time_max=run_time_max, + command=command, + ) + out = self._execute_command( + commands=self._commands.submit_job_command + [queue_script_path], + working_directory=working_directory, + split_output=False, + ) + if out is not None: + return self._commands.get_job_id_from_output(out) + else: + return None + + def enable_reservation(self, process_id): + """ + + Args: + process_id (int): + + Returns: + str: + """ + out = self._execute_command( + commands=self._commands.enable_reservation_command + [str(process_id)], + split_output=True, + ) + if out is not None: + return out[0] + else: + return None + + def delete_job(self, process_id): + """ + + Args: + process_id (int): + + Returns: + str: + """ + out = self._execute_command( + commands=self._commands.delete_job_command + [str(process_id)], + split_output=True, + ) + if out is not None: + return out[0] + else: + return None + + def get_queue_status(self, user=None): + """ + + Args: + user (str): + + Returns: + pandas.DataFrame: + """ + out = self._execute_command( + commands=self._commands.get_queue_status_command, split_output=False + ) + df = self._commands.convert_queue_status(queue_status_output=out) + if user is None: + return df + else: + return df[df["user"] == user] + + def get_status_of_my_jobs(self): + """ + + Returns: + pandas.DataFrame: + """ + return self.get_queue_status(user=self._get_user()) + + def get_status_of_job(self, process_id): + """ + + Args: + process_id: + + Returns: + str: ['running', 'pending', 'error'] + """ + df = self.get_queue_status() + df_selected = df[df["jobid"] == process_id]["status"] + if len(df_selected) != 0: + return df_selected.values[0] + else: + return None + + def get_status_of_jobs(self, process_id_lst): + """ + + Args: + process_id_lst: + + Returns: + list: ['running', 'pending', 'error', ...] + """ + df = self.get_queue_status() + results_lst = [] + for process_id in process_id_lst: + df_selected = df[df["jobid"] == process_id]["status"] + if len(df_selected) != 0: + results_lst.append(df_selected.values[0]) + else: + results_lst.append("finished") + return results_lst + + def check_queue_parameters( + self, queue, cores=1, run_time_max=None, memory_max=None, active_queue=None + ): + """ + + Args: + queue (str/None): + cores (int): + run_time_max (int/None): + memory_max (int/None): + active_queue (dict): + + Returns: + list: [cores, run_time_max, memory_max] + """ + if active_queue is None: + active_queue = self._config["queues"][queue] + cores = self._value_in_range( + value=cores, + value_min=active_queue["cores_min"], + value_max=active_queue["cores_max"], + ) + run_time_max = self._value_in_range( + value=run_time_max, value_max=active_queue["run_time_max"] + ) + memory_max = self._value_in_range( + value=memory_max, value_max=active_queue["memory_max"] + ) + return cores, run_time_max, memory_max + + def _write_queue_script( + self, + queue=None, + job_name=None, + working_directory=None, + cores=None, + memory_max=None, + run_time_max=None, + command=None, + ): + """ + + Args: + queue (str/None): + job_name (str/None): + working_directory (str/None): + cores (int/None): + memory_max (int/None): + run_time_max (int/None): + command (str/None): + + """ + if isinstance(command, list): + command = "".join(command) + if working_directory is None: + working_directory = "." + queue_script = self._job_submission_template( + queue=queue, + job_name=job_name, + working_directory=working_directory, + cores=cores, + memory_max=memory_max, + run_time_max=run_time_max, + command=command, + ) + queue_script_path = os.path.join(working_directory, "run_queue.sh") + with open(queue_script_path, "w") as f: + f.writelines(queue_script) + return working_directory, queue_script_path + + def _job_submission_template( + self, + queue=None, + job_name="job.py", + working_directory=".", + cores=None, + memory_max=None, + run_time_max=None, + command=None, + ): + """ + + Args: + queue (str/None): + job_name (str): + working_directory (str): + cores (int/None): + memory_max (int/None): + run_time_max (int/None): + command (str/None): + + Returns: + str: + """ + if queue is None: + queue = self._config["queue_primary"] + self._value_error_if_none(value=command) + if queue not in self.queue_list: + raise ValueError() + active_queue = self._config["queues"][queue] + cores, run_time_max, memory_max = self.check_queue_parameters( + queue=None, + cores=cores, + run_time_max=run_time_max, + memory_max=memory_max, + active_queue=active_queue, + ) + template = active_queue["template"] + return template.render( + job_name=job_name, + working_directory=working_directory, + cores=cores, + memory_max=memory_max, + run_time_max=run_time_max, + command=command, + ) + + @staticmethod + def _get_user(): + """ + + Returns: + str: + """ + return getpass.getuser() + + @staticmethod + def _execute_command(commands, working_directory=None, split_output=True): + """ + + Args: + commands (list/str): + working_directory (str): + split_output (bool): + + Returns: + str: + """ + try: + out = subprocess.check_output( + commands, + cwd=working_directory, + stderr=subprocess.STDOUT, + universal_newlines=True, + shell=not isinstance(commands, list), + ) + except subprocess.CalledProcessError: + out = None + if out is not None and split_output: + return out.split("\n") + else: + return out + + @staticmethod + def _fill_queue_dict(queue_lst_dict): + """ + + Args: + queue_lst_dict (dict): + """ + queue_keys = ["cores_min", "cores_max", "run_time_max", "memory_max"] + for queue_dict in queue_lst_dict.values(): + for key in set(queue_keys) - set(queue_dict.keys()): + queue_dict[key] = None + + @staticmethod + def _load_templates(queue_lst_dict, directory="."): + """ + + Args: + queue_lst_dict (dict): + directory (str): + """ + for queue_dict in queue_lst_dict.values(): + with open(os.path.join(directory, queue_dict["script"]), "r") as f: + queue_dict["template"] = Template(f.read()) + + @staticmethod + def _value_error_if_none(value): + """ + + Args: + value (str/None): + """ + if value is None: + raise ValueError() + if not isinstance(value, str): + raise TypeError() + + @classmethod + def _value_in_range(cls, value, value_min=None, value_max=None): + """ + + Args: + value (int/float/None): + value_min (int/float/None): + value_max (int/float/None): + + Returns: + int/float/None: + """ + + if value is not None: + value_, value_min_, value_max_ = [ + cls._memory_spec_string_to_value(v) + if v is not None and isinstance(v, str) + else v + for v in (value, value_min, value_max) + ] + # ATTENTION: '60000' is interpreted as '60000M' since default magnitude is 'M' + # ATTENTION: int('60000') is interpreted as '60000B' since _memory_spec_string_to_value return the size in + # ATTENTION: bytes, as target_magnitude = 'b' + # We want to compare the the actual (k,m,g)byte value if there is any + if value_min_ is not None and value_ < value_min_: + return value_min + if value_max_ is not None and value_ > value_max_: + return value_max + return value + else: + if value_min is not None: + return value_min + if value_max is not None: + return value_max + return value + + @staticmethod + def _is_memory_string(value): + """ + Tests a string if it specifies a certain amount of memory e.g.: '20G', '60b'. Also pure integer strings are + also valid. + + Args: + value (str): the string to test + + Returns: + (bool): A boolean value if the string matches a memory specification + """ + memory_spec_pattern = r"[0-9]+[bBkKmMgGtT]?" + return re.findall(memory_spec_pattern, value)[0] == value + + @classmethod + def _memory_spec_string_to_value( + cls, value, default_magnitude="m", target_magnitude="b" + ): + """ + Converts a valid memory string (tested by _is_memory_string) into an integer/float value of desired + magnitude `default_magnitude`. If it is a plain integer string (e.g.: '50000') it will be interpreted with + the magnitude passed in by the `default_magnitude`. The output will rescaled to `target_magnitude` + + Args: + value (str): the string + default_magnitude (str): magnitude for interpreting plain integer strings [b, B, k, K, m, M, g, G, t, T] + target_magnitude (str): to which the output value should be converted [b, B, k, K, m, M, g, G, t, T] + + Returns: + (float/int): the value of the string in `target_magnitude` units + """ + magnitude_mapping = {"b": 0, "k": 1, "m": 2, "g": 3, "t": 4} + if cls._is_memory_string(value): + integer_pattern = r"[0-9]+" + magnitude_pattern = r"[bBkKmMgGtT]+" + integer_value = int(re.findall(integer_pattern, value)[0]) + + magnitude = re.findall(magnitude_pattern, value) + if len(magnitude) > 0: + magnitude = magnitude[0].lower() + else: + magnitude = default_magnitude.lower() + # Convert it to default magnitude = megabytes + return (integer_value * 1024 ** magnitude_mapping[magnitude]) / ( + 1024 ** magnitude_mapping[target_magnitude] + ) + else: + return value diff --git a/pysqa/modular.py b/pysqa/modular.py new file mode 100644 index 00000000..f789aa76 --- /dev/null +++ b/pysqa/modular.py @@ -0,0 +1,151 @@ +# coding: utf-8 +# Copyright (c) Jan Janssen + +import pandas +from pysqa.basic import BasisQueueAdapter + + +class ModularQueueAdapter(BasisQueueAdapter): + def __init__(self, config, directory="~/.queues"): + config["queue_type"] = "SLURM" + super(ModularQueueAdapter, self).__init__(config=config, directory=directory) + self._queue_to_cluster_dict = { + k: v["cluster"] for k, v in self._config["queues"].items() + } + for v in self._queue_to_cluster_dict.values(): + if v not in self._config["cluster"]: + raise ValueError() + + def submit_job( + self, + queue=None, + job_name=None, + working_directory=None, + cores=None, + memory_max=None, + run_time_max=None, + command=None, + ): + """ + + Args: + queue (str/None): + job_name (str/None): + working_directory (str/None): + cores (int/None): + memory_max (int/None): + run_time_max (int/None): + command (str/None): + + Returns: + int: + """ + working_directory, queue_script_path = self._write_queue_script( + queue=queue, + job_name=job_name, + working_directory=working_directory, + cores=cores, + memory_max=memory_max, + run_time_max=run_time_max, + command=command, + ) + cluster_module = self._queue_to_cluster_dict["queue"] + commands = ( + ["module --quiet swap cluster/{};".format(cluster_module)] + + self._commands.submit_job_command + + [queue_script_path] + ) + out = self._execute_command( + commands=commands, working_directory=working_directory, split_output=False + ) + if out is not None: + cluster_queue_id = self._commands.get_job_id_from_output(out) + cluster_queue_id *= 10 + cluster_queue_id += self._config["cluster"].index(cluster_module) + return cluster_queue_id + else: + return None + + def enable_reservation(self, process_id): + """ + + Args: + process_id (int): + + Returns: + str: + """ + cluster_module, cluster_queue_id = self._resolve_queue_id( + process_id=process_id, cluster_dict=self._config["cluster"] + ) + cluster_commands = self._switch_cluster_command(cluster_module=cluster_module) + commands = ( + cluster_commands + + self._commands.enable_reservation_command + + [str(cluster_queue_id)] + ) + out = self._execute_command(commands=commands, split_output=True) + if out is not None: + return out[0] + else: + return None + + def delete_job(self, process_id): + """ + + Args: + process_id (int): + + Returns: + str: + """ + cluster_module, cluster_queue_id = self._resolve_queue_id( + process_id=process_id, cluster_dict=self._config["cluster"] + ) + cluster_commands = self._switch_cluster_command(cluster_module=cluster_module) + commands = ( + cluster_commands + + self._commands.delete_job_command + + [str(cluster_queue_id)] + ) + out = self._execute_command(commands=commands, split_output=True) + if out is not None: + return out[0] + else: + return None + + def get_queue_status(self, user=None): + """ + + Args: + user (str): + + Returns: + pandas.DataFrame: + """ + df_lst = [] + for cluster_module in self._config["cluster"]: + cluster_commands = self._switch_cluster_command( + cluster_module=cluster_module + ) + out = self._execute_command( + commands=cluster_commands + self._commands.get_queue_status_command, + split_output=False, + ) + df = self._commands.convert_queue_status(queue_status_output=out) + df_lst.append(df) + df = pandas.concat(df_lst, axis=1, sort=False).reset_index(drop=True) + if user is None: + return df + else: + return df[df["user"] == user] + + @staticmethod + def _resolve_queue_id(process_id, cluster_dict): + cluster_queue_id = int(process_id / 10) + cluster_module = cluster_dict[process_id - cluster_queue_id * 10] + return cluster_module, cluster_queue_id + + @staticmethod + def _switch_cluster_command(cluster_module): + return ["module --quiet swap cluster/{};".format(cluster_module)] diff --git a/pysqa/queueadapter.py b/pysqa/queueadapter.py index 96e27241..09bf5c49 100644 --- a/pysqa/queueadapter.py +++ b/pysqa/queueadapter.py @@ -1,14 +1,10 @@ # coding: utf-8 # Copyright (c) Jan Janssen -import getpass -import importlib -from jinja2 import Template import os -import pandas -import subprocess import yaml -import re +from pysqa.basic import BasisQueueAdapter +from pysqa.modular import ModularQueueAdapter __author__ = "Jan Janssen" __copyright__ = "Copyright 2019, Jan Janssen" @@ -46,32 +42,14 @@ class QueueAdapter(object): Queues available for auto completion QueueAdapter().queues. returns the queue name. """ - def __init__(self, directory="~/.queues"): - self._config = self._read_config( - file_name=os.path.join(directory, "queue.yaml") - ) - self._fill_queue_dict(queue_lst_dict=self._config["queues"]) - self._load_templates(queue_lst_dict=self._config["queues"], directory=directory) - if self._config["queue_type"] == "SGE": - class_name = "SunGridEngineCommands" - module_name = "pysqa.wrapper.sge" - elif self._config["queue_type"] == "TORQUE": - class_name = "TorqueCommands" - module_name = "pysqa.wrapper.torque" - elif self._config["queue_type"] == "SLURM": - class_name = "SlurmCommands" - module_name = "pysqa.wrapper.slurm" - elif self._config["queue_type"] == "LSF": - class_name = "LsfCommands" - module_name = "pysqa.wrapper.lsf" - elif self._config["queue_type"] == "MOAB": - class_name = "MoabCommands" - module_name = "pysqa.wrapper.moab" + config = self._read_config(file_name=os.path.join(directory, "queue.yaml")) + if config["queue_type"] in ["SGE", "TORQUE", "SLURM", "LSF", "MOAB"]: + self._adapter = BasisQueueAdapter(config=config, directory=directory) + elif config["queue_type"] in ["GENT"]: + self._adapter = ModularQueueAdapter(config=config, directory=directory) else: - raise ValueError() - self._commands = getattr(importlib.import_module(module_name), class_name)() - self._queues = Queues(self.queue_list) + raise ValueError @property def config(self): @@ -80,7 +58,7 @@ def config(self): Returns: dict: """ - return self._config + return self._adapter.config @property def queue_list(self): @@ -89,7 +67,7 @@ def queue_list(self): Returns: list: """ - return list(self._config["queues"].keys()) + return self._adapter.queue_list @property def queue_view(self): @@ -98,13 +76,11 @@ def queue_view(self): Returns: pandas.DataFrame: """ - return pandas.DataFrame(self._config["queues"]).T.drop( - ["script", "template"], axis=1 - ) + return self._adapter.queue_view @property def queues(self): - return self._queues + return self._adapter.queues def submit_job( self, @@ -130,11 +106,7 @@ def submit_job( Returns: int: """ - if isinstance(command, list): - command = "".join(command) - if working_directory is None: - working_directory = "." - queue_script = self._job_submission_template( + return self._adapter.submit_job( queue=queue, job_name=job_name, working_directory=working_directory, @@ -143,18 +115,6 @@ def submit_job( run_time_max=run_time_max, command=command, ) - queue_script_path = os.path.join(working_directory, "run_queue.sh") - with open(queue_script_path, "w") as f: - f.writelines(queue_script) - out = self._execute_command( - commands_lst=self._commands.submit_job_command + [queue_script_path], - working_directory=working_directory, - split_output=False, - ) - if out is not None: - return self._commands.get_job_id_from_output(out) - else: - return None def enable_reservation(self, process_id): """ @@ -165,14 +125,7 @@ def enable_reservation(self, process_id): Returns: str: """ - out = self._execute_command( - commands_lst=self._commands.enable_reservation_command + [str(process_id)], - split_output=True, - ) - if out is not None: - return out[0] - else: - return None + return self._adapter.enable_reservation(process_id=process_id) def delete_job(self, process_id): """ @@ -183,14 +136,7 @@ def delete_job(self, process_id): Returns: str: """ - out = self._execute_command( - commands_lst=self._commands.delete_job_command + [str(process_id)], - split_output=True, - ) - if out is not None: - return out[0] - else: - return None + return self._adapter.delete_job(process_id=process_id) def get_queue_status(self, user=None): """ @@ -201,14 +147,7 @@ def get_queue_status(self, user=None): Returns: pandas.DataFrame: """ - out = self._execute_command( - commands_lst=self._commands.get_queue_status_command, split_output=False - ) - df = self._commands.convert_queue_status(queue_status_output=out) - if user is None: - return df - else: - return df[df["user"] == user] + return self._adapter.get_queue_status(user=user) def get_status_of_my_jobs(self): """ @@ -216,7 +155,7 @@ def get_status_of_my_jobs(self): Returns: pandas.DataFrame: """ - return self.get_queue_status(user=self._get_user()) + return self._adapter.get_status_of_my_jobs() def get_status_of_job(self, process_id): """ @@ -227,12 +166,7 @@ def get_status_of_job(self, process_id): Returns: str: ['running', 'pending', 'error'] """ - df = self.get_queue_status() - df_selected = df[df["jobid"] == process_id]["status"] - if len(df_selected) != 0: - return df_selected.values[0] - else: - return None + return self._adapter.get_status_of_job(process_id=process_id) def get_status_of_jobs(self, process_id_lst): """ @@ -243,15 +177,7 @@ def get_status_of_jobs(self, process_id_lst): Returns: list: ['running', 'pending', 'error', ...] """ - df = self.get_queue_status() - results_lst = [] - for process_id in process_id_lst: - df_selected = df[df["jobid"] == process_id]["status"] - if len(df_selected) != 0: - results_lst.append(df_selected.values[0]) - else: - results_lst.append("finished") - return results_lst + return self._adapter.get_status_of_jobs(process_id_lst=process_id_lst) def check_queue_parameters( self, queue, cores=1, run_time_max=None, memory_max=None, active_queue=None @@ -268,110 +194,13 @@ def check_queue_parameters( Returns: list: [cores, run_time_max, memory_max] """ - if active_queue is None: - active_queue = self._config["queues"][queue] - cores = self._value_in_range( - value=cores, - value_min=active_queue["cores_min"], - value_max=active_queue["cores_max"], - ) - run_time_max = self._value_in_range( - value=run_time_max, value_max=active_queue["run_time_max"] - ) - memory_max = self._value_in_range( - value=memory_max, value_max=active_queue["memory_max"] - ) - return cores, run_time_max, memory_max - - def _job_submission_template( - self, - queue=None, - job_name="job.py", - working_directory=".", - cores=None, - memory_max=None, - run_time_max=None, - command=None, - ): - """ - - Args: - queue (str/None): - job_name (str): - working_directory (str): - cores (int/None): - memory_max (int/None): - run_time_max (int/None): - command (str/None): - - Returns: - str: - """ - if queue is None: - queue = self._config["queue_primary"] - self._value_error_if_none(value=command) - if queue not in self.queue_list: - raise ValueError() - active_queue = self._config["queues"][queue] - cores, run_time_max, memory_max = self.check_queue_parameters( - queue=None, + return self._adapter.check_queue_parameters( + queue=queue, cores=cores, run_time_max=run_time_max, memory_max=memory_max, active_queue=active_queue, ) - template = active_queue["template"] - return template.render( - job_name=job_name, - working_directory=working_directory, - cores=cores, - memory_max=memory_max, - run_time_max=run_time_max, - command=command, - ) - - @staticmethod - def _get_user(): - """ - - Returns: - str: - """ - return getpass.getuser() - - @staticmethod - def _execute_command(commands_lst, working_directory=None, split_output=True): - """ - - Args: - commands_lst (list): - working_directory (str): - split_output (bool): - - Returns: - str: - """ - if working_directory is None: - try: - out = subprocess.check_output( - commands_lst, stderr=subprocess.STDOUT, universal_newlines=True - ) - except subprocess.CalledProcessError: - out = None - else: - try: - out = subprocess.check_output( - commands_lst, - cwd=working_directory, - stderr=subprocess.STDOUT, - universal_newlines=True, - ) - except subprocess.CalledProcessError: - out = None - if out is not None and split_output: - return out.split("\n") - else: - return out @staticmethod def _read_config(file_name="queue.yaml"): @@ -385,144 +214,3 @@ def _read_config(file_name="queue.yaml"): """ with open(file_name, "r") as f: return yaml.load(f, Loader=yaml.FullLoader) - - @staticmethod - def _fill_queue_dict(queue_lst_dict): - """ - - Args: - queue_lst_dict (dict): - """ - queue_keys = ["cores_min", "cores_max", "run_time_max", "memory_max"] - for queue_dict in queue_lst_dict.values(): - for key in set(queue_keys) - set(queue_dict.keys()): - queue_dict[key] = None - - @staticmethod - def _load_templates(queue_lst_dict, directory="."): - """ - - Args: - queue_lst_dict (dict): - directory (str): - """ - for queue_dict in queue_lst_dict.values(): - with open(os.path.join(directory, queue_dict["script"]), "r") as f: - queue_dict["template"] = Template(f.read()) - - @staticmethod - def _value_error_if_none(value): - """ - - Args: - value (str/None): - """ - if value is None: - raise ValueError() - if not isinstance(value, str): - raise TypeError() - - @classmethod - def _value_in_range(cls, value, value_min=None, value_max=None): - """ - - Args: - value (int/float/None): - value_min (int/float/None): - value_max (int/float/None): - - Returns: - int/float/None: - """ - - if value is not None: - value_, value_min_, value_max_ = [ - cls._memory_spec_string_to_value(v) - if v is not None and isinstance(v, str) - else v - for v in (value, value_min, value_max) - ] - # ATTENTION: '60000' is interpreted as '60000M' since default magnitude is 'M' - # ATTENTION: int('60000') is interpreted as '60000B' since _memory_spec_string_to_value return the size in - # ATTENTION: bytes, as target_magnitude = 'b' - # We want to compare the the actual (k,m,g)byte value if there is any - if value_min_ is not None and value_ < value_min_: - return value_min - if value_max_ is not None and value_ > value_max_: - return value_max - return value - else: - if value_min is not None: - return value_min - if value_max is not None: - return value_max - return value - - @staticmethod - def _is_memory_string(value): - """ - Tests a string if it specifies a certain amount of memory e.g.: '20G', '60b'. Also pure integer strings are - also valid. - - Args: - value (str): the string to test - - Returns: - (bool): A boolean value if the string matches a memory specification - """ - memory_spec_pattern = r"[0-9]+[bBkKmMgGtT]?" - return re.findall(memory_spec_pattern, value)[0] == value - - @classmethod - def _memory_spec_string_to_value( - cls, value, default_magnitude="m", target_magnitude="b" - ): - """ - Converts a valid memory string (tested by _is_memory_string) into an integer/float value of desired - magnitude `default_magnitude`. If it is a plain integer string (e.g.: '50000') it will be interpreted with - the magnitude passed in by the `default_magnitude`. The output will rescaled to `target_magnitude` - - Args: - value (str): the string - default_magnitude (str): magnitude for interpreting plain integer strings [b, B, k, K, m, M, g, G, t, T] - target_magnitude (str): to which the output value should be converted [b, B, k, K, m, M, g, G, t, T] - - Returns: - (float/int): the value of the string in `target_magnitude` units - """ - magnitude_mapping = {"b": 0, "k": 1, "m": 2, "g": 3, "t": 4} - if cls._is_memory_string(value): - integer_pattern = r"[0-9]+" - magnitude_pattern = r"[bBkKmMgGtT]+" - integer_value = int(re.findall(integer_pattern, value)[0]) - - magnitude = re.findall(magnitude_pattern, value) - if len(magnitude) > 0: - magnitude = magnitude[0].lower() - else: - magnitude = default_magnitude.lower() - # Convert it to default magnitude = megabytes - return (integer_value * 1024 ** magnitude_mapping[magnitude]) / ( - 1024 ** magnitude_mapping[target_magnitude] - ) - else: - return value - - -class Queues(object): - """ - Queues is an abstract class simply to make the list of queues available for auto completion. This is mainly used in - interactive environments like jupyter. - """ - - def __init__(self, list_of_queues): - self._list_of_queues = list_of_queues - - def __getattr__(self, item): - if item in self._list_of_queues: - return item - else: - raise AttributeError - - def __dir__(self): - return self._list_of_queues diff --git a/pysqa/queues.py b/pysqa/queues.py new file mode 100644 index 00000000..c2d28b0c --- /dev/null +++ b/pysqa/queues.py @@ -0,0 +1,21 @@ +# coding: utf-8 +# Copyright (c) Jan Janssen + + +class Queues(object): + """ + Queues is an abstract class simply to make the list of queues available for auto completion. This is mainly used in + interactive environments like jupyter. + """ + + def __init__(self, list_of_queues): + self._list_of_queues = list_of_queues + + def __getattr__(self, item): + if item in self._list_of_queues: + return item + else: + raise AttributeError + + def __dir__(self): + return self._list_of_queues diff --git a/tests/config/gent/queue.yaml b/tests/config/gent/queue.yaml new file mode 100644 index 00000000..7d1dd7dd --- /dev/null +++ b/tests/config/gent/queue.yaml @@ -0,0 +1,5 @@ +queue_type: GENT +queue_primary: slurm +cluster: [cluster1, cluster2, cluster3] +queues: + slurm: {cluster: cluster1, cores_max: 100, cores_min: 10, run_time_max: 259200, script: slurm.sh} diff --git a/tests/config/gent/slurm.sh b/tests/config/gent/slurm.sh new file mode 100644 index 00000000..1529823c --- /dev/null +++ b/tests/config/gent/slurm.sh @@ -0,0 +1,15 @@ +#!/bin/bash +#SBATCH --output=time.out +#SBATCH --job-name={{job_name}} +#SBATCH --workdir= {{working_directory}} +#SBATCH --get-user-env=L +#SBATCH --partition=slurm +{%- if run_time_max %} +#SBATCH --time={{run_time_max}} +{%- endif %} +{%- if memory_max %} +#SBATCH --mem={{memory_max}} +{%- endif %} +#SBATCH --cpus-per-task={{cores}} + +{{command}} \ No newline at end of file diff --git a/tests/test_queueadapter.py b/tests/test_queueadapter.py index 4a1a1b7c..e21bdb45 100644 --- a/tests/test_queueadapter.py +++ b/tests/test_queueadapter.py @@ -6,6 +6,7 @@ import unittest import getpass from pysqa import QueueAdapter +from pysqa.basic import BasisQueueAdapter __author__ = "Jan Janssen" __copyright__ = "Copyright 2019, Jan Janssen" @@ -25,6 +26,7 @@ def setUpClass(cls): cls.lsf = QueueAdapter(directory=os.path.join(cls.path, "config/lsf")) cls.sge = QueueAdapter(directory=os.path.join(cls.path, "config/sge")) cls.moab = QueueAdapter(directory=os.path.join(cls.path, "config/moab")) + cls.gent = QueueAdapter(directory=os.path.join(cls.path, "config/gent")) def test_missing_config(self): self.assertRaises( @@ -42,78 +44,112 @@ def test_config(self): self.assertEqual(self.lsf.config["queue_primary"], "lsf") self.assertEqual(self.sge.config["queue_primary"], "impi_hydra_small") self.assertEqual(self.moab.config["queue_primary"], "moab") + self.assertEqual(self.gent.config["queue_primary"], "slurm") def test_value_in_range(self): self.assertEqual( - None, self.sge._value_in_range(value=None, value_min=None, value_max=None) + None, + self.sge._adapter._value_in_range( + value=None, value_min=None, value_max=None + ), + ) + self.assertEqual( + 1, + self.sge._adapter._value_in_range(value=None, value_min=1, value_max=None), ) self.assertEqual( - 1, self.sge._value_in_range(value=None, value_min=1, value_max=None) + 1, + self.sge._adapter._value_in_range(value=None, value_min=None, value_max=1), ) self.assertEqual( - 1, self.sge._value_in_range(value=None, value_min=None, value_max=1) + 1, + self.sge._adapter._value_in_range(value=1, value_min=None, value_max=None), ) self.assertEqual( - 1, self.sge._value_in_range(value=1, value_min=None, value_max=None) + 1, self.sge._adapter._value_in_range(value=0, value_min=1, value_max=None) ) self.assertEqual( - 1, self.sge._value_in_range(value=0, value_min=1, value_max=None) + 1, self.sge._adapter._value_in_range(value=2, value_min=None, value_max=1) ) self.assertEqual( - 1, self.sge._value_in_range(value=2, value_min=None, value_max=1) + 1, self.sge._adapter._value_in_range(value=1, value_min=0, value_max=2) ) - self.assertEqual(1, self.sge._value_in_range(value=1, value_min=0, value_max=2)) def test_job_submission_template(self): - self.assertRaises(ValueError, self.sge._job_submission_template, command=None) - self.assertRaises(TypeError, self.sge._job_submission_template, command=1) + self.assertRaises( + ValueError, self.sge._adapter._job_submission_template, command=None + ) + self.assertRaises( + TypeError, self.sge._adapter._job_submission_template, command=1 + ) template = ( "#!/bin/bash\n#$ -N job.py\n#$ -wd .\n#$ -pe impi_hydra_small 1\n#$ -l h_rt=604800\n" "#$ -o time.out\n#$ -e error.out\n\npython test.py" ) self.assertEqual( - self.sge._job_submission_template(command="python test.py"), template + self.sge._adapter._job_submission_template(command="python test.py"), + template, ) template = ( "#!/bin/bash\n#BSUB -q queue\n#BSUB -J job.py\n#BSUB -o time.out\n#BSUB -n 10\n#BSUB -cwd .\n" "#BSUB -e error.out\n#BSUB -W 259200\n\npython test.py" ) self.assertEqual( - self.lsf._job_submission_template(command="python test.py"), template + self.lsf._adapter._job_submission_template(command="python test.py"), + template, ) self.assertRaises( ValueError, - self.sge._job_submission_template, + self.sge._adapter._job_submission_template, command="python test.py", queue="notavailable", ) def test_interfaces(self): - self.assertEqual(self.sge._commands.submit_job_command, ["qsub", "-terse"]) - self.assertEqual(self.sge._commands.delete_job_command, ["qdel"]) self.assertEqual( - self.sge._commands.enable_reservation_command, ["qalter", "-R", "y"] + self.sge._adapter._commands.submit_job_command, ["qsub", "-terse"] + ) + self.assertEqual(self.sge._adapter._commands.delete_job_command, ["qdel"]) + self.assertEqual( + self.sge._adapter._commands.enable_reservation_command, + ["qalter", "-R", "y"], + ) + self.assertEqual( + self.sge._adapter._commands.get_queue_status_command, ["qstat", "-xml"] + ) + self.assertEqual( + self.torque._adapter._commands.submit_job_command, ["qsub", "-terse"] + ) + self.assertEqual(self.torque._adapter._commands.delete_job_command, ["qdel"]) + self.assertEqual( + self.torque._adapter._commands.get_queue_status_command, ["qstat", "-x"] ) - self.assertEqual(self.sge._commands.get_queue_status_command, ["qstat", "-xml"]) - self.assertEqual(self.torque._commands.submit_job_command, ["qsub", "-terse"]) - self.assertEqual(self.torque._commands.delete_job_command, ["qdel"]) self.assertEqual( - self.torque._commands.get_queue_status_command, ["qstat", "-x"] + self.lsf._adapter._commands.submit_job_command, ["bsub", "-terse"] ) - self.assertEqual(self.lsf._commands.submit_job_command, ["bsub", "-terse"]) - self.assertEqual(self.lsf._commands.delete_job_command, ["bkill"]) - self.assertEqual(self.lsf._commands.get_queue_status_command, ["qstat", "-x"]) + self.assertEqual(self.lsf._adapter._commands.delete_job_command, ["bkill"]) self.assertEqual( - self.slurm._commands.submit_job_command, ["sbatch", "--parsable"] + self.lsf._adapter._commands.get_queue_status_command, ["qstat", "-x"] ) - self.assertEqual(self.slurm._commands.delete_job_command, ["scancel"]) self.assertEqual( - self.slurm._commands.get_queue_status_command, + self.slurm._adapter._commands.submit_job_command, ["sbatch", "--parsable"] + ) + self.assertEqual(self.slurm._adapter._commands.delete_job_command, ["scancel"]) + self.assertEqual( + self.slurm._adapter._commands.get_queue_status_command, + ["squeue", "--format", "%A|%u|%t|%j", "--noheader"], + ) + self.assertEqual(self.moab._adapter._commands.submit_job_command, ["msub"]) + self.assertEqual( + self.moab._adapter._commands.delete_job_command, ["mjobctl", "-c"] + ) + self.assertEqual( + self.moab._adapter._commands.get_queue_status_command, ["mdiag", "-x"] + ) + self.assertEqual( + self.gent._adapter._commands.get_queue_status_command, ["squeue", "--format", "%A|%u|%t|%j", "--noheader"], ) - self.assertEqual(self.moab._commands.submit_job_command, ["msub"]) - self.assertEqual(self.moab._commands.delete_job_command, ["mjobctl", "-c"]) - self.assertEqual(self.moab._commands.get_queue_status_command, ["mdiag", "-x"]) def test_convert_queue_status(self): with open(os.path.join(self.path, "config/sge", "qstat.xml"), "r") as f: @@ -145,7 +181,9 @@ def test_convert_queue_status(self): ) self.assertTrue( df.equals( - self.sge._commands.convert_queue_status(queue_status_output=content) + self.sge._adapter._commands.convert_queue_status( + queue_status_output=content + ) ) ) @@ -165,7 +203,7 @@ def test_queues(self): _ = self.sge.queues.notavailable def test_get_user(self): - self.assertEqual(self.sge._get_user(), getpass.getuser()) + self.assertEqual(self.sge._adapter._get_user(), getpass.getuser()) def test_check_queue_parameters(self): self.assertEqual( @@ -176,36 +214,45 @@ def test_queue_view(self): self.assertIsInstance(self.slurm.queue_view, pandas.DataFrame) def test_memory_string_comparison(self): - self.assertEqual(QueueAdapter._value_in_range(1023, value_min="1K"), "1K") - self.assertEqual(QueueAdapter._value_in_range(1035, value_min="1K"), 1035) - self.assertEqual(QueueAdapter._value_in_range(1035, value_max="1K"), "1K") - self.assertEqual(QueueAdapter._value_in_range("1035", value_min="1K"), "1035") + self.assertEqual(BasisQueueAdapter._value_in_range(1023, value_min="1K"), "1K") + self.assertEqual(BasisQueueAdapter._value_in_range(1035, value_min="1K"), 1035) + self.assertEqual(BasisQueueAdapter._value_in_range(1035, value_max="1K"), "1K") + self.assertEqual( + BasisQueueAdapter._value_in_range("1035", value_min="1K"), "1035" + ) self.assertEqual( - QueueAdapter._value_in_range("60000M", value_min="1K", value_max="50G"), + BasisQueueAdapter._value_in_range( + "60000M", value_min="1K", value_max="50G" + ), "50G", ) self.assertEqual( - QueueAdapter._value_in_range("60000", value_min="1K", value_max="50G"), + BasisQueueAdapter._value_in_range("60000", value_min="1K", value_max="50G"), "50G", ) self.assertEqual( - QueueAdapter._value_in_range("60000M", value_min="1K", value_max="70G"), + BasisQueueAdapter._value_in_range( + "60000M", value_min="1K", value_max="70G" + ), "60000M", ) self.assertEqual( - QueueAdapter._value_in_range(60000, value_min="1K", value_max="70G"), 60000 + BasisQueueAdapter._value_in_range(60000, value_min="1K", value_max="70G"), + 60000, ) self.assertEqual( - QueueAdapter._value_in_range( + BasisQueueAdapter._value_in_range( 90000 * 1024 ** 2, value_min="1K", value_max="70G" ), "70G", ) self.assertEqual( - QueueAdapter._value_in_range("90000", value_min="1K", value_max="70G"), + BasisQueueAdapter._value_in_range("90000", value_min="1K", value_max="70G"), "70G", ) self.assertEqual( - QueueAdapter._value_in_range("60000M", value_min="60G", value_max="70G"), + BasisQueueAdapter._value_in_range( + "60000M", value_min="60G", value_max="70G" + ), "60G", )