-
Notifications
You must be signed in to change notification settings - Fork 29
Add OCS/GCS backend support #342
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,3 +18,4 @@ src/Makevars | |
| windows | ||
| /doc/ | ||
| /Meta/ | ||
| .idea/ | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -57,6 +57,79 @@ SGE = R6::R6Class("SGE", | |
| cloneable = FALSE | ||
| ) | ||
|
|
||
| #' Class for Open Cluster Scheduler (OCS) | ||
| OCS = R6::R6Class("OCS", | ||
| inherit = QSys, | ||
|
|
||
| public = list( | ||
| initialize = function(addr, n_jobs, master, ..., template=getOption("clustermq.template", class(self)[1]), | ||
| log_worker=FALSE, log_file=NULL, verbose=TRUE) { | ||
| super$initialize(addr=addr, master=master, template=template) | ||
|
|
||
| # fill the template with options and required fields | ||
|
||
| opts = private$fill_options(n_jobs=n_jobs, ...) | ||
| filled = fill_template(private$template, opts, required=c("master", "n_jobs")) | ||
|
|
||
| private$job_name = opts$job_name | ||
| if (verbose) | ||
| message("Submitting ", n_jobs, " worker jobs to ", class(self)[1], " as ", sQuote(private$job_name), " ...") | ||
|
|
||
| # submit the job with qsub (stdin-based) and capture the output | ||
| # on success the output will contain the job id, on failure the error message | ||
| private$qsub_stdout = system2("qsub", input=filled, stdout=TRUE) | ||
|
|
||
| # check the return code and stop on error | ||
| status = attr(private$qsub_stdout, "status") | ||
| if (!is.null(status) && status != 0) | ||
| private$template_error(class(self)[1], status, filled) | ||
|
|
||
| # try to read the job ID from stdout. On error stop | ||
| private$job_id <- regmatches(private$qsub_stdout, regexpr("^[0-9]+", private$qsub_stdout)) | ||
|
||
| if (length(private$job_id) == 0) | ||
| private$template_error(class(self)[1], private$qsub_stdout, filled) | ||
|
|
||
| # if we got here, we have a job ID and can proceed | ||
| if (verbose) | ||
| message("Submitted job has ID ", private$job_id) | ||
|
|
||
| private$master$add_pending_workers(n_jobs) | ||
| private$is_cleaned_up = FALSE | ||
| }, | ||
|
|
||
| cleanup = function(success, timeout) { | ||
| # first call finalize to send qdel ... | ||
| private$finalize() | ||
|
||
|
|
||
| # ... then set the cleaned up flag to avoid sending qdel again | ||
| private$is_cleaned_up = success | ||
| } | ||
| ), | ||
|
|
||
| private = list( | ||
| qsub_stdout = NULL, | ||
| job_name = NULL, | ||
| job_id = NULL, | ||
|
|
||
| finalize = function(quiet = TRUE) { | ||
| if (!private$is_cleaned_up) { | ||
| system(paste("qdel", private$job_id), ignore.stdout=quiet, ignore.stderr=quiet, wait=FALSE) | ||
| } | ||
| private$is_cleaned_up = TRUE | ||
| } | ||
| ), | ||
|
|
||
| cloneable = FALSE | ||
| ) | ||
|
|
||
| #' Class for Gridware Cluster Scheduler (GCS) | ||
| GCS = R6::R6Class("GCS", | ||
| inherit = OCS, | ||
| cloneable = FALSE | ||
|
|
||
| # no changes needed, but we want to have a separate class for GCS to allow for GCS-specific | ||
| # templates and enterprise edition options | ||
| ) | ||
|
|
||
| PBS = R6::R6Class("PBS", | ||
| inherit = SGE, | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,9 +10,9 @@ | |
| qsys_default = toupper(getOption('clustermq.scheduler')) | ||
|
|
||
| if (length(qsys_default) == 0) { | ||
| qname = c("SLURM", "LSF", "SGE", "LOCAL") | ||
| exec = Sys.which(c("sbatch", "bsub", "qsub")) | ||
| select = c(which(nchar(exec) > 0), 4)[1] | ||
| qname = c("SLURM", "LSF", "SGE", "GCS", "OCS", "LOCAL") | ||
| exec = Sys.which(c("sbatch", "bsub", "qsub", "qsub", "qsub")) | ||
| select = c(which(nchar(exec) > 0), 6)[1] | ||
|
||
| qsys_default = qname[select] | ||
| } | ||
|
|
||
|
|
@@ -26,8 +26,7 @@ | |
| #' @keywords internal | ||
| .onAttach = function(libname, pkgname) { | ||
| if (is.null(getOption("clustermq.scheduler"))) { | ||
| packageStartupMessage("* Option 'clustermq.scheduler' not set, ", | ||
| "defaulting to ", sQuote(qsys_default)) | ||
| packageStartupMessage("* Option 'clustermq.scheduler' not set, ", "defaulting to ", sQuote(qsys_default)) | ||
| packageStartupMessage("--- see: https://mschubert.github.io/clustermq/articles/userguide.html#configuration") | ||
| } | ||
| if (!libzmq_has_draft()) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -51,6 +51,8 @@ schedulers](https://mschubert.github.io/clustermq/articles/userguide.html#config | |
| * [SLURM](https://mschubert.github.io/clustermq/articles/userguide.html#slurm) - *should work without setup* | ||
| * [LSF](https://mschubert.github.io/clustermq/articles/userguide.html#lsf) - *should work without setup* | ||
| * [SGE](https://mschubert.github.io/clustermq/articles/userguide.html#sge) - *may require configuration* | ||
| * [GCS](https://mschubert.github.io/clustermq/articles/userguide.html#gcs) - *works without setup* | ||
| * [OCS](https://mschubert.github.io/clustermq/articles/userguide.html#ocs) - *works without setup* | ||
|
||
| * [PBS](https://mschubert.github.io/clustermq/articles/userguide.html#pbs)/[Torque](https://mschubert.github.io/clustermq/articles/userguide.html#torque) - *needs* `options(clustermq.scheduler="PBS"/"Torque")` | ||
| * via [SSH](https://mschubert.github.io/clustermq/articles/userguide.html#ssh-connector) - | ||
| *needs* `options(clustermq.scheduler="ssh", clustermq.ssh.host=<yourhost>)` | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| # Submit client should only show the job ID of the new job on success | ||
| #$ -terse | ||
|
|
||
| # Name of the job visible in OCS | ||
| #$ -N {{ job_name }} | ||
|
|
||
| # Join error and output file. | ||
| #$ -j y | ||
|
|
||
| # Location of the output file | ||
| #$ -o {{ log_file | /dev/null }} | ||
|
|
||
| # Start R job in the working directory | ||
| #$ -cwd | ||
|
|
||
| # Export the full environment to the R job (e.g if *LD_LIBRARY_PATH* is required). | ||
| # Depending on security settings might require a cluster manager to set | ||
| # ENABLE_SUBMIT_LIB_PATH=1 as *qmaster_param* | ||
| #$ -V | ||
|
|
||
| # Spawns workload as tasks of an array job into the scheduler (one job with multiple tasks) | ||
| #$ -t 1-{{ n_jobs }} | ||
|
|
||
| # Each array task will allocate one slot in the cluster, if not other specified. | ||
| #$ -pe mytestpe {{ cores | 1 }} | ||
|
|
||
| # Per slot the job will get one power core (C) assuming R code is single-threaded, if not other specified. | ||
| #$ -bunit C | ||
| #$ -bamount {{ threads | 1 }} | ||
|
|
||
| # Cores on a host are packed (cores on a die or chiplet sharing same NUMA node and caches if possible) | ||
| #$ -bstrategy packed | ||
| #$ -btype host | ||
|
|
||
| # The scheduler will do the binding via *HWLOC*. | ||
| # Change to *env* if scheduler should make binding decision but not do the binding itself. | ||
| #$ -binstance set | ||
|
|
||
| # Allows to set resource requests like memory (1 GB [in bytes]) | ||
| # to set runtime limits (1 hour [in seconds]) | ||
| # or to influence scheduler resource selection (job will be executed in all.q queue) | ||
| #$ -l mem_free={{ memory | 1073741824 }},h_rt={{ walltime | 3600 }},q=all.q | ||
|
|
||
| # Tag the job so that it can be identified later on (e.g. in a JSV script before | ||
| # submission so the job can get adapted or for filtering later on) | ||
| #$ -ac application=clustermq | ||
|
|
||
| ulimit -v $(( 1024 * {{ memory | 4096 }} )) | ||
| CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")' |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,37 @@ | ||
| # Submit client should only show the job ID of the new job on success | ||
| #$ -terse | ||
|
|
||
| # Name of the job visible in OCS | ||
| #$ -N {{ job_name }} | ||
|
|
||
| # Join error and output file | ||
| #$ -j y | ||
|
|
||
| # Location of the output file | ||
| #$ -o {{ log_file | /dev/null }} | ||
|
|
||
| # Start R job in the working directory | ||
| #$ -cwd | ||
|
|
||
| # Export the full environment to the R job (e.g if *LD_LIBRARY_PATH* is required) | ||
| # depending on security settings might require a cluster manager to set | ||
| # ENABLE_SUBMIT_LIB_PATH=1 as *qmaster_param* | ||
| #$ -V | ||
|
|
||
| # Spawns workload as tasks of an array job into the scheduler (one job with multiple tasks) | ||
| #$ -t 1-{{ n_jobs }} | ||
|
|
||
| # Each array task will allocate one slot in the cluster, if not other specified. | ||
| #$ -pe mytestpe {{ cores | 1 }} | ||
|
|
||
| # Per slot the job will get one power core (C) assuming R code is single-threaded, if not other specified. | ||
| #$ -bunit C | ||
| #$ -bamount {{ threads | 1 }} | ||
|
|
||
| # Allows to set resource requests like memory (default 1 GB in bytes) | ||
| # to set runtime limits (default 1 hour in seconds) | ||
| # or to influence scheduler resource selection (job will be executed in all.q) | ||
| #$ -l mem_free={{ memory | 1073741824 }},h_rt={{ runtime | 3600 }},q=all.q | ||
|
|
||
| ulimit -v $(( 1024 * {{ memory | 4096 }} )) | ||
| CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")' |
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the SGE initializer not reused here? Job names are guaranteed to be unique within clustermq; but if IDs are better, we should use them in SGE as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have no access to SGE and do not know if job names were unique back then with the old Sun Microsystems release.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel fairly strongly that we should duplicate this code only if necessary. The ideal implementation would be shared between SGE, OCS and GCS