Skip to content

Commit 3b2db1c

Browse files
committed
Re-working process worker
1 parent 7bbeb0e commit 3b2db1c

File tree

1 file changed

+5
-7
lines changed

1 file changed

+5
-7
lines changed

giskard/utils/worker_pool.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from dataclasses import dataclass, field
1010
from enum import Enum
1111
from io import StringIO
12-
from multiprocessing import Process, Queue, SimpleQueue, cpu_count, get_context
12+
from multiprocessing import Process, Queue, cpu_count, get_context
1313
from multiprocessing.context import SpawnContext, SpawnProcess
1414
from multiprocessing.managers import SyncManager
1515
from queue import Empty, Full
@@ -92,9 +92,7 @@ class GiskardResult:
9292
exception: Any = None
9393

9494

95-
def _process_worker(
96-
tasks_queue: Queue[Optional[GiskardTask]], tasks_results: Queue[GiskardResult], running_process: Dict[str, str]
97-
):
95+
def _process_worker(tasks_queue: Queue, tasks_results: Queue, running_process: Dict[str, str]):
9896
pid = os.getpid()
9997
LOGGER.info("Process %s started", pid)
10098

@@ -159,12 +157,12 @@ def __init__(self, nb_workers: Optional[int] = None, name: Optional[str] = None)
159157
# Mapping of the running tasks and worker pids
160158
self.with_timeout_tasks: List[TimeoutData] = []
161159
# Queue with tasks to run
162-
self.pending_tasks_queue: Queue[GiskardTask] = self._mp_context.Queue()
160+
self.pending_tasks_queue: Queue = self._mp_context.Queue()
163161
# Queue with tasks to be consumed asap
164162
# As in ProcessPool, add one more to avoid idling process
165-
self.running_tasks_queue: Queue[Optional[GiskardTask]] = self._mp_context.Queue(maxsize=self._nb_workers + 1)
163+
self.running_tasks_queue: Queue = self._mp_context.Queue(maxsize=self._nb_workers + 1)
166164
# Queue with results to notify
167-
self.tasks_results: Queue[GiskardResult] = self._mp_context.Queue()
165+
self.tasks_results: Queue = self._mp_context.Queue()
168166
# Mapping task_id with future
169167
self.futures_mapping: Dict[str, Future] = dict()
170168
LOGGER.debug("Starting threads for the WorkerPoolExecutor")

0 commit comments

Comments
 (0)