diff --git a/core/google/cloud/future/base.py b/core/google/cloud/future/base.py index 928269506b65..aed1dfd80e5d 100644 --- a/core/google/cloud/future/base.py +++ b/core/google/cloud/future/base.py @@ -15,8 +15,12 @@ """Abstract and helper bases for Future implementations.""" import abc +import concurrent.futures +import functools +import operator import six +import tenacity from google.cloud.future import _helpers @@ -72,8 +76,8 @@ def set_exception(self, exception): class PollingFuture(Future): """A Future that needs to poll some service to check its status. - The private :meth:`_blocking_poll` method should be implemented by - subclasses. + The :meth:`done` method should be implemented by subclasses. The polling + behavior will repeatedly call ``done`` until it returns True. .. note: Privacy here is intended to prevent the final class from overexposing, not to prevent subclasses from accessing methods. @@ -89,6 +93,19 @@ def __init__(self): self._done_callbacks = [] @abc.abstractmethod + def done(self): + """Checks to see if the operation is complete. + + Returns: + bool: True if the operation is complete, False otherwise. + """ + # pylint: disable=redundant-returns-doc, missing-raises-doc + raise NotImplementedError() + + def running(self): + """True if the operation is currently running.""" + return not self.done() + def _blocking_poll(self, timeout=None): """Poll and wait for the Future to be resolved. @@ -96,8 +113,32 @@ def _blocking_poll(self, timeout=None): timeout (int): How long to wait for the operation to complete. If None, wait indefinitely. """ - # pylint: disable=missing-raises - raise NotImplementedError() + if self._result_set: + return + + retry_on = tenacity.retry_if_result( + functools.partial(operator.is_not, True)) + # Use exponential backoff with jitter. + wait_on = ( + tenacity.wait_exponential(multiplier=1, max=10) + + tenacity.wait_random(0, 1)) + + if timeout is None: + retry = tenacity.retry(retry=retry_on, wait=wait_on) + else: + retry = tenacity.retry( + retry=retry_on, + wait=wait_on, + stop=tenacity.stop_after_delay(timeout)) + + try: + retry(self.done)() + except tenacity.RetryError as exc: + six.raise_from( + concurrent.futures.TimeoutError( + 'Operation did not complete within the designated ' + 'timeout.'), + exc) def result(self, timeout=None): """Get the result of the operation, blocking if necessary. @@ -113,7 +154,7 @@ def result(self, timeout=None): google.gax.GaxError: If the operation errors or if the timeout is reached before the operation completes. """ - self._blocking_poll() + self._blocking_poll(timeout=timeout) if self._exception is not None: # pylint: disable=raising-bad-type diff --git a/core/google/cloud/future/operation.py b/core/google/cloud/future/operation.py new file mode 100644 index 000000000000..5bbfda1a8f0b --- /dev/null +++ b/core/google/cloud/future/operation.py @@ -0,0 +1,247 @@ +# Copyright 2016 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Futures for long-running operations returned from Google Cloud APIs.""" + +import functools +import threading + +from google.longrunning import operations_pb2 +from google.protobuf import json_format +from google.rpc import code_pb2 + +from google.cloud import _helpers +from google.cloud import exceptions +from google.cloud.future import base + + +class Operation(base.PollingFuture): + """A Future for interacting with a Google API Long-Running Operation. + + Args: + operation (google.longrunning.operations_pb2.Operation): The + initial operation. + refresh (Callable[[], Operation]): A callable that returns the + latest state of the operation. + cancel (Callable[[], None]), A callable that tries to cancel + the operation. + result_type (type): The protobuf type for the operation's result. + metadata_type (type): The protobuf type for the operation's + metadata. + """ + + def __init__( + self, operation, refresh, cancel, + result_type, metadata_type=None): + super(Operation, self).__init__() + self._operation = operation + self._refresh = refresh + self._cancel = cancel + self._result_type = result_type + self._metadata_type = metadata_type + self._completion_lock = threading.Lock() + # Invoke this in case the operation came back already complete. + self._set_result_from_operation() + + @property + def operation(self): + """google.longrunning.Operation: The current long-running operation.""" + return self._operation + + @property + def metadata(self): + """google.protobuf.Message: the current operation metadata.""" + if not self._operation.HasField('metadata'): + return None + + return _helpers._from_any_pb( + self._metadata_type, self._operation.metadata) + + def _set_result_from_operation(self): + """Set the result or exception from the operation if it is complete.""" + # This must be done in a lock to prevent the polling thread + # and main thread from both executing the completion logic + # at the same time. + with self._completion_lock: + # If the operation isn't complete or if the result has already been + # set, do not call set_result/set_exception again. + # Note: self._result_set is set to True in set_result and + # set_exception, in case those methods are invoked directly. + if not self._operation.done or self._result_set: + return + + if self._operation.HasField('response'): + response = _helpers._from_any_pb( + self._result_type, self._operation.response) + self.set_result(response) + elif self._operation.HasField('error'): + exception = exceptions.GoogleCloudError( + self._operation.error.message, + errors=(self._operation.error)) + self.set_exception(exception) + else: + exception = exceptions.GoogleCloudError( + 'Unexpected state: Long-running operation had neither ' + 'response nor error set.') + self.set_exception(exception) + + def _refresh_and_update(self): + """Refresh the operation and update the result if needed.""" + # If the currently cached operation is done, no need to make another + # RPC as it will not change once done. + if not self._operation.done: + self._operation = self._refresh() + self._set_result_from_operation() + + def done(self): + """Checks to see if the operation is complete. + + Returns: + bool: True if the operation is complete, False otherwise. + """ + self._refresh_and_update() + return self._operation.done + + def cancel(self): + """Attempt to cancel the operation. + + Returns: + bool: True if the cancel RPC was made, False if the operation is + already complete. + """ + if self.done(): + return False + + self._cancel() + return True + + def cancelled(self): + """True if the operation was cancelled.""" + self._refresh_and_update() + return (self._operation.HasField('error') and + self._operation.error.code == code_pb2.CANCELLED) + + +def _refresh_http(api_request, operation_name): + """Refresh an operation using a JSON/HTTP client. + + Args: + api_request (Callable): A callable used to make an API request. This + should generally be + :meth:`google.cloud._http.Connection.api_request`. + operation_name (str): The name of the operation. + + Returns: + google.longrunning.operations_pb2.Operation: The operation. + """ + path = 'operations/{}'.format(operation_name) + api_response = api_request(method='GET', path=path) + return json_format.ParseDict( + api_response, operations_pb2.Operation()) + + +def _cancel_http(api_request, operation_name): + """Cancel an operation using a JSON/HTTP client. + + Args: + api_request (Callable): A callable used to make an API request. This + should generally be + :meth:`google.cloud._http.Connection.api_request`. + operation_name (str): The name of the operation. + """ + path = 'operations/{}:cancel'.format(operation_name) + api_request(method='POST', path=path) + + +def from_http_json(operation, api_request, result_type, **kwargs): + """Create an operation future from using a HTTP/JSON client. + + This interacts with the long-running operations `service`_ (specific + to a given API) vis `HTTP/JSON`_. + + .. _HTTP/JSON: https://cloud.google.com/speech/reference/rest/\ + v1beta1/operations#Operation + + Args: + operation (dict): Operation as a dictionary. + api_request (Callable): A callable used to make an API request. This + should generally be + :meth:`google.cloud._http.Connection.api_request`. + result_type (type): The protobuf result type. + kwargs: Keyword args passed into the :class:`Operation` constructor. + + Returns: + Operation: The operation future to track the given operation. + """ + operation_proto = json_format.ParseDict( + operation, operations_pb2.Operation()) + refresh = functools.partial( + _refresh_http, api_request, operation_proto.name) + cancel = functools.partial( + _cancel_http, api_request, operation_proto.name) + return Operation(operation_proto, refresh, cancel, result_type, **kwargs) + + +def _refresh_grpc(operations_stub, operation_name): + """Refresh an operation using a gRPC client. + + Args: + operations_stub (google.longrunning.operations_pb2.OperationsStub): + The gRPC operations stub. + operation_name (str): The name of the operation. + + Returns: + google.longrunning.operations_pb2.Operation: The operation. + """ + request_pb = operations_pb2.GetOperationRequest(name=operation_name) + return operations_stub.GetOperation(request_pb) + + +def _cancel_grpc(operations_stub, operation_name): + """Cancel an operation using a gRPC client. + + Args: + operations_stub (google.longrunning.operations_pb2.OperationsStub): + The gRPC operations stub. + operation_name (str): The name of the operation. + """ + request_pb = operations_pb2.CancelOperationRequest(name=operation_name) + operations_stub.CancelOperation(request_pb) + + +def from_grpc(operation, operations_stub, result_type, **kwargs): + """Create an operation future from using a gRPC client. + + This interacts with the long-running operations `service`_ (specific + to a given API) via gRPC. + + .. _service: https://github.com/googleapis/googleapis/blob/\ + 050400df0fdb16f63b63e9dee53819044bffc857/\ + google/longrunning/operations.proto#L38 + + Args: + operation (google.longrunning.operations_pb2.Operation): The operation. + operations_stub (google.longrunning.operations_pb2.OperationsStub): + The operations stub. + result_type (type): The protobuf result type. + kwargs: Keyword args passed into the :class:`Operation` constructor. + + Returns: + Operation: The operation future to track the given operation. + """ + refresh = functools.partial( + _refresh_grpc, operations_stub, operation.name) + cancel = functools.partial( + _cancel_grpc, operations_stub, operation.name) + return Operation(operation, refresh, cancel, result_type, **kwargs) diff --git a/core/setup.py b/core/setup.py index cd461c5f2526..ba84f2347d18 100644 --- a/core/setup.py +++ b/core/setup.py @@ -57,6 +57,7 @@ 'google-auth >= 0.4.0, < 2.0.0dev', 'google-auth-httplib2', 'six', + 'tenacity >= 4.0.0, <5.0.0dev' ] setup( diff --git a/core/tests/unit/future/test_base.py b/core/tests/unit/future/test_base.py index f10c10b24fb4..69a0348e68d9 100644 --- a/core/tests/unit/future/test_base.py +++ b/core/tests/unit/future/test_base.py @@ -12,7 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import concurrent.futures import threading +import time import mock import pytest @@ -21,8 +23,8 @@ class PollingFutureImpl(base.PollingFuture): - def _blocking_poll(self, timeout=None): # pragma: NO COVER - pass + def done(self): + return False def cancel(self): return True @@ -30,9 +32,6 @@ def cancel(self): def cancelled(self): return False - def done(self): - return False - def running(self): return True @@ -87,13 +86,11 @@ def __init__(self): self.poll_count = 0 self.event = threading.Event() - def _blocking_poll(self, timeout=None): - if self._result_set: - return - + def done(self): self.poll_count += 1 self.event.wait() self.set_result(42) + return True def test_result_with_polling(): @@ -109,6 +106,18 @@ def test_result_with_polling(): assert future.poll_count == 1 +class PollingFutureImplTimeout(PollingFutureImplWithPoll): + def done(self): + time.sleep(1) + return False + + +def test_result_timeout(): + future = PollingFutureImplTimeout() + with pytest.raises(concurrent.futures.TimeoutError): + future.result(timeout=1) + + def test_callback_background_thread(): future = PollingFutureImplWithPoll() callback = mock.Mock() @@ -116,6 +125,9 @@ def test_callback_background_thread(): future.add_done_callback(callback) assert future._polling_thread is not None + + # Give the thread a second to poll + time.sleep(1) assert future.poll_count == 1 future.event.set() diff --git a/core/tests/unit/future/test_operation.py b/core/tests/unit/future/test_operation.py new file mode 100644 index 000000000000..0e29aa687ee6 --- /dev/null +++ b/core/tests/unit/future/test_operation.py @@ -0,0 +1,207 @@ +# Copyright 2017, Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import mock + +from google.cloud.future import operation +from google.longrunning import operations_pb2 +from google.protobuf import struct_pb2 +from google.rpc import code_pb2 +from google.rpc import status_pb2 + +TEST_OPERATION_NAME = 'test/operation' + + +def make_operation_proto( + name=TEST_OPERATION_NAME, metadata=None, response=None, + error=None, **kwargs): + operation_proto = operations_pb2.Operation( + name=name, **kwargs) + + if metadata is not None: + operation_proto.metadata.Pack(metadata) + + if response is not None: + operation_proto.response.Pack(response) + + if error is not None: + operation_proto.error.CopyFrom(error) + + return operation_proto + + +def make_operation_future(client_operations_responses=None): + if client_operations_responses is None: + client_operations_responses = [make_operation_proto()] + + refresh = mock.Mock( + spec=['__call__'], side_effect=client_operations_responses) + refresh.responses = client_operations_responses + cancel = mock.Mock(spec=['__call__']) + operation_future = operation.Operation( + client_operations_responses[0], + refresh, + cancel, + result_type=struct_pb2.Struct, + metadata_type=struct_pb2.Struct) + + return operation_future, refresh, cancel + + +def test_constructor(): + future, refresh, cancel = make_operation_future() + + assert future.operation == refresh.responses[0] + assert future.operation.done is False + assert future.operation.name == TEST_OPERATION_NAME + assert future.metadata is None + assert future.running() + + +def test_metadata(): + expected_metadata = struct_pb2.Struct() + future, _, _ = make_operation_future( + [make_operation_proto(metadata=expected_metadata)]) + + assert future.metadata == expected_metadata + + +def test_cancellation(): + responses = [ + make_operation_proto(), + # Second response indicates that the operation was cancelled. + make_operation_proto( + done=True, + error=status_pb2.Status(code=code_pb2.CANCELLED))] + future, _, cancel = make_operation_future(responses) + + assert future.cancel() + assert future.cancelled() + cancel.assert_called_once_with() + + # Cancelling twice should have no effect. + assert not future.cancel() + cancel.assert_called_once_with() + + +def test_result(): + expected_result = struct_pb2.Struct() + responses = [ + make_operation_proto(), + # Second operation response includes the result. + make_operation_proto(done=True, response=expected_result)] + future, _, _ = make_operation_future(responses) + + result = future.result() + + assert result == expected_result + assert future.done() + + +def test_exception(): + expected_exception = status_pb2.Status(message='meep') + responses = [ + make_operation_proto(), + # Second operation response includes the error. + make_operation_proto(done=True, error=expected_exception)] + future, _, _ = make_operation_future(responses) + + exception = future.exception() + + assert expected_exception.message in '{!r}'.format(exception) + + +def test_unexpected_result(): + responses = [ + make_operation_proto(), + # Second operation response is done, but has not error or response. + make_operation_proto(done=True)] + future, _, _ = make_operation_future(responses) + + exception = future.exception() + + assert 'Unexpected state' in '{!r}'.format(exception) + + +def test__refresh_http(): + api_request = mock.Mock( + return_value={'name': TEST_OPERATION_NAME, 'done': True}) + + result = operation._refresh_http(api_request, TEST_OPERATION_NAME) + + assert result.name == TEST_OPERATION_NAME + assert result.done is True + api_request.assert_called_once_with( + method='GET', path='operations/{}'.format(TEST_OPERATION_NAME)) + + +def test__cancel_http(): + api_request = mock.Mock() + + operation._cancel_http(api_request, TEST_OPERATION_NAME) + + api_request.assert_called_once_with( + method='POST', path='operations/{}:cancel'.format(TEST_OPERATION_NAME)) + + +def test_from_http_json(): + operation_json = {'name': TEST_OPERATION_NAME, 'done': True} + api_request = mock.sentinel.api_request + + future = operation.from_http_json( + operation_json, api_request, struct_pb2.Struct, + metadata_type=struct_pb2.Struct) + + assert future._result_type == struct_pb2.Struct + assert future._metadata_type == struct_pb2.Struct + assert future.operation.name == TEST_OPERATION_NAME + assert future.done + + +def test__refresh_grpc(): + operations_stub = mock.Mock(spec=['GetOperation']) + expected_result = make_operation_proto(done=True) + operations_stub.GetOperation.return_value = expected_result + + result = operation._refresh_grpc(operations_stub, TEST_OPERATION_NAME) + + assert result == expected_result + expected_request = operations_pb2.GetOperationRequest( + name=TEST_OPERATION_NAME) + operations_stub.GetOperation.assert_called_once_with(expected_request) + + +def test__cancel_grpc(): + operations_stub = mock.Mock(spec=['CancelOperation']) + + operation._cancel_grpc(operations_stub, TEST_OPERATION_NAME) + + expected_request = operations_pb2.CancelOperationRequest( + name=TEST_OPERATION_NAME) + operations_stub.CancelOperation.assert_called_once_with(expected_request) + + +def test_from_grpc(): + operation_proto = make_operation_proto(done=True) + operations_stub = mock.sentinel.operations_stub + + future = operation.from_grpc( + operation_proto, operations_stub, struct_pb2.Struct, + metadata_type=struct_pb2.Struct) + + assert future._result_type == struct_pb2.Struct + assert future._metadata_type == struct_pb2.Struct + assert future.operation.name == TEST_OPERATION_NAME + assert future.done