Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
329efce
Add verbosity to smart-dispatch
bouthilx Oct 14, 2017
3b8919a
Added test for slurm integration
aalitaiga Jul 31, 2017
8ce0b25
New test for priority
aalitaiga Aug 8, 2017
8dc8e0a
Added gres + memory tests
aalitaiga Aug 8, 2017
1e67a22
Refactored tests
aalitaiga Aug 9, 2017
6a25263
small update
aalitaiga Aug 10, 2017
b34ff36
Python3 compatibility + PR comments
aalitaiga Sep 1, 2017
eb4d473
Fixed naccelerators issue
aalitaiga Sep 1, 2017
c2f2de6
Updated tests to skip on Graham and Cedar smartdispatch modified to h…
aalitaiga Sep 19, 2017
dcf1504
Cleaned code with PR feedback
aalitaiga Sep 26, 2017
23e8a01
Updated tests
aalitaiga Oct 6, 2017
93a4a3a
Updated tests using mock
aalitaiga Oct 10, 2017
6952922
Refactor detect_cluster tests
bouthilx Oct 10, 2017
60610b0
Small changes in TestSlurmQueue
aalitaiga Oct 10, 2017
39fa04e
Fix add_sbatch_option bug
bouthilx Oct 13, 2017
255920c
Refactor SlurmJobGenerator
bouthilx Oct 13, 2017
ce370bc
Remove queue name for Slurm clusters
bouthilx Oct 13, 2017
74d30ac
Replace PBS_JOBID with SLURM_JOB_ID
bouthilx Oct 13, 2017
88a413e
Add PBS_FILENAME definition to pbs.prolog
bouthilx Oct 14, 2017
22cfb38
Fix env var export option for Slurm
bouthilx Oct 14, 2017
cdd6085
Adapt PBS_WALLTIME for slurm
bouthilx Oct 14, 2017
0b05a94
Add sbatch to command-line launcher options
bouthilx Oct 14, 2017
3c5a645
Make get_launcher more flexible
bouthilx Oct 14, 2017
8565d69
Updated documentation for slurm clusters
aalitaiga Oct 15, 2017
60e4453
Add support for SlurmJobGenerator
bouthilx Oct 16, 2017
c876f57
Print stderr when both qsub and sacctmgr fails
bouthilx Oct 16, 2017
97ce7c4
Add automatic script for cluster verification
bouthilx Oct 16, 2017
1112ec9
Add verification script for cedar
bouthilx Oct 16, 2017
8502e22
Add verification script for graham
bouthilx Oct 16, 2017
5baabe1
Add verification script for mila
bouthilx Oct 16, 2017
72929d8
Make get_launcher return None when no launcher
bouthilx Oct 16, 2017
eb91544
Updated README
aalitaiga Oct 16, 2017
14c5819
Set properly account in verify_graham
bouthilx Oct 16, 2017
fde46db
Set properly account in verify_cedar
bouthilx Oct 16, 2017
33c048b
Fix walltime_to_seconds convertion
bouthilx Oct 17, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,12 @@ python my_script.py 9
Jobs that did not terminate properly, for example, it exceeded the walltime, can be resumed using the {batch_id} given to you upon launch. Of course, all this assuming your script is resumable.

*Note: Jobs are always in a batch, even if it's a batch of one.*

### SLURM clusters

Smartdispatch can also run on SLURM clusters.
All features like `--gpusPerNode` or `--coresPerNode` are supported.
However you need to pass SLURM specific features using --sbatchFlags. For simplicity, --sbatchFlags supports short and long option definitions only with the following syntax:
-Cgpu6gb or --constraint=gpu6gb
For comparison, this would be invalid:
-C gpu6gb or --constraint gpu6gb.
19 changes: 18 additions & 1 deletion scripts/sd-launch-pbs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import logging
from smartdispatch import launch_jobs
from smartdispatch import utils


logger = logging.getLogger()


LOGS_FOLDERNAME = "SMART_DISPATCH_LOGS"
CLUSTER_NAME = utils.detect_cluster()
LAUNCHER = utils.get_launcher(CLUSTER_NAME)
Expand All @@ -23,12 +27,25 @@ def main():

def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument('-L', '--launcher', choices=['qsub', 'msub'], required=False, help='Which launcher to use. Default: qsub')
parser.add_argument('-L', '--launcher', choices=['qsub', 'msub', 'sbatch'], required=False, help='Which launcher to use. Default: qsub')
parser.add_argument('pbs', type=str, help='PBS filename to launch.')
parser.add_argument('path_job', type=str, help='Path to the job folder.')

parser.add_argument(
'-v', '--verbose', action='count', default=0,
help="Print informations about the process.\n"
" -v: INFO\n"
" -vv: DEBUG")

args = parser.parse_args()

if args.verbose == 0:
logging.basicConfig(level=logging.WARNING)
elif args.verbose == 1:
logging.basicConfig(level=logging.INFO)
elif args.verbose >= 2:
logging.basicConfig(level=logging.DEBUG)

return args


Expand Down
71 changes: 62 additions & 9 deletions scripts/smart-dispatch
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
#!/usr/bin/env python2
# -*- coding: utf-8 -*-

import argparse
import logging
import os
import sys
import argparse
import time as t
from os.path import join as pjoin
from textwrap import dedent
Expand All @@ -16,9 +17,12 @@ from smartdispatch import get_available_queues
from smartdispatch import launch_jobs
from smartdispatch import utils

import logging
import smartdispatch


logger = logging.getLogger()


LOGS_FOLDERNAME = "SMART_DISPATCH_LOGS"
CLUSTER_NAME = utils.detect_cluster()
AVAILABLE_QUEUES = get_available_queues(CLUSTER_NAME)
Expand All @@ -29,25 +33,52 @@ TIMEOUT_EXIT_CODE = 124
AUTORESUME_TRIGGER_AFTER = '$(($PBS_WALLTIME - 60))' # By default, 60s before the maximum walltime.
AUTORESUME_WORKER_CALL_PREFIX = 'timeout -s TERM {trigger_after} '.format(trigger_after=AUTORESUME_TRIGGER_AFTER)
AUTORESUME_WORKER_CALL_SUFFIX = ' WORKER_PIDS+=" $!"'
AUTORESUME_PROLOG = 'WORKER_PIDS=""'
AUTORESUME_PROLOG = """
WORKER_PIDS=""
VERBOSE={verbose}
"""
AUTORESUME_EPILOG = """\
NEED_TO_RESUME=false
if [ $VERBOSE = true ]; then
echo NEED_TO_RESUME=$NEED_TO_RESUME
echo WORKER_PIDS=$WORKER_PIDS
fi
for WORKER_PID in $WORKER_PIDS; do
if [ $VERBOSE = true ]; then
echo WORKER_PID=$WORKER_PID
fi
wait "$WORKER_PID"
RETURN_CODE=$?
if [ $VERBOSE = true ]; then
echo "RETURN_CODE is $RETURN_CODE while " \
"timeout_exit_code is {timeout_exit_code}"
fi
if [ $RETURN_CODE -eq {timeout_exit_code} ]; then
NEED_TO_RESUME=true
fi
if [ $VERBOSE = true ]; then
echo NEED_TO_RESUME=$NEED_TO_RESUME
fi
done
if [ $VERBOSE = true ]; then
echo NEED_TO_RESUME=$NEED_TO_RESUME
fi
if [ "$NEED_TO_RESUME" = true ]; then
echo "Autoresuming using: {{launcher}} $PBS_FILENAME"
sd-launch-pbs --launcher {{launcher}} $PBS_FILENAME {{path_job}}
if [ $VERBOSE = true]; then
VERBOSE_OPTION="-vv"
else
VERBOSE_OPTION=""
fi

sd-launch-pbs $VERBOSE_OPTION --launcher {{launcher}} $PBS_FILENAME {{path_job}}
fi
""".format(timeout_exit_code=TIMEOUT_EXIT_CODE)


def main():
# Necessary if we want 'logging.info' to appear in stderr.
# TODO: Could we avoid this, can -v (verbose) be sufficiant?
logging.root.setLevel(logging.INFO)

args = parse_arguments()
Expand Down Expand Up @@ -163,18 +194,25 @@ def main():
prolog = []
epilog = ['wait']
if args.autoresume:
prolog = [AUTORESUME_PROLOG]
epilog = [AUTORESUME_EPILOG.format(launcher=LAUNCHER if args.launcher is None else args.launcher, path_job=path_job)]
prolog = [
AUTORESUME_PROLOG.format(verbose=str(args.verbose >= 2).lower())]
epilog = [
AUTORESUME_EPILOG.format(
launcher=LAUNCHER if args.launcher is None else args.launcher,
path_job=path_job)]

job_generator = job_generator_factory(queue, commands, prolog, epilog, command_params, CLUSTER_NAME, path_job)

# generating default names per each jobs in each batch
for pbs_id, pbs in enumerate(job_generator.pbs_list):
proper_size_name = utils.jobname_generator(jobname, pbs_id)
pbs.add_options(N=proper_size_name)

if args.pbsFlags is not None:
job_generator.add_pbs_flags(args.pbsFlags.split(' '))

if args.sbatchFlags is not None:
job_generator.add_sbatch_flags(args.sbatchFlags.split(' '))
pbs_filenames = job_generator.write_pbs_files(path_job_commands)

# Launch the jobs
Expand All @@ -187,10 +225,17 @@ def main():

def parse_arguments():
parser = argparse.ArgumentParser()

parser.add_argument(
'-v', '--verbose', action='count', default=0,
help="Print informations about the process.\n"
" -v: INFO\n"
" -vv: DEBUG")

parser.add_argument('-q', '--queueName', required=True, help='Queue used (ex: qwork@mp2, qfat256@mp2, gpu_1)')
parser.add_argument('-n', '--batchName', required=False, help='The name of the batch. Default: The commands launched.')
parser.add_argument('-t', '--walltime', required=False, help='Set the estimated running time of your jobs using the DD:HH:MM:SS format. Note that they will be killed when this time limit is reached.')
parser.add_argument('-L', '--launcher', choices=['qsub', 'msub'], required=False, help='Which launcher to use. Default: qsub')
parser.add_argument('-L', '--launcher', choices=['qsub', 'msub', 'sbatch'], required=False, help='Which launcher to use. Default: qsub')
parser.add_argument('-C', '--coresPerNode', type=int, required=False, help='How many cores there are per node.')
parser.add_argument('-G', '--gpusPerNode', type=int, required=False, help='How many gpus there are per node.')
# parser.add_argument('-M', '--memPerNode', type=int, required=False, help='How much memory there are per node (in Gb).')
Expand All @@ -206,6 +251,7 @@ def parse_arguments():

parser.add_argument('-p', '--pool', type=int, help="Number of workers that will be consuming commands. Default: Nb commands")
parser.add_argument('--pbsFlags', type=str, help='ADVANCED USAGE: Allow to pass a space seperated list of PBS flags. Ex:--pbsFlags="-lfeature=k80 -t0-4"')
parser.add_argument('--sbatchFlags', type=str, help='ADVANCED USAGE: Allow to pass a space seperated list of SBATCH flags. Ex:--sbatchFlags="--qos=high --ofile.out"')
subparsers = parser.add_subparsers(dest="mode")

launch_parser = subparsers.add_parser('launch', help="Launch jobs.")
Expand All @@ -226,6 +272,13 @@ def parse_arguments():
if args.coresPerCommand < 1:
parser.error("coresPerNode must be at least 1")

if args.verbose == 0:
logging.basicConfig(level=logging.WARNING)
elif args.verbose == 1:
logging.basicConfig(level=logging.INFO)
elif args.verbose >= 2:
logging.basicConfig(level=logging.DEBUG)

return args


Expand Down
88 changes: 88 additions & 0 deletions smartdispatch/job_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ def job_generator_factory(queue, commands, prolog=[], epilog=[], command_params=
return HeliosJobGenerator(queue, commands, prolog, epilog, command_params, base_path)
elif cluster_name == "hades":
return HadesJobGenerator(queue, commands, prolog, epilog, command_params, base_path)
elif utils.get_launcher(cluster_name) == "sbatch":
return SlurmJobGenerator(queue, commands, prolog, epilog, command_params, base_path)

return JobGenerator(queue, commands, prolog, epilog, command_params, base_path)

Expand Down Expand Up @@ -73,6 +75,23 @@ def add_pbs_flags(self, flags):
pbs.add_resources(**resources)
pbs.add_options(**options)

def add_sbatch_flags(self, flags):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a unit test for add_sbatch_flags

options = {}

for flag in flags:
split = flag.find('=')
if flag.startswith('--'):
if split == -1:
raise ValueError("Invalid SBATCH flag ({}), no '=' character found' ".format(flag))
options[flag[:split].lstrip("-")] = flag[split+1:]
elif flag.startswith('-') and split == -1:
options[flag[1:2]] = flag[2:]
else:
raise ValueError("Invalid SBATCH flag ({}, is it a PBS flag?)".format(flag))

for pbs in self.pbs_list:
pbs.add_sbatch_options(**options)

def _generate_base_pbs(self):
""" Generates PBS files allowing the execution of every commands on the given queue. """
nb_commands_per_node = self.queue.nb_cores_per_node // self.nb_cores_per_command
Expand Down Expand Up @@ -171,3 +190,72 @@ def _add_cluster_specific_rules(self):
for pbs in self.pbs_list:
# Remove forbidden ppn option. Default is 2 cores per gpu.
pbs.resources['nodes'] = re.sub(":ppn=[0-9]+", "", pbs.resources['nodes'])


class SlurmJobGenerator(JobGenerator):

def __init__(self, *args, **kwargs):
super(SlurmJobGenerator, self).__init__(*args, **kwargs)

def _adapt_options(self, pbs):
# Remove queue, there is no queue in Slurm
if "-q" in pbs.options:
del pbs.options["-q"]

# SBATCH does not interpret options, they can only contain %A if we
# want to include job's name and %a to include job array's index
for option in ['-o', '-e']:
pbs.options[option] = re.sub('"\$PBS_JOBID"', '%A',
pbs.options[option])

# Convert to Slurm's --export
#
# Warning: Slurm does **not** export variables defined locally such as
# variables defined along the command line. For ex:
# PBS_FILENAME=something sbatch --export=ALL somefile.sh
# would *not* export PBS_FILENAME to the script.
if pbs.options.pop('-V', None) is not None:
pbs.add_sbatch_options(export='ALL')

def _adapt_commands(self, pbs):
pass

def _adapt_resources(self, pbs):
# Set proper option for gpus
match = re.match(".*gpus=([0-9]+)", pbs.resources['nodes'])
if match:
gpus = match.group(1)
pbs.add_resources(naccelerators=gpus)
pbs.resources['nodes'] = re.sub(":gpus=[0-9]+", "",
pbs.resources['nodes'])

# Set proper option for cpus
match = re.match(".*ppn=([0-9]+)", pbs.resources['nodes'])
if match:
ppn = match.group(1)
pbs.add_resources(ncpus=ppn)
pbs.resources['nodes'] = re.sub("ppn=[0-9]+", "", pbs.resources['nodes'])

def _adapt_variable_names(self, pbs):
for command_id, command in enumerate(pbs.commands):
pbs.commands[command_id] = command = re.sub(
"\$PBS_JOBID", "$SLURM_JOB_ID", command)
# NOTE: SBATCH_TIMELIMIT is **not** an official slurm environment
# variable, it needs to be set in the script.
pbs.commands[command_id] = command = re.sub(
"\$PBS_WALLTIME", "$SBATCH_TIMELIMIT", command)

def _adapt_prolog(self, pbs):
# Set SBATCH_TIMELIMIT in the prolog, hence, before any code from
# commands and epilog.
pbs.add_to_prolog(
"SBATCH_TIMELIMIT=%s" %
utils.walltime_to_seconds(pbs.resources["walltime"]))

def _add_cluster_specific_rules(self):
for pbs in self.pbs_list:
self._adapt_options(pbs)
self._adapt_resources(pbs)
self._adapt_variable_names(pbs)
self._adapt_prolog(pbs)
self._adapt_commands(pbs)
26 changes: 26 additions & 0 deletions smartdispatch/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ def __init__(self, queue_name, walltime):
self.options = OrderedDict()
self.add_options(q=queue_name)

self.sbatch_options = OrderedDict()

# Declares that all environment variables in the qsub command's environment are to be exported to the batch job.
self.add_options(V="")

Expand Down Expand Up @@ -62,6 +64,22 @@ def add_options(self, **options):

self.options["-" + option_name] = option_value

def add_sbatch_options(self, **options):
""" Adds sbatch options to this PBS file.

Parameters
----------
**options : dict
each key is the name of a SBATCH option
"""

for option_name, option_value in options.items():
if len(option_name) == 1:
dash = "-"
else:
dash = "--"
self.sbatch_options[dash + option_name] = option_value

def add_resources(self, **resources):
""" Adds resources to this PBS file.

Expand Down Expand Up @@ -144,7 +162,9 @@ def save(self, filename):
specified where to save this PBS file
"""
with open(filename, 'w') as pbs_file:
self.prolog.insert(0, "PBS_FILENAME=%s" % filename)
pbs_file.write(str(self))
self.prolog.pop(0)

def __str__(self):
pbs = []
Expand All @@ -159,6 +179,12 @@ def __str__(self):
for resource_name, resource_value in self.resources.items():
pbs += ["#PBS -l {0}={1}".format(resource_name, resource_value)]

for option_name, option_value in self.sbatch_options.items():
if option_name.startswith('--'):
pbs += ["#SBATCH {0}={1}".format(option_name, option_value)]
else:
pbs += ["#SBATCH {0} {1}".format(option_name, option_value)]

pbs += ["\n# Modules #"]
for module in self.modules:
pbs += ["module load " + module]
Expand Down
Loading