diff --git a/google/gax/__init__.py b/google/gax/__init__.py index d1e60c5..7d0a765 100644 --- a/google/gax/__init__.py +++ b/google/gax/__init__.py @@ -33,16 +33,9 @@ import collections import logging -import multiprocessing as mp -import dill -from grpc import RpcError, StatusCode import pkg_resources -from google.gax.errors import GaxError -from google.gax.retry import retryable -from google.rpc import code_pb2 - # pylint: disable=no-member __version__ = pkg_resources.get_distribution('google-gax').version # pylint: enable=no-member @@ -502,211 +495,3 @@ def __next__(self): if self._index >= len(self._current): self._current = None return resource - - -def _from_any(pb_type, any_pb): - """Converts an Any protobuf to the specified message type - - Args: - pb_type (type): the type of the message that any_pb stores an instance - of. - any_pb (google.protobuf.any_pb2.Any): the object to be converted. - - Returns: - pb_type: An instance of the pb_type message. - - Raises: - TypeError: if the message could not be converted. - """ - msg = pb_type() - # Check exceptional case: raise if can't Unpack - if not any_pb.Unpack(msg): - raise TypeError( - 'Could not convert {} to {}'.format( - any_pb.__class__.__name__, pb_type.__name__)) - - # Return expected message - return msg - - -def _try_callback(target, clbk): - try: - clbk(target) - except Exception as ex: # pylint: disable=broad-except - _LOG.exception(ex) - - -class _DeadlineExceededError(RpcError, GaxError): - - def __init__(self): - super(_DeadlineExceededError, self).__init__('Deadline Exceeded') - - def code(self): # pylint: disable=no-self-use - """Always returns StatusCode.DEADLINE_EXCEEDED""" - return StatusCode.DEADLINE_EXCEEDED - - -class _OperationFuture(object): - """A Future which polls a service for completion via OperationsClient.""" - - def __init__(self, operation, client, result_type, metadata_type, - call_options=None): - """ - Args: - operation (google.longrunning.Operation): the initial long-running - operation object. - client - (google.gapic.longrunning.operations_client.OperationsClient): - a client for the long-running operation service. - result_type (type): the class type of the result. - metadata_type (Optional[type]): the class type of the metadata. - call_options (Optional[google.gax.CallOptions]): the call options - that are used when reloading the operation. - """ - self._operation = operation - self._client = client - self._result_type = result_type - self._metadata_type = metadata_type - self._call_options = call_options - self._queue = mp.Queue() - self._process = None - - def cancel(self): - """If last Operation's value of `done` is true, returns false; - otherwise, issues OperationsClient.cancel_operation and returns true. - """ - if self.done(): - return False - - self._client.cancel_operation(self._operation.name) - return True - - def result(self, timeout=None): - """Enters polling loop on OperationsClient.get_operation, and once - Operation.done is true, then returns Operation.response if successful or - throws GaxError if not successful. - - This method will wait up to timeout seconds. If the call hasn't - completed in timeout seconds, then a RetryError will be raised. timeout - can be an int or float. If timeout is not specified or None, there is no - limit to the wait time. - """ - # Check exceptional case: raise if no response - if not self._poll(timeout).HasField('response'): - raise GaxError(self._operation.error.message) - - # Return expected result - return _from_any(self._result_type, self._operation.response) - - def exception(self, timeout=None): - """Similar to result(), except returns the exception if any.""" - # Check exceptional case: return none if no error - if not self._poll(timeout).HasField('error'): - return None - - # Return expected error - return self._operation.error - - def cancelled(self): - """Return True if the call was successfully cancelled.""" - self._get_operation() - return (self._operation.HasField('error') and - self._operation.error.code == code_pb2.CANCELLED) - - def done(self): - """Issues OperationsClient.get_operation and returns value of - Operation.done. - """ - return self._get_operation().done - - def add_done_callback(self, fn): # pylint: disable=invalid-name - """Enters a polling loop on OperationsClient.get_operation, and once the - operation is done or cancelled, calls the function with this - _OperationFuture. Added callables are called in the order that they were - added. - """ - if self._operation.done: - _try_callback(self, fn) - else: - self._queue.put(dill.dumps(fn)) - if self._process is None: - self._process = mp.Process(target=self._execute_tasks) - self._process.start() - - def operation_name(self): - """Returns the value of Operation.name.""" - return self._operation.name - - def metadata(self): - """Returns the value of Operation.metadata from the last call to - OperationsClient.get_operation (or if only the initial API call has been - made, the metadata from that first call). - """ - # Check exceptional case: return none if no metadata - if not self._operation.HasField('metadata'): - return None - - # Return expected metadata - return _from_any(self._metadata_type, self._operation.metadata) - - def last_operation_data(self): - """Returns the data from the last call to OperationsClient.get_operation - (or if only the initial API call has been made, the data from that first - call). - """ - return self._operation - - def _get_operation(self): - if not self._operation.done: - self._operation = self._client.get_operation( - self._operation.name, self._call_options) - - return self._operation - - def _poll(self, timeout=None): - def _done_check(_): - # Check exceptional case: raise if in progress - if not self.done(): - raise _DeadlineExceededError() - - # Return expected operation - return self._operation - - # If a timeout is set, then convert it to milliseconds. - # - # Also, we need to send 0 instead of None for the rpc arguments, - # because an internal method (`_has_timeout_settings`) will - # erroneously return False otherwise. - rpc_arg = None - if timeout is not None: - timeout *= 1000 - rpc_arg = 0 - - # Set the backoff settings. We have specific backoff settings - # for "are we there yet" calls that are distinct from those configured - # in the config.json files. - backoff_settings = BackoffSettings( - initial_retry_delay_millis=1000, - retry_delay_multiplier=2, - max_retry_delay_millis=30000, - initial_rpc_timeout_millis=rpc_arg, - rpc_timeout_multiplier=rpc_arg, - max_rpc_timeout_millis=rpc_arg, - total_timeout_millis=timeout, - ) - - # Set the retry to retry if `_done_check` raises the - # _DeadlineExceededError, according to the given backoff settings. - retry_options = RetryOptions( - [StatusCode.DEADLINE_EXCEEDED], backoff_settings) - retryable_done_check = retryable(_done_check, retry_options) - - # Start polling, and return the final result from `_done_check`. - return retryable_done_check() - - def _execute_tasks(self): - self._poll() - - while not self._queue.empty(): - task = dill.loads(self._queue.get()) - _try_callback(self, task) diff --git a/google/gax/future/__init__.py b/google/gax/future/__init__.py new file mode 100644 index 0000000..11d879b --- /dev/null +++ b/google/gax/future/__init__.py @@ -0,0 +1,36 @@ +# Copyright 2017, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Futures for dealing with asynchronous operations.""" + +from google.gax.future.base import Future + +__all__ = [ + 'Future', +] diff --git a/google/gax/future/_helpers.py b/google/gax/future/_helpers.py new file mode 100644 index 0000000..a5ce5a3 --- /dev/null +++ b/google/gax/future/_helpers.py @@ -0,0 +1,130 @@ +# Copyright 2017, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Private helpers for futures.""" + +import logging +import threading + +from google import gax +from google.gax import retry + + +_LOGGER = logging.getLogger(__name__) + + +def from_any(pb_type, any_pb): + """Converts an Any protobuf to the specified message type + + Args: + pb_type (type): the type of the message that any_pb stores an instance + of. + any_pb (google.protobuf.any_pb2.Any): the object to be converted. + + Returns: + pb_type: An instance of the pb_type message. + + Raises: + TypeError: if the message could not be converted. + """ + msg = pb_type() + if not any_pb.Unpack(msg): + raise TypeError( + 'Could not convert {} to {}'.format( + any_pb.__class__.__name__, pb_type.__name__)) + + return msg + + +def start_daemon_thread(*args, **kwargs): + """Starts a thread and marks it as a daemon thread.""" + thread = threading.Thread(*args, **kwargs) + thread.daemon = True + thread.start() + return thread + + +def safe_invoke_callback(callback, *args, **kwargs): + """Invoke a callback, swallowing and logging any exceptions.""" + # pylint: disable=bare-except + # We intentionally want to swallow all exceptions. + try: + callback(*args, **kwargs) + except: + _LOGGER.exception('Error while executing Future callback.') + + +def blocking_poll(poll_once_func, retry_codes, timeout=None): + """A pattern for repeatedly polling a function. + + This pattern uses gax's retry and backoff functionality to continuously + poll a function. The function can raises + :class:`google.gax.errors.TimeoutError` to indicate that it should be + polled again. This pattern will continue to call the function until the + timeout expires or the function returns a value. + + Args: + poll_once_func (Callable): The function to invoke. + retry_codes (Sequence[str]): a list of Google API error codes that + signal a retry should happen. + timeout (int): The maximum number of seconds to poll. + + Returns: + Any: The final result of invoking the function. + """ + # If a timeout is set, then convert it to milliseconds. + # + # Also, we need to send 0 instead of None for the rpc arguments, + # because an internal method (`_has_timeout_settings`) will + # erroneously return False otherwise. + rpc_timeout = None + if timeout is not None: + timeout *= 1000 + rpc_timeout = 0 + + # Set the backoff settings. We have specific backoff settings + # for "are we there yet" calls that are distinct from those configured + # in the config.json files. + backoff_settings = gax.BackoffSettings( + initial_retry_delay_millis=1000, + retry_delay_multiplier=2, + max_retry_delay_millis=30000, + initial_rpc_timeout_millis=rpc_timeout, + rpc_timeout_multiplier=rpc_timeout, + max_rpc_timeout_millis=rpc_timeout, + total_timeout_millis=timeout, + ) + + # Set the retry to retry if poll_once_func raises the + # a deadline exceeded error, according to the given backoff settings. + retry_options = gax.RetryOptions(retry_codes, backoff_settings) + retryable_poll = retry.retryable(poll_once_func, retry_options) + + # Start polling, and return the final result from the poll_once_func. + return retryable_poll() diff --git a/google/gax/future/base.py b/google/gax/future/base.py new file mode 100644 index 0000000..8ae4b66 --- /dev/null +++ b/google/gax/future/base.py @@ -0,0 +1,217 @@ +# Copyright 2017, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Abstract and helper bases for Future implementations.""" + +import abc + +import six + +from google.gax.future import _helpers + + +@six.add_metaclass(abc.ABCMeta) +class Future(object): + # pylint: disable=missing-docstring, invalid-name + # We inherit the interfaces here from concurrent.futures. + + """Future interface. + + This interface is based on :class:`concurrent.futures.Future`. + """ + + @abc.abstractmethod + def cancel(self): # pragma: NO COVER + raise NotImplementedError() + + @abc.abstractmethod + def cancelled(self): # pragma: NO COVER + raise NotImplementedError() + + @abc.abstractmethod + def running(self): # pragma: NO COVER + raise NotImplementedError() + + @abc.abstractmethod + def done(self): # pragma: NO COVER + raise NotImplementedError() + + @abc.abstractmethod + def result(self, timeout=None): # pragma: NO COVER + raise NotImplementedError() + + @abc.abstractmethod + def exception(self, timeout=None): # pragma: NO COVER + raise NotImplementedError() + + @abc.abstractmethod + def add_done_callback(self, fn): # pragma: NO COVER + raise NotImplementedError() + + @abc.abstractmethod + def set_result(self, result): # pragma: NO COVER + raise NotImplementedError() + + @abc.abstractmethod + def set_exception(self, exception): # pragma: NO COVER + raise NotImplementedError() + + +class PollingFuture(Future): + """A Future that needs to poll some service to check its status. + + The private :meth:`_poll_once` method should be implemented by subclasses. + + Privacy here is intended to prevent the final class from overexposing, not + to prevent subclasses from accessing methods. + """ + def __init__(self): + super(PollingFuture, self).__init__() + self._result = None + self._exception = None + self._result_set = False + """bool: Set to True when the result has been set via set_result or + set_exception.""" + self._polling_thread = None + self._done_callbacks = [] + + @abc.abstractmethod + def _poll_once(self, timeout): # pragma: NO COVER + """Poll for completion once. + + Subclasses must implement this. It should check if the task is complete + and call :meth:`set_result` or :meth:`set_exception`. If the task + isn't complete, this should raise a + :class:`google.gax.errors.GaxError` with a code that can be retried. + + Args: + timeout (float): unused. + + Raises: + google.gax.errors.GaxError: if the operation should be retried. + + .. note: Due to the retry implementation, the exception raised here + to indicate retry must also be a `grpc.RpcError`. + """ + # pylint: disable=missing-raises-doc + # pylint doesn't recognize this as abstract. + raise NotImplementedError() + + @abc.abstractproperty + def _poll_retry_codes(self): # pragma: NO COVER + """Sequence[str]: Which API status codes can be retried.""" + raise NotImplementedError() + + def _blocking_poll(self, timeout=None): + """Poll and wait for the Future to be resolved. + + Args: + timeout (int): How long to wait for the operation to complete. + If None, wait indefinitely. + """ + if self._result_set: + return + + _helpers.blocking_poll( + self._poll_once, self._poll_retry_codes, timeout=timeout) + + def result(self, timeout=None): + """Get the result of the operation, blocking if necessary. + + Args: + timeout (int): How long to wait for the operation to complete. + If None, wait indefinitely. + + Returns: + google.protobuf.Message: The Operation's result. + + Raises: + google.gax.GaxError: If the operation errors or if the timeout is + reached before the operation completes. + """ + self._blocking_poll() + + if self._exception is not None: + # pylint: disable=raising-bad-type + # Pylint doesn't recognize that this is valid in this case. + raise self._exception + + return self._result + + def exception(self, timeout=None): + """Get the exception from the operation, blocking if necessary. + + Args: + timeout (int): How long to wait for the operation to complete. + If None, wait indefinitely. + + Returns: + Optional[google.gax.GaxError]: The operation's error. + """ + self._blocking_poll() + return self._exception + + def add_done_callback(self, fn): + """Add a callback to be executed when the operation is complete. + + If the operation is not already complete, this will start a helper + thread to poll for the status of the operation in the background. + + Args: + fn (Callable[Future]): The callback to execute when the operation + is complete. + """ + if self._result_set: + _helpers.safe_invoke_callback(fn, self) + return + + self._done_callbacks.append(fn) + + if self._polling_thread is None: + # The polling thread will exit on its own as soon as the operation + # is done. + self._polling_thread = _helpers.start_daemon_thread( + target=self._blocking_poll) + + def _invoke_callbacks(self, *args, **kwargs): + """Invoke all done callbacks.""" + for callback in self._done_callbacks: + _helpers.safe_invoke_callback(callback, *args, **kwargs) + + def set_result(self, result): + """Set the Future's result.""" + self._result = result + self._result_set = True + self._invoke_callbacks(self) + + def set_exception(self, exception): + """Set the Future's exception.""" + self._exception = exception + self._result_set = True + self._invoke_callbacks(self) diff --git a/google/gax/future/grpc_operation_future.py b/google/gax/future/grpc_operation_future.py new file mode 100644 index 0000000..95d0aa3 --- /dev/null +++ b/google/gax/future/grpc_operation_future.py @@ -0,0 +1,184 @@ +# Copyright 2017, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""a gRPC-based future for long-running operations.""" + +import concurrent.futures +import threading + +import grpc + +from google.gax import errors +from google.gax.future import _helpers +from google.gax.future import base +from google.rpc import code_pb2 + + +class _RetryPoll( + errors.GaxError, grpc.RpcError, concurrent.futures.TimeoutError): + def __init__(self): + super(_RetryPoll, self).__init__('Operation not complete') + + def code(self): # pylint: disable=no-self-use + """RPC status code, inherited from :class:`grpc.RpcError`.""" + return grpc.StatusCode.DEADLINE_EXCEEDED + + +class OperationFuture(base.PollingFuture): + """A Future representing a long-running server-side operation.""" + + def __init__( + self, operation, client, result_type, metadata_type=None, + call_options=None): + """ + Args: + operation (google.longrunning.Operation): the initial long-running + operation object. + client + (google.gapic.longrunning.operations_client.OperationsClient): + a client for the long-running operation service. + result_type (type): the class type of the result. + metadata_type (type): the class type of the metadata. + call_options (google.gax.CallOptions): the call options + that are used when check the operation status. + """ + super(OperationFuture, self).__init__() + self._operation = operation + self._client = client + self._result_type = result_type + self._metadata_type = metadata_type + self._call_options = call_options + self._completion_lock = threading.Lock() + + @property + def operation(self): + """google.longrunning.Operation: The current long-running operation + message.""" + return self._operation + + @property + def metadata(self): + """google.protobuf.Message: the current operation metadata.""" + if not self._operation.HasField('metadata'): + return None + + return _helpers.from_any(self._metadata_type, self._operation.metadata) + + def cancel(self): + """Attempt to cancel the operation. + + Returns: + bool: True if the cancel RPC was made, False if the operation is + already complete. + """ + if self.done(): + return False + + self._client.cancel_operation(self._operation.name) + return True + + def cancelled(self): + """True if the operation was cancelled.""" + operation = self._call_get_operation_rpc() + return (operation.HasField('error') and + operation.error.code == code_pb2.CANCELLED) + + def running(self): + """True if the operation is currently running.""" + return not self.done() + + def _call_get_operation_rpc(self): + """Call the GetOperation RPC method.""" + # If the currently cached operation is done, no need to make another + # RPC as it will not change once done. + if not self._operation.done: + self._operation = self._client.get_operation( + self._operation.name, self._call_options) + self._set_result_from_operation() + + return self._operation + + def _set_result_from_operation(self): + """Set the result or exception from the current Operation message, + if it is complete.""" + # This must be done in a lock to prevent the polling thread + # and main thread from both executing the completion logic + # at the same time. + with self._completion_lock: + # If the operation isn't complete or if the result has already been + # set, do not call set_result/set_exception again. + # Note: self._result_set is set to True in set_result and + # set_exception, in case those methods are invoked directly. + if not self._operation.done or self._result_set: + return + + if self._operation.HasField('response'): + response = _helpers.from_any( + self._result_type, self._operation.response) + self.set_result(response) + elif self._operation.HasField('error'): + exception = errors.GaxError(self._operation.error.message) + self.set_exception(exception) + else: + exception = errors.GaxError('Unknown operation error') + self.set_exception(exception) + + def done(self): + """Checks to see if the operation is complete. + + This will make a blocking RPC to refresh the status of the operation. + + Returns: + bool: True if the operation is complete, False otherwise. + """ + operation = self._call_get_operation_rpc() + return operation.done + + def _poll_once(self, timeout): + """Checks the status of the operation once. + + This implements the abstract method :meth:`PollingFuture._poll_once`. + + Uses :meth:`done` to refresh the status of the operation. If it's not + done, it will raise a :class:`OperationTimeoutError`. This fits + the interface needed by the :func:`_helpers.blocking_poll` helper. The + helper will continue executing this method with exponential backoff. + + This method exits cleanly with no return value once the operation is + complete. + + Raises: + _RetryPoll: if the operation is not done. + """ + if not self.done(): + raise _RetryPoll() + + @property + def _poll_retry_codes(self): + return [grpc.StatusCode.DEADLINE_EXCEEDED] diff --git a/nox.py b/nox.py index 1d4330e..54d0600 100644 --- a/nox.py +++ b/nox.py @@ -81,6 +81,7 @@ def cover(session): session.interpreter = 'python3.6' session.install('-r', 'test-requirements.txt') session.install('.') + session.run('pip', 'freeze') session.run( 'py.test', '--cov=google.gax', '--cov=tests', '--cov-report=', 'tests') session.run( diff --git a/setup.py b/setup.py index a89f603..9ea607f 100644 --- a/setup.py +++ b/setup.py @@ -36,7 +36,6 @@ DEPENDENCIES = [ - 'dill >= 0.2.5, < 0.3dev', 'future >= 0.16.0, < 0.17dev', 'googleapis-common-protos >= 1.5.2, < 2.0dev', 'grpcio >=1.0.2, < 2.0dev', diff --git a/tests/future/__init__.py b/tests/future/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/future/test__helpers.py b/tests/future/test__helpers.py new file mode 100644 index 0000000..98a5003 --- /dev/null +++ b/tests/future/test__helpers.py @@ -0,0 +1,75 @@ +# Copyright 2017, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import grpc +import mock +import pytest + +from google.gax.future import _helpers +from google.protobuf import any_pb2 + +from tests.fixtures import fixture_pb2 + + +def test_from_any(): + in_message = fixture_pb2.Simple(field1='a') + in_message_any = any_pb2.Any() + in_message_any.Pack(in_message) + out_message = _helpers.from_any(fixture_pb2.Simple, in_message_any) + assert in_message == out_message + + +def test_from_any_wrong_type(): + in_message = any_pb2.Any() + in_message.Pack(fixture_pb2.Simple(field1='a')) + with pytest.raises(TypeError): + _helpers.from_any(fixture_pb2.Outer, in_message) + + +@mock.patch('threading.Thread', autospec=True) +def test_start_deamon_thread(mock_thread): + thread = _helpers.start_daemon_thread(target=mock.sentinel.target) + assert thread.daemon is True + + +class DeadlineExceeded(grpc.RpcError): + def code(self): + return grpc.StatusCode.DEADLINE_EXCEEDED + + +@mock.patch('time.sleep') +def test_blocking_poll(unused_sleep): + error = DeadlineExceeded() + target = mock.Mock(side_effect=[error, error, 42]) + + result = _helpers.blocking_poll( + target, [grpc.StatusCode.DEADLINE_EXCEEDED], timeout=1) + + assert result == 42 + assert target.call_count == 3 diff --git a/tests/future/test_base.py b/tests/future/test_base.py new file mode 100644 index 0000000..1b2d104 --- /dev/null +++ b/tests/future/test_base.py @@ -0,0 +1,147 @@ +# Copyright 2017, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import grpc +import mock +import pytest + +from google.gax.future import base + + +class PollingFutureImpl(base.PollingFuture): + def _poll_once(self, timeout): # pragma: NO COVER + pass + + @property + def _poll_retry_codes(self): # pragma: NO COVER + return [] + + def cancel(self): + return True + + def cancelled(self): + return False + + def done(self): + return False + + def running(self): + return True + + +def test_polling_future_constructor(): + future = PollingFutureImpl() + assert not future.done() + assert not future.cancelled() + assert future.running() + assert future.cancel() + + +def test_set_result(): + future = PollingFutureImpl() + callback_mock = mock.Mock() + + future.set_result(1) + + assert future.result() == 1 + future.add_done_callback(callback_mock) + callback_mock.assert_called_once_with(future) + + +def test_set_exception(): + future = PollingFutureImpl() + exception = ValueError('meep') + + future.set_exception(exception) + + assert future.exception() == exception + with pytest.raises(ValueError): + future.result() + + callback_mock = mock.Mock() + future.add_done_callback(callback_mock) + callback_mock.assert_called_once_with(future) + + +def test_invoke_callback_exception(): + future = PollingFutureImplWithPoll() + future.set_result(42) + + # This should not raise, despite the callback causing an exception. + callback_mock = mock.Mock(side_effect=ValueError) + future.add_done_callback(callback_mock) + callback_mock.assert_called_once_with(future) + + +class RetryPoll(grpc.RpcError): + def code(self): + return grpc.StatusCode.DEADLINE_EXCEEDED + + +class PollingFutureImplWithPoll(PollingFutureImpl): + def __init__(self): + super(PollingFutureImplWithPoll, self).__init__() + self.poll_count = 0 + + def _poll_once(self, timeout): + self.poll_count += 1 + if self.poll_count < 3: + raise RetryPoll() + self.set_result(42) + + @property + def _poll_retry_codes(self): + return [grpc.StatusCode.DEADLINE_EXCEEDED] + + +@mock.patch('time.sleep') +def test_result_with_polling(unusued_sleep): + future = PollingFutureImplWithPoll() + + result = future.result() + + assert result == 42 + assert future.poll_count == 3 + # Repeated calls should not cause additional polling + assert future.result() == result + assert future.poll_count == 3 + + +@mock.patch('time.sleep') +def test_callback_background_thread(unused_sleep): + future = PollingFutureImplWithPoll() + callback_mock = mock.Mock() + + future.add_done_callback(callback_mock) + + assert future._polling_thread is not None + + future._polling_thread.join() + + callback_mock.assert_called_once_with(future) diff --git a/tests/future/test_grpc_operation_future.py b/tests/future/test_grpc_operation_future.py new file mode 100644 index 0000000..4e14125 --- /dev/null +++ b/tests/future/test_grpc_operation_future.py @@ -0,0 +1,160 @@ +# Copyright 2017, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import mock + +from google.gax.future import grpc_operation_future +from google.longrunning import operations_pb2 +from google.rpc import code_pb2 +from google.rpc import status_pb2 + +from tests.fixtures import fixture_pb2 + +TEST_OPERATION_NAME = 'test/operation' + + +class OperationsClient(object): + def __init__(self, operations): + self.operations = operations + self.cancelled = [] + + def get_operation(self, name, *args): + return self.operations.pop(0) + + def cancel_operation(self, name, *args): + self.cancelled.append(name) + + +def make_operation( + name=TEST_OPERATION_NAME, metadata=None, response=None, + error=None, **kwargs): + operation = operations_pb2.Operation( + name=name, **kwargs) + + if metadata is not None: + operation.metadata.Pack(metadata) + + if response is not None: + operation.response.Pack(response) + + if error is not None: + operation.error.CopyFrom(error) + + return operation + + +def make_operations_client(operations): + return OperationsClient(operations) + + +def make_operations_client_and_future(client_operations_responses=None): + if client_operations_responses is None: + client_operations_responses = [make_operation()] + + operations_client = make_operations_client(client_operations_responses) + operations_future = grpc_operation_future.OperationFuture( + client_operations_responses[0], operations_client, + result_type=fixture_pb2.Simple, + metadata_type=fixture_pb2.Simple) + + return operations_client, operations_future + + +def test_constructor(): + client, future = make_operations_client_and_future() + + assert future.operation == client.operations[0] + assert future.operation.done is False + assert future.operation.name == TEST_OPERATION_NAME + assert future.metadata is None + assert future.running() + + +def test_metadata(): + expected_metadata = fixture_pb2.Simple() + _, future = make_operations_client_and_future( + [make_operation(metadata=expected_metadata)]) + + assert future.metadata == expected_metadata + + +def test_cancellation(): + responses = [ + make_operation(), + # Second response indicates that the operation was cancelled. + make_operation( + done=True, + error=status_pb2.Status(code=code_pb2.CANCELLED))] + client, future = make_operations_client_and_future(responses) + + assert future.cancel() + assert future.cancelled() + assert future.operation.name in client.cancelled + assert not future.cancel(), 'cancelling twice should have no effect.' + + +@mock.patch('time.sleep') +def test_result(unused_sleep): + expected_result = fixture_pb2.Simple() + responses = [ + make_operation(), + # Second operation response includes the result. + make_operation(done=True, response=expected_result)] + _, future = make_operations_client_and_future(responses) + + result = future.result() + + assert result == expected_result + + +@mock.patch('time.sleep') +def test_exception(unused_sleep): + expected_exception = status_pb2.Status(message='meep') + responses = [ + make_operation(), + # Second operation response includes the error. + make_operation(done=True, error=expected_exception)] + _, future = make_operations_client_and_future(responses) + + exception = future.exception() + + assert expected_exception.message in '{!r}'.format(exception) + + +@mock.patch('time.sleep') +def test_unexpected_result(unused_sleep): + responses = [ + make_operation(), + # Second operation response is done, but has not error or response. + make_operation(done=True)] + _, future = make_operations_client_and_future(responses) + + exception = future.exception() + + assert 'Unknown operation error' in '{!r}'.format(exception) diff --git a/tests/test_gax.py b/tests/test_gax.py index 4de4108..9912fcd 100644 --- a/tests/test_gax.py +++ b/tests/test_gax.py @@ -32,20 +32,11 @@ from __future__ import absolute_import -import logging -import multiprocessing as mp - -import mock import unittest2 from google.gax import ( - _CallSettings, _LOG, _OperationFuture, BundleOptions, CallOptions, + _CallSettings, BundleOptions, CallOptions, INITIAL_PAGE, OPTION_INHERIT, RetryOptions) -from google.gax.errors import GaxError, RetryError -from google.longrunning import operations_pb2 -from google.rpc import code_pb2, status_pb2 - -from tests.fixtures.fixture_pb2 import Simple class TestBundleOptions(unittest2.TestCase): @@ -120,207 +111,3 @@ def test_settings_merge_none(self): self.assertEqual(final.page_descriptor, settings.page_descriptor) self.assertEqual(final.bundler, settings.bundler) self.assertEqual(final.bundle_descriptor, settings.bundle_descriptor) - - -def _task1(operation_future): - operation_future.test_queue.put(operation_future.result().field1) - - -def _task2(operation_future): - operation_future.test_queue.put(operation_future.result().field2) - - -class _FakeOperationsClient(object): - def __init__(self, operations): - self.operations = list(reversed(operations)) - - def get_operation(self, *_): - return self.operations.pop() - - def cancel_operation(self, *_): - pass - - -class _FakeLoggingHandler(logging.Handler): - def __init__(self, *args, **kwargs): - self.queue = mp.Queue() - super(_FakeLoggingHandler, self).__init__(*args, **kwargs) - - def emit(self, record): - self.acquire() - try: - self.queue.put(record.getMessage()) - finally: - self.release() - - def reset(self): - self.acquire() - try: - self.queue = mp.Queue() - finally: - self.release() - - -class TestOperationFuture(unittest2.TestCase): - - OPERATION_NAME = 'operations/projects/foo/instances/bar/operations/123' - - @classmethod - def setUpClass(cls): - cls._log_handler = _FakeLoggingHandler(level='DEBUG') - _LOG.addHandler(cls._log_handler) - - def setUp(self): - self._log_handler.reset() - - def _make_operation(self, metadata=None, response=None, error=None, - **kwargs): - operation = operations_pb2.Operation(name=self.OPERATION_NAME, **kwargs) - - if metadata is not None: - operation.metadata.Pack(metadata) - - if response is not None: - operation.response.Pack(response) - - if error is not None: - operation.error.CopyFrom(error) - - return operation - - def _make_operation_future(self, *operations): - if not operations: - operations = [self._make_operation()] - - fake_client = _FakeOperationsClient(operations) - return _OperationFuture(operations[0], fake_client, Simple, Simple) - - def test_cancel_issues_call_when_not_done(self): - operation = self._make_operation() - - fake_client = _FakeOperationsClient([operation]) - fake_client.cancel_operation = mock.Mock() - - operation_future = _OperationFuture( - operation, fake_client, Simple, Simple) - - self.assertTrue(operation_future.cancel()) - fake_client.cancel_operation.assert_called_with(self.OPERATION_NAME) - - def test_cancel_does_nothing_when_already_done(self): - operation = self._make_operation(done=True) - - fake_client = _FakeOperationsClient([operation]) - fake_client.cancel_operation = mock.Mock() - - operation_future = _OperationFuture( - operation, fake_client, Simple, Simple) - - self.assertFalse(operation_future.cancel()) - fake_client.cancel_operation.assert_not_called() - - def test_cancelled_true(self): - error = status_pb2.Status(code=code_pb2.CANCELLED) - operation = self._make_operation(error=error) - operation_future = self._make_operation_future(operation) - - self.assertTrue(operation_future.cancelled()) - - def test_cancelled_false(self): - operation = self._make_operation(error=status_pb2.Status()) - operation_future = self._make_operation_future(operation) - self.assertFalse(operation_future.cancelled()) - - def test_done_true(self): - operation = self._make_operation(done=True) - operation_future = self._make_operation_future(operation) - self.assertTrue(operation_future.done()) - - def test_done_false(self): - operation_future = self._make_operation_future() - self.assertFalse(operation_future.done()) - - def test_operation_name(self): - operation_future = self._make_operation_future() - self.assertEqual(self.OPERATION_NAME, operation_future.operation_name()) - - def test_metadata(self): - metadata = Simple() - operation = self._make_operation(metadata=metadata) - operation_future = self._make_operation_future(operation) - - self.assertEqual(metadata, operation_future.metadata()) - - def test_metadata_none(self): - operation_future = self._make_operation_future() - self.assertIsNone(operation_future.metadata()) - - def test_last_operation_data(self): - operation = self._make_operation() - operation_future = self._make_operation_future(operation) - self.assertEqual(operation, operation_future.last_operation_data()) - - def test_result_response(self): - response = Simple() - operation = self._make_operation(done=True, response=response) - operation_future = self._make_operation_future(operation) - - self.assertEqual(response, operation_future.result()) - - def test_result_error(self): - operation = self._make_operation(done=True, error=status_pb2.Status()) - operation_future = self._make_operation_future(operation) - self.assertRaises(GaxError, operation_future.result) - - def test_result_timeout(self): - operation_future = self._make_operation_future() - self.assertRaises(RetryError, operation_future.result, 0) - - def test_exception_error(self): - error = status_pb2.Status() - operation = self._make_operation(done=True, error=error) - operation_future = self._make_operation_future(operation) - - self.assertEqual(error, operation_future.exception()) - - def test_exception_response(self): - operation = self._make_operation(done=True, response=Simple()) - operation_future = self._make_operation_future(operation) - self.assertIsNone(operation_future.exception()) - - def test_exception_timeout(self): - operation_future = self._make_operation_future() - self.assertRaises(RetryError, operation_future.exception, 0) - - def test_add_done_callback(self): - response = Simple(field1='foo', field2='bar') - operation_future = self._make_operation_future( - self._make_operation(), - self._make_operation(done=True, response=response)) - operation_future.test_queue = mp.Queue() - - operation_future.add_done_callback(_task1) - operation_future.add_done_callback(_task2) - - self.assertEqual('foo', operation_future.test_queue.get()) - self.assertEqual('bar', operation_future.test_queue.get()) - - def test_add_done_callback_when_already_done(self): - response = Simple(field1='foo', field2='bar') - operation_future = self._make_operation_future( - self._make_operation(done=True, response=response)) - operation_future.test_queue = mp.Queue() - - operation_future.add_done_callback(_task1) - - self.assertEqual('foo', operation_future.test_queue.get()) - - def test_add_done_callback_when_exception(self): - def _raising_task(_): - raise Exception('Test message') - - operation_future = self._make_operation_future( - self._make_operation(), - self._make_operation(done=True, response=Simple())) - operation_future.add_done_callback(_raising_task) - self.assertEqual('Test message', self._log_handler.queue.get()) diff --git a/tests/test_utils_protobuf.py b/tests/test_utils_protobuf.py index ea8c634..6c13354 100644 --- a/tests/test_utils_protobuf.py +++ b/tests/test_utils_protobuf.py @@ -33,7 +33,7 @@ from google.api import http_pb2 from google.gax.utils import protobuf -from google.longrunning import operations_proto_pb2 as ops +from google.longrunning import operations_pb2 as ops class GetTests(unittest.TestCase):