diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml new file mode 100644 index 00000000..da969412 --- /dev/null +++ b/.github/workflows/coverage.yml @@ -0,0 +1,60 @@ +# This workflow will install Python dependencies, run tests and lint with a variety of Python versions +# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions + +name: Coverage + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + +env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + CODACY_PROJECT_TOKEN: ${{ secrets.CODACY_PROJECT_TOKEN }} + +jobs: + build: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + - name: Setup environment + run: | + cp .ci_support/environment.yml environment.yml + echo " - flux-core =0.50.0" >> environment.yml + - name: Setup Mambaforge + uses: conda-incubator/setup-miniconda@v2 + with: + python-version: '3.10' + miniforge-variant: Mambaforge + channels: conda-forge + channel-priority: strict + activate-environment: my-env + use-mamba: true + - name: Set cache date and number + run: | + echo "DATE=$(date +'%Y%m%d')" >> $GITHUB_ENV + cat .github/variables/cache_number.env >> $GITHUB_ENV + - uses: actions/cache@v2 + with: + path: /usr/share/miniconda3/envs/my-env + key: coverage-conda-${{ hashFiles('environment.yml') }}-${{ env.DATE }}-${{ env.CACHE_NUMBER }} + id: cache + - name: Update environment + run: mamba env update -n my-env -f environment.yml + if: steps.cache.outputs.cache-hit != 'true' + - name: Setup + shell: bash -l {0} + run: | + pip install --no-deps . + - name: Test + shell: bash -l {0} + run: coverage run --omit pysqa/_version.py -m unittest discover tests + - name: Coverage + shell: bash -l {0} + run: | + coverage combine + coveralls + coverage xml + python-codacy-coverage -r coverage.xml diff --git a/.github/workflows/unittest.yml b/.github/workflows/unittest.yml index f05d965a..ee865e92 100644 --- a/.github/workflows/unittest.yml +++ b/.github/workflows/unittest.yml @@ -81,12 +81,4 @@ jobs: pip install --no-deps . - name: Test shell: bash -l {0} - run: coverage run --omit pysqa/_version.py -m unittest discover tests - - name: Coverage - if: matrix.label == 'linux-64-py-3-10' - shell: bash -l {0} - run: | - coverage combine - coveralls - coverage xml - python-codacy-coverage -r coverage.xml + run: python -m unittest discover tests diff --git a/pysqa/queueadapter.py b/pysqa/queueadapter.py index 95a02dfb..60e82dae 100644 --- a/pysqa/queueadapter.py +++ b/pysqa/queueadapter.py @@ -308,7 +308,7 @@ def set_queue_adapter(config, directory, execute_command=execute_command): config (dict): configuration for one cluster directory (str): directory which contains the queue configurations """ - if config["queue_type"] in ["SGE", "TORQUE", "SLURM", "LSF", "MOAB"]: + if config["queue_type"] in ["SGE", "TORQUE", "SLURM", "LSF", "MOAB", "FLUX"]: return BasisQueueAdapter( config=config, directory=directory, execute_command=execute_command ) diff --git a/pysqa/utils/basic.py b/pysqa/utils/basic.py index b1fc3d91..24932830 100644 --- a/pysqa/utils/basic.py +++ b/pysqa/utils/basic.py @@ -66,6 +66,9 @@ def __init__(self, config, directory="~/.queues", execute_command=execute_comman elif self._config["queue_type"] == "REMOTE": class_name = None module_name = None + elif self._config["queue_type"] == "FLUX": + class_name = "FluxCommands" + module_name = "pysqa.wrapper.flux" else: raise ValueError() if self._config["queue_type"] != "REMOTE": diff --git a/pysqa/wrapper/flux.py b/pysqa/wrapper/flux.py new file mode 100644 index 00000000..401d2baa --- /dev/null +++ b/pysqa/wrapper/flux.py @@ -0,0 +1,52 @@ +# coding: utf-8 +from flux.job import JobID +import pandas +from pysqa.wrapper.generic import SchedulerCommands + + +class FluxCommands(SchedulerCommands): + @property + def submit_job_command(self): + return ["flux", "batch"] + + @property + def delete_job_command(self): + return ["flux", "cancel"] + + @property + def get_queue_status_command(self): + return ["flux", "jobs", "-a", "--no-header"] + + @staticmethod + def get_job_id_from_output(queue_submit_output): + return JobID(queue_submit_output.splitlines()[-1].rstrip().lstrip().split()[-1]) + + @staticmethod + def convert_queue_status(queue_status_output): + line_split_lst = [line.split() for line in queue_status_output.splitlines()] + job_id_lst, user_lst, job_name_lst, status_lst = [], [], [], [] + for ( + flux_id, + user, + job_name, + status, + task, + nodes, + runtime, + ranks, + ) in line_split_lst: + job_id_lst.append(JobID(flux_id)) + user_lst.append(user) + job_name_lst.append(job_name) + status_lst.append(status) + df = pandas.DataFrame( + { + "jobid": job_id_lst, + "user": user_lst, + "jobname": job_name_lst, + "status": status_lst, + } + ) + df.loc[df.status == "R", "status"] = "running" + df.loc[df.status == "S", "status"] = "pending" + return df diff --git a/tests/config/flux/flux.sh b/tests/config/flux/flux.sh new file mode 100644 index 00000000..d3e6f91d --- /dev/null +++ b/tests/config/flux/flux.sh @@ -0,0 +1,3 @@ +#!/bin/bash +#flux: -n{{cores}} --job-name={{job_name}} --env=CORES={{cores}} --output=time.out --error=error.out +{{command}} \ No newline at end of file diff --git a/tests/config/flux/flux_jobs b/tests/config/flux/flux_jobs new file mode 100644 index 00000000..4505cb9e --- /dev/null +++ b/tests/config/flux/flux_jobs @@ -0,0 +1,3 @@ +ƒWZEQa8X dahn sleep_batc R 2 2 1.931s [0-1] +ƒW8eCV2o dahn sleep_batc R 2 2 2.896s [0-1] +ƒVhYLeJB dahn sleep_batc R 2 2 3.878s [0-1] \ No newline at end of file diff --git a/tests/config/flux/queue.yaml b/tests/config/flux/queue.yaml new file mode 100644 index 00000000..edf7ac61 --- /dev/null +++ b/tests/config/flux/queue.yaml @@ -0,0 +1,4 @@ +queue_type: FLUX +queue_primary: flux +queues: + flux: {cores_max: 64, cores_min: 1, run_time_max: 172800, script: flux.sh} \ No newline at end of file diff --git a/tests/test_flux.py b/tests/test_flux.py new file mode 100644 index 00000000..ee139cde --- /dev/null +++ b/tests/test_flux.py @@ -0,0 +1,88 @@ +# coding: utf-8 +# Copyright (c) Jan Janssen + +import os +import unittest +import pandas +from pysqa import QueueAdapter + + +try: + import flux + skip_flux_test = False +except ImportError: + skip_flux_test = True + + +@unittest.skipIf(skip_flux_test, "Flux is not installed, so the flux tests are skipped") +class TestFluxQueueAdapter(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.path = os.path.dirname(os.path.abspath(__file__)) + cls.flux = QueueAdapter(directory=os.path.join(cls.path, "config/flux")) + + def test_config(self): + self.assertEqual(self.flux.config["queue_type"], "FLUX") + self.assertEqual(self.flux.config["queue_primary"], "flux") + + def test_list_clusters(self): + self.assertEqual(self.flux.list_clusters(), ['default']) + + def test_remote_flag(self): + self.assertFalse(self.flux._adapter.remote_flag) + + def test_ssh_delete_file_on_remote(self): + self.assertEqual(self.flux.ssh_delete_file_on_remote, True) + + def test_interfaces(self): + self.assertEqual( + self.flux._adapter._commands.submit_job_command, ["flux", "batch"] + ) + self.assertEqual(self.flux._adapter._commands.delete_job_command, ["flux", "cancel"]) + self.assertEqual( + self.flux._adapter._commands.get_queue_status_command, + ["flux", "jobs", "-a", "--no-header"], + ) + + def test_convert_queue_status_slurm(self): + with open(os.path.join(self.path, "config/flux", "flux_jobs"), "r") as f: + content = f.read() + df = pandas.DataFrame({ + "jobid": [1125147213824, 1109007532032, 1092532305920], + "user": ["dahn", "dahn", "dahn"], + "jobname": ["sleep_batc", "sleep_batc", "sleep_batc"], + "status": ["running", "running", "running"] + }) + self.assertTrue(df.equals(self.flux._adapter._commands.convert_queue_status( + queue_status_output=content + ))) + + def test_submit_job(self): + def execute_command( + commands, + working_directory=None, + split_output=True, + shell=False, + error_filename="pysqa.err", + ): + return "ƒWZEQa8X\n" + + flux_tmp = QueueAdapter( + directory=os.path.join(self.path, "config/flux"), + execute_command=execute_command + ) + self.assertEqual(flux_tmp.submit_job( + queue="flux", + job_name="test", + working_directory=".", + cores=4, + command="echo hello" + ), 1125147213824) + with open("run_queue.sh") as f: + output = f.read() + content = """\ +#!/bin/bash +#flux: -n4 --job-name=test --env=CORES=4 --output=time.out --error=error.out +echo hello""" + self.assertEqual(content, output) + os.remove("run_queue.sh")