Skip to content
84 changes: 43 additions & 41 deletions easybuild/framework/easyblock.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
* Davide Vanzo (Vanderbilt University)
* Caspar van Leeuwen (SURF)
"""

import concurrent
import copy
import glob
import inspect
Expand All @@ -52,6 +52,7 @@
import tempfile
import time
import traceback
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime

import easybuild.tools.environment as env
Expand Down Expand Up @@ -87,7 +88,7 @@
from easybuild.tools.hooks import MODULE_STEP, MODULE_WRITE, PACKAGE_STEP, PATCH_STEP, PERMISSIONS_STEP, POSTITER_STEP
from easybuild.tools.hooks import POSTPROC_STEP, PREPARE_STEP, READY_STEP, SANITYCHECK_STEP, SOURCE_STEP
from easybuild.tools.hooks import SINGLE_EXTENSION, TEST_STEP, TESTCASES_STEP, load_hooks, run_hook
from easybuild.tools.run import RunShellCmdError, check_async_cmd, run_cmd, run_shell_cmd
from easybuild.tools.run import RunShellCmdError, raise_run_shell_cmd_error, run_shell_cmd
from easybuild.tools.jenkins import write_to_xml
from easybuild.tools.module_generator import ModuleGeneratorLua, ModuleGeneratorTcl, module_generator, dependencies_for
from easybuild.tools.module_naming_scheme.utilities import det_full_ec_version
Expand Down Expand Up @@ -1818,41 +1819,31 @@ def skip_extensions_parallel(self, exts_filter):
self.log.experimental("Skipping installed extensions in parallel")
print_msg("skipping installed extensions (in parallel)", log=self.log)

async_cmd_info_cache = {}
running_checks_ids = []
installed_exts_ids = []
exts_queue = list(enumerate(self.ext_instances[:]))
checked_exts_cnt = 0
exts_cnt = len(self.ext_instances)
cmds = [resolve_exts_filter_template(exts_filter, ext) for ext in self.ext_instances]

with ThreadPoolExecutor(max_workers=self.cfg['parallel']) as thread_pool:

# asynchronously run checks to see whether extensions are already installed
while exts_queue or running_checks_ids:
# list of command to run asynchronously
async_cmds = [thread_pool.submit(run_shell_cmd, cmd, stdin=stdin, hidden=True, fail_on_error=False,
asynchronous=True, task_id=idx) for (idx, (cmd, stdin)) in enumerate(cmds)]

# first handle completed checks
for idx in running_checks_ids[:]:
# process result of commands as they have completed running
for done_task in concurrent.futures.as_completed(async_cmds):
res = done_task.result()
idx = res.task_id
ext_name = self.ext_instances[idx].name
# don't read any output, just check whether command completed
async_cmd_info = check_async_cmd(*async_cmd_info_cache[idx], output_read_size=0, fail_on_error=False)
if async_cmd_info['done']:
out, ec = async_cmd_info['output'], async_cmd_info['exit_code']
self.log.info("exts_filter result for %s: exit code %s; output: %s", ext_name, ec, out)
running_checks_ids.remove(idx)
if ec == 0:
print_msg("skipping extension %s" % ext_name, log=self.log)
installed_exts_ids.append(idx)

checked_exts_cnt += 1
exts_pbar_label = "skipping installed extensions "
exts_pbar_label += "(%d/%d checked)" % (checked_exts_cnt, exts_cnt)
self.update_exts_progress_bar(exts_pbar_label)

# start additional checks asynchronously
while exts_queue and len(running_checks_ids) < self.cfg['parallel']:
idx, ext = exts_queue.pop(0)
cmd, stdin = resolve_exts_filter_template(exts_filter, ext)
async_cmd_info_cache[idx] = run_cmd(cmd, log_all=False, log_ok=False, simple=False, inp=stdin,
regexp=False, trace=False, asynchronous=True)
running_checks_ids.append(idx)
self.log.info(f"exts_filter result for {ext_name}: exit code {res.exit_code}; output: {res.output}")
if res.exit_code == 0:
print_msg(f"skipping extension {ext_name}", log=self.log)
installed_exts_ids.append(idx)

checked_exts_cnt += 1
exts_pbar_label = "skipping installed extensions "
exts_pbar_label += "(%d/%d checked)" % (checked_exts_cnt, exts_cnt)
self.update_exts_progress_bar(exts_pbar_label)

# compose new list of extensions, skip over the ones that are already installed;
# note: original order in extensions list should be preserved!
Expand Down Expand Up @@ -1957,6 +1948,8 @@ def install_extensions_parallel(self, install=True):
"""
self.log.info("Installing extensions in parallel...")

thread_pool = ThreadPoolExecutor(max_workers=self.cfg['parallel'])

running_exts = []
installed_ext_names = []

Expand Down Expand Up @@ -1993,16 +1986,23 @@ def update_exts_progress_bar_helper(running_exts, progress_size):

# check for extension installations that have completed
if running_exts:
self.log.info("Checking for completed extension installations (%d running)...", len(running_exts))
self.log.info(f"Checking for completed extension installations ({len(running_exts)} running)...")
for ext in running_exts[:]:
if self.dry_run or ext.async_cmd_check():
self.log.info("Installation of %s completed!", ext.name)
ext.postrun()
running_exts.remove(ext)
installed_ext_names.append(ext.name)
update_exts_progress_bar_helper(running_exts, 1)
if self.dry_run or ext.async_cmd_task.done():
res = ext.async_cmd_task.result()
if res.exit_code == 0:
self.log.info(f"Installation of extension {ext.name} completed!")
# run post-install method for extension from same working dir as installation of extension
cwd = change_dir(res.work_dir)
ext.postrun()
change_dir(cwd)
running_exts.remove(ext)
installed_ext_names.append(ext.name)
update_exts_progress_bar_helper(running_exts, 1)
else:
raise_run_shell_cmd_error(res)
else:
self.log.debug("Installation of %s is still running...", ext.name)
self.log.debug(f"Installation of extension {ext.name} is still running...")

# try to start as many extension installations as we can, taking into account number of available cores,
# but only consider first 100 extensions still in the queue
Expand Down Expand Up @@ -2069,9 +2069,9 @@ def update_exts_progress_bar_helper(running_exts, progress_size):
rpath_filter_dirs=self.rpath_filter_dirs)
if install:
ext.prerun()
ext.run_async()
ext.async_cmd_task = ext.run_async(thread_pool)
running_exts.append(ext)
self.log.info("Started installation of extension %s in the background...", ext.name)
self.log.info(f"Started installation of extension {ext.name} in the background...")
update_exts_progress_bar_helper(running_exts, 0)

# print progress info after every iteration (unless that info is already shown via progress bar)
Expand All @@ -2084,6 +2084,8 @@ def update_exts_progress_bar_helper(running_exts, progress_size):
running_ext_names = ', '.join(x.name for x in running_exts[:3]) + ", ..."
print_msg(msg % (installed_cnt, exts_cnt, queued_cnt, running_cnt, running_ext_names), log=self.log)

thread_pool.shutdown()

#
# MISCELLANEOUS UTILITY FUNCTIONS
#
Expand Down
48 changes: 2 additions & 46 deletions easybuild/framework/extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from easybuild.framework.easyconfig.templates import TEMPLATE_NAMES_EASYBLOCK_RUN_STEP, template_constant_dict
from easybuild.tools.build_log import EasyBuildError, raise_nosupport
from easybuild.tools.filetools import change_dir
from easybuild.tools.run import check_async_cmd, run_cmd, run_shell_cmd
from easybuild.tools.run import run_shell_cmd


def resolve_exts_filter_template(exts_filter, ext):
Expand Down Expand Up @@ -150,12 +150,7 @@ def __init__(self, mself, ext, extra_params=None):
self.sanity_check_module_loaded = False
self.fake_mod_data = None

self.async_cmd_info = None
self.async_cmd_output = None
self.async_cmd_check_cnt = None
# initial read size should be relatively small,
# to avoid hanging for a long time until desired output is available in async_cmd_check
self.async_cmd_read_size = 1024
self.async_cmd_task = None

@property
def name(self):
Expand Down Expand Up @@ -195,44 +190,6 @@ def postrun(self):
"""
self.master.run_post_install_commands(commands=self.cfg.get('postinstallcmds', []))

def async_cmd_start(self, cmd, inp=None):
"""
Start installation asynchronously using specified command.
"""
self.async_cmd_output = ''
self.async_cmd_check_cnt = 0
self.async_cmd_info = run_cmd(cmd, log_all=True, simple=False, inp=inp, regexp=False, asynchronous=True)

def async_cmd_check(self):
"""
Check progress of installation command that was started asynchronously.

:return: True if command completed, False otherwise
"""
if self.async_cmd_info is None:
raise EasyBuildError("No installation command running asynchronously for %s", self.name)
elif self.async_cmd_info is False:
self.log.info("No asynchronous command was started for extension %s", self.name)
return True
else:
self.log.debug("Checking on installation of extension %s...", self.name)
# use small read size, to avoid waiting for a long time until sufficient output is produced
res = check_async_cmd(*self.async_cmd_info, output_read_size=self.async_cmd_read_size)
self.async_cmd_output += res['output']
if res['done']:
self.log.info("Installation of extension %s completed!", self.name)
self.async_cmd_info = None
else:
self.async_cmd_check_cnt += 1
self.log.debug("Installation of extension %s still running (checked %d times)",
self.name, self.async_cmd_check_cnt)
# increase read size after sufficient checks,
# to avoid that installation hangs due to output buffer filling up...
if self.async_cmd_check_cnt % 10 == 0 and self.async_cmd_read_size < (1024 ** 2):
self.async_cmd_read_size *= 2

return res['done']

@property
def required_deps(self):
"""Return list of required dependencies for this extension."""
Expand Down Expand Up @@ -273,7 +230,6 @@ def sanity_check_step(self):
self.log.info("modulename set to False for '%s' extension, so skipping sanity check", self.name)
elif exts_filter:
cmd, stdin = resolve_exts_filter_template(exts_filter, self)
# set log_ok to False so we can catch the error instead of run_cmd
cmd_res = run_shell_cmd(cmd, fail_on_error=False, stdin=stdin)

if cmd_res.exit_code:
Expand Down
49 changes: 34 additions & 15 deletions easybuild/tools/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@
from collections import namedtuple
from datetime import datetime

try:
# get_native_id is only available in Python >= 3.8
from threading import get_native_id as get_thread_id
except ImportError:
# get_ident is available in Python >= 3.3
from threading import get_ident as get_thread_id

import easybuild.tools.asyncprocess as asyncprocess
from easybuild.base import fancylogger
from easybuild.tools.build_log import EasyBuildError, dry_run_msg, print_msg, time_str_since
Expand Down Expand Up @@ -79,7 +86,7 @@


RunShellCmdResult = namedtuple('RunShellCmdResult', ('cmd', 'exit_code', 'output', 'stderr', 'work_dir',
'out_file', 'err_file'))
'out_file', 'err_file', 'thread_id', 'task_id'))


class RunShellCmdError(BaseException):
Expand Down Expand Up @@ -183,7 +190,7 @@ def cache_aware_func(cmd, *args, **kwargs):
@run_shell_cmd_cache
def run_shell_cmd(cmd, fail_on_error=True, split_stderr=False, stdin=None, env=None,
hidden=False, in_dry_run=False, verbose_dry_run=False, work_dir=None, use_bash=True,
output_file=True, stream_output=None, asynchronous=False, with_hooks=True,
output_file=True, stream_output=None, asynchronous=False, task_id=None, with_hooks=True,
qa_patterns=None, qa_wait_patterns=None):
"""
Run specified (interactive) shell command, and capture output + exit code.
Expand All @@ -199,7 +206,8 @@ def run_shell_cmd(cmd, fail_on_error=True, split_stderr=False, stdin=None, env=N
:param use_bash: execute command through bash shell (enabled by default)
:param output_file: collect command output in temporary output file
:param stream_output: stream command output to stdout (auto-enabled with --logtostdout if None)
:param asynchronous: run command asynchronously
:param asynchronous: indicate that command is being run asynchronously
:param task_id: task ID for specified shell command (included in return value)
:param with_hooks: trigger pre/post run_shell_cmd hooks (if defined)
:param qa_patterns: list of 2-tuples with patterns for questions + corresponding answers
:param qa_wait_patterns: list of 2-tuples with patterns for non-questions
Expand All @@ -223,9 +231,6 @@ def to_cmd_str(cmd):
return cmd_str

# temporarily raise a NotImplementedError until all options are implemented
if asynchronous:
raise NotImplementedError

if qa_patterns or qa_wait_patterns:
raise NotImplementedError

Expand All @@ -235,6 +240,11 @@ def to_cmd_str(cmd):
cmd_str = to_cmd_str(cmd)
cmd_name = os.path.basename(cmd_str.split(' ')[0])

thread_id = None
if asynchronous:
thread_id = get_thread_id()
_log.info(f"Initiating running of shell command '{cmd_str}' via thread with ID {thread_id}")

# auto-enable streaming of command output under --logtostdout/-l, unless it was disabled explicitely
if stream_output is None and build_option('logtostdout'):
_log.info(f"Auto-enabling streaming output of '{cmd_str}' command because logging to stdout is enabled")
Expand All @@ -259,16 +269,16 @@ def to_cmd_str(cmd):
if not in_dry_run and build_option('extended_dry_run'):
if not hidden or verbose_dry_run:
silent = build_option('silent')
msg = f" running command \"{cmd_str}\"\n"
msg = f" running shell command \"{cmd_str}\"\n"
msg += f" (in {work_dir})"
dry_run_msg(msg, silent=silent)

return RunShellCmdResult(cmd=cmd_str, exit_code=0, output='', stderr=None, work_dir=work_dir,
out_file=cmd_out_fp, err_file=cmd_err_fp)
out_file=cmd_out_fp, err_file=cmd_err_fp, thread_id=thread_id, task_id=task_id)

start_time = datetime.now()
if not hidden:
cmd_trace_msg(cmd_str, start_time, work_dir, stdin, cmd_out_fp, cmd_err_fp)
_cmd_trace_msg(cmd_str, start_time, work_dir, stdin, cmd_out_fp, cmd_err_fp, thread_id)

if stream_output:
print_msg(f"(streaming) output for command '{cmd_str}':")
Expand All @@ -293,7 +303,11 @@ def to_cmd_str(cmd):

stderr = subprocess.PIPE if split_stderr else subprocess.STDOUT

_log.info(f"Running command '{cmd_str}' in {work_dir}")
log_msg = f"Running shell command '{cmd_str}' in {work_dir}"
if thread_id:
log_msg += f" (via thread with ID {thread_id})"
_log.info(log_msg)

proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=stderr, stdin=subprocess.PIPE,
cwd=work_dir, env=env, shell=shell, executable=executable)

Expand Down Expand Up @@ -337,7 +351,7 @@ def to_cmd_str(cmd):
raise EasyBuildError(f"Failed to dump command output to temporary file: {err}")

res = RunShellCmdResult(cmd=cmd_str, exit_code=proc.returncode, output=output, stderr=stderr, work_dir=work_dir,
out_file=cmd_out_fp, err_file=cmd_err_fp)
out_file=cmd_out_fp, err_file=cmd_err_fp, thread_id=thread_id, task_id=task_id)

# always log command output
cmd_name = cmd_str.split(' ')[0]
Expand Down Expand Up @@ -370,7 +384,7 @@ def to_cmd_str(cmd):
return res


def cmd_trace_msg(cmd, start_time, work_dir, stdin, cmd_out_fp, cmd_err_fp):
def _cmd_trace_msg(cmd, start_time, work_dir, stdin, cmd_out_fp, cmd_err_fp, thread_id):
"""
Helper function to construct and print trace message for command being run

Expand All @@ -380,11 +394,18 @@ def cmd_trace_msg(cmd, start_time, work_dir, stdin, cmd_out_fp, cmd_err_fp):
:param stdin: stdin input value for command
:param cmd_out_fp: path to output file for command
:param cmd_err_fp: path to errors/warnings output file for command
:param thread_id: thread ID (None when not running shell command asynchronously)
"""
start_time = start_time.strftime('%Y-%m-%d %H:%M:%S')

if thread_id:
run_cmd_msg = f"running shell command (asynchronously, thread ID: {thread_id}):"
else:
run_cmd_msg = "running shell command:"

lines = [
"running command:",
run_cmd_msg,
f"\t{cmd}",
f"\t[started at: {start_time}]",
f"\t[working dir: {work_dir}]",
]
Expand All @@ -395,8 +416,6 @@ def cmd_trace_msg(cmd, start_time, work_dir, stdin, cmd_out_fp, cmd_err_fp):
if cmd_err_fp:
lines.append(f"\t[errors/warnings saved to {cmd_err_fp}]")

lines.append('\t' + cmd)

trace_msg('\n'.join(lines))


Expand Down
Loading