Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 16 additions & 38 deletions easybuild/tools/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
* Toon Willems (Ghent University)
* Ward Poelmans (Ghent University)
"""
import fcntl
import functools
import inspect
import locale
Expand Down Expand Up @@ -64,7 +63,7 @@

from easybuild.base import fancylogger
from easybuild.tools.build_log import EasyBuildError, EasyBuildExit, CWD_NOTFOUND_ERROR
from easybuild.tools.build_log import dry_run_msg, print_msg, time_str_since
from easybuild.tools.build_log import dry_run_msg, time_str_since
from easybuild.tools.config import build_option
from easybuild.tools.hooks import RUN_SHELL_CMD, load_hooks, run_hook
from easybuild.tools.output import COLOR_RED, COLOR_YELLOW, colorize, print_error
Expand Down Expand Up @@ -482,9 +481,6 @@ def to_cmd_str(cmd):
if not hidden:
_cmd_trace_msg(cmd_str, start_time, work_dir, stdin, tmpdir, thread_id, interactive=interactive)

if stream_output:
print_msg(f"(streaming) output for command '{cmd_str}':")

# use bash as shell instead of the default /bin/sh used by subprocess.run
# (which could be dash instead of bash, like on Ubuntu, see https://wiki.ubuntu.com/DashAsBinSh)
# stick to None (default value) when not running command via a shell
Expand All @@ -511,17 +507,10 @@ def to_cmd_str(cmd):
stdin = stdin.encode()

if stream_output or qa_patterns:

if qa_patterns:
# make stdout, stderr, stdin non-blocking files
channels = [proc.stdout, proc.stdin]
if split_stderr:
channels.append(proc.stderr)

for channel in channels:
fd = channel.fileno()
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
# enable non-blocking access to stdout, stderr, stdin
channels = [channel for channel in (proc.stdout, proc.stdin, proc.stderr) if channel is not None]
for channel in channels:
os.set_blocking(channel.fileno(), False)

if stdin:
proc.stdin.write(stdin)
Expand All @@ -535,28 +524,16 @@ def to_cmd_str(cmd):
time_no_match = 0
prev_stdout = ''

# collect output piece-wise, while checking for questions to answer (if qa_patterns is provided)
while exit_code is None:

# use small read size (128 bytes) when streaming output, to make it stream more fluently
# -1 means reading until EOF
read_size = 128 if exit_code is None else -1

# get output as long as output is available;
# note: can't use proc.stdout.read without read_size argument,
# since that will always wait until EOF
more_stdout = True
while more_stdout:
more_stdout = proc.stdout.read(read_size) or b''
_log.debug(f"Obtained more stdout: {more_stdout}")
stdout += more_stdout
# collect output line by line, while checking for questions to answer (if qa_patterns is provided)
for line in iter(proc.stdout.readline, b''):
_log.debug(f"Captured stdout: {line.decode(errors='ignore').rstrip()}")
stdout += line

# note: we assume that there won't be any questions in stderr output
if split_stderr:
more_stderr = True
while more_stderr:
more_stderr = proc.stderr.read(read_size) or b''
stderr += more_stderr
for line in iter(proc.stderr.readline, b''):
stderr += line

if qa_patterns:
# only check for question patterns if additional output is available
Expand All @@ -575,17 +552,18 @@ def to_cmd_str(cmd):
error_msg = "No matching questions found for current command output, "
error_msg += f"giving up after {qa_timeout} seconds!"
raise EasyBuildError(error_msg)
else:
_log.debug(f"{time_no_match:0.1f} seconds without match in output of interactive shell command")
_log.debug(f"{time_no_match:0.1f} seconds without match in output of interactive shell command")

time.sleep(check_interval_secs)

exit_code = proc.poll()

# collect last bit of output once processed has exited
stdout += proc.stdout.read()
for line in iter(proc.stdout.readline, b''):
_log.debug(f"Captured stdout: {line.decode(errors='ignore').rstrip()}")
stdout += line
if split_stderr:
stderr += proc.stderr.read()
stderr += proc.stderr.read() or b''
else:
(stdout, stderr) = proc.communicate(input=stdin)

Expand Down
1 change: 0 additions & 1 deletion test/framework/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,6 @@ def test_zzz_logtostdout(self):
self.mock_stdout(False)

self.assertIn("Auto-enabling streaming output", stdout)
self.assertIn("== (streaming) output for command 'gcc toy.c -o toy':", stdout)

if os.path.exists(dummylogfn):
os.remove(dummylogfn)
Expand Down
2 changes: 1 addition & 1 deletion test/framework/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -1703,7 +1703,7 @@ def test_run_shell_cmd_stream(self):
self.assertEqual(res.output, expected_output)

self.assertEqual(stderr, '')
expected = ("== (streaming) output for command 'echo hello" + '\n' + expected_output).split('\n')
expected = ("running shell command:\n\techo hello" + '\n' + expected_output).split('\n')
for line in expected:
self.assertIn(line, stdout)

Expand Down