diff --git a/pysqa/ext/modular.py b/pysqa/ext/modular.py index 02a861fe..3e7ab94b 100644 --- a/pysqa/ext/modular.py +++ b/pysqa/ext/modular.py @@ -142,7 +142,7 @@ def get_queue_status(self, user=None): ) df = self._commands.convert_queue_status(queue_status_output=out) df_lst.append(df) - df = pandas.concat(df_lst, axis=1, sort=False).reset_index(drop=True) + df = pandas.concat(df_lst, axis=0, sort=False).reset_index(drop=True) if user is None: return df else: diff --git a/pysqa/utils/basic.py b/pysqa/utils/basic.py index 29565a96..b1fc3d91 100644 --- a/pysqa/utils/basic.py +++ b/pysqa/utils/basic.py @@ -277,13 +277,13 @@ def get_job_from_remote(self, working_directory): """ Get the results of the calculation - this is necessary when the calculation was executed on a remote host. """ - pass + raise NotImplementedError def convert_path_to_remote(self, path): - pass + raise NotImplementedError def transfer_file(self, file, transfer_back=False): - pass + raise NotImplementedError def check_queue_parameters( self, queue, cores=1, run_time_max=None, memory_max=None, active_queue=None diff --git a/tests/config/remote_alternative/queue.yaml b/tests/config/remote_alternative/queue.yaml new file mode 100644 index 00000000..64b9d346 --- /dev/null +++ b/tests/config/remote_alternative/queue.yaml @@ -0,0 +1,13 @@ +queue_type: REMOTE +queue_primary: remote +ssh_host: hpc-cluster.university.edu +ssh_username: hpcuser +known_hosts: ~/.ssh/known_hosts +ssh_key: ~/.ssh/id_rsa +ssh_remote_config_dir: /u/share/pysqa/resources/queues/ +ssh_remote_path: /u/hpcuser/remote/ +ssh_local_path: /home/localuser/projects/ +ssh_continous_connection: True +ssh_port: 22 +queues: + remote: {cores_max: 100, cores_min: 10, run_time_max: 259200} \ No newline at end of file diff --git a/tests/test_gent.py b/tests/test_gent.py index adac0974..9105e7a9 100644 --- a/tests/test_gent.py +++ b/tests/test_gent.py @@ -14,6 +14,15 @@ __status__ = "production" __date__ = "Feb 9, 2019" +df_queue_status = pandas.DataFrame( + { + "cluster": ["Mycluster", "Mycluster", "Mycluster", "Mycluster", "Mycluster"], + "jobid": [5322019, 5322016, 5322017, 5322018, 5322013], + "user": ["janj", "janj", "janj", "janj", "janj"], + "jobname": ["pi_19576488", "pi_19576485", "pi_19576486", "pi_19576487", "pi_19576482"], + "status": ["r", "r", "r", "r", "r"], + } +) class TestGentQueueAdapter(unittest.TestCase): @classmethod @@ -60,19 +69,136 @@ def test_get_queue_from_output(self): def test_convert_queue_status_slurm(self): with open(os.path.join(self.path, "config/gent", "gent_output"), "r") as f: content = f.read() - df_verify = pandas.DataFrame( - { - "cluster": ["Mycluster", "Mycluster", "Mycluster", "Mycluster", "Mycluster"], - "jobid": [5322019, 5322016, 5322017, 5322018, 5322013], - "user": ["janj", "janj", "janj", "janj", "janj"], - "jobname": ["pi_19576488", "pi_19576485", "pi_19576486", "pi_19576487", "pi_19576482"], - "status": ["r", "r", "r", "r", "r"], - } - ) self.assertTrue( - df_verify.equals( + df_queue_status.equals( self.gent._adapter._commands.convert_queue_status( queue_status_output=content ) ) ) + + def test_switch_cluster_command(self): + self.assertEqual( + self.gent._adapter._switch_cluster_command(cluster_module="module1"), + ['module', '--quiet', 'swap', 'cluster/module1;'] + ) + + def test_resolve_queue_id(self): + self.assertEqual( + self.gent._adapter._resolve_queue_id(process_id=20120, cluster_dict={0: "cluster1"}), + ("cluster1", 2012) + ) + + def test_submit_job_no_output(self): + def execute_command( + commands, + working_directory=None, + split_output=True, + shell=False, + error_filename="pysqa.err", + ): + pass + + gent_tmp = QueueAdapter( + directory=os.path.join(self.path, "config/gent"), + execute_command=execute_command + ) + self.assertIsNone(gent_tmp.submit_job( + queue="slurm", + job_name="test", + working_directory=".", + command="echo hello" + )) + os.remove("run_queue.sh") + + def test_submit_job_with_output(self): + def execute_command( + commands, + working_directory=None, + split_output=True, + shell=False, + error_filename="pysqa.err", + ): + return "123;cluster0" + + gent_tmp = QueueAdapter( + directory=os.path.join(self.path, "config/gent"), + execute_command=execute_command + ) + self.assertEqual(gent_tmp.submit_job( + queue="slurm", + job_name="test", + working_directory=".", + command="echo hello" + ), 1230) + os.remove("run_queue.sh") + + def test_delete_job_no_output(self): + def execute_command( + commands, + working_directory=None, + split_output=True, + shell=False, + error_filename="pysqa.err", + ): + pass + + gent_tmp = QueueAdapter( + directory=os.path.join(self.path, "config/gent"), + execute_command=execute_command + ) + self.assertIsNone(gent_tmp.delete_job(process_id=1)) + + def test_delete_job_with_output(self): + def execute_command( + commands, + working_directory=None, + split_output=True, + shell=False, + error_filename="pysqa.err", + ): + return 0, 1 + + gent_tmp = QueueAdapter( + directory=os.path.join(self.path, "config/gent"), + execute_command=execute_command + ) + self.assertEqual(gent_tmp.delete_job(process_id=1), 0) + + def test_get_queue_status(self): + def execute_command( + commands, + working_directory=None, + split_output=True, + shell=False, + error_filename="pysqa.err", + ): + with open(os.path.join(self.path, "config", "gent", "gent_output")) as f: + return f.read() + + gent_tmp = QueueAdapter( + directory=os.path.join(self.path, "config/gent"), + execute_command=execute_command + ) + self.assertTrue(pandas.concat([df_queue_status] * 3).reset_index(drop=True).equals( + gent_tmp.get_queue_status()) + ) + + def test_get_queue_status_user(self): + def execute_command( + commands, + working_directory=None, + split_output=True, + shell=False, + error_filename="pysqa.err", + ): + with open(os.path.join(self.path, "config", "gent", "gent_output")) as f: + return f.read() + + gent_tmp = QueueAdapter( + directory=os.path.join(self.path, "config/gent"), + execute_command=execute_command + ) + self.assertTrue(pandas.concat([df_queue_status] * 3).reset_index(drop=True).equals( + gent_tmp.get_queue_status(user="janj")) + ) diff --git a/tests/test_remote.py b/tests/test_remote.py index 98a5dfa9..c82f6bd3 100644 --- a/tests/test_remote.py +++ b/tests/test_remote.py @@ -19,6 +19,7 @@ class TestRemoteQueueAdapter(unittest.TestCase): def setUpClass(cls): cls.path = os.path.dirname(os.path.abspath(__file__)) cls.remote = QueueAdapter(directory=os.path.join(cls.path, "config/remote")) + cls.remote_alternative = QueueAdapter(directory=os.path.join(cls.path, "config/remote_alternative")) def test_config(self): self.assertEqual(self.remote.config["queue_type"], "REMOTE") @@ -36,3 +37,27 @@ def test_ssh_delete_file_on_remote(self): def test_submit_job_remote(self): with self.assertRaises(NotImplementedError): self.remote.submit_job(queue="remote", dependency_list=[]) + + def test_convert_path_to_remote(self): + self.assertEqual(self.remote.convert_path_to_remote("/home/localuser/projects/test"), "/u/hpcuser/remote/test") + + def test_delete_command(self): + self.assertEqual( + "python -m pysqa.cmd --config_directory /u/share/pysqa/resources/queues/ --delete --id 123", + self.remote._adapter._delete_command(job_id=123) + ) + + def test_reservation_command(self): + self.assertEqual( + "python -m pysqa.cmd --config_directory /u/share/pysqa/resources/queues/ --reservation --id 123", + self.remote._adapter._reservation_command(job_id=123) + ) + + def test_get_ssh_user(self): + self.assertEqual(self.remote._adapter._get_user(), "hpcuser") + + def test_get_file_transfer(self): + self.assertEqual( + self.remote._adapter._get_file_transfer(file="abc.txt", local_dir="local", remote_dir="test"), + os.path.abspath("abc.txt") + ) \ No newline at end of file diff --git a/tests/test_slurm.py b/tests/test_slurm.py index 0e4bce99..a1af4b1a 100644 --- a/tests/test_slurm.py +++ b/tests/test_slurm.py @@ -16,6 +16,23 @@ __date__ = "Feb 9, 2019" +df_queue_status = pandas.DataFrame( + { + "jobid": [5322019, 5322016, 5322017, 5322018, 5322013], + "user": ["janj", "janj", "janj", "janj", "maxi"], + "jobname": ["pi_19576488", "pi_19576485", "pi_19576486", "pi_19576487", "pi_19576482"], + "status": ["running", "running", "running", "running", "running"], + "working_directory": [ + "/cmmc/u/janj/pyiron/projects/2023/2023-04-19-dft-test/job_1", + "/cmmc/u/janj/pyiron/projects/2023/2023-04-19-dft-test/job_2", + "/cmmc/u/janj/pyiron/projects/2023/2023-04-19-dft-test/job_3", + "/cmmc/u/janj/pyiron/projects/2023/2023-04-19-dft-test/job_4", + "/cmmc/u/janj/pyiron/projects/2023/2023-04-19-dft-test/job_5", + ] + } +) + + class TestSlurmQueueAdapter(unittest.TestCase): @classmethod def setUpClass(cls): @@ -65,23 +82,8 @@ def test__list_command_to_be_executed(self): def test_convert_queue_status_slurm(self): with open(os.path.join(self.path, "config/slurm", "squeue_output"), "r") as f: content = f.read() - df_verify = pandas.DataFrame( - { - "jobid": [5322019, 5322016, 5322017, 5322018, 5322013], - "user": ["janj", "janj", "janj", "janj", "maxi"], - "jobname": ["pi_19576488", "pi_19576485", "pi_19576486", "pi_19576487", "pi_19576482"], - "status": ["running", "running", "running", "running", "running"], - "working_directory": [ - "/cmmc/u/janj/pyiron/projects/2023/2023-04-19-dft-test/job_1", - "/cmmc/u/janj/pyiron/projects/2023/2023-04-19-dft-test/job_2", - "/cmmc/u/janj/pyiron/projects/2023/2023-04-19-dft-test/job_3", - "/cmmc/u/janj/pyiron/projects/2023/2023-04-19-dft-test/job_4", - "/cmmc/u/janj/pyiron/projects/2023/2023-04-19-dft-test/job_5", - ] - } - ) self.assertTrue( - df_verify.equals( + df_queue_status.equals( self.slurm._adapter._commands.convert_queue_status( queue_status_output=content ) @@ -183,3 +185,52 @@ def execute_command( command="echo hello" )) self.assertIsNone(slurm_tmp.delete_job(process_id=123)) + + def test_queue_status(self): + def execute_command( + commands, + working_directory=None, + split_output=True, + shell=False, + error_filename="pysqa.err", + ): + with open(os.path.join(self.path, "config", "slurm", "squeue_output")) as f: + return f.read() + + slurm_tmp = QueueAdapter( + directory=os.path.join(self.path, "config/slurm"), + execute_command=execute_command + ) + self.assertTrue( + df_queue_status.equals(slurm_tmp.get_queue_status()) + ) + self.assertTrue( + df_queue_status[df_queue_status.user=="janj"].equals(slurm_tmp.get_queue_status(user="janj")) + ) + self.assertEqual(slurm_tmp.get_status_of_job(process_id=5322019), "running") + self.assertIsNone(slurm_tmp.get_status_of_job(process_id=0)) + self.assertEqual(slurm_tmp.get_status_of_jobs(process_id_lst=[5322019, 0]), ["running", "finished"]) + + def test_not_implemented_functions(self): + def execute_command( + commands, + working_directory=None, + split_output=True, + shell=False, + error_filename="pysqa.err", + ): + pass + + slurm_tmp = QueueAdapter( + directory=os.path.join(self.path, "config/slurm"), + execute_command=execute_command + ) + + with self.assertRaises(NotImplementedError): + slurm_tmp._adapter.convert_path_to_remote(path="test") + + with self.assertRaises(NotImplementedError): + slurm_tmp._adapter.transfer_file(file="test", transfer_back=False) + + with self.assertRaises(NotImplementedError): + slurm_tmp._adapter.get_job_from_remote(working_directory=".")