diff --git a/pysqa/basic.py b/pysqa/basic.py index 6fbc3b84..dda4ffc3 100644 --- a/pysqa/basic.py +++ b/pysqa/basic.py @@ -58,6 +58,9 @@ def __init__(self, config, directory="~/.queues"): elif self._config["queue_type"] == "MOAB": class_name = "MoabCommands" module_name = "pysqa.wrapper.moab" + elif self._config["queue_type"] == "GENT": + class_name = "GentCommands" + module_name = "pysqa.wrapper.gent" else: raise ValueError() self._commands = getattr(importlib.import_module(module_name), class_name)() diff --git a/pysqa/modular.py b/pysqa/modular.py index f789aa76..7d11f859 100644 --- a/pysqa/modular.py +++ b/pysqa/modular.py @@ -49,12 +49,13 @@ def submit_job( run_time_max=run_time_max, command=command, ) - cluster_module = self._queue_to_cluster_dict["queue"] + cluster_module = self._queue_to_cluster_dict[queue] commands = ( ["module --quiet swap cluster/{};".format(cluster_module)] + self._commands.submit_job_command + [queue_script_path] ) + commands = " ".join(commands) out = self._execute_command( commands=commands, working_directory=working_directory, split_output=False ) @@ -84,6 +85,7 @@ def enable_reservation(self, process_id): + self._commands.enable_reservation_command + [str(cluster_queue_id)] ) + #commands = " ".join(commands) out = self._execute_command(commands=commands, split_output=True) if out is not None: return out[0] @@ -108,6 +110,7 @@ def delete_job(self, process_id): + self._commands.delete_job_command + [str(cluster_queue_id)] ) + commands = " ".join(commands) out = self._execute_command(commands=commands, split_output=True) if out is not None: return out[0] @@ -125,15 +128,18 @@ def get_queue_status(self, user=None): """ df_lst = [] for cluster_module in self._config["cluster"]: - cluster_commands = self._switch_cluster_command( - cluster_module=cluster_module + cluster_commands = self._switch_cluster_command(cluster_module=cluster_module) + commands = ( + cluster_commands + + self._commands.get_queue_status_command ) + commands = " ".join(commands) out = self._execute_command( - commands=cluster_commands + self._commands.get_queue_status_command, + commands=commands, split_output=False, ) df = self._commands.convert_queue_status(queue_status_output=out) - df_lst.append(df) + if not df is None: df_lst.append(df) # prevent empty clusters from throwing errors df = pandas.concat(df_lst, axis=1, sort=False).reset_index(drop=True) if user is None: return df diff --git a/pysqa/wrapper/gent.py b/pysqa/wrapper/gent.py new file mode 100644 index 00000000..99548e1f --- /dev/null +++ b/pysqa/wrapper/gent.py @@ -0,0 +1,59 @@ +# coding: utf-8 +# Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department +# Distributed under the terms of "New BSD License", see the LICENSE file. + +import pandas + + +__author__ = "Jan Janssen" +__copyright__ = ( + "Copyright 2019, Max-Planck-Institut für Eisenforschung GmbH - " + "Computational Materials Design (CM) Department" +) +__version__ = "1.0" +__maintainer__ = "Jan Janssen" +__email__ = "janssen@mpie.de" +__status__ = "development" +__date__ = "Feb 9, 2019" + + +class GentCommands(object): + @property + def submit_job_command(self): + return ["sbatch", "--parsable"] + + @property + def delete_job_command(self): + return ["scancel"] + + @property + def enable_reservation_command(self): + raise NotImplementedError() + + @property + def get_queue_status_command(self): + return ["squeue", "--format", "'%A|%u|%t|%j'", "--noheader"] + + @staticmethod + def get_job_id_from_output(queue_submit_output): + return int(queue_submit_output.splitlines()[-1].rstrip().lstrip().split(';')[0]) + + @staticmethod + def get_queue_from_output(queue_submit_output): + return str(queue_submit_output.splitlines()[-1].rstrip().lstrip().split(';')[1]) + + @staticmethod + def convert_queue_status(queue_status_output): + qstat = queue_status_output.splitlines() + queue = qstat[0].split(':')[1].strip() + if len(qstat) <= 1: # first row contains cluster name, check if there are jobs + return None + + line_split_lst = [line.split('|') for line in qstat[1:]] + job_id_lst, user_lst, status_lst, job_name_lst, queue_lst = zip(*[(int(jobid), user, status.lower(), jobname, queue) + for jobid, user, status, jobname in line_split_lst]) + return pandas.DataFrame({'cluster': queue_lst, + 'jobid': job_id_lst, + 'user': user_lst, + 'jobname': job_name_lst, + 'status': status_lst})