diff --git a/pytest_mp/hookspec.py b/pytest_mp/hookspec.py new file mode 100644 index 0000000..6d77add --- /dev/null +++ b/pytest_mp/hookspec.py @@ -0,0 +1,26 @@ +from pluggy import HookspecMarker + + +hookspec = HookspecMarker("pytest") + + +@hookspec +def pytest_mp_configure(clique_options): + """ + Allows plugins and conftest files to perform initial clique configuration. + + This hook is called for every plugin and conftest file after command line options of cliques have been parsed. + + :param list[argparse.Namespace] clique_options: list of options of cliques; empty if no clique explicitly created. + """ + + +@hookspec +def pytest_mp_prefork(clique_id): + """ + Allows plugins and conftest files to perform configuration for a specific clique before forking a new process. + + This hook is called for every plugin and conftest file before forking a new process. + + :param int clique_id: the clique id starting from 0. + """ diff --git a/pytest_mp/plugin.py b/pytest_mp/plugin.py index ce99fea..6dd2da4 100644 --- a/pytest_mp/plugin.py +++ b/pytest_mp/plugin.py @@ -1,6 +1,11 @@ +from argparse import Namespace from contextlib import contextmanager -import multiprocessing +from copy import copy +from random import choice import collections +import multiprocessing +import shlex +import sys from _pytest import main import psutil @@ -16,8 +21,16 @@ def pytest_addoption(parser): np_help = 'Set the concurrent worker amount (defaults to cpu count). Value of 0 disables pytest-mp.' group.addoption('--np', '--num-processes', type=int, action='store', dest='num_processes', help=np_help) + mp_clique_help = 'Create a clique. Each clique has the number of processes set by `--np`. ' \ + 'The parameter of a clique is a string contains one or more pytest command line options. ' \ + 'Resources between cliques are assumed to be isolated, so they can run tests in parallel, ' \ + 'even if the tests are in isolate_* groups. ' \ + 'If no clique is created, there will be a default clique with no pytest option.' + group.addoption('--mp-clique', action='append', dest='mp_cliques', metavar='PYTEST_OPTIONS', help=mp_clique_help) + parser.addini('mp', mp_help, type='bool', default=False) parser.addini('num_processes', np_help) + parser.addini('mp_cliques', mp_clique_help, type='args') # Includes pytest-instafail functionality # :copyright: (c) 2013-2016 by Janne Vanhala. @@ -27,76 +40,84 @@ def pytest_addoption(parser): help="show failures and errors instantly as they occur (disabled by default).") +def pytest_addhooks(pluginmanager): + import pytest_mp.hookspec as hookspec + pluginmanager.add_hookspecs(hookspec) + + manager = multiprocessing.Manager() # Used for "global" synchronization access. synchronization = dict(manager=manager) -synchronization['fixture_message_board'] = manager.dict() -synchronization['fixture_lock'] = manager.Lock() - -state_fixtures = dict(use_mp=False, num_processes=None) +mp_options = Namespace(use_mp=False, num_processes=0, clique_options=[]) @pytest.fixture(scope='session') def mp_use_mp(): - return state_fixtures['use_mp'] + return getattr(mp_options, 'use_mp', False) @pytest.fixture(scope='session') def mp_num_processes(): - return state_fixtures['num_processes'] + return getattr(mp_options, 'num_processes', 0) @pytest.fixture(scope='session') -def mp_message_board(): - return synchronization['fixture_message_board'] +def mp_clique_options(): + return getattr(mp_options, 'clique_options', []) @pytest.fixture(scope='session') -def mp_lock(): - return synchronization['fixture_lock'] +def mp_clique_id(): + return synchronization.get('clique_id', 0) @pytest.fixture(scope='session') -def mp_trail(): - message_board = synchronization['fixture_message_board'] +def mp_message_board(mp_clique_id): + return synchronization['fixture_message_board'][mp_clique_id] + +@pytest.fixture(scope='session') +def mp_lock(mp_clique_id): + return synchronization['fixture_lock'][mp_clique_id] + + +@pytest.fixture(scope='session') +def mp_trail(mp_message_board, mp_lock): @contextmanager def trail(name, state='start'): if state not in ('start', 'finish'): raise Exception('mp_trail state must be "start" or "finish": {}'.format(state)) consumer_key = name + '__consumers__' - with synchronization['fixture_lock']: + with mp_lock: if state == 'start': - if consumer_key not in message_board: - message_board[consumer_key] = 1 + if consumer_key not in mp_message_board: + mp_message_board[consumer_key] = 1 yield True else: - message_board[consumer_key] += 1 + mp_message_board[consumer_key] += 1 yield False else: - message_board[consumer_key] -= 1 - if message_board[consumer_key]: + mp_message_board[consumer_key] -= 1 + if mp_message_board[consumer_key]: yield False else: - del message_board[consumer_key] + del mp_message_board[consumer_key] yield True return trail -def load_mp_options(session): - """Return use_mp, num_processes from pytest session""" - if session.config.option.use_mp is None: - if not session.config.getini('mp'): - state_fixtures['use_mp'] = False - state_fixtures['num_processes'] = 0 - return False, 0 +def load_mp_options(config): + """Return use_mp, num_processes, clique_options from pytest config""" + if config.option.use_mp is None: + if not config.getini('mp'): + return - if hasattr(session.config.option, 'num_processes') and session.config.option.num_processes is not None: - num_processes = session.config.option.num_processes + if hasattr(config.option, 'num_processes') and config.option.num_processes is not None: + num_processes = config.option.num_processes else: - num_processes = session.config.getini('num_processes') or 'cpu_count' + num_processes = config.getini('num_processes') or 'cpu_count' if num_processes == 'cpu_count': num_processes = multiprocessing.cpu_count() @@ -106,9 +127,26 @@ def load_mp_options(session): except ValueError: raise ValueError('--num-processes must be an integer.') - state_fixtures['use_mp'] = True - state_fixtures['num_processes'] = num_processes - return True, num_processes + if getattr(config.option, 'mp_cliques', []): + mp_cliques_args = config.option.mp_cliques + else: + mp_cliques_args = config.getini('mp_cliques') or [] + + clique_options = [] + for arg in mp_cliques_args: + args = shlex.split(arg) + option = copy(config.option) + option, unknown = config._parser.parse_known_and_unknown_args(args, namespace=option) + if unknown: + raise ValueError('unknown parameter for --mp-clique {}'.format(unknown)) + clique_options.append(option) + + # Call hooks of pytest_mp_configure + config.pluginmanager.hook.pytest_mp_configure(clique_options=clique_options) + + mp_options.use_mp = True + mp_options.num_processes = num_processes + mp_options.clique_options = clique_options def get_item_batch_name_and_strategy(item): @@ -193,17 +231,27 @@ def run_isolated_serial_batch(batch, final_test, session, finished_signal=None): return -def submit_test_to_process(test, session): +def prefork(session, clique_options, clique_id): + # Configure the clique before fork + if clique_options: + synchronization['clique_id'] = clique_id + session.config.option = clique_options[clique_id] + session.config.pluginmanager.hook.pytest_mp_prefork(clique_id=clique_id) + + +def submit_test_to_process(test, session, clique_options, clique_id): + prefork(session, clique_options, clique_id) proc = multiprocessing.Process(target=run_test, args=(test, None, session, synchronization['trigger_process_loop'])) with synchronization['processes_lock']: proc.start() pid = proc.pid - synchronization['running_pids'][pid] = True - synchronization['processes'][pid] = proc + synchronization['running_pids'][pid] = clique_id + synchronization['processes'][pid] = proc + synchronization['clique_status'][clique_id].size += 1 synchronization['trigger_process_loop'].set() -def submit_batch_to_process(batch, session): +def submit_batch_to_process(batch, session, clique_options, clique_id): def run_batch(tests, finished_signal): for i, test in enumerate(tests): @@ -213,42 +261,118 @@ def run_batch(tests, finished_signal): raise session.Interrupted(session.shouldstop) finished_signal.set() + prefork(session, clique_options, clique_id) proc = multiprocessing.Process(target=run_batch, args=(batch['tests'], synchronization['trigger_process_loop'])) with synchronization['processes_lock']: proc.start() pid = proc.pid - synchronization['running_pids'][pid] = True - synchronization['processes'][pid] = proc + synchronization['running_pids'][pid] = clique_id + synchronization['processes'][pid] = proc + synchronization['clique_status'][clique_id].size += 1 synchronization['trigger_process_loop'].set() def reap_finished_processes(): + synchronization['process_finished'].wait() + synchronization['process_finished'].clear() + with synchronization['processes_lock']: - pid_list = list(synchronization['finished_pids'].keys()) + finished_pids = dict(synchronization['finished_pids']) synchronization['finished_pids'].clear() - for pid in pid_list: + for pid, clique_id in finished_pids.items(): synchronization['processes'][pid].join() del synchronization['processes'][pid] + synchronization['clique_status'][clique_id].size -= 1 + assert synchronization['clique_status'][clique_id].size >= 0 + if synchronization['clique_status'][clique_id].size == 0: + synchronization['clique_status'][clique_id].barrier = False + def wait_until_no_running(): - wait_until_can_submit(1) + """ Wait until all processes are reaped """ + while sum([gs.size for gs in synchronization['clique_status']]): + reap_finished_processes() + + +def wait_until_new_barrier(): + """ + Wait until at least one new barrier is set. + Return a set of cliques having set barrier. + """ + new_barrier_cliques = set() + while True: + for i in range(len(synchronization['clique_status'])): + if synchronization['clique_status'][i].size == 0: + synchronization['clique_status'][i].barrier = True + new_barrier_cliques.add(i) + + if new_barrier_cliques: + return new_barrier_cliques + + reap_finished_processes() def wait_until_can_submit(num_processes): + """ + Wait until at leat a clique in is available. + Return a list of cliques with minimal size. + """ while True: - with synchronization['processes_lock']: - num_pids = len(synchronization['running_pids']) + min_cliques, min_size = [], sys.maxint + for i in range(len(synchronization['clique_status'])): + if synchronization['clique_status'][i].size >= num_processes: + continue - if num_pids < num_processes: - return + if synchronization['clique_status'][i].size == 0: + synchronization['clique_status'][i].barrier = False - synchronization['process_finished'].wait() - synchronization['process_finished'].clear() + if not synchronization['clique_status'][i].barrier: + if synchronization['clique_status'][i].size < min_size: + min_size = synchronization['clique_status'][i].size + min_cliques = [i] + elif synchronization['clique_status'][i].size == min_size: + min_cliques.append(i) + if min_cliques: + return min_cliques + + reap_finished_processes() + + +def wait_until_can_submit_with_barrier(num_processes, barrier_cliques): + """ + Wait until at least a clique in `barrier_cliques` is available. + Return a list of cliques with minimal size. + + If a clique has size 0, add it into `barrier_cliques`. + """ + while True: + min_cliques, min_size = [], sys.maxint + for i in range(len(synchronization['clique_status'])): + if synchronization['clique_status'][i].size >= num_processes: + continue -def run_batched_tests(batches, session, num_processes): + if synchronization['clique_status'][i].size == 0: + synchronization['clique_status'][i].barrier = True + barrier_cliques.add(i) + + if i in barrier_cliques: + assert synchronization['clique_status'][i].barrier + if synchronization['clique_status'][i].size < min_size: + min_size = synchronization['clique_status'][i].size + min_cliques = [i] + elif synchronization['clique_status'][i].size == min_size: + min_cliques.append(i) + + if min_cliques: + return min_cliques + + reap_finished_processes() + + +def run_batched_tests(batches, session, num_processes, clique_options): sorting = dict(free=2, serial=2, isolated_free=1, isolated_serial=0) batch_names = sorted(batches.keys(), key=lambda x: sorting.get(batches[x]['strategy'], 3)) @@ -263,30 +387,23 @@ def run_batched_tests(batches, session, num_processes): strategy = batches[batch]['strategy'] if strategy == 'free': for test in batches[batch]['tests']: - wait_until_can_submit(num_processes) - submit_test_to_process(test, session) - reap_finished_processes() + cliques = wait_until_can_submit(num_processes) + submit_test_to_process(test, session, clique_options, choice(cliques)) elif strategy == 'serial': - wait_until_can_submit(num_processes) - submit_batch_to_process(batches[batch], session) - reap_finished_processes() + cliques = wait_until_can_submit(num_processes) + submit_batch_to_process(batches[batch], session, clique_options, choice(cliques)) elif strategy == 'isolated_free': - wait_until_no_running() + barrier_cliques = wait_until_new_barrier() for test in batches[batch]['tests']: - wait_until_can_submit(num_processes) - submit_test_to_process(test, session) - reap_finished_processes() - wait_until_no_running() + cliques = wait_until_can_submit_with_barrier(num_processes, barrier_cliques) + submit_test_to_process(test, session, clique_options, choice(cliques)) elif strategy == 'isolated_serial': - wait_until_no_running() - submit_batch_to_process(batches[batch], session) - reap_finished_processes() - wait_until_no_running() + barrier_cliques = wait_until_new_barrier() + submit_batch_to_process(batches[batch], session, clique_options, choice(list(barrier_cliques))) else: raise Exception('Unknown strategy {}'.format(strategy)) wait_until_no_running() - reap_finished_processes() def process_loop(num_processes): @@ -310,8 +427,7 @@ def process_loop(num_processes): except IOError: continue with synchronization['processes_lock']: - del synchronization['running_pids'][pid] - synchronization['finished_pids'][pid] = True + synchronization['finished_pids'][pid] = synchronization['running_pids'].pop(pid) synchronization['process_finished'].set() num_pids -= 1 @@ -327,7 +443,13 @@ def pytest_runtestloop(session): if session.config.option.collectonly: return True - use_mp, num_processes = load_mp_options(session) + use_mp, num_processes, clique_options = mp_options.use_mp, mp_options.num_processes, mp_options.clique_options + if clique_options: + synchronization['fixture_message_board'] = [manager.dict() for _ in clique_options] + synchronization['fixture_lock'] = [manager.Lock() for _ in clique_options] + else: + synchronization['fixture_message_board'] = [manager.dict()] + synchronization['fixture_lock'] = [manager.Lock()] batches = batch_tests(session) @@ -343,14 +465,20 @@ def pytest_runtestloop(session): synchronization['process_finished'] = multiprocessing.Event() synchronization['reap_process_loop'] = multiprocessing.Event() synchronization['processes_lock'] = multiprocessing.Lock() - synchronization['running_pids'] = manager.dict() - synchronization['finished_pids'] = manager.dict() - synchronization['processes'] = dict() + synchronization['running_pids'] = manager.dict() # pid -> clique_id + synchronization['finished_pids'] = manager.dict() # pid -> clique_id + synchronization['processes'] = dict() # pid -> multiprocessing.Process + + # Use barrier to isolate groups. It can only be set/unset when clique size is 0 + synchronization['clique_status'] = [Namespace(size=0, barrier=False) for _ in clique_options] + if not synchronization['clique_status']: + # Create an implicit clique + synchronization['clique_status'] = [Namespace(size=0, barrier=False)] proc_loop = multiprocessing.Process(target=process_loop, args=(num_processes,)) proc_loop.start() - run_batched_tests(batches, session, num_processes) + run_batched_tests(batches, session, num_processes, clique_options) synchronization['reap_process_loop'].set() proc_loop.join() @@ -384,9 +512,9 @@ def pytest_configure(config): config.pluginmanager.unregister(standard_reporter) config.pluginmanager.register(mp_reporter, 'terminalreporter') - if config.option.use_mp is None: - if not config.getini('mp'): - return + load_mp_options(config) + if not mp_options.use_mp: + return if config.option.xmlpath is not None: from pytest_mp.junitxml import MPLogXML diff --git a/pytest_mp/terminal.py b/pytest_mp/terminal.py index 033ad83..348d74b 100644 --- a/pytest_mp/terminal.py +++ b/pytest_mp/terminal.py @@ -1,5 +1,7 @@ from _pytest.terminal import TerminalReporter +from plugin import synchronization, mp_options + # Taken from pytest/_pytest/terminal.py # and made process safe by avoiding use of `setdefault()` @@ -23,6 +25,11 @@ def __init__(self, reporter, manager): self.stats_lock = manager.Lock() self._progress_items_reported_proxy = manager.Value('i', 0) + def pytest_runtest_logstart(self, nodeid, location): + TerminalReporter.pytest_runtest_logstart(self, nodeid, location) + if getattr(mp_options, 'use_mp', False): + self.write("@clique {} ".format(synchronization.get('clique_id', 0))) + def pytest_collectreport(self, report): # Show errors occurred during the collection instantly. TerminalReporter.pytest_collectreport(self, report) diff --git a/tests/test_fixtures.py b/tests/test_fixtures.py index bee9aab..1047f3c 100644 --- a/tests/test_fixtures.py +++ b/tests/test_fixtures.py @@ -12,7 +12,7 @@ def test_one(mp_use_mp): """.format(use_mp)) - result = testdir.runpytest('--mp' if use_mp else '') + result = testdir.runpytest_subprocess('--mp' if use_mp else '') result.assert_outcomes(passed=1) assert result.ret == 0 @@ -26,11 +26,64 @@ def test_one(mp_num_processes): """.format(num_processes)) - result = testdir.runpytest('--mp', '--np={}'.format(num_processes)) + result = testdir.runpytest_subprocess('--mp', '--np={}'.format(num_processes)) result.assert_outcomes(passed=1) assert result.ret == 0 +@pytest.mark.parametrize('clique_num', (0, 1, 100)) +def test_mp_clique_options(testdir, clique_num): + testdir.makepyfile(""" + + def test_one(mp_clique_options): + assert len(mp_clique_options) == {} + for option in mp_clique_options: + assert option.num_processes == 1000 + + """.format(clique_num)) + + result = testdir.runpytest_subprocess('--mp', *['--mp-clique="--np=1000"' for _ in range(clique_num)]) + result.assert_outcomes(passed=1) + assert result.ret == 0 + + +def test_mp_clique_id(testdir): + testdir.makepyfile(""" + import time + import multiprocessing + + import pytest + + manager = multiprocessing.Manager() + clique_ids = manager.dict() + lock = manager.Lock() + + @pytest.mark.mp_group('test_one', 'isolated_serial') + def test_one(mp_clique_id): + global clique_ids + with lock: + clique_ids[1] = mp_clique_id + time.sleep(1) + + @pytest.mark.mp_group('test_two', 'isolated_serial') + def test_two(mp_clique_id): + global clique_ids + with lock: + clique_ids[2] = mp_clique_id + time.sleep(1) + + def test_three(): + assert clique_ids[1] != clique_ids[2] + assert 0 in clique_ids.values() + assert 1 in clique_ids.values() + + """) + + result = testdir.runpytest_subprocess('--mp', '--mp-clique="--np=2"', '--mp-clique="--np=2"') + result.assert_outcomes(passed=3) + assert result.ret == 0 + + def test_mp_lock_blocks_test_exec(request, testdir): testdir.makepyfile(""" from time import sleep @@ -53,7 +106,7 @@ def test_three(mp_lock): """) t0 = datetime.now() - result = testdir.runpytest('--mp') + result = testdir.runpytest_subprocess('--mp') delta = datetime.now() - t0 result.assert_outcomes(passed=3) @@ -78,7 +131,7 @@ def test_two(mp_message_board): """) - result = testdir.runpytest('--mp') + result = testdir.runpytest_subprocess('--mp') result.assert_outcomes(passed=2) assert result.ret == 0 @@ -101,7 +154,7 @@ def test_mp_trail(mp_trail, mp_message_board): """) - result = testdir.runpytest('--mp') + result = testdir.runpytest_subprocess('--mp') result.assert_outcomes(passed=1) assert result.ret == 0 @@ -128,7 +181,7 @@ def test_mp_trail(val, mp_trail, mp_message_board): """) - result = testdir.runpytest('--mp') + result = testdir.runpytest_subprocess('--mp') result.assert_outcomes(passed=100) assert result.ret == 0 @@ -157,6 +210,6 @@ def test_mp_trail(val, mp_trail, mp_message_board): """) - result = testdir.runpytest('--mp') + result = testdir.runpytest_subprocess('--mp') result.assert_outcomes(passed=100) assert result.ret == 0 diff --git a/tests/test_groups.py b/tests/test_groups.py index 4cdc88c..9f7ca2b 100644 --- a/tests/test_groups.py +++ b/tests/test_groups.py @@ -31,7 +31,7 @@ def test_four(request): """) - result = testdir.runpytest('--mp' if use_mp else '') + result = testdir.runpytest_subprocess('--mp' if use_mp else '') result.assert_outcomes(passed=4) assert result.ret == 0 @@ -66,7 +66,7 @@ def test_four(request): """) - result = testdir.runpytest('--mp' if use_mp else '') + result = testdir.runpytest_subprocess('--mp' if use_mp else '') result.assert_outcomes(passed=4) assert result.ret == 0 @@ -95,7 +95,7 @@ def test_three(request): """) - result = testdir.runpytest('--mp' if use_mp else '') + result = testdir.runpytest_subprocess('--mp' if use_mp else '') result.assert_outcomes(passed=3) assert result.ret == 0 @@ -113,7 +113,7 @@ def test_one(self): """) - result = testdir.runpytest('--mp') + result = testdir.runpytest_subprocess('--mp') result.stdout.fnmatch_lines(['*Exception: Detected too many mp_group values for test_one', '*= no tests ran in * seconds =*']) assert result.ret == 3 @@ -134,7 +134,7 @@ def test_one(self, request): """) - result = testdir.runpytest('--mp') + result = testdir.runpytest_subprocess('--mp') result.stdout.fnmatch_lines(['*Exception: Detected too many mp_group values for test_one', '*= no tests ran in * seconds =*']) assert result.ret == 3 @@ -165,6 +165,6 @@ def test_d_free(val): assert True """) - result = testdir.runpytest('-vs', '--mp') + result = testdir.runpytest_subprocess('-vs', '--mp') result.assert_outcomes(passed=26) assert result.ret == 0 diff --git a/tests/test_hook.py b/tests/test_hook.py new file mode 100644 index 0000000..ca6b876 --- /dev/null +++ b/tests/test_hook.py @@ -0,0 +1,156 @@ +import pytest + + +conftest = """ + + from time import time + + import pytest + + _mp_configure_hook_called_time = 0 + _mp_prefork_hook_called_time = 0 + _configure_hook_called_time = 0 + _generate_tests_hook_called_time = 0 + _clique_options = None + _clique_id = -1 + + + @pytest.fixture(scope="function") + def mp_configure_hook_called_time(): + return _mp_configure_hook_called_time + + @pytest.fixture(scope="function") + def mp_prefork_hook_called_time(): + return _mp_prefork_hook_called_time + + @pytest.fixture(scope="function") + def configure_hook_called_time(): + return _configure_hook_called_time + + @pytest.fixture(scope="function") + def generate_tests_hook_called_time(): + return _generate_tests_hook_called_time + + @pytest.fixture(scope="function") + def clique_options(): + return _clique_options + + @pytest.fixture(scope="function") + def clique_id(): + return _clique_id + + def pytest_mp_configure(clique_options): + global _mp_configure_hook_called_time, _clique_options + _mp_configure_hook_called_time = time() + _clique_options = clique_options + + def pytest_mp_prefork(clique_id): + global _mp_prefork_hook_called_time, _clique_id + _mp_prefork_hook_called_time = time() + _clique_id = clique_id + + def pytest_configure(config): + global _configure_hook_called_time + _configure_hook_called_time = time() + + def pytest_generate_tests(metafunc): + global _generate_tests_hook_called_time + if _generate_tests_hook_called_time == 0: + _generate_tests_hook_called_time = time() + + """ + + +@pytest.mark.parametrize('clique_num', (0, 1, 10)) +def test_pytest_mp_configure(testdir, clique_num): + testdir.makeconftest(conftest) + testdir.makepyfile(""" + + def test_one(mp_configure_hook_called_time, clique_options, mp_clique_options): + assert mp_configure_hook_called_time > 0 + assert len(clique_options) == {} + assert clique_options == mp_clique_options + for i in range(len(clique_options)): + assert clique_options[i].num_processes == i+1 + + """.format(clique_num)) + + result = testdir.runpytest_subprocess('--mp', *['--mp-clique="--np={}"'.format(i + 1) for i in range(clique_num)]) + result.assert_outcomes(passed=1) + assert result.ret == 0 + + +def test_no_pytest_mp_configure_when_mp_disabled(testdir): + testdir.makeconftest(conftest) + testdir.makepyfile(""" + + def test_one(mp_configure_hook_called_time, clique_options): + assert mp_configure_hook_called_time == 0 + assert clique_options is None + + """) + + result = testdir.runpytest_subprocess('--mp-clique="--np=1"') + result.assert_outcomes(passed=1) + assert result.ret == 0 + + +def test_pytest_mp_configure_after_pytest_configure(testdir): + testdir.makeconftest(conftest) + testdir.makepyfile(""" + + def test_one(mp_configure_hook_called_time, configure_hook_called_time): + assert configure_hook_called_time > 0 + assert mp_configure_hook_called_time > configure_hook_called_time + + """) + + result = testdir.runpytest_subprocess('--mp', '--mp-clique="--np=1"') + result.assert_outcomes(passed=1) + assert result.ret == 0 + + +def test_pytest_mp_configure_before_pytest_generate_tests(testdir): + testdir.makeconftest(conftest) + testdir.makepyfile(""" + + def test_one(mp_configure_hook_called_time, generate_tests_hook_called_time): + assert mp_configure_hook_called_time > 0 + assert generate_tests_hook_called_time > mp_configure_hook_called_time + + """) + + +@pytest.mark.parametrize('clique_num', (1, 2, 4)) +def test_pytest_mp_prefork(testdir, clique_num): + testdir.makeconftest(conftest) + testdir.makepyfile(""" + + import pytest + + @pytest.mark.parametrize('count', range(10)) + def test_one(mp_prefork_hook_called_time, clique_id, mp_clique_id, mp_num_processes, count): + assert mp_prefork_hook_called_time > 0 + assert mp_num_processes == 2 + assert clique_id == mp_clique_id + + """) + + result = testdir.runpytest_subprocess('--mp', '--np=2', *['--mp-clique=--np=4' for i in range(clique_num)]) + result.assert_outcomes(passed=10) + assert result.ret == 0 + + +def test_no_pytest_mp_prefork_when_no_clique(testdir): + testdir.makeconftest(conftest) + testdir.makepyfile(""" + + def test_one(mp_prefork_hook_called_time, clique_id): + assert mp_prefork_hook_called_time == 0 + assert clique_id == -1 + + """) + + result = testdir.runpytest_subprocess('--mp') + result.assert_outcomes(passed=1) + assert result.ret == 0 diff --git a/tests/test_invocation.py b/tests/test_invocation.py index 4aab6e1..a8afba8 100644 --- a/tests/test_invocation.py +++ b/tests/test_invocation.py @@ -4,26 +4,40 @@ cpu_count = psutil.cpu_count() +def test_plugin_loaded(testdir): + testdir.makepyfile(""" + def test_one(pytestconfig): + assert pytestconfig.pluginmanager.get_plugin('pytest-mp') + """) + result = testdir.runpytest_subprocess() + result.assert_outcomes(passed=1) + assert result.ret == 0 + + def test_confirm_options_in_help(testdir): - result = testdir.runpytest('--help') + result = testdir.runpytest_subprocess('--help') result.stdout.fnmatch_lines(['pytest-mp:', '*--mp, --multiprocessing', - '*--np=NUM_PROCESSES, --num-processes=NUM_PROCESSES']) + '*--np=NUM_PROCESSES, --num-processes=NUM_PROCESSES', + '*--mp-clique=PYTEST_OPTIONS']) def _parametrized_ini(): header = "[pytest]\n" mp = "mp = {}\n" np = "num_processes = {}\n" - return [(header + mp.format(False), False, None), - (header + mp.format(False) + np.format(100), False, 100), - (header + np.format(0), False, 0), - (header + np.format(100), False, 100), - (header + mp.format(True), True, None), - (header + mp.format(True) + np.format(100), True, 100)] - - -@pytest.mark.parametrize('ini_content, mp, num_processes', _parametrized_ini()) -def test_ini_without_cmdline(testdir, ini_content, mp, num_processes): + mpc = "mp_cliques = {}\n" + mpc_0 = mpc.format('') + mpc_2 = mpc.format('--np=1 "--np 2"') + return [(header + mp.format(False), False, None, []), + (header + mp.format(False) + np.format(100) + mpc_2, False, 100, [1, 2]), + (header + np.format(0) + mpc_0, False, 0, []), + (header + np.format(100) + mpc_2, False, 100, [1, 2]), + (header + mp.format(True), True, None, []), + (header + mp.format(True) + np.format(100) + mpc_2, True, 100, [1, 2])] + + +@pytest.mark.parametrize('ini_content, mp, num_processes, clique_options', _parametrized_ini()) +def test_ini_without_cmdline(testdir, ini_content, mp, num_processes, clique_options): """Confirms that .ini values are used to determine mp run options""" testdir.makeini(ini_content) @@ -32,6 +46,7 @@ def test_ini_without_cmdline(testdir, ini_content, mp, num_processes): num_processes = num_processes or cpu_count else: num_processes = 0 + clique_options = [] testdir.makepyfile(""" import pytest @@ -43,18 +58,23 @@ def test_mp(mp_use_mp): # mp_use_mp is pytest-mp helper fixture def test_num_processes(mp_num_processes): # mp_num_processes is pytest-mp helper fixture assert mp_num_processes == {} - """.format(mp, num_processes)) + def test_clique_options(mp_clique_options): # mp_clique_options is pytest-mp helper fixture + assert [o.num_processes for o in mp_clique_options] == {} - result = testdir.runpytest() + """.format(mp, num_processes, clique_options)) - result.stdout.fnmatch_lines(['*= 2 passed in * seconds =*']) + result = testdir.runpytest_subprocess() + + result.stdout.fnmatch_lines(['*= 3 passed in * seconds =*']) assert result.ret == 0 -@pytest.mark.parametrize('cmd_mp, cmd_num_processes', - [(True, None), (True, 50), (True, 0), (False, None), (False, 50), (False, 0)]) -@pytest.mark.parametrize('ini_content, ini_mp, ini_num_processes', _parametrized_ini()) -def test_ini_with_cmdline(testdir, cmd_mp, cmd_num_processes, ini_content, ini_mp, ini_num_processes): +@pytest.mark.parametrize('cmd_mp, cmd_num_processes, cmd_clique_options', + [(True, None, []), (True, 50, [1, 2]), (True, 0, [1]), + (False, None, []), (False, 50, [1, 2]), (False, 0, [1])]) +@pytest.mark.parametrize('ini_content, ini_mp, ini_num_processes, ini_clique_options', _parametrized_ini()) +def test_ini_with_cmdline(testdir, cmd_mp, cmd_num_processes, cmd_clique_options, + ini_content, ini_mp, ini_num_processes, ini_clique_options): """Confirms that .ini values are not used when cmdline values are specified to determine mp run options""" testdir.makeini(ini_content) @@ -65,8 +85,10 @@ def test_ini_with_cmdline(testdir, cmd_mp, cmd_num_processes, ini_content, ini_m else: priority = cmd_num_processes or ini_num_processes num_processes = cpu_count if priority is None else priority + clique_options = cmd_clique_options or ini_clique_options else: num_processes = 0 + clique_options = [] testdir.makepyfile(""" import pytest @@ -78,15 +100,20 @@ def test_mp(mp_use_mp): def test_num_processes(mp_num_processes): assert mp_num_processes == {} - """.format(use_mp, num_processes)) + def test_clique_options(mp_clique_options): + assert [o.num_processes for o in mp_clique_options] == {} + + """.format(use_mp, num_processes, clique_options)) cmd_options = [] if cmd_mp: cmd_options.append('--mp') if cmd_num_processes is not None: cmd_options.append('--num-processes={}'.format(cmd_num_processes)) + for i in cmd_clique_options: + cmd_options.append('--mp-clique=--np={} --log-level=DEBUG'.format(i)) - result = testdir.runpytest(*cmd_options) + result = testdir.runpytest_subprocess(*cmd_options) - result.stdout.fnmatch_lines(['*= 2 passed in * seconds =*']) + result.stdout.fnmatch_lines(['*= 3 passed in * seconds =*']) assert result.ret == 0 diff --git a/tests/test_separation.py b/tests/test_separation.py index e71d697..13d9e14 100644 --- a/tests/test_separation.py +++ b/tests/test_separation.py @@ -1,5 +1,8 @@ +import pytest -def test_isolated_trail_separation(testdir): + +@pytest.mark.parametrize('clique_num', [1, 2, 4]) +def test_isolated_trail_separation(testdir, clique_num): testdir.makepyfile(""" from contextlib import contextmanager from multiprocessing import Manager @@ -8,20 +11,20 @@ def test_isolated_trail_separation(testdir): import pytest - shared = Manager().dict() + shared = [Manager().dict() for _ in range({clique_num})] @contextmanager - def _trail_fixture(num, mp_trail, request): + def _trail_fixture(num, mp_trail, mp_clique_id, request): group = request.node.get_marker('mp_group').kwargs['group'] group += num with mp_trail(num) as start: if start: - shared[num + 'group'] = group + shared[mp_clique_id][num + 'group'] = group if 'Shared' in group: - assert 'Shared' in shared[num + 'group'] + assert 'Shared' in shared[mp_clique_id][num + 'group'] else: - assert shared[num + 'group'] == group + assert shared[mp_clique_id][num + 'group'] == group yield @@ -30,62 +33,62 @@ def _trail_fixture(num, mp_trail, request): @pytest.fixture - def trail_fixture_1(mp_trail, request): - with _trail_fixture('1', mp_trail, request): + def trail_fixture_1(mp_trail, mp_clique_id, request): + with _trail_fixture('1', mp_trail, mp_clique_id, request): yield @pytest.fixture - def trail_fixture_2(mp_trail, request): - with _trail_fixture('2', mp_trail, request): + def trail_fixture_2(mp_trail, mp_clique_id, request): + with _trail_fixture('2', mp_trail, mp_clique_id, request): yield @pytest.fixture - def trail_fixture_3(mp_trail, request): - with _trail_fixture('3', mp_trail, request): + def trail_fixture_3(mp_trail, mp_clique_id, request): + with _trail_fixture('3', mp_trail, mp_clique_id, request): yield @pytest.fixture - def trail_fixture_4(mp_trail, request): - with _trail_fixture('4', mp_trail, request): + def trail_fixture_4(mp_trail, mp_clique_id, request): + with _trail_fixture('4', mp_trail, mp_clique_id, request): yield @pytest.fixture - def trail_fixture_5(mp_trail, request): - with _trail_fixture('5', mp_trail, request): + def trail_fixture_5(mp_trail, mp_clique_id, request): + with _trail_fixture('5', mp_trail, mp_clique_id, request): yield @pytest.fixture - def trail_fixture_6(mp_trail, request): - with _trail_fixture('6', mp_trail, request): + def trail_fixture_6(mp_trail, mp_clique_id, request): + with _trail_fixture('6', mp_trail, mp_clique_id, request): yield @pytest.fixture - def trail_fixture_7(mp_trail, request): - with _trail_fixture('7', mp_trail, request): + def trail_fixture_7(mp_trail, mp_clique_id, request): + with _trail_fixture('7', mp_trail, mp_clique_id, request): yield @pytest.fixture - def trail_fixture_8(mp_trail, request): - with _trail_fixture('8', mp_trail, request): + def trail_fixture_8(mp_trail, mp_clique_id, request): + with _trail_fixture('8', mp_trail, mp_clique_id, request): yield @pytest.fixture - def trail_fixture_9(mp_trail, request): - with _trail_fixture('9', mp_trail, request): + def trail_fixture_9(mp_trail, mp_clique_id, request): + with _trail_fixture('9', mp_trail, mp_clique_id, request): yield @pytest.fixture - def trail_fixture_10(mp_trail, request): - with _trail_fixture('10', mp_trail, request): + def trail_fixture_10(mp_trail, mp_clique_id, request): + with _trail_fixture('10', mp_trail, mp_clique_id, request): yield @@ -175,8 +178,8 @@ def test_serial(trail_fixture_1, trail_fixture_2, trail_fixture_3, trail_fixture trail_fixture_5, trail_fixture_6, trail_fixture_7, trail_fixture_8, trail_fixture_9, trail_fixture_10, _): assert True - """) + """.format(clique_num=clique_num)) - result = testdir.runpytest('--mp') + result = testdir.runpytest_subprocess('--mp', '--np=4', *['--mp-clique=--np=2' for _ in range(clique_num)]) result.assert_outcomes(passed=1000) assert result.ret == 0 diff --git a/tests/test_smoke.py b/tests/test_smoke.py index adc0daa..8bd9698 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -28,7 +28,7 @@ def test_three(val): """.format(strategy)) - result = testdir.runpytest('--mp') + result = testdir.runpytest_subprocess('--mp') result.assert_outcomes(passed=15) assert result.ret == 0 @@ -57,7 +57,7 @@ def test_three(val): """.format(strategy)) - result = testdir.runpytest('--mp') + result = testdir.runpytest_subprocess('--mp') result.assert_outcomes(passed=12, failed=3) assert result.ret == 1 @@ -91,7 +91,7 @@ def test_three(val): """.format(strategy)) - result = testdir.runpytest('--mp') + result = testdir.runpytest_subprocess('--mp') result.assert_outcomes(passed=9, failed=3, skipped=3) assert result.ret == 1 @@ -132,6 +132,6 @@ def test_three(bomb, val): """.format(strategy)) - result = testdir.runpytest('--mp') + result = testdir.runpytest_subprocess('--mp') result.assert_outcomes(passed=6, failed=3, skipped=3, error=3) assert result.ret == 1 diff --git a/tests/test_strategies.py b/tests/test_strategies.py index 7d688a2..bb4ca05 100644 --- a/tests/test_strategies.py +++ b/tests/test_strategies.py @@ -12,7 +12,7 @@ def test_two(val): """) - result = testdir.runpytest('--mp') + result = testdir.runpytest_subprocess('--mp') result.stdout.fnmatch_lines(['*Exception: Unknown strategy unknown', '*= no tests ran in * seconds =*']) assert result.ret == 3 @@ -35,7 +35,7 @@ def test_two(val): """) - result = testdir.runpytest('--mp') + result = testdir.runpytest_subprocess('--mp') result.stdout.fnmatch_lines(['*Exception: TestGroup already has specified strategy free.', '*= no tests ran in * seconds =*']) assert result.ret == 3 @@ -74,7 +74,7 @@ def test_three(val): """) - result = testdir.runpytest('--mp') + result = testdir.runpytest_subprocess('--mp') result.assert_outcomes(passed=15) @@ -122,7 +122,7 @@ def test_three(val): """) - result = testdir.runpytest('--mp') + result = testdir.runpytest_subprocess('--mp') result.assert_outcomes(passed=15) @@ -157,7 +157,7 @@ def test_three(val): """) - result = testdir.runpytest('--mp') + result = testdir.runpytest_subprocess('--mp') result.assert_outcomes(passed=15) @@ -201,38 +201,63 @@ def test_three(val): """) - result = testdir.runpytest('--mp') + result = testdir.runpytest_subprocess('--mp') result.assert_outcomes(passed=15) -@pytest.mark.parametrize("c", range(0, 10)) +@pytest.mark.parametrize("c", range(0, 8)) @pytest.mark.parametrize("strategy1", ["isolated_free", "isolated_serial"]) -@pytest.mark.parametrize("strategy2", ["free", "serial", "isolated_free", "isolated_serial"]) -def test_isolated_with_another_strategy(c, strategy1, strategy2, testdir, tmpdir): +@pytest.mark.parametrize("strategy2", ["free", "serial"]) +@pytest.mark.parametrize("clique_size", [1, 2, 4]) +def test_isolated_with_other_strategies(c, strategy1, strategy2, clique_size, testdir, tmpdir): testdir.makepyfile(""" import pytest import py, time, random - @pytest.mark.mp_group('TestGroup1', '{strategy1}') - def test_strategy1(): + @pytest.fixture(scope='function') + def clique_dir(mp_clique_id): tempdir = py.path.local('{tmpdir_path}') - assert len(tempdir.listdir()) == 0, tempdir.listdir() - newdir = tempdir.mkdir('strategy1') + return tempdir.ensure_dir(str(mp_clique_id)) + + @pytest.mark.mp_group('TestGroup1', '{strategy1}') + def test_strategy1(clique_dir): + assert len(clique_dir.listdir()) == 0, clique_dir.listdir() + newdir = clique_dir.mkdir('strategy1') time.sleep(random.random()*0.1) - assert len(tempdir.listdir()) == 1, tempdir.listdir() + assert len(clique_dir.listdir()) == 1, clique_dir.listdir() time.sleep(random.random()*0.1) newdir.remove() @pytest.mark.mp_group('TestGroup2', '{strategy2}') - def test_strategy2(): - tempdir = py.path.local('{tmpdir_path}') - assert len(tempdir.listdir()) == 0, tempdir.listdir() - newdir = tempdir.mkdir('strategy2') + def test_strategy2(clique_dir): + assert len(clique_dir.listdir()) == 0, clique_dir.listdir() + newdir = clique_dir.mkdir('strategy2') time.sleep(random.random()*0.1) - assert len(tempdir.listdir()) == 1, tempdir.listdir() + assert len(clique_dir.listdir()) == 1, clique_dir.listdir() time.sleep(random.random()*0.1) newdir.remove() + + @pytest.mark.mp_group('TestGroup3', '{strategy1}') + def test_strategy3(clique_dir): + assert len(clique_dir.listdir()) == 0, clique_dir.listdir() + newdir = clique_dir.mkdir('strategy3') + time.sleep(random.random()*0.1) + assert len(clique_dir.listdir()) == 1, clique_dir.listdir() + time.sleep(random.random()*0.1) + newdir.remove() + + @pytest.mark.mp_group('TestGroup4', '{strategy1}') + def test_strategy4(clique_dir): + assert len(clique_dir.listdir()) == 0, clique_dir.listdir() + newdir = clique_dir.mkdir('strategy4') + time.sleep(random.random()*0.1) + assert len(clique_dir.listdir()) == 1, clique_dir.listdir() + time.sleep(random.random()*0.1) + newdir.remove() + """.format(tmpdir_path=tmpdir.strpath, strategy1=strategy1, strategy2=strategy2)) - result = testdir.runpytest('--mp', '--np=2') - result.assert_outcomes(passed=2) + result = testdir.runpytest_subprocess('--mp', '--np=2', *['--mp-clique=--np=2' for _ in range(clique_size)]) + result.assert_outcomes(passed=4) + assert len(tmpdir.listdir()) == clique_size + assert result.ret == 0