diff --git a/tests/test_basic.py b/tests/test_basic.py new file mode 100644 index 00000000..691cb36a --- /dev/null +++ b/tests/test_basic.py @@ -0,0 +1,70 @@ +# coding: utf-8 +# Copyright (c) Jan Janssen + +import os +import unittest +from pysqa import QueueAdapter +from pysqa.utils.basic import BasisQueueAdapter + +__author__ = "Jan Janssen" +__copyright__ = "Copyright 2019, Jan Janssen" +__version__ = "0.0.1" +__maintainer__ = "Jan Janssen" +__email__ = "janssen@mpie.de" +__status__ = "production" +__date__ = "Feb 9, 2019" + + +class TestRunmode(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.path = os.path.dirname(os.path.abspath(__file__)) + + def test_missing_config(self): + self.assertRaises( + ValueError, QueueAdapter, directory=os.path.join(self.path, "config/error") + ) + + def test_memory_string_comparison(self): + self.assertEqual(BasisQueueAdapter._value_in_range(1023, value_min="1K"), "1K") + self.assertEqual(BasisQueueAdapter._value_in_range(1035, value_min="1K"), 1035) + self.assertEqual(BasisQueueAdapter._value_in_range(1035, value_max="1K"), "1K") + self.assertEqual( + BasisQueueAdapter._value_in_range("1035", value_min="1K"), "1035" + ) + self.assertEqual( + BasisQueueAdapter._value_in_range( + "60000M", value_min="1K", value_max="50G" + ), + "50G", + ) + self.assertEqual( + BasisQueueAdapter._value_in_range("60000", value_min="1K", value_max="50G"), + "50G", + ) + self.assertEqual( + BasisQueueAdapter._value_in_range( + "60000M", value_min="1K", value_max="70G" + ), + "60000M", + ) + self.assertEqual( + BasisQueueAdapter._value_in_range(60000, value_min="1K", value_max="70G"), + 60000, + ) + self.assertEqual( + BasisQueueAdapter._value_in_range( + 90000 * 1024 ** 2, value_min="1K", value_max="70G" + ), + "70G", + ) + self.assertEqual( + BasisQueueAdapter._value_in_range("90000", value_min="1K", value_max="70G"), + "70G", + ) + self.assertEqual( + BasisQueueAdapter._value_in_range( + "60000M", value_min="60G", value_max="70G" + ), + "60G", + ) diff --git a/tests/test_gent.py b/tests/test_gent.py new file mode 100644 index 00000000..d6403cca --- /dev/null +++ b/tests/test_gent.py @@ -0,0 +1,51 @@ +# coding: utf-8 +# Copyright (c) Jan Janssen + +import os +import unittest +from pysqa import QueueAdapter + +__author__ = "Jan Janssen" +__copyright__ = "Copyright 2019, Jan Janssen" +__version__ = "0.0.1" +__maintainer__ = "Jan Janssen" +__email__ = "janssen@mpie.de" +__status__ = "production" +__date__ = "Feb 9, 2019" + + +class TestRunmode(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.path = os.path.dirname(os.path.abspath(__file__)) + cls.gent = QueueAdapter(directory=os.path.join(cls.path, "config/gent")) + + def test_config(self): + self.assertEqual(self.gent.config["queue_type"], "GENT") + self.assertEqual(self.gent.config["queue_primary"], "slurm") + + def test_list_clusters(self): + self.assertEqual(self.gent.list_clusters(), ['default']) + + def test_ssh_delete_file_on_remote(self): + self.assertEqual(self.gent.ssh_delete_file_on_remote, True) + + def test_interfaces(self): + self.assertEqual( + self.gent._adapter._commands.get_queue_status_command, + ["squeue", "--format", "%A|%u|%t|%.15j|%Z", "--noheader"], + ) + + def test__list_command_to_be_executed(self): + with self.subTest("gent"): + self.assertEqual( + self.gent._adapter._list_command_to_be_executed(None, "here"), + ["sbatch", "--parsable", "here"], + ) + with self.subTest("gent with dependency"): + self.assertRaises( + NotImplementedError, + self.gent._adapter._list_command_to_be_executed, + [], + "here", + ) diff --git a/tests/test_lsf.py b/tests/test_lsf.py new file mode 100644 index 00000000..5a7e88fa --- /dev/null +++ b/tests/test_lsf.py @@ -0,0 +1,64 @@ +# coding: utf-8 +# Copyright (c) Jan Janssen + +import os +import unittest +from pysqa import QueueAdapter + +__author__ = "Jan Janssen" +__copyright__ = "Copyright 2019, Jan Janssen" +__version__ = "0.0.1" +__maintainer__ = "Jan Janssen" +__email__ = "janssen@mpie.de" +__status__ = "production" +__date__ = "Feb 9, 2019" + + +class TestRunmode(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.path = os.path.dirname(os.path.abspath(__file__)) + cls.lsf = QueueAdapter(directory=os.path.join(cls.path, "config/lsf")) + + def test_config(self): + self.assertEqual(self.lsf.config["queue_type"], "LSF") + self.assertEqual(self.lsf.config["queue_primary"], "lsf") + + def test_list_clusters(self): + self.assertEqual(self.lsf.list_clusters(), ['default']) + + def test_ssh_delete_file_on_remote(self): + self.assertEqual(self.lsf.ssh_delete_file_on_remote, True) + + def test_job_submission_template(self): + template = ( + "#!/bin/bash\n#BSUB -q queue\n#BSUB -J job.py\n#BSUB -o time.out\n#BSUB -n 10\n#BSUB -cwd .\n" + "#BSUB -e error.out\n#BSUB -W 259200\n\npython test.py" + ) + self.assertEqual( + self.lsf._adapter._job_submission_template(command="python test.py"), + template, + ) + + def test_interfaces(self): + self.assertEqual( + self.lsf._adapter._commands.submit_job_command, ["bsub", "-terse"] + ) + self.assertEqual(self.lsf._adapter._commands.delete_job_command, ["bkill"]) + self.assertEqual( + self.lsf._adapter._commands.get_queue_status_command, ["qstat", "-x"] + ) + + def test__list_command_to_be_executed(self): + with self.subTest("lsf"): + self.assertEqual( + self.lsf._adapter._list_command_to_be_executed(None, "here"), + ["bsub", "-terse", "here"], + ) + with self.subTest("lsf with dependency"): + self.assertRaises( + NotImplementedError, + self.lsf._adapter._list_command_to_be_executed, + [], + "here", + ) diff --git a/tests/test_moab.py b/tests/test_moab.py new file mode 100644 index 00000000..60f74105 --- /dev/null +++ b/tests/test_moab.py @@ -0,0 +1,54 @@ +# coding: utf-8 +# Copyright (c) Jan Janssen + +import os +import unittest +from pysqa import QueueAdapter + +__author__ = "Jan Janssen" +__copyright__ = "Copyright 2019, Jan Janssen" +__version__ = "0.0.1" +__maintainer__ = "Jan Janssen" +__email__ = "janssen@mpie.de" +__status__ = "production" +__date__ = "Feb 9, 2019" + + +class TestRunmode(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.path = os.path.dirname(os.path.abspath(__file__)) + cls.moab = QueueAdapter(directory=os.path.join(cls.path, "config/moab")) + + def test_config(self): + self.assertEqual(self.moab.config["queue_type"], "MOAB") + self.assertEqual(self.moab.config["queue_primary"], "moab") + + def test_list_clusters(self): + self.assertEqual(self.moab.list_clusters(), ['default']) + + def test_ssh_delete_file_on_remote(self): + self.assertEqual(self.moab.ssh_delete_file_on_remote, True) + + def test_interfaces(self): + self.assertEqual(self.moab._adapter._commands.submit_job_command, ["msub"]) + self.assertEqual( + self.moab._adapter._commands.delete_job_command, ["mjobctl", "-c"] + ) + self.assertEqual( + self.moab._adapter._commands.get_queue_status_command, ["mdiag", "-x"] + ) + + def test__list_command_to_be_executed(self): + with self.subTest("moab with dependency"): + self.assertRaises( + NotImplementedError, + self.moab._adapter._list_command_to_be_executed, + [], + "here", + ) + with self.subTest("moab"): + self.assertEqual( + self.moab._adapter._list_command_to_be_executed(None, "here"), + ["msub", "here"], + ) diff --git a/tests/test_multi.py b/tests/test_multi.py new file mode 100644 index 00000000..24880962 --- /dev/null +++ b/tests/test_multi.py @@ -0,0 +1,28 @@ +# coding: utf-8 +# Copyright (c) Jan Janssen + +import os +import unittest +from pysqa import QueueAdapter + +__author__ = "Jan Janssen" +__copyright__ = "Copyright 2019, Jan Janssen" +__version__ = "0.0.1" +__maintainer__ = "Jan Janssen" +__email__ = "janssen@mpie.de" +__status__ = "production" +__date__ = "Feb 9, 2019" + + +class TestRunmode(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.path = os.path.dirname(os.path.abspath(__file__)) + cls.multi = QueueAdapter(directory=os.path.join(cls.path, "config/multicluster")) + + def test_config(self): + self.assertEqual(self.multi.config["queue_type"], "SLURM") + self.assertEqual(self.multi.config["queue_primary"], "slurm") + + def test_list_clusters(self): + self.assertEqual(self.multi.list_clusters(), ['local_slurm', 'remote_slurm']) diff --git a/tests/test_queueadapter.py b/tests/test_queueadapter.py deleted file mode 100644 index fcb99c13..00000000 --- a/tests/test_queueadapter.py +++ /dev/null @@ -1,482 +0,0 @@ -# coding: utf-8 -# Copyright (c) Jan Janssen - -import os -import pandas -import unittest -import getpass -from pysqa import QueueAdapter -from pysqa.utils.basic import BasisQueueAdapter - -__author__ = "Jan Janssen" -__copyright__ = "Copyright 2019, Jan Janssen" -__version__ = "0.0.1" -__maintainer__ = "Jan Janssen" -__email__ = "janssen@mpie.de" -__status__ = "production" -__date__ = "Feb 9, 2019" - - -class TestRunmode(unittest.TestCase): - @classmethod - def setUpClass(cls): - cls.path = os.path.dirname(os.path.abspath(__file__)) - cls.torque = QueueAdapter(directory=os.path.join(cls.path, "config/torque")) - cls.slurm = QueueAdapter(directory=os.path.join(cls.path, "config/slurm")) - cls.lsf = QueueAdapter(directory=os.path.join(cls.path, "config/lsf")) - cls.sge = QueueAdapter(directory=os.path.join(cls.path, "config/sge")) - cls.moab = QueueAdapter(directory=os.path.join(cls.path, "config/moab")) - cls.gent = QueueAdapter(directory=os.path.join(cls.path, "config/gent")) - cls.remote = QueueAdapter(directory=os.path.join(cls.path, "config/remote")) - cls.multi = QueueAdapter(directory=os.path.join(cls.path, "config/multicluster")) - - def test_missing_config(self): - self.assertRaises( - ValueError, QueueAdapter, directory=os.path.join(self.path, "config/error") - ) - - def test_config(self): - self.assertEqual(self.torque.config["queue_type"], "TORQUE") - self.assertEqual(self.slurm.config["queue_type"], "SLURM") - self.assertEqual(self.lsf.config["queue_type"], "LSF") - self.assertEqual(self.sge.config["queue_type"], "SGE") - self.assertEqual(self.moab.config["queue_type"], "MOAB") - self.assertEqual(self.gent.config["queue_type"], "GENT") - self.assertEqual(self.remote.config["queue_type"], "REMOTE") - self.assertEqual(self.multi.config["queue_type"], "SLURM") - self.assertEqual(self.torque.config["queue_primary"], "torque") - self.assertEqual(self.slurm.config["queue_primary"], "slurm") - self.assertEqual(self.lsf.config["queue_primary"], "lsf") - self.assertEqual(self.sge.config["queue_primary"], "impi_hydra_small") - self.assertEqual(self.moab.config["queue_primary"], "moab") - self.assertEqual(self.gent.config["queue_primary"], "slurm") - self.assertEqual(self.remote.config["queue_primary"], "remote") - self.assertEqual(self.multi.config["queue_primary"], "slurm") - - def test_list_clusters(self): - self.assertEqual(self.torque.list_clusters(), ['default']) - self.assertEqual(self.slurm.list_clusters(), ['default']) - self.assertEqual(self.lsf.list_clusters(), ['default']) - self.assertEqual(self.moab.list_clusters(), ['default']) - self.assertEqual(self.torque.list_clusters(), ['default']) - self.assertEqual(self.gent.list_clusters(), ['default']) - self.assertEqual(self.remote.list_clusters(), ['default']) - self.assertEqual(self.multi.list_clusters(), ['local_slurm', 'remote_slurm']) - - def test_ssh_delete_file_on_remote(self): - self.assertEqual(self.torque.ssh_delete_file_on_remote, True) - self.assertEqual(self.slurm.ssh_delete_file_on_remote, True) - self.assertEqual(self.lsf.ssh_delete_file_on_remote, True) - self.assertEqual(self.moab.ssh_delete_file_on_remote, True) - self.assertEqual(self.torque.ssh_delete_file_on_remote, True) - self.assertEqual(self.gent.ssh_delete_file_on_remote, True) - self.assertEqual(self.remote.ssh_delete_file_on_remote, False) - - def test_value_in_range(self): - self.assertEqual( - None, - self.sge._adapter._value_in_range( - value=None, value_min=None, value_max=None - ), - ) - self.assertEqual( - 1, - self.sge._adapter._value_in_range(value=None, value_min=1, value_max=None), - ) - self.assertEqual( - 1, - self.sge._adapter._value_in_range(value=None, value_min=None, value_max=1), - ) - self.assertEqual( - 1, - self.sge._adapter._value_in_range(value=1, value_min=None, value_max=None), - ) - self.assertEqual( - 1, self.sge._adapter._value_in_range(value=0, value_min=1, value_max=None) - ) - self.assertEqual( - 1, self.sge._adapter._value_in_range(value=2, value_min=None, value_max=1) - ) - self.assertEqual( - 1, self.sge._adapter._value_in_range(value=1, value_min=0, value_max=2) - ) - - def test_job_submission_template(self): - self.assertRaises( - ValueError, self.sge._adapter._job_submission_template, command=None - ) - self.assertRaises( - TypeError, self.sge._adapter._job_submission_template, command=1 - ) - template = ( - "#!/bin/bash\n#$ -N job.py\n#$ -wd .\n#$ -pe impi_hydra_small 1\n#$ -l h_rt=604800\n" - "#$ -o time.out\n#$ -e error.out\n\npython test.py" - ) - self.assertEqual( - self.sge._adapter._job_submission_template(command="python test.py"), - template, - ) - template = ( - "#!/bin/bash\n#BSUB -q queue\n#BSUB -J job.py\n#BSUB -o time.out\n#BSUB -n 10\n#BSUB -cwd .\n" - "#BSUB -e error.out\n#BSUB -W 259200\n\npython test.py" - ) - self.assertEqual( - self.lsf._adapter._job_submission_template(command="python test.py"), - template, - ) - self.assertRaises( - ValueError, - self.sge._adapter._job_submission_template, - command="python test.py", - queue="notavailable", - ) - - def test_interfaces(self): - self.assertEqual( - self.sge._adapter._commands.submit_job_command, ["qsub", "-terse"] - ) - self.assertEqual(self.sge._adapter._commands.delete_job_command, ["qdel"]) - self.assertEqual( - self.sge._adapter._commands.enable_reservation_command, - ["qalter", "-R", "y"], - ) - self.assertEqual( - self.sge._adapter._commands.get_queue_status_command, ["qstat", "-xml"] - ) - self.assertEqual( - self.torque._adapter._commands.submit_job_command, ["qsub"] - ) - self.assertEqual(self.torque._adapter._commands.delete_job_command, ["qdel"]) - self.assertEqual( - self.torque._adapter._commands.get_queue_status_command, ["qstat", "-f"] - ) - self.assertEqual( - self.lsf._adapter._commands.submit_job_command, ["bsub", "-terse"] - ) - self.assertEqual(self.lsf._adapter._commands.delete_job_command, ["bkill"]) - self.assertEqual( - self.lsf._adapter._commands.get_queue_status_command, ["qstat", "-x"] - ) - self.assertEqual( - self.slurm._adapter._commands.submit_job_command, ["sbatch", "--parsable"] - ) - self.assertEqual(self.slurm._adapter._commands.delete_job_command, ["scancel"]) - self.assertEqual( - self.slurm._adapter._commands.get_queue_status_command, - ["squeue", "--format", "%A|%u|%t|%.15j|%Z", "--noheader"], - ) - self.assertEqual(self.moab._adapter._commands.submit_job_command, ["msub"]) - self.assertEqual( - self.moab._adapter._commands.delete_job_command, ["mjobctl", "-c"] - ) - self.assertEqual( - self.moab._adapter._commands.get_queue_status_command, ["mdiag", "-x"] - ) - self.assertEqual( - self.gent._adapter._commands.get_queue_status_command, - ["squeue", "--format", "%A|%u|%t|%.15j|%Z", "--noheader"], - ) - - def test__list_command_to_be_executed(self): - with self.subTest("slurm"): - self.assertEqual( - self.slurm._adapter._list_command_to_be_executed(None, "here"), - ["sbatch", "--parsable", "here"], - ) - with self.subTest("slurm with one dependency"): - self.assertEqual( - self.slurm._adapter._list_command_to_be_executed(["2"], "here"), - ["sbatch", "--parsable", "--dependency=afterok:2", "here"], - ) - with self.subTest("slurm with two dependencies"): - self.assertEqual( - self.slurm._adapter._list_command_to_be_executed(["2", "34"], "here"), - ["sbatch", "--parsable", "--dependency=afterok:2,34", "here"], - ) - with self.subTest("torque"): - self.assertEqual( - self.torque._adapter._list_command_to_be_executed(None, "here"), - ["qsub", "here"], - ) - with self.subTest("torque with dependency"): - self.assertRaises( - NotImplementedError, - self.torque._adapter._list_command_to_be_executed, - [], - "here", - ) - with self.subTest("moab with dependency"): - self.assertRaises( - NotImplementedError, - self.moab._adapter._list_command_to_be_executed, - [], - "here", - ) - with self.subTest("moab"): - self.assertEqual( - self.moab._adapter._list_command_to_be_executed(None, "here"), - ["msub", "here"], - ) - with self.subTest("gent"): - self.assertEqual( - self.gent._adapter._list_command_to_be_executed(None, "here"), - ["sbatch", "--parsable", "here"], - ) - with self.subTest("gent with dependency"): - self.assertRaises( - NotImplementedError, - self.gent._adapter._list_command_to_be_executed, - [], - "here", - ) - with self.subTest("sge"): - self.assertEqual( - self.sge._adapter._list_command_to_be_executed(None, "here"), - ["qsub", "-terse", "here"], - ) - with self.subTest("sge with dependency"): - self.assertRaises( - NotImplementedError, - self.sge._adapter._list_command_to_be_executed, - [], - "here", - ) - with self.subTest("lsf"): - self.assertEqual( - self.lsf._adapter._list_command_to_be_executed(None, "here"), - ["bsub", "-terse", "here"], - ) - with self.subTest("lsf with dependency"): - self.assertRaises( - NotImplementedError, - self.lsf._adapter._list_command_to_be_executed, - [], - "here", - ) - - def test_convert_queue_status_slurm(self): - with open(os.path.join(self.path, "config/slurm", "squeue_output"), "r") as f: - content = f.read() - df_verify = pandas.DataFrame( - { - "jobid": [5322019, 5322016, 5322017, 5322018, 5322013], - "user": ["janj", "janj", "janj", "janj", "janj"], - "jobname": ["pi_19576488", "pi_19576485", "pi_19576486", "pi_19576487", "pi_19576482"], - "status": ["running", "running", "running", "running", "running"], - "working_directory": [ - "/cmmc/u/janj/pyiron/projects/2023/2023-04-19-dft-test/job_1", - "/cmmc/u/janj/pyiron/projects/2023/2023-04-19-dft-test/job_2", - "/cmmc/u/janj/pyiron/projects/2023/2023-04-19-dft-test/job_3", - "/cmmc/u/janj/pyiron/projects/2023/2023-04-19-dft-test/job_4", - "/cmmc/u/janj/pyiron/projects/2023/2023-04-19-dft-test/job_5", - ] - } - ) - self.assertTrue( - df_verify.equals( - self.slurm._adapter._commands.convert_queue_status( - queue_status_output=content - ) - ) - ) - - def test_convert_queue_status_sge(self): - with open(os.path.join(self.path, "config/sge", "qstat.xml"), "r") as f: - content = f.read() - df_running = pandas.DataFrame( - { - "jobid": ["2836045"], - "user": ["friko"], - "jobname": ["vasp.5.3.5"], - "status": ["running"], - } - ) - df_pending = pandas.DataFrame( - { - "jobid": ["2836046", "2967274"], - "user": ["friko", "janj"], - "jobname": ["vasp.5.3.5", "hello.py"], - "status": ["pending", "error"], - } - ) - df_merge = pandas.concat([df_running, df_pending], sort=True) - df = pandas.DataFrame( - { - "jobid": pandas.to_numeric(df_merge.jobid), - "user": df_merge.user, - "jobname": df_merge.jobname, - "status": df_merge.status, - "working_directory": [""] * len(df_merge), - } - ) - self.assertTrue( - df.equals( - self.sge._adapter._commands.convert_queue_status( - queue_status_output=content - ) - ) - ) - - def test_convert_queue_status_torque(self): - with open(os.path.join(self.path, "config/torque", "PBSPro_qsub_output"), "r") as f: - content = f.read() - df_verify = pandas.DataFrame( - { - "jobid": [80005196, 80005197, 80005198], - "user": ["asd562", "asd562", "fgh562"], - "jobname": ["test1", "test2", "test_asdfasdfasdfasdfasdfasdfasdfasdfasdfasdf"], - "status": ["running", "pending", "pending"], - "working_directory": ["/scratch/a01/asd562/VASP/test/test1",\ - "/scratch/a01/asd562/VASP/test/test2",\ - "/scratch/a01/fgh562/VASP/test/test_asdfasdfasdfasdfasdfasdfasdfasdfasdfasdf"] - } - ) - self.assertTrue( - df_verify.equals( - self.torque._adapter._commands.convert_queue_status( - queue_status_output=content - ) - ) - ) - - def test_queue_list(self): - self.assertEqual( - sorted(self.sge.queue_list), - ["impi_hy", "impi_hydra", "impi_hydra_cmfe", "impi_hydra_small"], - ) - - def test_queues(self): - self.assertEqual(self.sge.queues.impi_hydra, "impi_hydra") - self.assertEqual( - sorted(dir(self.sge.queues)), - ["impi_hy", "impi_hydra", "impi_hydra_cmfe", "impi_hydra_small"], - ) - with self.assertRaises(AttributeError): - _ = self.sge.queues.notavailable - - def test_get_user(self): - self.assertEqual(self.sge._adapter._get_user(), getpass.getuser()) - - def test_check_queue_parameters(self): - self.assertEqual( - (1, 604800, None), self.sge.check_queue_parameters(queue="impi_hydra_small") - ) - - def test_queue_view(self): - self.assertIsInstance(self.slurm.queue_view, pandas.DataFrame) - - def test_submit_job_remote(self): - with self.assertRaises(NotImplementedError): - self.remote.submit_job(queue="remote", dependency_list=[]) - - def test_submit_job_empty_working_directory(self): - with self.assertRaises(ValueError): - self.slurm.submit_job(working_directory=" ") - - def test_memory_string_comparison(self): - self.assertEqual(BasisQueueAdapter._value_in_range(1023, value_min="1K"), "1K") - self.assertEqual(BasisQueueAdapter._value_in_range(1035, value_min="1K"), 1035) - self.assertEqual(BasisQueueAdapter._value_in_range(1035, value_max="1K"), "1K") - self.assertEqual( - BasisQueueAdapter._value_in_range("1035", value_min="1K"), "1035" - ) - self.assertEqual( - BasisQueueAdapter._value_in_range( - "60000M", value_min="1K", value_max="50G" - ), - "50G", - ) - self.assertEqual( - BasisQueueAdapter._value_in_range("60000", value_min="1K", value_max="50G"), - "50G", - ) - self.assertEqual( - BasisQueueAdapter._value_in_range( - "60000M", value_min="1K", value_max="70G" - ), - "60000M", - ) - self.assertEqual( - BasisQueueAdapter._value_in_range(60000, value_min="1K", value_max="70G"), - 60000, - ) - self.assertEqual( - BasisQueueAdapter._value_in_range( - 90000 * 1024 ** 2, value_min="1K", value_max="70G" - ), - "70G", - ) - self.assertEqual( - BasisQueueAdapter._value_in_range("90000", value_min="1K", value_max="70G"), - "70G", - ) - self.assertEqual( - BasisQueueAdapter._value_in_range( - "60000M", value_min="60G", value_max="70G" - ), - "60G", - ) - - def test_write_queue(self): - with self.assertRaises(ValueError): - self.slurm._adapter._write_queue_script( - queue=None, - job_name=None, - working_directory=None, - cores=None, - memory_max=None, - run_time_max=None, - command=None - ) - self.slurm._adapter._write_queue_script( - queue="slurm", - job_name=None, - working_directory=None, - cores=None, - memory_max=None, - run_time_max=None, - command="echo \"hello\"" - ) - with open("run_queue.sh", "r") as f: - content = f.read() - output = """\ -#!/bin/bash -#SBATCH --output=time.out -#SBATCH --job-name=None -#SBATCH --chdir=. -#SBATCH --get-user-env=L -#SBATCH --partition=slurm -#SBATCH --time=4320 -#SBATCH --cpus-per-task=10 - -echo \"hello\"""" - self.assertEqual(content, output) - os.remove("run_queue.sh") - - def test_write_queue_extra_keywords(self): - self.slurm._adapter._write_queue_script( - queue="slurm_extra", - job_name=None, - working_directory=None, - cores=None, - memory_max=None, - run_time_max=None, - command="echo \"hello\"", - account="123456" - ) - with open("run_queue.sh", "r") as f: - content = f.read() - output = """\ -#!/bin/bash -#SBATCH --output=time.out -#SBATCH --job-name=None -#SBATCH --chdir=. -#SBATCH --get-user-env=L -#SBATCH --partition=slurm -#SBATCH --account=123456 -#SBATCH --time=4320 -#SBATCH --cpus-per-task=10 - -echo \"hello\"""" - self.assertEqual(content, output) - os.remove("run_queue.sh") \ No newline at end of file diff --git a/tests/test_remote.py b/tests/test_remote.py new file mode 100644 index 00000000..855b7a2e --- /dev/null +++ b/tests/test_remote.py @@ -0,0 +1,35 @@ +# coding: utf-8 +# Copyright (c) Jan Janssen + +import os +import unittest +from pysqa import QueueAdapter + +__author__ = "Jan Janssen" +__copyright__ = "Copyright 2019, Jan Janssen" +__version__ = "0.0.1" +__maintainer__ = "Jan Janssen" +__email__ = "janssen@mpie.de" +__status__ = "production" +__date__ = "Feb 9, 2019" + + +class TestRunmode(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.path = os.path.dirname(os.path.abspath(__file__)) + cls.remote = QueueAdapter(directory=os.path.join(cls.path, "config/remote")) + + def test_config(self): + self.assertEqual(self.remote.config["queue_type"], "REMOTE") + self.assertEqual(self.remote.config["queue_primary"], "remote") + + def test_list_clusters(self): + self.assertEqual(self.remote.list_clusters(), ['default']) + + def test_ssh_delete_file_on_remote(self): + self.assertEqual(self.remote.ssh_delete_file_on_remote, False) + + def test_submit_job_remote(self): + with self.assertRaises(NotImplementedError): + self.remote.submit_job(queue="remote", dependency_list=[]) diff --git a/tests/test_sge.py b/tests/test_sge.py new file mode 100644 index 00000000..02aab1d7 --- /dev/null +++ b/tests/test_sge.py @@ -0,0 +1,171 @@ +# coding: utf-8 +# Copyright (c) Jan Janssen + +import os +import pandas +import unittest +import getpass +from pysqa import QueueAdapter + +__author__ = "Jan Janssen" +__copyright__ = "Copyright 2019, Jan Janssen" +__version__ = "0.0.1" +__maintainer__ = "Jan Janssen" +__email__ = "janssen@mpie.de" +__status__ = "production" +__date__ = "Feb 9, 2019" + + +class TestRunmode(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.path = os.path.dirname(os.path.abspath(__file__)) + cls.sge = QueueAdapter(directory=os.path.join(cls.path, "config/sge")) + + def test_config(self): + self.assertEqual(self.sge.config["queue_type"], "SGE") + self.assertEqual(self.sge.config["queue_primary"], "impi_hydra_small") + + def test_list_clusters(self): + self.assertEqual(self.sge.list_clusters(), ['default']) + + def test_ssh_delete_file_on_remote(self): + self.assertEqual(self.sge.ssh_delete_file_on_remote, True) + + def test_value_in_range(self): + self.assertEqual( + None, + self.sge._adapter._value_in_range( + value=None, value_min=None, value_max=None + ), + ) + self.assertEqual( + 1, + self.sge._adapter._value_in_range(value=None, value_min=1, value_max=None), + ) + self.assertEqual( + 1, + self.sge._adapter._value_in_range(value=None, value_min=None, value_max=1), + ) + self.assertEqual( + 1, + self.sge._adapter._value_in_range(value=1, value_min=None, value_max=None), + ) + self.assertEqual( + 1, self.sge._adapter._value_in_range(value=0, value_min=1, value_max=None) + ) + self.assertEqual( + 1, self.sge._adapter._value_in_range(value=2, value_min=None, value_max=1) + ) + self.assertEqual( + 1, self.sge._adapter._value_in_range(value=1, value_min=0, value_max=2) + ) + + def test_job_submission_template(self): + self.assertRaises( + ValueError, self.sge._adapter._job_submission_template, command=None + ) + self.assertRaises( + TypeError, self.sge._adapter._job_submission_template, command=1 + ) + template = ( + "#!/bin/bash\n#$ -N job.py\n#$ -wd .\n#$ -pe impi_hydra_small 1\n#$ -l h_rt=604800\n" + "#$ -o time.out\n#$ -e error.out\n\npython test.py" + ) + self.assertEqual( + self.sge._adapter._job_submission_template(command="python test.py"), + template, + ) + self.assertRaises( + ValueError, + self.sge._adapter._job_submission_template, + command="python test.py", + queue="notavailable", + ) + + def test_interfaces(self): + self.assertEqual( + self.sge._adapter._commands.submit_job_command, ["qsub", "-terse"] + ) + self.assertEqual(self.sge._adapter._commands.delete_job_command, ["qdel"]) + self.assertEqual( + self.sge._adapter._commands.enable_reservation_command, + ["qalter", "-R", "y"], + ) + self.assertEqual( + self.sge._adapter._commands.get_queue_status_command, ["qstat", "-xml"] + ) + + def test__list_command_to_be_executed(self): + with self.subTest("sge"): + self.assertEqual( + self.sge._adapter._list_command_to_be_executed(None, "here"), + ["qsub", "-terse", "here"], + ) + with self.subTest("sge with dependency"): + self.assertRaises( + NotImplementedError, + self.sge._adapter._list_command_to_be_executed, + [], + "here", + ) + + def test_convert_queue_status_sge(self): + with open(os.path.join(self.path, "config/sge", "qstat.xml"), "r") as f: + content = f.read() + df_running = pandas.DataFrame( + { + "jobid": ["2836045"], + "user": ["friko"], + "jobname": ["vasp.5.3.5"], + "status": ["running"], + } + ) + df_pending = pandas.DataFrame( + { + "jobid": ["2836046", "2967274"], + "user": ["friko", "janj"], + "jobname": ["vasp.5.3.5", "hello.py"], + "status": ["pending", "error"], + } + ) + df_merge = pandas.concat([df_running, df_pending], sort=True) + df = pandas.DataFrame( + { + "jobid": pandas.to_numeric(df_merge.jobid), + "user": df_merge.user, + "jobname": df_merge.jobname, + "status": df_merge.status, + "working_directory": [""] * len(df_merge), + } + ) + self.assertTrue( + df.equals( + self.sge._adapter._commands.convert_queue_status( + queue_status_output=content + ) + ) + ) + + def test_queue_list(self): + self.assertEqual( + sorted(self.sge.queue_list), + ["impi_hy", "impi_hydra", "impi_hydra_cmfe", "impi_hydra_small"], + ) + + def test_queues(self): + self.assertEqual(self.sge.queues.impi_hydra, "impi_hydra") + self.assertEqual( + sorted(dir(self.sge.queues)), + ["impi_hy", "impi_hydra", "impi_hydra_cmfe", "impi_hydra_small"], + ) + with self.assertRaises(AttributeError): + _ = self.sge.queues.notavailable + + def test_get_user(self): + self.assertEqual(self.sge._adapter._get_user(), getpass.getuser()) + + def test_check_queue_parameters(self): + self.assertEqual( + (1, 604800, None), self.sge.check_queue_parameters(queue="impi_hydra_small") + ) diff --git a/tests/test_slurm.py b/tests/test_slurm.py new file mode 100644 index 00000000..4ff516fd --- /dev/null +++ b/tests/test_slurm.py @@ -0,0 +1,160 @@ +# coding: utf-8 +# Copyright (c) Jan Janssen + +import os +import pandas +import unittest +import getpass +from pysqa import QueueAdapter + +__author__ = "Jan Janssen" +__copyright__ = "Copyright 2019, Jan Janssen" +__version__ = "0.0.1" +__maintainer__ = "Jan Janssen" +__email__ = "janssen@mpie.de" +__status__ = "production" +__date__ = "Feb 9, 2019" + + +class TestRunmode(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.path = os.path.dirname(os.path.abspath(__file__)) + cls.slurm = QueueAdapter(directory=os.path.join(cls.path, "config/slurm")) + + def test_config(self): + self.assertEqual(self.slurm.config["queue_type"], "SLURM") + self.assertEqual(self.slurm.config["queue_primary"], "slurm") + + def test_list_clusters(self): + self.assertEqual(self.slurm.list_clusters(), ['default']) + + def test_ssh_delete_file_on_remote(self): + self.assertEqual(self.slurm.ssh_delete_file_on_remote, True) + + def test_interfaces(self): + self.assertEqual( + self.slurm._adapter._commands.submit_job_command, ["sbatch", "--parsable"] + ) + self.assertEqual(self.slurm._adapter._commands.delete_job_command, ["scancel"]) + self.assertEqual( + self.slurm._adapter._commands.get_queue_status_command, + ["squeue", "--format", "%A|%u|%t|%.15j|%Z", "--noheader"], + ) + + def test__list_command_to_be_executed(self): + with self.subTest("slurm"): + self.assertEqual( + self.slurm._adapter._list_command_to_be_executed(None, "here"), + ["sbatch", "--parsable", "here"], + ) + with self.subTest("slurm with one dependency"): + self.assertEqual( + self.slurm._adapter._list_command_to_be_executed(["2"], "here"), + ["sbatch", "--parsable", "--dependency=afterok:2", "here"], + ) + with self.subTest("slurm with two dependencies"): + self.assertEqual( + self.slurm._adapter._list_command_to_be_executed(["2", "34"], "here"), + ["sbatch", "--parsable", "--dependency=afterok:2,34", "here"], + ) + + def test_convert_queue_status_slurm(self): + with open(os.path.join(self.path, "config/slurm", "squeue_output"), "r") as f: + content = f.read() + df_verify = pandas.DataFrame( + { + "jobid": [5322019, 5322016, 5322017, 5322018, 5322013], + "user": ["janj", "janj", "janj", "janj", "janj"], + "jobname": ["pi_19576488", "pi_19576485", "pi_19576486", "pi_19576487", "pi_19576482"], + "status": ["running", "running", "running", "running", "running"], + "working_directory": [ + "/cmmc/u/janj/pyiron/projects/2023/2023-04-19-dft-test/job_1", + "/cmmc/u/janj/pyiron/projects/2023/2023-04-19-dft-test/job_2", + "/cmmc/u/janj/pyiron/projects/2023/2023-04-19-dft-test/job_3", + "/cmmc/u/janj/pyiron/projects/2023/2023-04-19-dft-test/job_4", + "/cmmc/u/janj/pyiron/projects/2023/2023-04-19-dft-test/job_5", + ] + } + ) + self.assertTrue( + df_verify.equals( + self.slurm._adapter._commands.convert_queue_status( + queue_status_output=content + ) + ) + ) + + def test_get_user(self): + self.assertEqual(self.slurm._adapter._get_user(), getpass.getuser()) + + def test_queue_view(self): + self.assertIsInstance(self.slurm.queue_view, pandas.DataFrame) + + def test_submit_job_empty_working_directory(self): + with self.assertRaises(ValueError): + self.slurm.submit_job(working_directory=" ") + + def test_write_queue(self): + with self.assertRaises(ValueError): + self.slurm._adapter._write_queue_script( + queue=None, + job_name=None, + working_directory=None, + cores=None, + memory_max=None, + run_time_max=None, + command=None + ) + self.slurm._adapter._write_queue_script( + queue="slurm", + job_name=None, + working_directory=None, + cores=None, + memory_max=None, + run_time_max=None, + command="echo \"hello\"" + ) + with open("run_queue.sh", "r") as f: + content = f.read() + output = """\ +#!/bin/bash +#SBATCH --output=time.out +#SBATCH --job-name=None +#SBATCH --chdir=. +#SBATCH --get-user-env=L +#SBATCH --partition=slurm +#SBATCH --time=4320 +#SBATCH --cpus-per-task=10 + +echo \"hello\"""" + self.assertEqual(content, output) + os.remove("run_queue.sh") + + def test_write_queue_extra_keywords(self): + self.slurm._adapter._write_queue_script( + queue="slurm_extra", + job_name=None, + working_directory=None, + cores=None, + memory_max=None, + run_time_max=None, + command="echo \"hello\"", + account="123456" + ) + with open("run_queue.sh", "r") as f: + content = f.read() + output = """\ +#!/bin/bash +#SBATCH --output=time.out +#SBATCH --job-name=None +#SBATCH --chdir=. +#SBATCH --get-user-env=L +#SBATCH --partition=slurm +#SBATCH --account=123456 +#SBATCH --time=4320 +#SBATCH --cpus-per-task=10 + +echo \"hello\"""" + self.assertEqual(content, output) + os.remove("run_queue.sh") diff --git a/tests/test_torque.py b/tests/test_torque.py new file mode 100644 index 00000000..722b636b --- /dev/null +++ b/tests/test_torque.py @@ -0,0 +1,77 @@ +# coding: utf-8 +# Copyright (c) Jan Janssen + +import os +import pandas +import unittest +from pysqa import QueueAdapter + +__author__ = "Jan Janssen" +__copyright__ = "Copyright 2019, Jan Janssen" +__version__ = "0.0.1" +__maintainer__ = "Jan Janssen" +__email__ = "janssen@mpie.de" +__status__ = "production" +__date__ = "Feb 9, 2019" + + +class TestRunmode(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.path = os.path.dirname(os.path.abspath(__file__)) + cls.torque = QueueAdapter(directory=os.path.join(cls.path, "config/torque")) + + def test_config(self): + self.assertEqual(self.torque.config["queue_type"], "TORQUE") + self.assertEqual(self.torque.config["queue_primary"], "torque") + + def test_list_clusters(self): + self.assertEqual(self.torque.list_clusters(), ['default']) + + def test_ssh_delete_file_on_remote(self): + self.assertEqual(self.torque.ssh_delete_file_on_remote, True) + + def test_interfaces(self): + self.assertEqual( + self.torque._adapter._commands.submit_job_command, ["qsub"] + ) + self.assertEqual(self.torque._adapter._commands.delete_job_command, ["qdel"]) + self.assertEqual( + self.torque._adapter._commands.get_queue_status_command, ["qstat", "-f"] + ) + + def test__list_command_to_be_executed(self): + with self.subTest("torque"): + self.assertEqual( + self.torque._adapter._list_command_to_be_executed(None, "here"), + ["qsub", "here"], + ) + with self.subTest("torque with dependency"): + self.assertRaises( + NotImplementedError, + self.torque._adapter._list_command_to_be_executed, + [], + "here", + ) + + def test_convert_queue_status_torque(self): + with open(os.path.join(self.path, "config/torque", "PBSPro_qsub_output"), "r") as f: + content = f.read() + df_verify = pandas.DataFrame( + { + "jobid": [80005196, 80005197, 80005198], + "user": ["asd562", "asd562", "fgh562"], + "jobname": ["test1", "test2", "test_asdfasdfasdfasdfasdfasdfasdfasdfasdfasdf"], + "status": ["running", "pending", "pending"], + "working_directory": ["/scratch/a01/asd562/VASP/test/test1", \ + "/scratch/a01/asd562/VASP/test/test2", \ + "/scratch/a01/fgh562/VASP/test/test_asdfasdfasdfasdfasdfasdfasdfasdfasdfasdf"] + } + ) + self.assertTrue( + df_verify.equals( + self.torque._adapter._commands.convert_queue_status( + queue_status_output=content + ) + ) + )