Skip to content

Commit 40a15ca

Browse files
committed
issue #96, pool: handle race conditions when iterating through dictionary
Signed-off-by: Matteo Cafasso <noxdafox@gmail.com>
1 parent 36fce1a commit 40a15ca

1 file changed

Lines changed: 17 additions & 8 deletions

File tree

pebble/pool/process.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -266,13 +266,12 @@ def handle_worker_expiration(self, expiration: tuple):
266266
self.task_manager.task_done(task.id, Result(TASK_ERROR, error))
267267

268268
def find_expired_task(self, worker_id: int) -> Task:
269-
tasks = tuple(self.task_manager.tasks.values())
269+
tasks = dictionary_values(self.task_manager.tasks)
270270
running_tasks = tuple(t for t in tasks if t.worker_id != 0)
271-
272271
if running_tasks:
273272
return task_worker_lookup(running_tasks, worker_id)
274-
else:
275-
raise BrokenProcessPool("All workers expired")
273+
274+
raise BrokenProcessPool("All workers expired")
276275

277276

278277
class TaskManager:
@@ -317,10 +316,11 @@ def task_problem(self, task_id: int, error: Exception):
317316
self.task_done(task_id, Result(TASK_ERROR, error))
318317

319318
def timeout_tasks(self) -> tuple:
320-
return tuple(t for t in tuple(self.tasks.values()) if self.timeout(t))
319+
return tuple(t for t in dictionary_values(self.tasks)
320+
if self.timeout(t))
321321

322322
def cancelled_tasks(self) -> tuple:
323-
return tuple(t for t in tuple(self.tasks.values())
323+
return tuple(t for t in dictionary_values(self.tasks)
324324
if t.timestamp != 0 and t.future.cancelled())
325325

326326
@staticmethod
@@ -371,8 +371,8 @@ def inspect_workers(self) -> tuple:
371371
Returns the workers which have unexpectedly ended.
372372
373373
"""
374-
workers = tuple(self.workers.values())
375-
expired = tuple(w for w in workers if not w.is_alive())
374+
expired = tuple(w for w in dictionary_values(self.workers)
375+
if not w.is_alive())
376376

377377
for worker in expired:
378378
self.workers.pop(worker.pid)
@@ -495,6 +495,15 @@ def interpreter_shutdown():
495495
stop_process(worker)
496496

497497

498+
def dictionary_values(dictionary: dict) -> tuple:
499+
"""Returns a snapshot of the dictionary values handling race conditions."""
500+
while True:
501+
try:
502+
return tuple(dictionary.values())
503+
except RuntimeError: # race condition
504+
pass
505+
506+
498507
atexit.register(interpreter_shutdown)
499508

500509

0 commit comments

Comments
 (0)