diff --git a/README.md b/README.md index 59d0a250..4f102b87 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,12 @@ # pysqa -Simple queue adapter for python +Simple queue adapter for python [![Codacy Badge](https://api.codacy.com/project/badge/Grade/9db80cb2477f46be870d1446540b4bf3)](https://www.codacy.com/app/pyiron-runner/pysqa?utm_source=github.com&utm_medium=referral&utm_content=pyiron/pysqa&utm_campaign=Badge_Grade_Dashboard) [![Build Status](https://travis-ci.org/pyiron/pysqa.svg?branch=master)](https://travis-ci.org/pyiron/pysqa) [![Build status](https://ci.appveyor.com/api/projects/status/9lpjai8rvt8324aj/branch/master?svg=true)](https://ci.appveyor.com/project/pyiron-runner/pysqa/branch/master) [![Coverage Status](https://coveralls.io/repos/github/pyiron/pysqa/badge.svg?branch=master)](https://coveralls.io/github/pyiron/pysqa?branch=master) -The goal of pysqa is to make submitting to an HPC cluster as easy as starting another subprocess. This is based on the assumption that even though modern queuing systems allow for an wide range of different configuration, most users submit the majority of their jobs with very similar parameters. Therefore pysqa allows the users to store their submission scripts as jinja2 templates for quick access. After the submission pysqa allows the users to track the progress of their jobs, delete them or enable reservations using the built-in functionality of the queuing system. The currently supported queuing systems are: LFS, MOAB, SGE (tested), SLURM (tested), TORQUE. +The goal of pysqa is to make submitting to an HPC cluster as easy as starting another subprocess. This is based on the assumption that even though modern queuing systems allow for an wide range of different configuration, most users submit the majority of their jobs with very similar parameters. Therefore pysqa allows the users to store their submission scripts as jinja2 templates for quick access. After the submission pysqa allows the users to track the progress of their jobs, delete them or enable reservations using the built-in functionality of the queuing system. The currently supported queuing systems are: LFS, MOAB, SGE (tested), SLURM (tested), TORQUE. # Installation pysqa can either be installed via pip using: @@ -18,23 +18,23 @@ Or via anaconda from the conda-forge channel conda install -c conda-forge pysqa -# Usage +# Usage pysqa requires the user to configure the type of queuing system as well as the available templates. Example configuration are available at: https://github.com/pyiron/pysqa/tree/master/tests/config By default pysqa is searching for the queue configuration in `~/.queues/queue.yaml` and the corresponding jinja2 templates in the same folder. Import pysqa: - from pysqa import QueueAdapter - sqa = QueueAdapter(directory=‘~/.queues’) # directory which contains the queue.yaml file + from pysqa import QueueAdapter + sqa = QueueAdapter(directory=‘~/.queues’) # directory which contains the queue.yaml file -List available queues as list of queue names: +List available queues as list of queue names: - sqa.queue_list + sqa.queue_list -List available queues in an pandas dataframe: +List available queues in an pandas dataframe: - sqa.queue_view + sqa.queue_view Submit a job to the queue - if no queue is specified it is submitted to the default queue defined in the queue configuration: @@ -50,9 +50,9 @@ Get status of a specifc job from the queuing system: Delete a job from the queuing sytem: - sqa.delete_job(process_id=1234) + sqa.delete_job(process_id=1234) -Sample configurations for the specific queuing systems are availabe in the tests: +Sample configurations for the specific queuing systems are availabe in the tests: * lsf - https://github.com/pyiron/pysqa/tree/master/tests/config/lsf * moab - https://github.com/pyiron/pysqa/tree/master/tests/config/moab @@ -61,7 +61,7 @@ Sample configurations for the specific queuing systems are availabe in the tests * torque - https://github.com/pyiron/pysqa/tree/master/tests/config/torque # License -pysqa is released under the BSD license https://github.com/pyiron/pysqa/blob/master/LICENSE . It is a spin-off of the pyiron project https://github.com/pyiron/pyiron therefore if you use pysqa for your publication, please cite: +pysqa is released under the BSD license https://github.com/pyiron/pysqa/blob/master/LICENSE . It is a spin-off of the pyiron project https://github.com/pyiron/pyiron therefore if you use pysqa for your publication, please cite: @article{pyiron-paper, title = {pyiron: An integrated development environment for computational materials science}, diff --git a/pysqa/queueadapter.py b/pysqa/queueadapter.py index ed3d55a8..0605748d 100644 --- a/pysqa/queueadapter.py +++ b/pysqa/queueadapter.py @@ -119,7 +119,7 @@ def submit_job(self, queue=None, job_name=None, working_directory=None, cores=No """ if isinstance(command, list): command = ''.join(command) - if working_directory is None: + if working_directory is None: working_directory = '.' queue_script = self._job_submission_template(queue=queue, job_name=job_name, working_directory=working_directory, cores=cores, @@ -224,7 +224,7 @@ def get_status_of_jobs(self, process_id_lst): else: results_lst.append('finished') return results_lst - + def check_queue_parameters(self, queue, cores=1, run_time_max=None, memory_max=None, active_queue=None): """ @@ -387,9 +387,8 @@ def _value_in_range(cls, value, value_min=None, value_max=None): value_, value_min_, value_max_ = [cls._memory_spec_string_to_value(v) if v is not None and isinstance(v, str) else v for v in (value, value_min, value_max)] - # ATTENTION: '60000' is interpreted as '60000M' since default magnitude is 'M' - # ATTENTION: int('60000') is interpreted as '60000B' since _memory_spec_string_to_value return the size in - # ATTENTION: bytes, as target_magnitude = 'b' + # Numerical strings are interpreted as MB (default magnitude), while integers are interpreted as bytes + # String with magnitude prefixes are interpreted by _memory_spec_string_to_value # We want to compare the the actual (k,m,g)byte value if there is any if value_min_ is not None and value_ < value_min_: return value_min @@ -407,7 +406,7 @@ def _value_in_range(cls, value, value_min=None, value_max=None): def _is_memory_string(value): """ Tests a string if it specifies a certain amount of memory e.g.: '20G', '60b'. Also pure integer strings are - also valid. + also valid. Also allow prefix + byte symbol format e.g.: '20GB'. Args: value (str): the string to test @@ -415,7 +414,7 @@ def _is_memory_string(value): Returns: (bool): A boolean value if the string matches a memory specification """ - memory_spec_pattern = r'[0-9]+[bBkKmMgGtT]?' + memory_spec_pattern = r'[0-9]+[bBkKmMgGtT]{0,2}' return re.findall(memory_spec_pattern, value)[0] == value @classmethod @@ -447,7 +446,7 @@ def _memory_spec_string_to_value(cls, value, default_magnitude='m', target_magni magnitude = re.findall(magnitude_pattern, value) if len(magnitude) > 0: - magnitude = magnitude[0].lower() + magnitude = magnitude[0].lower()[0] # only take prefix else: magnitude = default_magnitude.lower() # Convert it to default magnitude = megabytes diff --git a/pysqa/wrapper/lsf.py b/pysqa/wrapper/lsf.py index c6dbf5be..67819e25 100644 --- a/pysqa/wrapper/lsf.py +++ b/pysqa/wrapper/lsf.py @@ -31,8 +31,8 @@ def get_queue_status_command(self): @staticmethod def get_job_id_from_output(queue_submit_output): - raise NotImplementedError() - + raise NotImplementedError() + @staticmethod def convert_queue_status(queue_status_output): raise NotImplementedError() diff --git a/pysqa/wrapper/moab.py b/pysqa/wrapper/moab.py index 3470cfb7..4666ebfc 100644 --- a/pysqa/wrapper/moab.py +++ b/pysqa/wrapper/moab.py @@ -31,8 +31,8 @@ def get_queue_status_command(self): @staticmethod def get_job_id_from_output(queue_submit_output): - raise NotImplementedError() - + raise NotImplementedError() + @staticmethod def convert_queue_status(queue_status_output): raise NotImplementedError() diff --git a/pysqa/wrapper/sge.py b/pysqa/wrapper/sge.py index 92a8b368..b6a647de 100644 --- a/pysqa/wrapper/sge.py +++ b/pysqa/wrapper/sge.py @@ -35,7 +35,7 @@ def get_queue_status_command(self): @staticmethod def get_job_id_from_output(queue_submit_output): return int(queue_submit_output) - + @staticmethod def convert_queue_status(queue_status_output): def leaf_to_dict(leaf): diff --git a/pysqa/wrapper/slurm.py b/pysqa/wrapper/slurm.py index cfc4cd64..73ebc0bd 100644 --- a/pysqa/wrapper/slurm.py +++ b/pysqa/wrapper/slurm.py @@ -1,9 +1,11 @@ # coding: utf-8 # Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department # Distributed under the terms of "New BSD License", see the LICENSE file. +# Updated for HPC UGent by Sander Borgmans +import os import pandas - +from .. import queueadapter as qa __author__ = "Jan Janssen" __copyright__ = "Copyright 2019, Max-Planck-Institut für Eisenforschung GmbH - " \ @@ -25,23 +27,284 @@ def delete_job_command(self): return ['scancel'] @property - def enable_reservation_command(self): - raise NotImplementedError() + def enable_reservation_command(self): # this is written in TORQUE (compatible with hpc) + return ['qalter', '-W'] @property def get_queue_status_command(self): - return ['squeue', '--format', '%A|%u|%t|%j', '--noheader'] + return ['squeue', '--format', '"%A|%u|%t|%j"', '--noheader'] @staticmethod def get_job_id_from_output(queue_submit_output): - return int(queue_submit_output.splitlines()[-1].rstrip().lstrip()) - + return int(queue_submit_output.splitlines()[-1].rstrip().lstrip().split(';')[0]) + + @staticmethod + def get_queue_from_output(queue_submit_output): + return str(queue_submit_output.splitlines()[-1].rstrip().lstrip().split(';')[1]) + @staticmethod def convert_queue_status(queue_status_output): - line_split_lst = [line.split('|') for line in queue_status_output.splitlines()] - job_id_lst, user_lst, status_lst, job_name_lst = zip(*[(int(jobid), user, status.lower(), jobname) + qstat = queue_status_output.splitlines() + queue = qstat[0].split(':')[1].strip() # get queue name + if len(qstat) <= 1: # first row contains cluster name, check if there are jobs + return pandas.DataFrame(columns=['cluster', 'jobid', 'user', 'status', 'jobname']) + + line_split_lst = [line.split('|') for line in qstat[1:]] + job_id_lst, user_lst, status_lst, job_name_lst, queue_lst = zip(*[(int(jobid), user, status.lower(), jobname, queue) for jobid, user, status, jobname in line_split_lst]) - return pandas.DataFrame({'jobid': job_id_lst, + return pandas.DataFrame({'cluster': queue_lst, + 'jobid': job_id_lst, 'user': user_lst, - 'jobname': job_name_lst, + 'jobname': job_name_lst, 'status': status_lst}) + + +class QueueAdapterUGent(qa.QueueAdapter): + """ + The goal of the QueueAdapterUGent class is to overwrite certain functions of the QueueAdapter specific to HPC UGent + """ + + def submit_job(self, queue=None, job_name=None, working_directory=None, cores=None, nodes=None, memory_max=None, + run_time_max=None, command=None): + """ + + Args: + queue (str/None): identifies cluster, if None current cluster is used + job_name (str/None): + working_directory (str/None): + cores (int/str/None): cores per node (int, 'half', 'all') + nodes (int/None): number of nodes + memory_max (int/None): + run_time_max (int/None): + command (str/None): + + Returns: + int: + """ + if isinstance(command, list): + command = ''.join(command) + if working_directory is None: + working_directory = '.' + queue_script = self._job_submission_template(queue=queue, job_name=job_name, + working_directory=working_directory, cores=cores, nodes=nodes, + memory_max=memory_max, run_time_max=run_time_max, command=command) + queue_script_path = os.path.join(working_directory, 'run_queue.sh') + with open(queue_script_path, 'w') as f: + f.writelines(queue_script) + out = self._execute_command(commands_lst=self._commands.submit_job_command + [queue_script_path], + working_directory=working_directory, split_output=False, queue=queue) + if out is not None: + return self._commands.get_job_id_from_output(out) + else: + return None + + def enable_reservation(self, process_id, reservation_flag, queue=None): + """ + + Args: + process_id (int): + reservation_flag (int): reservation id + queue (str/None): + + Returns: + str: + """ + out = self._execute_command(commands_lst=self._commands.enable_reservation_command + ['x=FLAGS:ADVRES:'+reservation_flag] + [str(process_id)], + split_output=True, queue=queue) + if out is not None: + return out[0] + else: + return None + + def delete_job(self, process_id, queue=None): + """ + + Args: + process_id (int): + queue (str/None): + + Returns: + str: + """ + out = self._execute_command(commands_lst=self._commands.delete_job_command + [str(process_id)], + split_output=True, queue=queue) + if out is not None: + return out[0] + else: + return None + + def get_queue_status(self, user=None, queue=None): + """ + + Args: + user (str): + + Returns: + pandas.DataFrame: + """ + out = self._execute_command(commands_lst=self._commands.get_queue_status_command, split_output=False, queue=queue) + df = self._commands.convert_queue_status(queue_status_output=out) + if user is None or df.empty: + return df + else: + return df[df['user'] == user] + + def get_status_of_my_jobs(self): + """ + + Returns: + pandas.DataFrame: + """ + df = pandas.DataFrame() + + for queue in self.queue_list: + user = self._get_user() + df = df.append(self.get_queue_status(user=user, queue=queue), ignore_index=True, sort=False) + return df + + def get_status_of_job(self, process_id, queue=None): + """ + + Args: + process_id: + + Returns: + str: ['running', 'pending', 'error'] + """ + df = self.get_queue_status(queue=queue) + df_selected = df[df['jobid'] == process_id]['status'] + if len(df_selected) != 0: + return df_selected.values[0] + else: + return None + + def check_queue_parameters(self, queue, cores=None, nodes=None, run_time_max=None, memory_max=None, active_queue=None): + """ + + Args: + queue (str/None): + cores (int/str/None): + nodes (int/None): + run_time_max (int/None): + memory_max (int/None): + active_queue (dict): + can be read from $SLURM_CONF + + Returns: + list: [cores, nodes, run_time_max, memory_max] + """ + if active_queue is None: + active_queue = self._config['queues'][queue] + + if isinstance(cores, str): + if cores=='half': cores = active_queue['cores_max']/2 + elif cores=='all': cores = active_queue['cores_max'] + else: raise ValueError('This string does not correspond to an alias (half,all)') + else: + cores = self._value_in_range(value=cores, value_min=1, + value_max=active_queue['cores_max']) + + nodes = self._value_in_range(value=nodes, value_min=1, + value_max=active_queue['nodes_max']) + + run_time_max = self._value_in_range(value=run_time_max, + value_max=active_queue['run_time_max']) + + memory_max = self._value_in_range(value=memory_max, + value_max=active_queue['pmem_max']) + return cores, nodes, run_time_max, memory_max + + def _job_submission_template(self, queue=None, job_name=None, working_directory='.', cores=None, + nodes=None, memory_max=None, run_time_max=None, command=None): + """ + + Args: + queue (str/None): + job_name (str): + working_directory (str): + cores (int/str/None): + nodes (int/None) + memory_max (int/None): + run_time_max (int/None): + command (str/None): + + Returns: + str: + """ + if queue is None: + queue = os.environ['HPCUGENT_FAMILY_CLUSTER_VERSION'] # get current cluster name + self._value_error_if_none(value=command) + if queue not in self.queue_list: + raise ValueError() + active_queue = self._config['queues'][queue] + cores, nodes, run_time_max, memory_max = self.check_queue_parameters(queue=queue, + cores=cores, + nodes=nodes, + run_time_max=run_time_max, + memory_max=memory_max, + active_queue=active_queue) + memory_max = str(memory_max)+'MB' + run_time_max = self._runtime2str(run_time_max) + + template = active_queue['template'] + return template.render(job_name=job_name, + working_directory=working_directory, + cores=cores, + nodes=nodes, + memory_max=memory_max, + run_time_max=run_time_max, + command=command) + + @staticmethod + def _execute_command(commands_lst, working_directory=None, split_output=True, queue=None): + """ + + Args: + commands_lst (list): + working_directory (str): + split_output (bool): + + Returns: + str: + """ + + cmd = "" + if not queue is None: + cmd += "module --quiet swap cluster/{}; ".format(queue) + cmd += " ".join(commands_lst) + + if working_directory is None: + try: + out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, shell=True) + except subprocess.CalledProcessError: + out = None + else: + try: + out = subprocess.check_output(cmd, cwd=working_directory, stderr=subprocess.STDOUT, shell=True) + except subprocess.CalledProcessError: + out = None + + out = out.decode('utf8') + if out is not None and split_output: + return out.split('\n') + else: + return out + + @staticmethod + def _fill_queue_dict(queue_lst_dict): + """ + + Args: + queue_lst_dict (dict): + """ + queue_keys = ['cores_max', 'nodes_max', 'pmem_max', 'run_time_max'] + for queue_dict in queue_lst_dict.values(): + for key in set(queue_keys) - set(queue_dict.keys()): + queue_dict[key] = None + + @staticmethod + def _runtime2str(time): + if isinstance(time, int): + return "{}:{}:{}".format(int(time//3600),int((time%3600)//60),int(time%60)) + else: + return ValueError('The runtime could not be converted to a suitable format!') diff --git a/pysqa/wrapper/torque.py b/pysqa/wrapper/torque.py index a2f2e657..e4bd3ee3 100644 --- a/pysqa/wrapper/torque.py +++ b/pysqa/wrapper/torque.py @@ -31,8 +31,8 @@ def get_queue_status_command(self): @staticmethod def get_job_id_from_output(queue_submit_output): - raise NotImplementedError() - + raise NotImplementedError() + @staticmethod def convert_queue_status(queue_status_output): raise NotImplementedError() diff --git a/tests/config/error/queue.yaml b/tests/config/error/queue.yaml index ebfeb3b5..f17558de 100644 --- a/tests/config/error/queue.yaml +++ b/tests/config/error/queue.yaml @@ -2,4 +2,3 @@ queue_type: Error queue_primary: error queues: lsf: {cores_max: 100, cores_min: 10, run_time_max: 259200, script: lsf.sh} -