From 8444f61806757beb54548ab4e1cd4d2c97378b99 Mon Sep 17 00:00:00 2001 From: Sander Borgmans Date: Wed, 28 Aug 2019 12:33:51 +0200 Subject: [PATCH 1/2] Adapted for hpc ugent, fixed some minor errors --- pysqa/basic.py | 3 +++ pysqa/modular.py | 14 +++++++--- pysqa/wrapper/gent.py | 59 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 73 insertions(+), 3 deletions(-) create mode 100644 pysqa/wrapper/gent.py diff --git a/pysqa/basic.py b/pysqa/basic.py index 6fbc3b84..dda4ffc3 100644 --- a/pysqa/basic.py +++ b/pysqa/basic.py @@ -58,6 +58,9 @@ def __init__(self, config, directory="~/.queues"): elif self._config["queue_type"] == "MOAB": class_name = "MoabCommands" module_name = "pysqa.wrapper.moab" + elif self._config["queue_type"] == "GENT": + class_name = "GentCommands" + module_name = "pysqa.wrapper.gent" else: raise ValueError() self._commands = getattr(importlib.import_module(module_name), class_name)() diff --git a/pysqa/modular.py b/pysqa/modular.py index ce34faa8..eb1f788d 100644 --- a/pysqa/modular.py +++ b/pysqa/modular.py @@ -48,12 +48,13 @@ def submit_job( run_time_max=run_time_max, command=command, ) - cluster_module = self._queue_to_cluster_dict["queue"] + cluster_module = self._queue_to_cluster_dict[queue] commands = ( ["module --quiet swap cluster/{};".format(cluster_module)] + self._commands.submit_job_command + [queue_script_path] ) + commands = " ".join(commands) out = self._execute_command( commands=commands, working_directory=working_directory, split_output=False ) @@ -83,6 +84,7 @@ def enable_reservation(self, process_id): + self._commands.enable_reservation_command + [str(cluster_queue_id)] ) + commands = " ".join(commands) out = self._execute_command(commands=commands, split_output=True) if out is not None: return out[0] @@ -107,6 +109,7 @@ def delete_job(self, process_id): + self._commands.delete_job_command + [str(cluster_queue_id)] ) + commands = " ".join(commands) out = self._execute_command(commands=commands, split_output=True) if out is not None: return out[0] @@ -127,8 +130,13 @@ def get_queue_status(self, user=None): cluster_commands = self._switch_cluster_command( cluster_module=cluster_module ) + commands = ( + cluster_commands + + self._commands.get_queue_status_command + ) + commands = " ".join(commands) out = self._execute_command( - commands=cluster_commands + self._commands.get_queue_status_command, + commands=commands, split_output=False, ) df = self._commands.convert_queue_status(queue_status_output=out) @@ -142,7 +150,7 @@ def get_queue_status(self, user=None): @staticmethod def _resolve_queue_id(process_id, cluster_dict): cluster_queue_id = int(process_id / 10) - cluster_module = cluster_dict[process_id - cluster_queue_id] + cluster_module = cluster_dict[process_id - cluster_queue_id*10] return cluster_module, cluster_queue_id @staticmethod diff --git a/pysqa/wrapper/gent.py b/pysqa/wrapper/gent.py new file mode 100644 index 00000000..7efe2f52 --- /dev/null +++ b/pysqa/wrapper/gent.py @@ -0,0 +1,59 @@ +# coding: utf-8 +# Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department +# Distributed under the terms of "New BSD License", see the LICENSE file. + +import pandas + + +__author__ = "Jan Janssen" +__copyright__ = ( + "Copyright 2019, Max-Planck-Institut für Eisenforschung GmbH - " + "Computational Materials Design (CM) Department" +) +__version__ = "1.0" +__maintainer__ = "Jan Janssen" +__email__ = "janssen@mpie.de" +__status__ = "development" +__date__ = "Feb 9, 2019" + + +class GentCommands(object): + @property + def submit_job_command(self): + return ["sbatch", "--parsable"] + + @property + def delete_job_command(self): + return ["scancel"] + + @property + def enable_reservation_command(self): + raise NotImplementedError() + + @property + def get_queue_status_command(self): + return ["squeue", "--format", "%A|%u|%t|%j", "--noheader"] + + @staticmethod + def get_job_id_from_output(queue_submit_output): + return int(queue_submit_output.splitlines()[-1].rstrip().lstrip().split(';')[0]) + + @staticmethod + def get_queue_from_output(queue_submit_output): + return str(queue_submit_output.splitlines()[-1].rstrip().lstrip().split(';')[1]) + + @staticmethod + def convert_queue_status(queue_status_output): + qstat = queue_status_output.splitlines() + queue = qstat[0].split(':')[1].strip() + if len(qstat) <= 1: # first row contains cluster name, check if there are jobs + return pandas.DataFrame(columns=['cluster', 'jobid', 'user', 'status', 'jobname']) + + line_split_lst = [line.split('|') for line in qstat[1:]] + job_id_lst, user_lst, status_lst, job_name_lst, queue_lst = zip(*[(int(jobid), user, status.lower(), jobname, queue) + for jobid, user, status, jobname in line_split_lst]) + return pandas.DataFrame({'cluster': queue_lst, + 'jobid': job_id_lst, + 'user': user_lst, + 'jobname': job_name_lst, + 'status': status_lst}) From 0e1e31dd44d0955ab1d6103ba72d5589090697bd Mon Sep 17 00:00:00 2001 From: Sander Borgmans Date: Wed, 28 Aug 2019 14:00:20 +0200 Subject: [PATCH 2/2] Issue with squeue format when parsed as list --- pysqa/modular.py | 8 +++----- pysqa/wrapper/gent.py | 12 ++++++------ 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/pysqa/modular.py b/pysqa/modular.py index eb1f788d..4c81c414 100644 --- a/pysqa/modular.py +++ b/pysqa/modular.py @@ -84,7 +84,7 @@ def enable_reservation(self, process_id): + self._commands.enable_reservation_command + [str(cluster_queue_id)] ) - commands = " ".join(commands) + #commands = " ".join(commands) out = self._execute_command(commands=commands, split_output=True) if out is not None: return out[0] @@ -127,9 +127,7 @@ def get_queue_status(self, user=None): """ df_lst = [] for cluster_module in self._config["cluster"]: - cluster_commands = self._switch_cluster_command( - cluster_module=cluster_module - ) + cluster_commands = self._switch_cluster_command(cluster_module=cluster_module) commands = ( cluster_commands + self._commands.get_queue_status_command @@ -140,7 +138,7 @@ def get_queue_status(self, user=None): split_output=False, ) df = self._commands.convert_queue_status(queue_status_output=out) - df_lst.append(df) + if not df is None: df_lst.append(df) # prevent empty clusters from throwing errors df = pandas.concat(df_lst, axis=1, sort=False).reset_index(drop=True) if user is None: return df diff --git a/pysqa/wrapper/gent.py b/pysqa/wrapper/gent.py index 7efe2f52..99548e1f 100644 --- a/pysqa/wrapper/gent.py +++ b/pysqa/wrapper/gent.py @@ -32,11 +32,11 @@ def enable_reservation_command(self): @property def get_queue_status_command(self): - return ["squeue", "--format", "%A|%u|%t|%j", "--noheader"] + return ["squeue", "--format", "'%A|%u|%t|%j'", "--noheader"] @staticmethod def get_job_id_from_output(queue_submit_output): - return int(queue_submit_output.splitlines()[-1].rstrip().lstrip().split(';')[0]) + return int(queue_submit_output.splitlines()[-1].rstrip().lstrip().split(';')[0]) @staticmethod def get_queue_from_output(queue_submit_output): @@ -45,11 +45,11 @@ def get_queue_from_output(queue_submit_output): @staticmethod def convert_queue_status(queue_status_output): qstat = queue_status_output.splitlines() - queue = qstat[0].split(':')[1].strip() + queue = qstat[0].split(':')[1].strip() if len(qstat) <= 1: # first row contains cluster name, check if there are jobs - return pandas.DataFrame(columns=['cluster', 'jobid', 'user', 'status', 'jobname']) - - line_split_lst = [line.split('|') for line in qstat[1:]] + return None + + line_split_lst = [line.split('|') for line in qstat[1:]] job_id_lst, user_lst, status_lst, job_name_lst, queue_lst = zip(*[(int(jobid), user, status.lower(), jobname, queue) for jobid, user, status, jobname in line_split_lst]) return pandas.DataFrame({'cluster': queue_lst,