diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 00000000..cdb06be7 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,5 @@ +# .coveragerc to control coverage.py +[run] +source = pysqa +omit = pysqa/_version.py +concurrency = multiprocessing diff --git a/.travis.yml b/.travis.yml index 8a50b1ba..d8beed98 100644 --- a/.travis.yml +++ b/.travis.yml @@ -28,24 +28,25 @@ matrix: - conda info -a - conda config --set always_yes yes --set changeps1 no - conda update -q conda - - conda install -y -c conda-forge python=${PYTHONVER} coverage defusedxml pandas yaml jinja2 + - conda install -y -c conda-forge python=${PYTHONVER} coverage coveralls defusedxml pandas yaml jinja2 - pip install --pre . script: - coverage run -m unittest discover tests after_success: + - coverage combine - coveralls - coverage xml - pip install codacy-coverage - python-codacy-coverage -r coverage.xml deploy: - provider: pypi - user: jan-janssen + user: pyiron password: ${PYPI_UPLOAD_TOKEN} allow_failure: true on: branch: master - provider: pypi - user: jan-janssen + user: pyiron password: ${PYPI_UPLOAD_TOKEN} allow_failure: true on: diff --git a/README.md b/README.md index a9dd4857..59d0a250 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,77 @@ # pysqa -Simple queue adapter for Python +Simple queue adapter for python -[![Codacy Badge](https://api.codacy.com/project/badge/Grade/de30f320fd9f44ac9cbcce28eb2c54d1)](https://app.codacy.com/app/jan-janssen/pysqa?utm_source=github.com&utm_medium=referral&utm_content=pysqa/pysqa&utm_campaign=Badge_Grade_Dashboard) -[![Build Status](https://travis-ci.org/pysqa/pysqa.svg?branch=master)](https://travis-ci.org/pysqa/pysqa) -[![Build status](https://ci.appveyor.com/api/projects/status/90cb1w7a57fql52q/branch/master?svg=true)](https://ci.appveyor.com/project/jan-janssen/pysqa/branch/master) -[![Coverage Status](https://coveralls.io/repos/github/pysqa/pysqa/badge.svg?branch=master)](https://coveralls.io/github/pysqa/pysqa?branch=master) +[![Codacy Badge](https://api.codacy.com/project/badge/Grade/9db80cb2477f46be870d1446540b4bf3)](https://www.codacy.com/app/pyiron-runner/pysqa?utm_source=github.com&utm_medium=referral&utm_content=pyiron/pysqa&utm_campaign=Badge_Grade_Dashboard) +[![Build Status](https://travis-ci.org/pyiron/pysqa.svg?branch=master)](https://travis-ci.org/pyiron/pysqa) +[![Build status](https://ci.appveyor.com/api/projects/status/9lpjai8rvt8324aj/branch/master?svg=true)](https://ci.appveyor.com/project/pyiron-runner/pysqa/branch/master) +[![Coverage Status](https://coveralls.io/repos/github/pyiron/pysqa/badge.svg?branch=master)](https://coveralls.io/github/pyiron/pysqa?branch=master) + +The goal of pysqa is to make submitting to an HPC cluster as easy as starting another subprocess. This is based on the assumption that even though modern queuing systems allow for an wide range of different configuration, most users submit the majority of their jobs with very similar parameters. Therefore pysqa allows the users to store their submission scripts as jinja2 templates for quick access. After the submission pysqa allows the users to track the progress of their jobs, delete them or enable reservations using the built-in functionality of the queuing system. The currently supported queuing systems are: LFS, MOAB, SGE (tested), SLURM (tested), TORQUE. + +# Installation +pysqa can either be installed via pip using: + + pip install pysqa + +Or via anaconda from the conda-forge channel + + conda install -c conda-forge pysqa + + +# Usage +pysqa requires the user to configure the type of queuing system as well as the available templates. Example configuration are available at: +https://github.com/pyiron/pysqa/tree/master/tests/config +By default pysqa is searching for the queue configuration in `~/.queues/queue.yaml` and the corresponding jinja2 templates in the same folder. + +Import pysqa: + + from pysqa import QueueAdapter + sqa = QueueAdapter(directory=‘~/.queues’) # directory which contains the queue.yaml file + +List available queues as list of queue names: + + sqa.queue_list + +List available queues in an pandas dataframe: + + sqa.queue_view + +Submit a job to the queue - if no queue is specified it is submitted to the default queue defined in the queue configuration: + + sqa.submit_job(command=‘python test.py’) + +Get status of all jobs currently handled by the queuing system: + + sqa.get_queue_status() + +Get status of a specifc job from the queuing system: + + sqa.get_status_of_job(process_id=1234) + +Delete a job from the queuing sytem: + + sqa.delete_job(process_id=1234) + +Sample configurations for the specific queuing systems are availabe in the tests: + +* lsf - https://github.com/pyiron/pysqa/tree/master/tests/config/lsf +* moab - https://github.com/pyiron/pysqa/tree/master/tests/config/moab +* SGE - https://github.com/pyiron/pysqa/tree/master/tests/config/sge +* slurm - https://github.com/pyiron/pysqa/tree/master/tests/config/slurm +* torque - https://github.com/pyiron/pysqa/tree/master/tests/config/torque + +# License +pysqa is released under the BSD license https://github.com/pyiron/pysqa/blob/master/LICENSE . It is a spin-off of the pyiron project https://github.com/pyiron/pyiron therefore if you use pysqa for your publication, please cite: + + @article{pyiron-paper, + title = {pyiron: An integrated development environment for computational materials science}, + journal = {Computational Materials Science}, + volume = {163}, + pages = {24 - 36}, + year = {2019}, + issn = {0927-0256}, + doi = {https://doi.org/10.1016/j.commatsci.2018.07.043}, + url = {http://www.sciencedirect.com/science/article/pii/S0927025618304786}, + author = {Jan Janssen and Sudarsan Surendralal and Yury Lysogorskiy and Mira Todorova and Tilmann Hickel and Ralf Drautz and Jörg Neugebauer}, + keywords = {Modelling workflow, Integrated development environment, Complex simulation protocols}, + } diff --git a/appveyor.yml b/appveyor.yml index 66b9af1c..ef7f3238 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -39,7 +39,7 @@ install: - '"%CONDA%\Scripts\activate.bat"' - "conda config --set always_yes yes --set changeps1 no" - "conda update -q conda" -- "conda install -c conda-forge coverage defusedxml pandas yaml jinja2" +- 'conda install -c conda-forge coverage defusedxml "pandas>=0.23" yaml jinja2' - "conda info -a" - "pip install --pre ." diff --git a/azure-pipelines.yml b/azure-pipelines.yml new file mode 100644 index 00000000..ef3173ad --- /dev/null +++ b/azure-pipelines.yml @@ -0,0 +1,100 @@ +variables: + linux: 'ubuntu-16.04' + mac: 'macos-10.13' + windows: 'vs2017-win2016' + +pr: + branches: + include: + - '*' + +jobs: +- job: 'Linux' + pool: + vmImage: $(linux) + strategy: + matrix: + Python27: + python.version: '2.7' + Python37: + python.version: '3.7' + maxParallel: 4 + + steps: + - bash: echo "##vso[task.prependpath]/usr/share/miniconda/bin" + displayName: Add conda to PATH + + - bash: conda create -n pysqa -q --yes -c conda-forge python=$(python.version) coverage defusedxml pandas yaml jinja2 + displayName: conda install + + - bash: | + source activate pysqa + pip install --pre . + displayName: pysqa install + + - bash: | + source activate pysqa + coverage run -m unittest discover tests + displayName: run tests + +- job: 'OSX' + pool: + vmImage: $(mac) + strategy: + matrix: + Python27: + python.version: '2.7' + Python37: + python.version: '3.7' + maxParallel: 1 + + steps: + - bash: echo "##vso[task.prependpath]$CONDA/bin" + displayName: Add conda to PATH + + - bash: sudo chown -R $USER $CONDA + displayName: Take ownership of conda installation + + - bash: conda create -n pysqa -q --yes -c conda-forge python=$(python.version) coverage defusedxml pandas yaml jinja2 + displayName: conda install + + - bash: | + source activate pysqa + pip install --pre . + displayName: pysqa install + + - bash: | + source activate pysqa + coverage run -m unittest discover tests + displayName: run tests + +- job: 'Windows' + pool: + vmImage: $(windows) + strategy: + matrix: + Python27: + python.version: '2.7' + Python37: + python.version: '3.7' + maxParallel: 1 + + steps: + - powershell: Write-Host "##vso[task.prependpath]$env:CONDA\Scripts" + displayName: Add conda to PATH + + - bash: echo "##vso[task.prependpath]$CONDA/bin" + displayName: Add conda to PATH + + - script: conda create -n pysqa -q --yes -c conda-forge python=$(python.version) coverage defusedxml pandas yaml jinja2 + displayName: conda install + + - script: | + call activate pysqa + pip install --pre . + displayName: pysqa install + + - script: | + call activate pysqa + coverage run -m unittest discover tests + displayName: run tests diff --git a/pysqa/__init__.py b/pysqa/__init__.py index bd6c51fd..d8a3f3d2 100644 --- a/pysqa/__init__.py +++ b/pysqa/__init__.py @@ -1,8 +1,9 @@ -__version__ = '0.0.1' +__version__ = "0.0.1" __all__ = [] from pysqa.queueadapter import QueueAdapter from ._version import get_versions -__version__ = get_versions()['version'] + +__version__ = get_versions()["version"] del get_versions diff --git a/pysqa/queueadapter.py b/pysqa/queueadapter.py index f8a3b989..96e27241 100644 --- a/pysqa/queueadapter.py +++ b/pysqa/queueadapter.py @@ -7,11 +7,12 @@ import os import pandas import subprocess -from yaml import load +import yaml +import re __author__ = "Jan Janssen" __copyright__ = "Copyright 2019, Jan Janssen" -__version__ = "0.0.1" +__version__ = "0.0.3" __maintainer__ = "Jan Janssen" __email__ = "janssen@mpie.de" __status__ = "production" @@ -19,28 +20,58 @@ class QueueAdapter(object): - def __init__(self, directory='.'): - self._config = self._read_config(file_name=os.path.join(directory, 'queue.yaml')) - self._fill_queue_dict(queue_lst_dict=self._config['queues']) - self._load_templates(queue_lst_dict=self._config['queues'], directory=directory) - if self._config['queue_type'] == 'SGE': - class_name = 'SunGridEngineCommands' - module_name = 'pysqa.wrapper.sge' - elif self._config['queue_type'] == 'TORQUE': - class_name = 'TorqueCommands' - module_name = 'pysqa.wrapper.torque' - elif self._config['queue_type'] == 'SLURM': - class_name = 'SlurmCommands' - module_name = 'pysqa.wrapper.slurm' - elif self._config['queue_type'] == 'LSF': - class_name = 'LsfCommands' - module_name = 'pysqa.wrapper.lsf' - elif self._config['queue_type'] == 'MOAB': - class_name = 'MoabCommands' - module_name = 'pysqa.wrapper.moab' + """ + The goal of the QueueAdapter class is to make submitting to a queue system as easy as starting another sub process + locally. + + Args: + directory (str): directory containing the queue.yaml files as well as corresponding jinja2 templates for the + individual queues. + + Attributes: + + .. attribute:: config + + QueueAdapter configuration read from the queue.yaml file. + + .. attribute:: queue_list + + List of available queues + + .. attribute:: queue_view + + Pandas DataFrame representation of the available queues, read from queue.yaml. + + .. attribute:: queues + + Queues available for auto completion QueueAdapter().queues. returns the queue name. + """ + + def __init__(self, directory="~/.queues"): + self._config = self._read_config( + file_name=os.path.join(directory, "queue.yaml") + ) + self._fill_queue_dict(queue_lst_dict=self._config["queues"]) + self._load_templates(queue_lst_dict=self._config["queues"], directory=directory) + if self._config["queue_type"] == "SGE": + class_name = "SunGridEngineCommands" + module_name = "pysqa.wrapper.sge" + elif self._config["queue_type"] == "TORQUE": + class_name = "TorqueCommands" + module_name = "pysqa.wrapper.torque" + elif self._config["queue_type"] == "SLURM": + class_name = "SlurmCommands" + module_name = "pysqa.wrapper.slurm" + elif self._config["queue_type"] == "LSF": + class_name = "LsfCommands" + module_name = "pysqa.wrapper.lsf" + elif self._config["queue_type"] == "MOAB": + class_name = "MoabCommands" + module_name = "pysqa.wrapper.moab" else: raise ValueError() self._commands = getattr(importlib.import_module(module_name), class_name)() + self._queues = Queues(self.queue_list) @property def config(self): @@ -58,7 +89,7 @@ def queue_list(self): Returns: list: """ - return list(self._config['queues'].keys()) + return list(self._config["queues"].keys()) @property def queue_view(self): @@ -67,10 +98,24 @@ def queue_view(self): Returns: pandas.DataFrame: """ - return pandas.DataFrame(self._config['queues']).T.drop(['script', 'template'], axis=1) + return pandas.DataFrame(self._config["queues"]).T.drop( + ["script", "template"], axis=1 + ) - def submit_job(self, queue=None, job_name=None, working_directory=None, cores=None, memory_max=None, - run_time_max=None, command=None): + @property + def queues(self): + return self._queues + + def submit_job( + self, + queue=None, + job_name=None, + working_directory=None, + cores=None, + memory_max=None, + run_time_max=None, + command=None, + ): """ Args: @@ -86,16 +131,30 @@ def submit_job(self, queue=None, job_name=None, working_directory=None, cores=No int: """ if isinstance(command, list): - command = ''.join(command) - queue_script = self._job_submission_template(queue=queue, job_name=job_name, - working_directory=working_directory, cores=cores, - memory_max=memory_max, run_time_max=run_time_max, command=command) - queue_script_path = os.path.join(working_directory, 'run_queue.sh') - with open(queue_script_path, 'w') as f: + command = "".join(command) + if working_directory is None: + working_directory = "." + queue_script = self._job_submission_template( + queue=queue, + job_name=job_name, + working_directory=working_directory, + cores=cores, + memory_max=memory_max, + run_time_max=run_time_max, + command=command, + ) + queue_script_path = os.path.join(working_directory, "run_queue.sh") + with open(queue_script_path, "w") as f: f.writelines(queue_script) - out = self._execute_command(commands_lst=self._commands.submit_job_command + [queue_script_path], - working_directory=working_directory, split_output=False) - return int(out) + out = self._execute_command( + commands_lst=self._commands.submit_job_command + [queue_script_path], + working_directory=working_directory, + split_output=False, + ) + if out is not None: + return self._commands.get_job_id_from_output(out) + else: + return None def enable_reservation(self, process_id): """ @@ -106,8 +165,14 @@ def enable_reservation(self, process_id): Returns: str: """ - return self._execute_command(commands_lst=self._commands.enable_reservation_command + [str(process_id)], - split_output=True)[0] + out = self._execute_command( + commands_lst=self._commands.enable_reservation_command + [str(process_id)], + split_output=True, + ) + if out is not None: + return out[0] + else: + return None def delete_job(self, process_id): """ @@ -118,8 +183,14 @@ def delete_job(self, process_id): Returns: str: """ - return self._execute_command(commands_lst=self._commands.delete_job_command + [str(process_id)], - split_output=True)[0] + out = self._execute_command( + commands_lst=self._commands.delete_job_command + [str(process_id)], + split_output=True, + ) + if out is not None: + return out[0] + else: + return None def get_queue_status(self, user=None): """ @@ -130,12 +201,14 @@ def get_queue_status(self, user=None): Returns: pandas.DataFrame: """ - out = self._execute_command(commands_lst=self._commands.get_queue_status_command, split_output=False) + out = self._execute_command( + commands_lst=self._commands.get_queue_status_command, split_output=False + ) df = self._commands.convert_queue_status(queue_status_output=out) if user is None: return df else: - return df[df['user'] == user] + return df[df["user"] == user] def get_status_of_my_jobs(self): """ @@ -155,13 +228,34 @@ def get_status_of_job(self, process_id): str: ['running', 'pending', 'error'] """ df = self.get_queue_status() - df_selected = df[df['jobid'] == process_id]['status'] + df_selected = df[df["jobid"] == process_id]["status"] if len(df_selected) != 0: return df_selected.values[0] else: return None - def check_queue_parameters(self, queue, cores=1, run_time_max=None, memory_max=None, active_queue=None): + def get_status_of_jobs(self, process_id_lst): + """ + + Args: + process_id_lst: + + Returns: + list: ['running', 'pending', 'error', ...] + """ + df = self.get_queue_status() + results_lst = [] + for process_id in process_id_lst: + df_selected = df[df["jobid"] == process_id]["status"] + if len(df_selected) != 0: + results_lst.append(df_selected.values[0]) + else: + results_lst.append("finished") + return results_lst + + def check_queue_parameters( + self, queue, cores=1, run_time_max=None, memory_max=None, active_queue=None + ): """ Args: @@ -175,18 +269,30 @@ def check_queue_parameters(self, queue, cores=1, run_time_max=None, memory_max=N list: [cores, run_time_max, memory_max] """ if active_queue is None: - active_queue = self._config['queues'][queue] - cores = self._value_in_range(value=cores, - value_min=active_queue['cores_min'], - value_max=active_queue['cores_max']) - run_time_max = self._value_in_range(value=run_time_max, - value_max=active_queue['run_time_max']) - memory_max = self._value_in_range(value=memory_max, - value_max=active_queue['memory_max']) + active_queue = self._config["queues"][queue] + cores = self._value_in_range( + value=cores, + value_min=active_queue["cores_min"], + value_max=active_queue["cores_max"], + ) + run_time_max = self._value_in_range( + value=run_time_max, value_max=active_queue["run_time_max"] + ) + memory_max = self._value_in_range( + value=memory_max, value_max=active_queue["memory_max"] + ) return cores, run_time_max, memory_max - def _job_submission_template(self, queue=None, job_name='job.py', working_directory='.', cores=None, - memory_max=None, run_time_max=None, command=None): + def _job_submission_template( + self, + queue=None, + job_name="job.py", + working_directory=".", + cores=None, + memory_max=None, + run_time_max=None, + command=None, + ): """ Args: @@ -202,23 +308,27 @@ def _job_submission_template(self, queue=None, job_name='job.py', working_direct str: """ if queue is None: - queue = self._config['queue_primary'] + queue = self._config["queue_primary"] self._value_error_if_none(value=command) if queue not in self.queue_list: raise ValueError() - active_queue = self._config['queues'][queue] - cores, run_time_max, memory_max = self.check_queue_parameters(queue=None, - cores=cores, - run_time_max=run_time_max, - memory_max=memory_max, - active_queue=active_queue) - template = active_queue['template'] - return template.render(job_name=job_name, - working_directory=working_directory, - cores=cores, - memory_max=memory_max, - run_time_max=run_time_max, - command=command) + active_queue = self._config["queues"][queue] + cores, run_time_max, memory_max = self.check_queue_parameters( + queue=None, + cores=cores, + run_time_max=run_time_max, + memory_max=memory_max, + active_queue=active_queue, + ) + template = active_queue["template"] + return template.render( + job_name=job_name, + working_directory=working_directory, + cores=cores, + memory_max=memory_max, + run_time_max=run_time_max, + command=command, + ) @staticmethod def _get_user(): @@ -243,22 +353,28 @@ def _execute_command(commands_lst, working_directory=None, split_output=True): """ if working_directory is None: try: - out = subprocess.check_output(commands_lst, stderr=subprocess.STDOUT, universal_newlines=True) + out = subprocess.check_output( + commands_lst, stderr=subprocess.STDOUT, universal_newlines=True + ) except subprocess.CalledProcessError: out = None else: try: - out = subprocess.check_output(commands_lst, cwd=working_directory, stderr=subprocess.STDOUT, - universal_newlines=True) + out = subprocess.check_output( + commands_lst, + cwd=working_directory, + stderr=subprocess.STDOUT, + universal_newlines=True, + ) except subprocess.CalledProcessError: out = None if out is not None and split_output: - return out.split('\n') + return out.split("\n") else: return out @staticmethod - def _read_config(file_name='queue.yaml'): + def _read_config(file_name="queue.yaml"): """ Args: @@ -267,8 +383,8 @@ def _read_config(file_name='queue.yaml'): Returns: dict: """ - with open(file_name, 'r') as f: - return load(f) + with open(file_name, "r") as f: + return yaml.load(f, Loader=yaml.FullLoader) @staticmethod def _fill_queue_dict(queue_lst_dict): @@ -277,13 +393,13 @@ def _fill_queue_dict(queue_lst_dict): Args: queue_lst_dict (dict): """ - queue_keys = ['cores_min', 'cores_max', 'run_time_max', 'memory_max'] + queue_keys = ["cores_min", "cores_max", "run_time_max", "memory_max"] for queue_dict in queue_lst_dict.values(): for key in set(queue_keys) - set(queue_dict.keys()): queue_dict[key] = None @staticmethod - def _load_templates(queue_lst_dict, directory='.'): + def _load_templates(queue_lst_dict, directory="."): """ Args: @@ -291,8 +407,8 @@ def _load_templates(queue_lst_dict, directory='.'): directory (str): """ for queue_dict in queue_lst_dict.values(): - with open(os.path.join(directory, queue_dict['script']), 'r') as f: - queue_dict['template'] = Template(f.read()) + with open(os.path.join(directory, queue_dict["script"]), "r") as f: + queue_dict["template"] = Template(f.read()) @staticmethod def _value_error_if_none(value): @@ -306,8 +422,8 @@ def _value_error_if_none(value): if not isinstance(value, str): raise TypeError() - @staticmethod - def _value_in_range(value, value_min=None, value_max=None): + @classmethod + def _value_in_range(cls, value, value_min=None, value_max=None): """ Args: @@ -318,10 +434,21 @@ def _value_in_range(value, value_min=None, value_max=None): Returns: int/float/None: """ + if value is not None: - if value_min is not None and value < value_min: + value_, value_min_, value_max_ = [ + cls._memory_spec_string_to_value(v) + if v is not None and isinstance(v, str) + else v + for v in (value, value_min, value_max) + ] + # ATTENTION: '60000' is interpreted as '60000M' since default magnitude is 'M' + # ATTENTION: int('60000') is interpreted as '60000B' since _memory_spec_string_to_value return the size in + # ATTENTION: bytes, as target_magnitude = 'b' + # We want to compare the the actual (k,m,g)byte value if there is any + if value_min_ is not None and value_ < value_min_: return value_min - if value_max is not None and value > value_max: + if value_max_ is not None and value_ > value_max_: return value_max return value else: @@ -330,3 +457,72 @@ def _value_in_range(value, value_min=None, value_max=None): if value_max is not None: return value_max return value + + @staticmethod + def _is_memory_string(value): + """ + Tests a string if it specifies a certain amount of memory e.g.: '20G', '60b'. Also pure integer strings are + also valid. + + Args: + value (str): the string to test + + Returns: + (bool): A boolean value if the string matches a memory specification + """ + memory_spec_pattern = r"[0-9]+[bBkKmMgGtT]?" + return re.findall(memory_spec_pattern, value)[0] == value + + @classmethod + def _memory_spec_string_to_value( + cls, value, default_magnitude="m", target_magnitude="b" + ): + """ + Converts a valid memory string (tested by _is_memory_string) into an integer/float value of desired + magnitude `default_magnitude`. If it is a plain integer string (e.g.: '50000') it will be interpreted with + the magnitude passed in by the `default_magnitude`. The output will rescaled to `target_magnitude` + + Args: + value (str): the string + default_magnitude (str): magnitude for interpreting plain integer strings [b, B, k, K, m, M, g, G, t, T] + target_magnitude (str): to which the output value should be converted [b, B, k, K, m, M, g, G, t, T] + + Returns: + (float/int): the value of the string in `target_magnitude` units + """ + magnitude_mapping = {"b": 0, "k": 1, "m": 2, "g": 3, "t": 4} + if cls._is_memory_string(value): + integer_pattern = r"[0-9]+" + magnitude_pattern = r"[bBkKmMgGtT]+" + integer_value = int(re.findall(integer_pattern, value)[0]) + + magnitude = re.findall(magnitude_pattern, value) + if len(magnitude) > 0: + magnitude = magnitude[0].lower() + else: + magnitude = default_magnitude.lower() + # Convert it to default magnitude = megabytes + return (integer_value * 1024 ** magnitude_mapping[magnitude]) / ( + 1024 ** magnitude_mapping[target_magnitude] + ) + else: + return value + + +class Queues(object): + """ + Queues is an abstract class simply to make the list of queues available for auto completion. This is mainly used in + interactive environments like jupyter. + """ + + def __init__(self, list_of_queues): + self._list_of_queues = list_of_queues + + def __getattr__(self, item): + if item in self._list_of_queues: + return item + else: + raise AttributeError + + def __dir__(self): + return self._list_of_queues diff --git a/pysqa/wrapper/lsf.py b/pysqa/wrapper/lsf.py index 2fc366bc..84cb4c61 100644 --- a/pysqa/wrapper/lsf.py +++ b/pysqa/wrapper/lsf.py @@ -3,8 +3,10 @@ # Distributed under the terms of "New BSD License", see the LICENSE file. __author__ = "Jan Janssen" -__copyright__ = "Copyright 2019, Max-Planck-Institut für Eisenforschung GmbH - " \ - "Computational Materials Design (CM) Department" +__copyright__ = ( + "Copyright 2019, Max-Planck-Institut für Eisenforschung GmbH - " + "Computational Materials Design (CM) Department" +) __version__ = "1.0" __maintainer__ = "Jan Janssen" __email__ = "janssen@mpie.de" @@ -15,11 +17,11 @@ class LsfCommands(object): @property def submit_job_command(self): - return ['bsub', '-terse'] + return ["bsub", "-terse"] @property def delete_job_command(self): - return ['bkill'] + return ["bkill"] @property def enable_reservation_command(self): @@ -27,7 +29,11 @@ def enable_reservation_command(self): @property def get_queue_status_command(self): - return ['qstat', '-x'] + return ["qstat", "-x"] + + @staticmethod + def get_job_id_from_output(queue_submit_output): + raise NotImplementedError() @staticmethod def convert_queue_status(queue_status_output): diff --git a/pysqa/wrapper/moab.py b/pysqa/wrapper/moab.py index d11c30b1..4d63d04b 100644 --- a/pysqa/wrapper/moab.py +++ b/pysqa/wrapper/moab.py @@ -3,8 +3,10 @@ # Distributed under the terms of "New BSD License", see the LICENSE file. __author__ = "Jan Janssen" -__copyright__ = "Copyright 2019, Max-Planck-Institut für Eisenforschung GmbH - " \ - "Computational Materials Design (CM) Department" +__copyright__ = ( + "Copyright 2019, Max-Planck-Institut für Eisenforschung GmbH - " + "Computational Materials Design (CM) Department" +) __version__ = "1.0" __maintainer__ = "Jan Janssen" __email__ = "janssen@mpie.de" @@ -15,11 +17,11 @@ class MoabCommands(object): @property def submit_job_command(self): - return ['msub'] + return ["msub"] @property def delete_job_command(self): - return ['mjobctl', '-c'] + return ["mjobctl", "-c"] @property def enable_reservation_command(self): @@ -27,7 +29,11 @@ def enable_reservation_command(self): @property def get_queue_status_command(self): - return ['mdiag', '-x'] + return ["mdiag", "-x"] + + @staticmethod + def get_job_id_from_output(queue_submit_output): + raise NotImplementedError() @staticmethod def convert_queue_status(queue_status_output): diff --git a/pysqa/wrapper/sge.py b/pysqa/wrapper/sge.py index ea94dbee..eee05375 100644 --- a/pysqa/wrapper/sge.py +++ b/pysqa/wrapper/sge.py @@ -6,8 +6,10 @@ import defusedxml.cElementTree as ETree __author__ = "Jan Janssen" -__copyright__ = "Copyright 2019, Max-Planck-Institut für Eisenforschung GmbH - " \ - "Computational Materials Design (CM) Department" +__copyright__ = ( + "Copyright 2019, Max-Planck-Institut für Eisenforschung GmbH - " + "Computational Materials Design (CM) Department" +) __version__ = "1.0" __maintainer__ = "Jan Janssen" __email__ = "janssen@mpie.de" @@ -18,33 +20,43 @@ class SunGridEngineCommands(object): @property def submit_job_command(self): - return ['qsub', '-terse'] + return ["qsub", "-terse"] @property def delete_job_command(self): - return ['qdel'] + return ["qdel"] @property def enable_reservation_command(self): - return ['qalter', '-R', 'y'] + return ["qalter", "-R", "y"] @property def get_queue_status_command(self): - return ['qstat', '-xml'] + return ["qstat", "-xml"] + + @staticmethod + def get_job_id_from_output(queue_submit_output): + return int(queue_submit_output) @staticmethod def convert_queue_status(queue_status_output): def leaf_to_dict(leaf): - return [{sub_child.tag: sub_child.text for sub_child in child} for child in leaf] + return [ + {sub_child.tag: sub_child.text for sub_child in child} for child in leaf + ] tree = ETree.fromstring(queue_status_output) df_running_jobs = pandas.DataFrame(leaf_to_dict(leaf=tree[0])) df_pending_jobs = pandas.DataFrame(leaf_to_dict(leaf=tree[1])) df_merge = df_running_jobs.append(df_pending_jobs, sort=True) - df_merge.state[df_merge.state == 'r'] = 'running' - df_merge.state[df_merge.state == 'qw'] = 'pending' - df_merge.state[df_merge.state == 'Eqw'] = 'error' - return pandas.DataFrame({'jobid': pandas.to_numeric(df_merge.JB_job_number), - 'user': df_merge.JB_owner, - 'jobname': df_merge.JB_name, - 'status': df_merge.state}) + df_merge.state[df_merge.state == "r"] = "running" + df_merge.state[df_merge.state == "qw"] = "pending" + df_merge.state[df_merge.state == "Eqw"] = "error" + return pandas.DataFrame( + { + "jobid": pandas.to_numeric(df_merge.JB_job_number), + "user": df_merge.JB_owner, + "jobname": df_merge.JB_name, + "status": df_merge.state, + } + ) diff --git a/pysqa/wrapper/slurm.py b/pysqa/wrapper/slurm.py index edafb7b0..9f555875 100644 --- a/pysqa/wrapper/slurm.py +++ b/pysqa/wrapper/slurm.py @@ -2,9 +2,14 @@ # Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department # Distributed under the terms of "New BSD License", see the LICENSE file. +import pandas + + __author__ = "Jan Janssen" -__copyright__ = "Copyright 2019, Max-Planck-Institut für Eisenforschung GmbH - " \ - "Computational Materials Design (CM) Department" +__copyright__ = ( + "Copyright 2019, Max-Planck-Institut für Eisenforschung GmbH - " + "Computational Materials Design (CM) Department" +) __version__ = "1.0" __maintainer__ = "Jan Janssen" __email__ = "janssen@mpie.de" @@ -15,11 +20,11 @@ class SlurmCommands(object): @property def submit_job_command(self): - return ['sbatch', '--parsable'] + return ["sbatch", "--parsable"] @property def delete_job_command(self): - return ['scancel'] + return ["scancel"] @property def enable_reservation_command(self): @@ -27,8 +32,26 @@ def enable_reservation_command(self): @property def get_queue_status_command(self): - return ['squeue'] + return ["squeue", "--format", "%A|%u|%t|%j", "--noheader"] + + @staticmethod + def get_job_id_from_output(queue_submit_output): + return int(queue_submit_output.splitlines()[-1].rstrip().lstrip()) @staticmethod def convert_queue_status(queue_status_output): - raise NotImplementedError() + line_split_lst = [line.split("|") for line in queue_status_output.splitlines()] + job_id_lst, user_lst, status_lst, job_name_lst = zip( + *[ + (int(jobid), user, status.lower(), jobname) + for jobid, user, status, jobname in line_split_lst + ] + ) + return pandas.DataFrame( + { + "jobid": job_id_lst, + "user": user_lst, + "jobname": job_name_lst, + "status": status_lst, + } + ) diff --git a/pysqa/wrapper/torque.py b/pysqa/wrapper/torque.py index c0824a0d..aefe4b76 100644 --- a/pysqa/wrapper/torque.py +++ b/pysqa/wrapper/torque.py @@ -3,8 +3,10 @@ # Distributed under the terms of "New BSD License", see the LICENSE file. __author__ = "Jan Janssen" -__copyright__ = "Copyright 2019, Max-Planck-Institut für Eisenforschung GmbH - " \ - "Computational Materials Design (CM) Department" +__copyright__ = ( + "Copyright 2019, Max-Planck-Institut für Eisenforschung GmbH - " + "Computational Materials Design (CM) Department" +) __version__ = "1.0" __maintainer__ = "Jan Janssen" __email__ = "janssen@mpie.de" @@ -15,11 +17,11 @@ class TorqueCommands(object): @property def submit_job_command(self): - return ['qsub', '-terse'] + return ["qsub", "-terse"] @property def delete_job_command(self): - return ['qdel'] + return ["qdel"] @property def enable_reservation_command(self): @@ -27,7 +29,11 @@ def enable_reservation_command(self): @property def get_queue_status_command(self): - return ['qstat', '-x'] + return ["qstat", "-x"] + + @staticmethod + def get_job_id_from_output(queue_submit_output): + raise NotImplementedError() @staticmethod def convert_queue_status(queue_status_output): diff --git a/setup.py b/setup.py index 22add372..a1817b14 100644 --- a/setup.py +++ b/setup.py @@ -8,9 +8,9 @@ name='pysqa', version=versioneer.get_version(), description='pysqa - simple queue adapter', - long_description='https://github.com/pysqa/pysqa', + long_description='The goal of pysqa is to make submitting to an HPC cluster as easy as starting another subprocess. This is based on the assumption that even though modern queuing systems allow for an wide range of different configuration, most users submit the majority of their jobs with very similar parameters. Therefore pysqa allows the users to store their submission scripts as jinja2 templates for quick access. After the submission pysqa allows the users to track the progress of their jobs, delete them or enable reservations using the built-in functionality of the queuing system. The currently supported queuing systems are: LFS, MOAB, SGE, SLURM, TORQUE.', - url='https://github.com/pysqa/pysqa', + url='https://github.com/pyiron/pysqa', author='Jan Janssen', author_email='janssen@mpie.de', license='BSD', @@ -31,7 +31,8 @@ packages=find_packages(exclude=["*tests*"]), install_requires=['defusedxml', 'jinja2', - 'pandas', + 'pandas>=0.23', 'pyyaml'], + data_files = [("", ["LICENSE"])], cmdclass=versioneer.get_cmdclass(), ) diff --git a/tests/config/error/lsf.sh b/tests/config/error/lsf.sh new file mode 100644 index 00000000..1ed25832 --- /dev/null +++ b/tests/config/error/lsf.sh @@ -0,0 +1,15 @@ +#!/bin/bash +#BSUB -q queue +#BSUB -J {{job_name}} +#BSUB -o time.out +#BSUB -n {{cores}} +#BSUB -cwd {{working_directory}} +#BSUB -e error.out +{%- if run_time_max %} +#BSUB -W {{run_time_max}} +{%- endif %} +{%- if memory_max %} +#BSUB -M {{memory_max}} +{%- endif %} + +{{command}} \ No newline at end of file diff --git a/tests/config/error/queue.yaml b/tests/config/error/queue.yaml new file mode 100644 index 00000000..ebfeb3b5 --- /dev/null +++ b/tests/config/error/queue.yaml @@ -0,0 +1,5 @@ +queue_type: Error +queue_primary: error +queues: + lsf: {cores_max: 100, cores_min: 10, run_time_max: 259200, script: lsf.sh} + diff --git a/tests/test_queueadapter.py b/tests/test_queueadapter.py index 32554399..4a1a1b7c 100644 --- a/tests/test_queueadapter.py +++ b/tests/test_queueadapter.py @@ -4,7 +4,8 @@ import os import pandas import unittest -from pysqa.queueadapter import QueueAdapter +import getpass +from pysqa import QueueAdapter __author__ = "Jan Janssen" __copyright__ = "Copyright 2019, Jan Janssen" @@ -19,69 +20,192 @@ class TestRunmode(unittest.TestCase): @classmethod def setUpClass(cls): cls.path = os.path.dirname(os.path.abspath(__file__)) - cls.torque = QueueAdapter(directory=os.path.join(cls.path, 'config/torque')) - cls.slurm = QueueAdapter(directory=os.path.join(cls.path, 'config/slurm')) - cls.lsf = QueueAdapter(directory=os.path.join(cls.path, 'config/lsf')) - cls.sge = QueueAdapter(directory=os.path.join(cls.path, 'config/sge')) - cls.moab = QueueAdapter(directory=os.path.join(cls.path, 'config/moab')) + cls.torque = QueueAdapter(directory=os.path.join(cls.path, "config/torque")) + cls.slurm = QueueAdapter(directory=os.path.join(cls.path, "config/slurm")) + cls.lsf = QueueAdapter(directory=os.path.join(cls.path, "config/lsf")) + cls.sge = QueueAdapter(directory=os.path.join(cls.path, "config/sge")) + cls.moab = QueueAdapter(directory=os.path.join(cls.path, "config/moab")) + + def test_missing_config(self): + self.assertRaises( + ValueError, QueueAdapter, directory=os.path.join(self.path, "config/error") + ) def test_config(self): - self.assertEqual(self.torque.config['queue_type'], 'TORQUE') - self.assertEqual(self.slurm.config['queue_type'], 'SLURM') - self.assertEqual(self.lsf.config['queue_type'], 'LSF') - self.assertEqual(self.sge.config['queue_type'], 'SGE') - self.assertEqual(self.moab.config['queue_type'], 'MOAB') - self.assertEqual(self.torque.config['queue_primary'], 'torque') - self.assertEqual(self.slurm.config['queue_primary'], 'slurm') - self.assertEqual(self.lsf.config['queue_primary'], 'lsf') - self.assertEqual(self.sge.config['queue_primary'], 'impi_hydra_small') - self.assertEqual(self.moab.config['queue_primary'], 'moab') + self.assertEqual(self.torque.config["queue_type"], "TORQUE") + self.assertEqual(self.slurm.config["queue_type"], "SLURM") + self.assertEqual(self.lsf.config["queue_type"], "LSF") + self.assertEqual(self.sge.config["queue_type"], "SGE") + self.assertEqual(self.moab.config["queue_type"], "MOAB") + self.assertEqual(self.torque.config["queue_primary"], "torque") + self.assertEqual(self.slurm.config["queue_primary"], "slurm") + self.assertEqual(self.lsf.config["queue_primary"], "lsf") + self.assertEqual(self.sge.config["queue_primary"], "impi_hydra_small") + self.assertEqual(self.moab.config["queue_primary"], "moab") def test_value_in_range(self): - self.assertEqual(None, self.sge._value_in_range(value=None, value_min=None, value_max=None)) - self.assertEqual(1, self.sge._value_in_range(value=None, value_min=1, value_max=None)) - self.assertEqual(1, self.sge._value_in_range(value=None, value_min=None, value_max=1)) - self.assertEqual(1, self.sge._value_in_range(value=1, value_min=None, value_max=None)) - self.assertEqual(1, self.sge._value_in_range(value=0, value_min=1, value_max=None)) - self.assertEqual(1, self.sge._value_in_range(value=2, value_min=None, value_max=1)) + self.assertEqual( + None, self.sge._value_in_range(value=None, value_min=None, value_max=None) + ) + self.assertEqual( + 1, self.sge._value_in_range(value=None, value_min=1, value_max=None) + ) + self.assertEqual( + 1, self.sge._value_in_range(value=None, value_min=None, value_max=1) + ) + self.assertEqual( + 1, self.sge._value_in_range(value=1, value_min=None, value_max=None) + ) + self.assertEqual( + 1, self.sge._value_in_range(value=0, value_min=1, value_max=None) + ) + self.assertEqual( + 1, self.sge._value_in_range(value=2, value_min=None, value_max=1) + ) self.assertEqual(1, self.sge._value_in_range(value=1, value_min=0, value_max=2)) def test_job_submission_template(self): self.assertRaises(ValueError, self.sge._job_submission_template, command=None) self.assertRaises(TypeError, self.sge._job_submission_template, command=1) - template = "#!/bin/bash\n#$ -N job.py\n#$ -wd .\n#$ -pe impi_hydra_small 1\n#$ -l h_rt=604800\n" \ - "#$ -o time.out\n#$ -e error.out\n\npython test.py" - self.assertEqual(self.sge._job_submission_template(command='python test.py'), template) - template = "#!/bin/bash\n#BSUB -q queue\n#BSUB -J job.py\n#BSUB -o time.out\n#BSUB -n 10\n#BSUB -cwd .\n" \ - "#BSUB -e error.out\n#BSUB -W 259200\n\npython test.py" - self.assertEqual(self.lsf._job_submission_template(command='python test.py'), template) + template = ( + "#!/bin/bash\n#$ -N job.py\n#$ -wd .\n#$ -pe impi_hydra_small 1\n#$ -l h_rt=604800\n" + "#$ -o time.out\n#$ -e error.out\n\npython test.py" + ) + self.assertEqual( + self.sge._job_submission_template(command="python test.py"), template + ) + template = ( + "#!/bin/bash\n#BSUB -q queue\n#BSUB -J job.py\n#BSUB -o time.out\n#BSUB -n 10\n#BSUB -cwd .\n" + "#BSUB -e error.out\n#BSUB -W 259200\n\npython test.py" + ) + self.assertEqual( + self.lsf._job_submission_template(command="python test.py"), template + ) + self.assertRaises( + ValueError, + self.sge._job_submission_template, + command="python test.py", + queue="notavailable", + ) def test_interfaces(self): - self.assertEqual(self.sge._commands.submit_job_command, ['qsub', '-terse']) - self.assertEqual(self.sge._commands.delete_job_command, ['qdel']) - self.assertEqual(self.sge._commands.enable_reservation_command, ['qalter', '-R', 'y']) - self.assertEqual(self.sge._commands.get_queue_status_command, ['qstat', '-xml']) - self.assertEqual(self.torque._commands.submit_job_command, ['qsub', '-terse']) - self.assertEqual(self.torque._commands.delete_job_command, ['qdel']) - self.assertEqual(self.torque._commands.get_queue_status_command, ['qstat', '-x']) - self.assertEqual(self.lsf._commands.submit_job_command, ['bsub', '-terse']) - self.assertEqual(self.lsf._commands.delete_job_command, ['bkill']) - self.assertEqual(self.lsf._commands.get_queue_status_command, ['qstat', '-x']) - self.assertEqual(self.slurm._commands.submit_job_command, ['sbatch', '--parsable']) - self.assertEqual(self.slurm._commands.delete_job_command, ['scancel']) - self.assertEqual(self.slurm._commands.get_queue_status_command, ['squeue']) - self.assertEqual(self.moab._commands.submit_job_command, ['msub']) - self.assertEqual(self.moab._commands.delete_job_command, ['mjobctl', '-c']) - self.assertEqual(self.moab._commands.get_queue_status_command, ['mdiag', '-x']) + self.assertEqual(self.sge._commands.submit_job_command, ["qsub", "-terse"]) + self.assertEqual(self.sge._commands.delete_job_command, ["qdel"]) + self.assertEqual( + self.sge._commands.enable_reservation_command, ["qalter", "-R", "y"] + ) + self.assertEqual(self.sge._commands.get_queue_status_command, ["qstat", "-xml"]) + self.assertEqual(self.torque._commands.submit_job_command, ["qsub", "-terse"]) + self.assertEqual(self.torque._commands.delete_job_command, ["qdel"]) + self.assertEqual( + self.torque._commands.get_queue_status_command, ["qstat", "-x"] + ) + self.assertEqual(self.lsf._commands.submit_job_command, ["bsub", "-terse"]) + self.assertEqual(self.lsf._commands.delete_job_command, ["bkill"]) + self.assertEqual(self.lsf._commands.get_queue_status_command, ["qstat", "-x"]) + self.assertEqual( + self.slurm._commands.submit_job_command, ["sbatch", "--parsable"] + ) + self.assertEqual(self.slurm._commands.delete_job_command, ["scancel"]) + self.assertEqual( + self.slurm._commands.get_queue_status_command, + ["squeue", "--format", "%A|%u|%t|%j", "--noheader"], + ) + self.assertEqual(self.moab._commands.submit_job_command, ["msub"]) + self.assertEqual(self.moab._commands.delete_job_command, ["mjobctl", "-c"]) + self.assertEqual(self.moab._commands.get_queue_status_command, ["mdiag", "-x"]) def test_convert_queue_status(self): - with open(os.path.join(self.path, 'config/sge', 'qstat.xml'), 'r') as f: + with open(os.path.join(self.path, "config/sge", "qstat.xml"), "r") as f: content = f.read() - df_running = pandas.DataFrame({'jobid': ['2836045'], 'user': ['friko'], 'jobname': ['vasp.5.3.5'], - 'status': ['running']}) - df_pending = pandas.DataFrame({'jobid': ['2836046', '2967274'], 'user': ['friko', 'janj'], - 'jobname': ['vasp.5.3.5', 'hello.py'], 'status': ['pending', 'error']}) + df_running = pandas.DataFrame( + { + "jobid": ["2836045"], + "user": ["friko"], + "jobname": ["vasp.5.3.5"], + "status": ["running"], + } + ) + df_pending = pandas.DataFrame( + { + "jobid": ["2836046", "2967274"], + "user": ["friko", "janj"], + "jobname": ["vasp.5.3.5", "hello.py"], + "status": ["pending", "error"], + } + ) df_merge = df_running.append(df_pending, sort=True) - df = pandas.DataFrame({'jobid': pandas.to_numeric(df_merge.jobid), 'user': df_merge.user, - 'jobname': df_merge.jobname, 'status': df_merge.status}) - self.assertTrue(df.equals(self.sge._commands.convert_queue_status(queue_status_output=content))) + df = pandas.DataFrame( + { + "jobid": pandas.to_numeric(df_merge.jobid), + "user": df_merge.user, + "jobname": df_merge.jobname, + "status": df_merge.status, + } + ) + self.assertTrue( + df.equals( + self.sge._commands.convert_queue_status(queue_status_output=content) + ) + ) + + def test_queue_list(self): + self.assertEqual( + sorted(self.sge.queue_list), + ["impi_hy", "impi_hydra", "impi_hydra_cmfe", "impi_hydra_small"], + ) + + def test_queues(self): + self.assertEqual(self.sge.queues.impi_hydra, "impi_hydra") + self.assertEqual( + sorted(dir(self.sge.queues)), + ["impi_hy", "impi_hydra", "impi_hydra_cmfe", "impi_hydra_small"], + ) + with self.assertRaises(AttributeError): + _ = self.sge.queues.notavailable + + def test_get_user(self): + self.assertEqual(self.sge._get_user(), getpass.getuser()) + + def test_check_queue_parameters(self): + self.assertEqual( + (1, 604800, None), self.sge.check_queue_parameters(queue="impi_hydra_small") + ) + + def test_queue_view(self): + self.assertIsInstance(self.slurm.queue_view, pandas.DataFrame) + + def test_memory_string_comparison(self): + self.assertEqual(QueueAdapter._value_in_range(1023, value_min="1K"), "1K") + self.assertEqual(QueueAdapter._value_in_range(1035, value_min="1K"), 1035) + self.assertEqual(QueueAdapter._value_in_range(1035, value_max="1K"), "1K") + self.assertEqual(QueueAdapter._value_in_range("1035", value_min="1K"), "1035") + self.assertEqual( + QueueAdapter._value_in_range("60000M", value_min="1K", value_max="50G"), + "50G", + ) + self.assertEqual( + QueueAdapter._value_in_range("60000", value_min="1K", value_max="50G"), + "50G", + ) + self.assertEqual( + QueueAdapter._value_in_range("60000M", value_min="1K", value_max="70G"), + "60000M", + ) + self.assertEqual( + QueueAdapter._value_in_range(60000, value_min="1K", value_max="70G"), 60000 + ) + self.assertEqual( + QueueAdapter._value_in_range( + 90000 * 1024 ** 2, value_min="1K", value_max="70G" + ), + "70G", + ) + self.assertEqual( + QueueAdapter._value_in_range("90000", value_min="1K", value_max="70G"), + "70G", + ) + self.assertEqual( + QueueAdapter._value_in_range("60000M", value_min="60G", value_max="70G"), + "60G", + )