diff --git a/pysqa/wrapper/lsf.py b/pysqa/wrapper/lsf.py index d99db0d0..0a22a630 100644 --- a/pysqa/wrapper/lsf.py +++ b/pysqa/wrapper/lsf.py @@ -2,6 +2,8 @@ # Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department # Distributed under the terms of "New BSD License", see the LICENSE file. +import pandas + from pysqa.wrapper.generic import SchedulerCommands __author__ = "Jan Janssen" @@ -19,7 +21,7 @@ class LsfCommands(SchedulerCommands): @property def submit_job_command(self): - return ["bsub", "-terse"] + return ["bsub"] @property def delete_job_command(self): @@ -27,4 +29,32 @@ def delete_job_command(self): @property def get_queue_status_command(self): - return ["qstat", "-x"] + return ["bjobs"] + + @staticmethod + def get_job_id_from_output(queue_submit_output): + return int(queue_submit_output.split("<")[1].split(">")[0]) + + @staticmethod + def convert_queue_status(queue_status_output): + job_id_lst, user_lst, status_lst, job_name_lst = [], [], [], [] + line_split_lst = queue_status_output.split("\n") + if len(line_split_lst) > 1: + for l in line_split_lst[1:]: + line_segments = l.split() + if len(line_segments) > 1: + job_id_lst.append(int(line_segments[0])) + user_lst.append(line_segments[1]) + status_lst.append(line_segments[2]) + job_name_lst.append(line_segments[6]) + df = pandas.DataFrame( + { + "jobid": job_id_lst, + "user": user_lst, + "jobname": job_name_lst, + "status": status_lst, + } + ) + df.loc[df.status == "RUN", "status"] = "running" + df.loc[df.status == "PEND", "status"] = "pending" + return df diff --git a/tests/config/lsf/bjobs_output b/tests/config/lsf/bjobs_output new file mode 100644 index 00000000..cb585da7 --- /dev/null +++ b/tests/config/lsf/bjobs_output @@ -0,0 +1,7 @@ +JOBID USER STAT QUEUE FROM_HOST EXEC_HOST JOB_NAME SUBMIT_TIME +5136563 testuse RUN pbatch tester709 1*launch_ho pi_None Aug 22 12:28 + 40*batch_hosts +5136570 testuse RUN pbatch tester709 1*launch_ho pi_None Aug 22 12:30 + 40*batch_hosts +5136571 testuse RUN pbatch tester709 1*launch_ho pi_None Aug 22 12:31 + 40*batch_hosts \ No newline at end of file diff --git a/tests/test_lsf.py b/tests/test_lsf.py index f93ce86a..5643e958 100644 --- a/tests/test_lsf.py +++ b/tests/test_lsf.py @@ -3,6 +3,9 @@ import os import unittest + +import pandas + from pysqa import QueueAdapter __author__ = "Jan Janssen" @@ -42,18 +45,18 @@ def test_job_submission_template(self): def test_interfaces(self): self.assertEqual( - self.lsf._adapter._commands.submit_job_command, ["bsub", "-terse"] + self.lsf._adapter._commands.submit_job_command, ["bsub"] ) self.assertEqual(self.lsf._adapter._commands.delete_job_command, ["bkill"]) self.assertEqual( - self.lsf._adapter._commands.get_queue_status_command, ["qstat", "-x"] + self.lsf._adapter._commands.get_queue_status_command, ["bjobs"] ) def test__list_command_to_be_executed(self): with self.subTest("lsf"): self.assertEqual( self.lsf._adapter._list_command_to_be_executed(None, "here"), - ["bsub", "-terse", "here"], + ["bsub", "here"], ) with self.subTest("lsf with dependency"): self.assertRaises( @@ -62,3 +65,20 @@ def test__list_command_to_be_executed(self): [], "here", ) + + def test_convert_queue_status_sge(self): + with open(os.path.join(self.path, "config/lsf", "bjobs_output"), "r") as f: + content = f.read() + df = pandas.DataFrame({ + "jobid": [5136563, 5136570, 5136571], + "user": ["testuse"] * 3, + "jobname": ["pi_None"] * 3, + "status": ["running"] * 3 + }) + self.assertTrue( + df.equals( + self.lsf._adapter._commands.convert_queue_status( + queue_status_output=content + ) + ) + )