diff --git a/distributed/bokeh/background/server_lifecycle.py b/distributed/bokeh/background/server_lifecycle.py index 2e2dfa3421e..6f6c1a5aedd 100644 --- a/distributed/bokeh/background/server_lifecycle.py +++ b/distributed/bokeh/background/server_lifecycle.py @@ -41,7 +41,7 @@ def http_get(route): logger.info("Can not connect to %s", url, exc_info=True) return except HTTPError: - logger.warn("http route %s failed", route) + logger.warning("http route %s failed", route) return msg = json.loads(response.body.decode()) messages[route]['deque'].append(msg) @@ -58,7 +58,7 @@ def workers(): response = yield client.fetch( 'http://%(host)s:%(http-port)d/workers.json' % options) except HTTPError: - logger.warn("workers http route failed") + logger.warning("workers http route failed") return msg = json.loads(response.body.decode()) if msg: diff --git a/distributed/bokeh/core.py b/distributed/bokeh/core.py index 506933f8f6b..e4b9adc9753 100644 --- a/distributed/bokeh/core.py +++ b/distributed/bokeh/core.py @@ -53,8 +53,9 @@ def stop(self): if self.server._tornado._ping_job is not None: self.server._tornado._ping_job.stop() - # self.server.stop() # https://github.com/bokeh/bokeh/issues/5494 + if bokeh.__version__ >= '0.12.4': + self.server.stop() def format_bytes(n): diff --git a/distributed/cli/dask_scheduler.py b/distributed/cli/dask_scheduler.py index 9cfdbb845c7..22a17fd1219 100755 --- a/distributed/cli/dask_scheduler.py +++ b/distributed/cli/dask_scheduler.py @@ -149,7 +149,7 @@ def del_pid_file(): except ImportError: logger.info("Please install Bokeh to get Web UI") except Exception as e: - logger.warn("Could not start Bokeh web UI", exc_info=True) + logger.warning("Could not start Bokeh web UI", exc_info=True) logger.info('Local Directory: %26s', local_directory) logger.info('-' * 47) diff --git a/distributed/cli/dask_worker.py b/distributed/cli/dask_worker.py index 1c0bdf1447a..e57f78c74af 100755 --- a/distributed/cli/dask_worker.py +++ b/distributed/cli/dask_worker.py @@ -7,6 +7,7 @@ import logging import os import shutil +import signal from sys import exit from time import sleep @@ -28,7 +29,6 @@ global_nannies = [] -import signal def handle_signal(sig, frame): loop = IOLoop.instance() @@ -40,6 +40,7 @@ def handle_signal(sig, frame): if loop._running: loop.add_callback_from_signal(loop.stop) else: + loop.close() exit(1) @@ -237,6 +238,7 @@ def f(): io_loop=loop2) loop2.run_sync(f) + loop2.close() if nanny: for n in nannies: diff --git a/distributed/client.py b/distributed/client.py index d06b8b5b3ed..9c8ba3df42a 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -622,7 +622,7 @@ def _handle_report(self): msgs = yield self.scheduler_comm.comm.read() except CommClosedError: if self.status == 'running': - logger.warn("Client report stream closed to scheduler") + logger.warning("Client report stream closed to scheduler") logger.info("Reconnecting...") self.status = 'connecting' yield self._reconnect() @@ -696,7 +696,7 @@ def _handle_restart(self): self._restart_event.set() def _handle_error(self, exception=None): - logger.warn("Scheduler exception:") + logger.warning("Scheduler exception:") logger.exception(exception) @gen.coroutine @@ -1032,7 +1032,7 @@ def wait(k): response = yield self.scheduler.gather(keys=keys) if response['status'] == 'error': - logger.warn("Couldn't gather keys %s", response['keys']) + logger.warning("Couldn't gather keys %s", response['keys']) for key in response['keys']: self._send_to_scheduler({'op': 'report-key', 'key': key}) diff --git a/distributed/comm/inproc.py b/distributed/comm/inproc.py index 8addb48c0cd..787a9408ef1 100644 --- a/distributed/comm/inproc.py +++ b/distributed/comm/inproc.py @@ -157,7 +157,7 @@ def __init__(self, peer_addr, read_q, write_q, write_loop, def _get_finalizer(self): def finalize(write_q=self._write_q, write_loop=self._write_loop, r=repr(self)): - logger.warn("Closing dangling queue in %s" % (r,)) + logger.warning("Closing dangling queue in %s" % (r,)) write_loop.add_callback(write_q.put_nowait, _EOF) return finalize diff --git a/distributed/comm/tcp.py b/distributed/comm/tcp.py index bb072369d4e..628262cc2b2 100644 --- a/distributed/comm/tcp.py +++ b/distributed/comm/tcp.py @@ -91,7 +91,7 @@ def set_tcp_timeout(stream): TCP_USER_TIMEOUT = 18 # since Linux 2.6.37 sock.setsockopt(socket.SOL_TCP, TCP_USER_TIMEOUT, timeout * 1000) except EnvironmentError as e: - logger.warn("Could not set timeout on TCP stream: %s", e) + logger.warning("Could not set timeout on TCP stream: %s", e) def convert_stream_closed_error(exc): @@ -129,7 +129,7 @@ def _read_extra(self): def _get_finalizer(self): def finalize(stream=self.stream, r=repr(self)): if not stream.closed(): - logger.warn("Closing dangling stream in %s" % (r,)) + logger.warning("Closing dangling stream in %s" % (r,)) stream.close() return finalize @@ -392,9 +392,9 @@ def handle_stream(self, stream, address): yield stream.wait_for_handshake() except EnvironmentError as e: # The handshake went wrong, log and ignore - logger.warn("listener on %r: TLS handshake failed with remote %r: %s", - self.listen_address, address, - getattr(e, "real_error", None) or e) + logger.warning("listener on %r: TLS handshake failed with remote %r: %s", + self.listen_address, address, + getattr(e, "real_error", None) or e) else: comm = TLS(stream, address, self.deserialize) self.comm_handler(comm) diff --git a/distributed/comm/tests/test_comms.py b/distributed/comm/tests/test_comms.py index 58eaf5e2db0..2a8c13d3279 100644 --- a/distributed/comm/tests/test_comms.py +++ b/distributed/comm/tests/test_comms.py @@ -406,6 +406,8 @@ def run(): main_loop.add_callback(fut.set_exc_info, sys.exc_info()) else: main_loop.add_callback(fut.set_result, res) + finally: + thread_loop.close() t = threading.Thread(target=run) t.start() diff --git a/distributed/compatibility.py b/distributed/compatibility.py index 18579d60352..d23ab11af14 100644 --- a/distributed/compatibility.py +++ b/distributed/compatibility.py @@ -7,6 +7,9 @@ from Queue import Queue, Empty from io import BytesIO from thread import get_ident as get_thread_identity + from inspect import getargspec + from cgi import escape as html_escape + reload = reload # flake8: noqa unicode = unicode # flake8: noqa PY2 = True @@ -51,6 +54,8 @@ def cache_from_source(path): from threading import get_ident as get_thread_identity from importlib import invalidate_caches from importlib.util import cache_from_source + from inspect import getfullargspec as getargspec + from html import escape as html_escape PY2 = False PY3 = True diff --git a/distributed/config.py b/distributed/config.py index 4c9eeb520b9..79e031e4d11 100644 --- a/distributed/config.py +++ b/distributed/config.py @@ -48,8 +48,9 @@ def determine_config_file(): try: ensure_config_file(default_path, path) except EnvironmentError as e: - logger.warn("Could not write default config file to '%s'. Received error %s", - path, e) + warnings.warn("Could not write default config file to '%s'. " + "Received error %s" % (path, e), + UserWarning) return path if os.path.exists(path) else default_path diff --git a/distributed/core.py b/distributed/core.py index 34734f91e54..f6ca4aae162 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -241,7 +241,7 @@ def handle_comm(self, comm, shutting_down=shutting_down): handler = self.handlers[op] except KeyError: result = "No handler found: %s" % op - logger.warn(result, exc_info=True) + logger.warning(result, exc_info=True) else: logger.debug("Calling into handler %s", handler.__name__) try: @@ -249,7 +249,7 @@ def handle_comm(self, comm, shutting_down=shutting_down): if type(result) is gen.Future: result = yield result except CommClosedError as e: - logger.warn("Lost connection to %r: %s", address, e) + logger.warning("Lost connection to %r: %s", address, e) break except Exception as e: logger.exception(e) @@ -258,8 +258,8 @@ def handle_comm(self, comm, shutting_down=shutting_down): try: yield comm.write(result) except EnvironmentError as e: - logger.warn("Lost connection to %r while sending result for op %r: %s", - address, op, e) + logger.warning("Lost connection to %r while sending result for op %r: %s", + address, op, e) break msg = result = None if close_desired: @@ -438,8 +438,8 @@ def __del__(self): self.status = 'closed' still_open = [comm for comm in self.comms if not comm.closed()] if still_open: - logger.warn("rpc object %s deleted with %d open comms", - self, len(still_open)) + logger.warning("rpc object %s deleted with %d open comms", + self, len(still_open)) for comm in still_open: comm.abort() diff --git a/distributed/deploy/local.py b/distributed/deploy/local.py index 0658b03df82..383884e80ed 100644 --- a/distributed/deploy/local.py +++ b/distributed/deploy/local.py @@ -63,7 +63,7 @@ def __init__(self, n_workers=None, threads_per_worker=None, processes=True, silence_logs=logging.CRITICAL, diagnostics_port=8787, services={}, worker_services={}, nanny=None, **worker_kwargs): if nanny is not None: - warnings.warn("nanny has been deprecated, used processes=") + warnings.warning("nanny has been deprecated, used processes=") processes = nanny self.status = None self.processes = processes diff --git a/distributed/deploy/tests/test_local.py b/distributed/deploy/tests/test_local.py index ce5a66318fd..1c56d81a36d 100644 --- a/distributed/deploy/tests/test_local.py +++ b/distributed/deploy/tests/test_local.py @@ -218,7 +218,7 @@ def test_bokeh(loop): assert time() < start + 20 sleep(0.01) - with pytest.raises(requests.ReadTimeout): + with pytest.raises(requests.RequestException): requests.get(url, timeout=0.2) diff --git a/distributed/diagnostics/progressbar.py b/distributed/diagnostics/progressbar.py index 71b37c924ea..5a5ce63e5db 100644 --- a/distributed/diagnostics/progressbar.py +++ b/distributed/diagnostics/progressbar.py @@ -11,6 +11,7 @@ from .progress import format_time, Progress, MultiProgress +from ..compatibility import html_escape from ..core import connect, coerce_to_address, CommClosedError from ..client import default_client, futures_of from ..protocol.pickle import dumps @@ -227,13 +228,12 @@ def __init__(self, keys, scheduler=None, minimum=0, interval=0.1, func=key_split def make_widget(self, all): from ipywidgets import FloatProgress, HBox, VBox, HTML - import cgi self.elapsed_time = HTML('') self.bars = {key: FloatProgress(min=0, max=1, description='', height='10px') for key in all} self.bar_texts = {key: HTML('', width = "140px") for key in all} - self.bar_labels = {key: HTML('
' + cgi.escape(key.decode() if isinstance(key, bytes) else key) + '
') + self.bar_labels = {key: HTML('
' + html_escape(key.decode() if isinstance(key, bytes) else key) + '
') for key in all} def key(kv): diff --git a/distributed/http/core.py b/distributed/http/core.py index 66249f24bc2..9369f637f67 100644 --- a/distributed/http/core.py +++ b/distributed/http/core.py @@ -44,7 +44,7 @@ def resource_collect(pid=None): return {'cpu_percent': psutil.cpu_percent(), 'status': p.status(), 'memory_percent': p.memory_percent(), - 'memory_info_ex': p.memory_info_ex(), + 'memory_info': p.memory_info(), 'disk_io_counters': metrics.disk_io_counters(), 'net_io_counters': metrics.net_io_counters()} diff --git a/distributed/nanny.py b/distributed/nanny.py index 579b1ccfe10..9621eaa31bc 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -170,8 +170,8 @@ def _kill(self, comm=None, timeout=10): self.worker_address) except allowed_errors as e: # Maybe the scheduler is gone, or it is unresponsive - logger.warn("Nanny %r failed to unregister worker %r: %s", - self.address, self.worker_address, e) + logger.warning("Nanny %r failed to unregister worker %r: %s", + self.address, self.worker_address, e) except Exception as e: logger.exception(e) @@ -283,7 +283,7 @@ def _watch(self, wait_seconds=0.20): yield self._close() break elif self.should_watch and self.process and not isalive(self.process): - logger.warn("Discovered failed worker") + logger.warning("Discovered failed worker") self.cleanup() try: yield self.scheduler.unregister(address=self.worker_address) @@ -294,7 +294,7 @@ def _watch(self, wait_seconds=0.20): yield self._close() break if self.status != 'closed': - logger.warn('Restarting worker...') + logger.warning('Restarting worker...') yield self.instantiate() else: yield gen.sleep(wait_seconds) @@ -323,7 +323,7 @@ def resource_collect(self): 'cpu_percent': psutil.cpu_percent(), 'status': p.status(), 'memory_percent': p.memory_percent(), - 'memory_info_ex': p.memory_info_ex()._asdict(), + 'memory_info': p.memory_info()._asdict(), 'disk_io_counters': disk_io_counters()._asdict(), 'net_io_counters': net_io_counters()._asdict()} diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 39c5ce973b8..6feb64d8bc2 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1212,7 +1212,7 @@ def handle_client(self, comm, client=None): logger.exception(e) raise else: - logger.warn("Bad message: op=%s, %s", op, msg, exc_info=True) + logger.warning("Bad message: op=%s, %s", op, msg, exc_info=True) if op == 'close': breakout = True @@ -1727,8 +1727,8 @@ def replicate(self, comm=None, keys=None, n=None, workers=None, if v['status'] == 'OK': self.add_keys(worker=w, keys=list(gathers[w])) else: - logger.warn("Communication failed during replication: %s", - v) + logger.warning("Communication failed during replication: %s", + v) self.log_event(w, {'action': 'replicate-add', 'keys': gathers[w]}) @@ -1789,8 +1789,8 @@ def workers_to_close(self, memory_ratio=2): def retire_workers(self, comm=None, workers=None, remove=True, close=False, close_workers=False): if close: - logger.warn("The keyword close= has been deprecated. " - "Use close_workers= instead") + logger.warning("The keyword close= has been deprecated. " + "Use close_workers= instead") close_workers = close_workers or close with log_errors(): if workers is None: diff --git a/distributed/utils.py b/distributed/utils.py index 995f9b04a54..cc6c0c79b50 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -20,7 +20,7 @@ import threading import warnings -from .compatibility import cache_from_source, invalidate_caches, reload +from .compatibility import cache_from_source, getargspec, invalidate_caches, reload try: import resource @@ -72,7 +72,7 @@ def has_arg(func, argname): """ while True: try: - if argname in inspect.getargspec(func).args: + if argname in getargspec(func).args: return True except TypeError: break @@ -432,12 +432,23 @@ def truncate_exception(e, n=10000): return e -def queue_to_iterator(q): - while True: - result = q.get() - if isinstance(result, StopIteration): - raise result - yield result +if sys.version_info >= (3,): + # (re-)raising StopIteration is deprecated in 3.6+ + exec("""def queue_to_iterator(q): + while True: + result = q.get() + if isinstance(result, StopIteration): + return result.value + yield result + """) +else: + # Returning non-None from generator is a syntax error in 2.x + def queue_to_iterator(q): + while True: + result = q.get() + if isinstance(result, StopIteration): + raise result + yield result def _dump_to_queue(seq, q): diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 493662fa189..5ba004edb22 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -608,16 +608,13 @@ def popen(*args, **kwargs): terminate_process(proc) finally: # XXX Also dump stdout if return code != 0 ? + out, err = proc.communicate() if dump_stdout: - line = '\n\nPrint from stderr\n=================\n' - while line: - print(line, end='') - line = proc.stderr.readline() - - line = '\n\nPrint from stdout\n=================\n' - while line: - print(line, end='') - line = proc.stdout.readline() + print('\n\nPrint from stderr\n=================\n') + print(err) + + print('\n\nPrint from stdout\n=================\n') + print(out) def wait_for_port(address, timeout=5): diff --git a/distributed/worker.py b/distributed/worker.py index ab4b29a2bfb..a48792056fb 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -55,7 +55,7 @@ import psutil TOTAL_MEMORY = psutil.virtual_memory().total except ImportError: - logger.warn("Please install psutil to estimate worker memory use") + logger.warning("Please install psutil to estimate worker memory use") TOTAL_MEMORY = 8e9 psutil = None @@ -551,8 +551,8 @@ def gather(self, comm=None, who_has=None): result, missing_keys, missing_workers = yield gather_from_workers( who_has, rpc=self.rpc) if missing_keys: - logger.warn("Could not find data: %s on workers: %s (who_has: %s)", - missing_keys, missing_workers, who_has) + logger.warning("Could not find data: %s on workers: %s (who_has: %s)", + missing_keys, missing_workers, who_has) raise Return({'status': 'missing-data', 'keys': missing_keys}) else: @@ -753,7 +753,7 @@ def run(server, comm, function, args=(), kwargs={}, is_coro=False, wait=True): if is_coro: result = (yield result) if wait else None except Exception as e: - logger.warn(" Run Failed\n" + logger.warning(" Run Failed\n" "Function: %s\n" "args: %s\n" "kwargs: %s\n", @@ -1113,7 +1113,7 @@ def add_task(self, key, function=None, args=None, kwargs=None, task=None, if stop - start > 0.010: self.startstops[key].append(('deserialize', start, stop)) except Exception as e: - logger.warn("Could not deserialize task", exc_info=True) + logger.warning("Could not deserialize task", exc_info=True) emsg = error_message(e) emsg['key'] = key emsg['op'] = 'task-erred' @@ -1891,7 +1891,7 @@ def execute(self, key, report=False): else: self.exceptions[key] = result['exception'] self.tracebacks[key] = result['traceback'] - logger.warn(" Compute Failed\n" + logger.warning(" Compute Failed\n" "Function: %s\n" "args: %s\n" "kwargs: %s\n"