From 4bc64fea8a36de1b8034590c67e3875730babc37 Mon Sep 17 00:00:00 2001 From: Thiago F Pappacena Date: Wed, 21 Jun 2023 18:07:06 +0000 Subject: [PATCH 1/7] Adding simple test --- tests/test_arrow_dataset.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/test_arrow_dataset.py b/tests/test_arrow_dataset.py index edd7961510d..43de0212cf7 100644 --- a/tests/test_arrow_dataset.py +++ b/tests/test_arrow_dataset.py @@ -1631,6 +1631,18 @@ def __call__(self, example): ex_cnt = ExampleCounter(batched=True) dset.map(ex_cnt) self.assertEqual(ex_cnt.cnt, len(dset)) + + def __test_map_crash_subprocess(self, in_memory): + # be sure that a crash in one of the subprocess will not + # hang dataset.map() call forever + + def do_crash(self, example): + raise SystemExit() + + with tempfile.TemporaryDirectory() as tmp_dir: + with self._create_dummy_dataset(in_memory, tmp_dir) as dset: + with pytest.raises(ValueError): + dset.map(do_crash) def test_filter(self, in_memory): # keep only first five examples From 37b083056d725717b9dbb4c75512bac72747ea68 Mon Sep 17 00:00:00 2001 From: Thiago F Pappacena Date: Wed, 21 Jun 2023 17:36:35 -0300 Subject: [PATCH 2/7] Checking killed subprocs --- src/datasets/utils/py_utils.py | 20 +++++++++++++++++--- tests/test_arrow_dataset.py | 18 ++++++++++++------ 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/src/datasets/utils/py_utils.py b/src/datasets/utils/py_utils.py index d97fc0aac97..87e3a7619b6 100644 --- a/src/datasets/utils/py_utils.py +++ b/src/datasets/utils/py_utils.py @@ -32,7 +32,7 @@ from queue import Empty from shutil import disk_usage from types import CodeType, FunctionType -from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, TypeVar, Union +from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, TypeVar, Union, Set from urllib.parse import urlparse import dill @@ -1330,18 +1330,24 @@ def _write_generator_to_queue(queue: queue.Queue, func: Callable[..., Iterable[Y return i +def _get_pool_pid(pool: Union[multiprocessing.pool.Pool, multiprocess.pool.Pool]) -> Set[int]: + return set([f.pid for f in pool._pool]) + + def iflatmap_unordered( pool: Union[multiprocessing.pool.Pool, multiprocess.pool.Pool], func: Callable[..., Iterable[Y]], *, kwargs_iterable: Iterable[dict], ) -> Iterable[Y]: + initial_pool_pid = _get_pool_pid(pool) manager_cls = Manager if isinstance(pool, multiprocessing.pool.Pool) else multiprocess.Manager with manager_cls() as manager: queue = manager.Queue() async_results = [ pool.apply_async(_write_generator_to_queue, (queue, func, kwargs)) for kwargs in kwargs_iterable ] + subproc_killed = False try: while True: try: @@ -1349,6 +1355,14 @@ def iflatmap_unordered( except Empty: if all(async_result.ready() for async_result in async_results) and queue.empty(): break + if _get_pool_pid(pool) != initial_pool_pid: + subproc_killed = True + # One of the subprocesses has died. We should not wait forever. + raise RuntimeError( + "One of the subprocesses has abruptly died during map operation." + "To debug the error, disable multiprocessing." + ) finally: - # we get the result in case there's an error to raise - [async_result.get(timeout=0.05) for async_result in async_results] + if not subproc_killed: + # we get the result in case there's an error to raise + [async_result.get(timeout=0.05) for async_result in async_results] diff --git a/tests/test_arrow_dataset.py b/tests/test_arrow_dataset.py index 43de0212cf7..e1cbf64054f 100644 --- a/tests/test_arrow_dataset.py +++ b/tests/test_arrow_dataset.py @@ -1631,18 +1631,24 @@ def __call__(self, example): ex_cnt = ExampleCounter(batched=True) dset.map(ex_cnt) self.assertEqual(ex_cnt.cnt, len(dset)) - - def __test_map_crash_subprocess(self, in_memory): + + def test_map_crash_subprocess(self, in_memory): # be sure that a crash in one of the subprocess will not # hang dataset.map() call forever - def do_crash(self, example): - raise SystemExit() + def do_crash(row): + import os + os.kill(os.getpid(), 9) + return row with tempfile.TemporaryDirectory() as tmp_dir: with self._create_dummy_dataset(in_memory, tmp_dir) as dset: - with pytest.raises(ValueError): - dset.map(do_crash) + with pytest.raises(RuntimeError) as excinfo: + dset.map(do_crash, num_proc=2) + assert str(excinfo.value) == ( + "One of the subprocesses has abruptly died during map operation." + "To debug the error, disable multiprocessing." + ) def test_filter(self, in_memory): # keep only first five examples From 43e4c9b347995dd08d88e11e0e1304267a53364e Mon Sep 17 00:00:00 2001 From: Thiago F Pappacena Date: Wed, 21 Jun 2023 17:52:38 -0300 Subject: [PATCH 3/7] Code format --- src/datasets/utils/py_utils.py | 4 ++-- tests/test_arrow_dataset.py | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/datasets/utils/py_utils.py b/src/datasets/utils/py_utils.py index 87e3a7619b6..e699ac45ef7 100644 --- a/src/datasets/utils/py_utils.py +++ b/src/datasets/utils/py_utils.py @@ -32,7 +32,7 @@ from queue import Empty from shutil import disk_usage from types import CodeType, FunctionType -from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, TypeVar, Union, Set +from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple, TypeVar, Union from urllib.parse import urlparse import dill @@ -1331,7 +1331,7 @@ def _write_generator_to_queue(queue: queue.Queue, func: Callable[..., Iterable[Y def _get_pool_pid(pool: Union[multiprocessing.pool.Pool, multiprocess.pool.Pool]) -> Set[int]: - return set([f.pid for f in pool._pool]) + return {f.pid for f in pool._pool} def iflatmap_unordered( diff --git a/tests/test_arrow_dataset.py b/tests/test_arrow_dataset.py index e1cbf64054f..037ba655329 100644 --- a/tests/test_arrow_dataset.py +++ b/tests/test_arrow_dataset.py @@ -1633,11 +1633,12 @@ def __call__(self, example): self.assertEqual(ex_cnt.cnt, len(dset)) def test_map_crash_subprocess(self, in_memory): - # be sure that a crash in one of the subprocess will not + # be sure that a crash in one of the subprocess will not # hang dataset.map() call forever def do_crash(row): import os + os.kill(os.getpid(), 9) return row From 095d5ea379dbeeaceb878f0688e8287bdc29949e Mon Sep 17 00:00:00 2001 From: Thiago F Pappacena Date: Wed, 5 Jul 2023 19:13:25 -0300 Subject: [PATCH 4/7] Check pool of async results --- src/datasets/utils/py_utils.py | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/src/datasets/utils/py_utils.py b/src/datasets/utils/py_utils.py index e699ac45ef7..c4e622aed87 100644 --- a/src/datasets/utils/py_utils.py +++ b/src/datasets/utils/py_utils.py @@ -1330,24 +1330,20 @@ def _write_generator_to_queue(queue: queue.Queue, func: Callable[..., Iterable[Y return i -def _get_pool_pid(pool: Union[multiprocessing.pool.Pool, multiprocess.pool.Pool]) -> Set[int]: - return {f.pid for f in pool._pool} - - def iflatmap_unordered( pool: Union[multiprocessing.pool.Pool, multiprocess.pool.Pool], func: Callable[..., Iterable[Y]], *, kwargs_iterable: Iterable[dict], ) -> Iterable[Y]: - initial_pool_pid = _get_pool_pid(pool) + original_pool = list(pool._pool) + pool_changed = False manager_cls = Manager if isinstance(pool, multiprocessing.pool.Pool) else multiprocess.Manager with manager_cls() as manager: queue = manager.Queue() async_results = [ pool.apply_async(_write_generator_to_queue, (queue, func, kwargs)) for kwargs in kwargs_iterable ] - subproc_killed = False try: while True: try: @@ -1355,14 +1351,13 @@ def iflatmap_unordered( except Empty: if all(async_result.ready() for async_result in async_results) and queue.empty(): break - if _get_pool_pid(pool) != initial_pool_pid: - subproc_killed = True - # One of the subprocesses has died. We should not wait forever. - raise RuntimeError( - "One of the subprocesses has abruptly died during map operation." - "To debug the error, disable multiprocessing." - ) + if any(async_result._pool != original_pool for async_result in async_results) and queue.empty(): + pool_changed = True + raise RuntimeError( + "One of the subprocesses has abruptly died during map operation." + "To debug the error, disable multiprocessing." + ) finally: - if not subproc_killed: + if not pool_changed: # we get the result in case there's an error to raise [async_result.get(timeout=0.05) for async_result in async_results] From 3811f459ceae562f21a34ce5e72d282fb5ce39f5 Mon Sep 17 00:00:00 2001 From: Thiago F Pappacena Date: Thu, 6 Jul 2023 10:06:03 -0300 Subject: [PATCH 5/7] Style --- src/datasets/utils/py_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/datasets/utils/py_utils.py b/src/datasets/utils/py_utils.py index c4e622aed87..85c8cd029b0 100644 --- a/src/datasets/utils/py_utils.py +++ b/src/datasets/utils/py_utils.py @@ -32,7 +32,7 @@ from queue import Empty from shutil import disk_usage from types import CodeType, FunctionType -from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple, TypeVar, Union +from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, TypeVar, Union from urllib.parse import urlparse import dill From 919a9e4c778e4b09b3b9533c9ec2d120a7bd3035 Mon Sep 17 00:00:00 2001 From: Thiago F Pappacena Date: Thu, 6 Jul 2023 20:59:02 -0300 Subject: [PATCH 6/7] Using pool._pool instead of async_result pool --- src/datasets/utils/py_utils.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/src/datasets/utils/py_utils.py b/src/datasets/utils/py_utils.py index 85c8cd029b0..9bfb8149b15 100644 --- a/src/datasets/utils/py_utils.py +++ b/src/datasets/utils/py_utils.py @@ -32,7 +32,7 @@ from queue import Empty from shutil import disk_usage from types import CodeType, FunctionType -from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, TypeVar, Union +from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple, TypeVar, Union from urllib.parse import urlparse import dill @@ -1330,13 +1330,17 @@ def _write_generator_to_queue(queue: queue.Queue, func: Callable[..., Iterable[Y return i +def _get_pool_pid(pool: Union[multiprocessing.pool.Pool, multiprocess.pool.Pool]) -> Set[int]: + return {f.pid for f in pool._pool} + + def iflatmap_unordered( pool: Union[multiprocessing.pool.Pool, multiprocess.pool.Pool], func: Callable[..., Iterable[Y]], *, kwargs_iterable: Iterable[dict], ) -> Iterable[Y]: - original_pool = list(pool._pool) + initial_pool_pid = _get_pool_pid(pool) pool_changed = False manager_cls = Manager if isinstance(pool, multiprocessing.pool.Pool) else multiprocess.Manager with manager_cls() as manager: @@ -1351,12 +1355,13 @@ def iflatmap_unordered( except Empty: if all(async_result.ready() for async_result in async_results) and queue.empty(): break - if any(async_result._pool != original_pool for async_result in async_results) and queue.empty(): - pool_changed = True - raise RuntimeError( - "One of the subprocesses has abruptly died during map operation." - "To debug the error, disable multiprocessing." - ) + if _get_pool_pid(pool) != initial_pool_pid: + pool_changed = True + # One of the subprocesses has died. We should not wait forever. + raise RuntimeError( + "One of the subprocesses has abruptly died during map operation." + "To debug the error, disable multiprocessing." + ) finally: if not pool_changed: # we get the result in case there's an error to raise From 42d7331d68d7dac751739553adf015e959343542 Mon Sep 17 00:00:00 2001 From: Thiago F Pappacena Date: Sun, 9 Jul 2023 12:59:40 -0300 Subject: [PATCH 7/7] Avoid running on Windows a Linux specific test implementation Co-authored-by: Quentin Lhoest <42851186+lhoestq@users.noreply.github.com> --- tests/test_arrow_dataset.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_arrow_dataset.py b/tests/test_arrow_dataset.py index 037ba655329..0b08e3b4a6e 100644 --- a/tests/test_arrow_dataset.py +++ b/tests/test_arrow_dataset.py @@ -1632,6 +1632,7 @@ def __call__(self, example): dset.map(ex_cnt) self.assertEqual(ex_cnt.cnt, len(dset)) + @require_not_windows def test_map_crash_subprocess(self, in_memory): # be sure that a crash in one of the subprocess will not # hang dataset.map() call forever