Skip to content

Commit 19bd48f

Browse files
authored
Merge pull request #179 from pyiron/flux
Add first draft for flux
2 parents f387aa8 + 1000858 commit 19bd48f

File tree

9 files changed

+215
-10
lines changed

9 files changed

+215
-10
lines changed

.github/workflows/coverage.yml

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
# This workflow will install Python dependencies, run tests and lint with a variety of Python versions
2+
# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions
3+
4+
name: Coverage
5+
6+
on:
7+
push:
8+
branches: [ main ]
9+
pull_request:
10+
branches: [ main ]
11+
12+
env:
13+
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
14+
CODACY_PROJECT_TOKEN: ${{ secrets.CODACY_PROJECT_TOKEN }}
15+
16+
jobs:
17+
build:
18+
runs-on: ubuntu-latest
19+
20+
steps:
21+
- uses: actions/checkout@v2
22+
- name: Setup environment
23+
run: |
24+
cp .ci_support/environment.yml environment.yml
25+
echo " - flux-core =0.50.0" >> environment.yml
26+
- name: Setup Mambaforge
27+
uses: conda-incubator/setup-miniconda@v2
28+
with:
29+
python-version: '3.10'
30+
miniforge-variant: Mambaforge
31+
channels: conda-forge
32+
channel-priority: strict
33+
activate-environment: my-env
34+
use-mamba: true
35+
- name: Set cache date and number
36+
run: |
37+
echo "DATE=$(date +'%Y%m%d')" >> $GITHUB_ENV
38+
cat .github/variables/cache_number.env >> $GITHUB_ENV
39+
- uses: actions/cache@v2
40+
with:
41+
path: /usr/share/miniconda3/envs/my-env
42+
key: coverage-conda-${{ hashFiles('environment.yml') }}-${{ env.DATE }}-${{ env.CACHE_NUMBER }}
43+
id: cache
44+
- name: Update environment
45+
run: mamba env update -n my-env -f environment.yml
46+
if: steps.cache.outputs.cache-hit != 'true'
47+
- name: Setup
48+
shell: bash -l {0}
49+
run: |
50+
pip install --no-deps .
51+
- name: Test
52+
shell: bash -l {0}
53+
run: coverage run --omit pysqa/_version.py -m unittest discover tests
54+
- name: Coverage
55+
shell: bash -l {0}
56+
run: |
57+
coverage combine
58+
coveralls
59+
coverage xml
60+
python-codacy-coverage -r coverage.xml

.github/workflows/unittest.yml

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,4 @@ jobs:
8181
pip install --no-deps .
8282
- name: Test
8383
shell: bash -l {0}
84-
run: coverage run --omit pysqa/_version.py -m unittest discover tests
85-
- name: Coverage
86-
if: matrix.label == 'linux-64-py-3-10'
87-
shell: bash -l {0}
88-
run: |
89-
coverage combine
90-
coveralls
91-
coverage xml
92-
python-codacy-coverage -r coverage.xml
84+
run: python -m unittest discover tests

pysqa/queueadapter.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ def set_queue_adapter(config, directory, execute_command=execute_command):
311311
config (dict): configuration for one cluster
312312
directory (str): directory which contains the queue configurations
313313
"""
314-
if config["queue_type"] in ["SGE", "TORQUE", "SLURM", "LSF", "MOAB"]:
314+
if config["queue_type"] in ["SGE", "TORQUE", "SLURM", "LSF", "MOAB", "FLUX"]:
315315
return BasisQueueAdapter(
316316
config=config, directory=directory, execute_command=execute_command
317317
)

pysqa/utils/basic.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ def __init__(self, config, directory="~/.queues", execute_command=execute_comman
6666
elif self._config["queue_type"] == "REMOTE":
6767
class_name = None
6868
module_name = None
69+
elif self._config["queue_type"] == "FLUX":
70+
class_name = "FluxCommands"
71+
module_name = "pysqa.wrapper.flux"
6972
else:
7073
raise ValueError()
7174
if self._config["queue_type"] != "REMOTE":

pysqa/wrapper/flux.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# coding: utf-8
2+
from flux.job import JobID
3+
import pandas
4+
from pysqa.wrapper.generic import SchedulerCommands
5+
6+
7+
class FluxCommands(SchedulerCommands):
8+
@property
9+
def submit_job_command(self):
10+
return ["flux", "batch"]
11+
12+
@property
13+
def delete_job_command(self):
14+
return ["flux", "cancel"]
15+
16+
@property
17+
def get_queue_status_command(self):
18+
return ["flux", "jobs", "-a", "--no-header"]
19+
20+
@staticmethod
21+
def get_job_id_from_output(queue_submit_output):
22+
return JobID(queue_submit_output.splitlines()[-1].rstrip().lstrip().split()[-1])
23+
24+
@staticmethod
25+
def convert_queue_status(queue_status_output):
26+
line_split_lst = [line.split() for line in queue_status_output.splitlines()]
27+
job_id_lst, user_lst, job_name_lst, status_lst = [], [], [], []
28+
for (
29+
flux_id,
30+
user,
31+
job_name,
32+
status,
33+
task,
34+
nodes,
35+
runtime,
36+
ranks,
37+
) in line_split_lst:
38+
job_id_lst.append(JobID(flux_id))
39+
user_lst.append(user)
40+
job_name_lst.append(job_name)
41+
status_lst.append(status)
42+
df = pandas.DataFrame(
43+
{
44+
"jobid": job_id_lst,
45+
"user": user_lst,
46+
"jobname": job_name_lst,
47+
"status": status_lst,
48+
}
49+
)
50+
df.loc[df.status == "R", "status"] = "running"
51+
df.loc[df.status == "S", "status"] = "pending"
52+
return df

tests/config/flux/flux.sh

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
#!/bin/bash
2+
#flux: -n{{cores}} --job-name={{job_name}} --env=CORES={{cores}} --output=time.out --error=error.out
3+
{{command}}

tests/config/flux/flux_jobs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
ƒWZEQa8X dahn sleep_batc R 2 2 1.931s [0-1]
2+
ƒW8eCV2o dahn sleep_batc R 2 2 2.896s [0-1]
3+
ƒVhYLeJB dahn sleep_batc R 2 2 3.878s [0-1]

tests/config/flux/queue.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
queue_type: FLUX
2+
queue_primary: flux
3+
queues:
4+
flux: {cores_max: 64, cores_min: 1, run_time_max: 172800, script: flux.sh}

tests/test_flux.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
# coding: utf-8
2+
# Copyright (c) Jan Janssen
3+
4+
import os
5+
import unittest
6+
import pandas
7+
from pysqa import QueueAdapter
8+
9+
10+
try:
11+
import flux
12+
skip_flux_test = False
13+
except ImportError:
14+
skip_flux_test = True
15+
16+
17+
@unittest.skipIf(skip_flux_test, "Flux is not installed, so the flux tests are skipped")
18+
class TestFluxQueueAdapter(unittest.TestCase):
19+
@classmethod
20+
def setUpClass(cls):
21+
cls.path = os.path.dirname(os.path.abspath(__file__))
22+
cls.flux = QueueAdapter(directory=os.path.join(cls.path, "config/flux"))
23+
24+
def test_config(self):
25+
self.assertEqual(self.flux.config["queue_type"], "FLUX")
26+
self.assertEqual(self.flux.config["queue_primary"], "flux")
27+
28+
def test_list_clusters(self):
29+
self.assertEqual(self.flux.list_clusters(), ['default'])
30+
31+
def test_remote_flag(self):
32+
self.assertFalse(self.flux._adapter.remote_flag)
33+
34+
def test_ssh_delete_file_on_remote(self):
35+
self.assertEqual(self.flux.ssh_delete_file_on_remote, True)
36+
37+
def test_interfaces(self):
38+
self.assertEqual(
39+
self.flux._adapter._commands.submit_job_command, ["flux", "batch"]
40+
)
41+
self.assertEqual(self.flux._adapter._commands.delete_job_command, ["flux", "cancel"])
42+
self.assertEqual(
43+
self.flux._adapter._commands.get_queue_status_command,
44+
["flux", "jobs", "-a", "--no-header"],
45+
)
46+
47+
def test_convert_queue_status_slurm(self):
48+
with open(os.path.join(self.path, "config/flux", "flux_jobs"), "r") as f:
49+
content = f.read()
50+
df = pandas.DataFrame({
51+
"jobid": [1125147213824, 1109007532032, 1092532305920],
52+
"user": ["dahn", "dahn", "dahn"],
53+
"jobname": ["sleep_batc", "sleep_batc", "sleep_batc"],
54+
"status": ["running", "running", "running"]
55+
})
56+
self.assertTrue(df.equals(self.flux._adapter._commands.convert_queue_status(
57+
queue_status_output=content
58+
)))
59+
60+
def test_submit_job(self):
61+
def execute_command(
62+
commands,
63+
working_directory=None,
64+
split_output=True,
65+
shell=False,
66+
error_filename="pysqa.err",
67+
):
68+
return "ƒWZEQa8X\n"
69+
70+
flux_tmp = QueueAdapter(
71+
directory=os.path.join(self.path, "config/flux"),
72+
execute_command=execute_command
73+
)
74+
self.assertEqual(flux_tmp.submit_job(
75+
queue="flux",
76+
job_name="test",
77+
working_directory=".",
78+
cores=4,
79+
command="echo hello"
80+
), 1125147213824)
81+
with open("run_queue.sh") as f:
82+
output = f.read()
83+
content = """\
84+
#!/bin/bash
85+
#flux: -n4 --job-name=test --env=CORES=4 --output=time.out --error=error.out
86+
echo hello"""
87+
self.assertEqual(content, output)
88+
os.remove("run_queue.sh")

0 commit comments

Comments
 (0)