Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions pysqa/wrapper/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department
# Distributed under the terms of "New BSD License", see the LICENSE file.

import pandas

from pysqa.wrapper.generic import SchedulerCommands

__author__ = "Jan Janssen"
Expand All @@ -19,12 +21,40 @@
class LsfCommands(SchedulerCommands):
@property
def submit_job_command(self):
return ["bsub", "-terse"]
return ["bsub"]

@property
def delete_job_command(self):
return ["bkill"]

@property
def get_queue_status_command(self):
return ["qstat", "-x"]
return ["bjobs"]

@staticmethod
def get_job_id_from_output(queue_submit_output):
return int(queue_submit_output.split("<")[1].split(">")[0])

@staticmethod
def convert_queue_status(queue_status_output):
job_id_lst, user_lst, status_lst, job_name_lst = [], [], [], []
line_split_lst = queue_status_output.split("\n")
if len(line_split_lst) > 1:
for l in line_split_lst[1:]:
line_segments = l.split()
if len(line_segments) > 1:
job_id_lst.append(int(line_segments[0]))
user_lst.append(line_segments[1])
status_lst.append(line_segments[2])
job_name_lst.append(line_segments[6])
df = pandas.DataFrame(
{
"jobid": job_id_lst,
"user": user_lst,
"jobname": job_name_lst,
"status": status_lst,
}
)
df.loc[df.status == "RUN", "status"] = "running"
df.loc[df.status == "PEND", "status"] = "pending"
return df
7 changes: 7 additions & 0 deletions tests/config/lsf/bjobs_output
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
JOBID USER STAT QUEUE FROM_HOST EXEC_HOST JOB_NAME SUBMIT_TIME
5136563 testuse RUN pbatch tester709 1*launch_ho pi_None Aug 22 12:28
40*batch_hosts
5136570 testuse RUN pbatch tester709 1*launch_ho pi_None Aug 22 12:30
40*batch_hosts
5136571 testuse RUN pbatch tester709 1*launch_ho pi_None Aug 22 12:31
40*batch_hosts
26 changes: 23 additions & 3 deletions tests/test_lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@

import os
import unittest

import pandas

from pysqa import QueueAdapter

__author__ = "Jan Janssen"
Expand Down Expand Up @@ -42,18 +45,18 @@ def test_job_submission_template(self):

def test_interfaces(self):
self.assertEqual(
self.lsf._adapter._commands.submit_job_command, ["bsub", "-terse"]
self.lsf._adapter._commands.submit_job_command, ["bsub"]
)
self.assertEqual(self.lsf._adapter._commands.delete_job_command, ["bkill"])
self.assertEqual(
self.lsf._adapter._commands.get_queue_status_command, ["qstat", "-x"]
self.lsf._adapter._commands.get_queue_status_command, ["bjobs"]
)

def test__list_command_to_be_executed(self):
with self.subTest("lsf"):
self.assertEqual(
self.lsf._adapter._list_command_to_be_executed(None, "here"),
["bsub", "-terse", "here"],
["bsub", "here"],
)
with self.subTest("lsf with dependency"):
self.assertRaises(
Expand All @@ -62,3 +65,20 @@ def test__list_command_to_be_executed(self):
[],
"here",
)

def test_convert_queue_status_sge(self):
with open(os.path.join(self.path, "config/lsf", "bjobs_output"), "r") as f:
content = f.read()
df = pandas.DataFrame({
"jobid": [5136563, 5136570, 5136571],
"user": ["testuse"] * 3,
"jobname": ["pi_None"] * 3,
"status": ["running"] * 3
})
self.assertTrue(
df.equals(
self.lsf._adapter._commands.convert_queue_status(
queue_status_output=content
)
)
)