Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion distributed/bokeh/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import atexit
import json
import logging
import multiprocessing
import os
import socket
import sys
Expand Down
1 change: 0 additions & 1 deletion distributed/cli/dask_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import atexit
import json
import logging
import multiprocessing
import os
import socket
import subprocess
Expand Down
21 changes: 11 additions & 10 deletions distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from datetime import datetime, timedelta
import json
import logging
from multiprocessing import Process, Queue, queues
from multiprocessing.queues import Empty
import os
import shutil
import subprocess
Expand All @@ -18,7 +18,7 @@
from .compatibility import JSONDecodeError
from .core import Server, rpc, write, RPCClosed
from .protocol import to_serialize
from .utils import get_ip, ignoring, log_errors, tmpfile
from .utils import get_ip, ignoring, log_errors, mp_context, tmpfile
from .worker import _ncores, Worker, run, TOTAL_MEMORY

nanny_environment = os.path.dirname(sys.executable)
Expand Down Expand Up @@ -190,13 +190,14 @@ def instantiate(self, stream=None, environment=None):
except JSONDecodeError:
yield gen.sleep(0.01)
else:
q = Queue()
self.process = Process(target=run_worker_fork,
args=(q, self.ip, self.scheduler.ip,
self.scheduler.port, self.ncores,
self.port, self._given_worker_port,
self.local_dir, self.services, self.name,
self.memory_limit, self.reconnect))
q = mp_context.Queue()
self.process = mp_context.Process(
target=run_worker_fork,
args=(q, self.ip, self.scheduler.ip,
self.scheduler.port, self.ncores,
self.port, self._given_worker_port,
self.local_dir, self.services, self.name,
self.memory_limit, self.reconnect))
self.process.daemon = True
self.process.start()
while True:
Expand All @@ -208,7 +209,7 @@ def instantiate(self, stream=None, environment=None):
self.worker_dir = msg['dir']
assert self.worker_port
break
except queues.Empty:
except Empty:
yield gen.sleep(0.1)

logger.info("Nanny %s:%d starts worker process %s:%d",
Expand Down
5 changes: 2 additions & 3 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from concurrent.futures import CancelledError
from datetime import timedelta
import itertools
from multiprocessing import Process
import os
import pickle
from random import random, choice
Expand Down Expand Up @@ -34,7 +33,7 @@
temp_default_client, get_restrictions)
from distributed.scheduler import Scheduler, KilledWorker
from distributed.sizeof import sizeof
from distributed.utils import sync, tmp_text, ignoring, tokey, All
from distributed.utils import sync, tmp_text, ignoring, tokey, All, mp_context
from distributed.utils_test import (cluster, slow, slowinc, slowadd, randominc,
loop, inc, dec, div, throws, gen_cluster, gen_test, double, deep)

Expand Down Expand Up @@ -1614,7 +1613,7 @@ def long_running_client_connection(ip, port):

@gen_cluster()
def test_cleanup_after_broken_client_connection(s, a, b):
proc = Process(target=long_running_client_connection, args=(s.ip, s.port))
proc = mp_context.Process(target=long_running_client_connection, args=(s.ip, s.port))
proc.daemon = True
proc.start()

Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_core.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import print_function, division, absolute_import

from functools import partial
from multiprocessing import Process
import socket
from time import time

Expand All @@ -13,6 +12,7 @@
coerce_to_rpc, send_recv, coerce_to_address, ConnectionPool)
from distributed.utils_test import slow, loop, gen_test


def test_server(loop):
@gen.coroutine
def f():
Expand Down
Loading