diff --git a/gcloud/datastore/_generated/datastore_grpc_pb2.py b/gcloud/datastore/_generated/datastore_grpc_pb2.py index 9e753ad6745d..639c42c6ce6b 100644 --- a/gcloud/datastore/_generated/datastore_grpc_pb2.py +++ b/gcloud/datastore/_generated/datastore_grpc_pb2.py @@ -1,3 +1,20 @@ +# BEGIN: Imports from datastore_pb2 +from gcloud.datastore._generated.datastore_pb2 import AllocateIdsRequest +from gcloud.datastore._generated.datastore_pb2 import AllocateIdsResponse +from gcloud.datastore._generated.datastore_pb2 import BeginTransactionRequest +from gcloud.datastore._generated.datastore_pb2 import BeginTransactionResponse +from gcloud.datastore._generated.datastore_pb2 import CommitRequest +from gcloud.datastore._generated.datastore_pb2 import CommitResponse +from gcloud.datastore._generated.datastore_pb2 import LookupRequest +from gcloud.datastore._generated.datastore_pb2 import LookupResponse +from gcloud.datastore._generated.datastore_pb2 import Mutation +from gcloud.datastore._generated.datastore_pb2 import MutationResult +from gcloud.datastore._generated.datastore_pb2 import ReadOptions +from gcloud.datastore._generated.datastore_pb2 import RollbackRequest +from gcloud.datastore._generated.datastore_pb2 import RollbackResponse +from gcloud.datastore._generated.datastore_pb2 import RunQueryRequest +from gcloud.datastore._generated.datastore_pb2 import RunQueryResponse +# END: Imports from datastore_pb2 import grpc from grpc.beta import implementations as beta_implementations from grpc.beta import interfaces as beta_interfaces diff --git a/gcloud/datastore/connection.py b/gcloud/datastore/connection.py index 9fcacacfcd6f..6df414acd930 100644 --- a/gcloud/datastore/connection.py +++ b/gcloud/datastore/connection.py @@ -18,15 +18,41 @@ from google.rpc import status_pb2 +from gcloud._helpers import make_stub from gcloud import connection as connection_module from gcloud.environment_vars import GCD_HOST +from gcloud.exceptions import Conflict from gcloud.exceptions import make_exception from gcloud.datastore._generated import datastore_pb2 as _datastore_pb2 +# pylint: disable=ungrouped-imports +try: + from grpc.beta.interfaces import StatusCode + from grpc.framework.interfaces.face.face import AbortionError + from gcloud.datastore._generated import datastore_grpc_pb2 + DATASTORE_STUB_FACTORY = datastore_grpc_pb2.beta_create_Datastore_stub +except ImportError: # pragma: NO COVER + _HAVE_GRPC = False + DATASTORE_STUB_FACTORY = None + StatusCode = None + AbortionError = Exception +else: + _HAVE_GRPC = True +# pylint: enable=ungrouped-imports + + +DATASTORE_API_HOST = 'datastore.googleapis.com' +"""Datastore API request host.""" +DATASTORE_API_PORT = 443 +"""Datastore API request port.""" +GRPC_TIMEOUT_SECONDS = 10 +"""The default timeout to use for API requests via gRPC.""" class _DatastoreAPIOverHttp(object): """Helper mapping datastore API methods. + Makes requests to send / receive protobuf content over HTTP/1.1. + Methods make bare API requests without any helpers for constructing the requests or parsing the responses. @@ -196,6 +222,139 @@ def allocate_ids(self, project, request_pb): _datastore_pb2.AllocateIdsResponse) +class _DatastoreAPIOverGRPC(object): + """Helper mapping datastore API methods. + + Makes requests to send / receive protobuf content over gRPC. + + Methods make bare API requests without any helpers for constructing + the requests or parsing the responses. + + :type connection: :class:`gcloud.datastore.connection.Connection` + :param connection: A connection object that contains helpful + information for making requests. + """ + + def __init__(self, connection): + self._stub = make_stub(connection.credentials, connection.USER_AGENT, + DATASTORE_STUB_FACTORY, DATASTORE_API_HOST, + DATASTORE_API_PORT) + self._stub.__enter__() + + def __del__(self): + """Destructor for object. + + Ensures that the stub is exited so the shell can close properly. + """ + try: + self._stub.__exit__(None, None, None) + del self._stub + except AttributeError: + pass + + def lookup(self, project, request_pb): + """Perform a ``lookup`` request. + + :type project: string + :param project: The project to connect to. This is + usually your project name in the cloud console. + + :type request_pb: :class:`._generated.datastore_pb2.LookupRequest` + :param request_pb: The request protobuf object. + + :rtype: :class:`._generated.datastore_pb2.LookupResponse` + :returns: The returned protobuf response object. + """ + request_pb.project_id = project + return self._stub.Lookup(request_pb, GRPC_TIMEOUT_SECONDS) + + def run_query(self, project, request_pb): + """Perform a ``runQuery`` request. + + :type project: string + :param project: The project to connect to. This is + usually your project name in the cloud console. + + :type request_pb: :class:`._generated.datastore_pb2.RunQueryRequest` + :param request_pb: The request protobuf object. + + :rtype: :class:`._generated.datastore_pb2.RunQueryResponse` + :returns: The returned protobuf response object. + """ + request_pb.project_id = project + return self._stub.RunQuery(request_pb, GRPC_TIMEOUT_SECONDS) + + def begin_transaction(self, project, request_pb): + """Perform a ``beginTransaction`` request. + + :type project: string + :param project: The project to connect to. This is + usually your project name in the cloud console. + + :type request_pb: + :class:`._generated.datastore_pb2.BeginTransactionRequest` + :param request_pb: The request protobuf object. + + :rtype: :class:`._generated.datastore_pb2.BeginTransactionResponse` + :returns: The returned protobuf response object. + """ + request_pb.project_id = project + return self._stub.BeginTransaction(request_pb, GRPC_TIMEOUT_SECONDS) + + def commit(self, project, request_pb): + """Perform a ``commit`` request. + + :type project: string + :param project: The project to connect to. This is + usually your project name in the cloud console. + + :type request_pb: :class:`._generated.datastore_pb2.CommitRequest` + :param request_pb: The request protobuf object. + + :rtype: :class:`._generated.datastore_pb2.CommitResponse` + :returns: The returned protobuf response object. + """ + request_pb.project_id = project + try: + return self._stub.Commit(request_pb, GRPC_TIMEOUT_SECONDS) + except AbortionError as exc: + if exc.code == StatusCode.ABORTED: + raise Conflict(exc.details) + raise + + def rollback(self, project, request_pb): + """Perform a ``rollback`` request. + + :type project: string + :param project: The project to connect to. This is + usually your project name in the cloud console. + + :type request_pb: :class:`._generated.datastore_pb2.RollbackRequest` + :param request_pb: The request protobuf object. + + :rtype: :class:`._generated.datastore_pb2.RollbackResponse` + :returns: The returned protobuf response object. + """ + request_pb.project_id = project + return self._stub.Rollback(request_pb, GRPC_TIMEOUT_SECONDS) + + def allocate_ids(self, project, request_pb): + """Perform an ``allocateIds`` request. + + :type project: string + :param project: The project to connect to. This is + usually your project name in the cloud console. + + :type request_pb: :class:`._generated.datastore_pb2.AllocateIdsRequest` + :param request_pb: The request protobuf object. + + :rtype: :class:`._generated.datastore_pb2.AllocateIdsResponse` + :returns: The returned protobuf response object. + """ + request_pb.project_id = project + return self._stub.AllocateIds(request_pb, GRPC_TIMEOUT_SECONDS) + + class Connection(connection_module.Connection): """A connection to the Google Cloud Datastore via the Protobuf API. @@ -213,7 +372,7 @@ class Connection(connection_module.Connection): :attr:`API_BASE_URL`. """ - API_BASE_URL = 'https://datastore.googleapis.com' + API_BASE_URL = 'https://' + DATASTORE_API_HOST """The base of the API call URL.""" API_VERSION = 'v1' @@ -236,7 +395,10 @@ def __init__(self, credentials=None, http=None, api_base_url=None): except KeyError: api_base_url = self.__class__.API_BASE_URL self.api_base_url = api_base_url - self._datastore_api = _DatastoreAPIOverHttp(self) + if _HAVE_GRPC: + self._datastore_api = _DatastoreAPIOverGRPC(self) + else: + self._datastore_api = _DatastoreAPIOverHttp(self) def build_api_url(self, project, method, base_url=None, api_version=None): diff --git a/gcloud/datastore/test_connection.py b/gcloud/datastore/test_connection.py index e4e3549cdfcf..3ed80f18bedf 100644 --- a/gcloud/datastore/test_connection.py +++ b/gcloud/datastore/test_connection.py @@ -14,6 +14,8 @@ import unittest +from gcloud.datastore.connection import _HAVE_GRPC + class Test_DatastoreAPIOverHttp(unittest.TestCase): @@ -25,8 +27,6 @@ def _makeOne(self, *args, **kw): return self._getTargetClass()(*args, **kw) def test__rpc(self): - from gcloud.datastore.connection import Connection - class ReqPB(object): def SerializeToString(self): @@ -44,15 +44,10 @@ def FromString(cls, pb): REQPB = b'REQPB' PROJECT = 'PROJECT' METHOD = 'METHOD' - conn = Connection() + URI = 'http://api-url' + conn = _Connection(URI) datastore_api = self._makeOne(conn) - URI = '/'.join([ - conn.api_base_url, - conn.API_VERSION, - 'projects', - PROJECT + ':' + METHOD, - ]) - http = conn._http = Http({'status': '200'}, 'CONTENT') + http = conn.http = Http({'status': '200'}, 'CONTENT') response = datastore_api._rpc(PROJECT, METHOD, ReqPB(), RspPB) self.assertTrue(isinstance(response, RspPB)) self.assertEqual(response._pb, 'CONTENT') @@ -64,22 +59,17 @@ def FromString(cls, pb): self.assertEqual(called_with['headers']['User-Agent'], conn.USER_AGENT) self.assertEqual(called_with['body'], REQPB) + self.assertEqual(conn.build_kwargs, + [{'method': METHOD, 'project': PROJECT}]) def test__request_w_200(self): - from gcloud.datastore.connection import Connection - PROJECT = 'PROJECT' METHOD = 'METHOD' DATA = b'DATA' - conn = Connection() + URI = 'http://api-url' + conn = _Connection(URI) datastore_api = self._makeOne(conn) - URI = '/'.join([ - conn.api_base_url, - conn.API_VERSION, - 'projects', - PROJECT + ':' + METHOD, - ]) - http = conn._http = Http({'status': '200'}, 'CONTENT') + http = conn.http = Http({'status': '200'}, 'CONTENT') self.assertEqual(datastore_api._request(PROJECT, METHOD, DATA), 'CONTENT') called_with = http._called_with @@ -90,9 +80,10 @@ def test__request_w_200(self): self.assertEqual(called_with['headers']['User-Agent'], conn.USER_AGENT) self.assertEqual(called_with['body'], DATA) + self.assertEqual(conn.build_kwargs, + [{'method': METHOD, 'project': PROJECT}]) def test__request_not_200(self): - from gcloud.datastore.connection import Connection from gcloud.exceptions import BadRequest from google.rpc import status_pb2 @@ -103,13 +94,215 @@ def test__request_not_200(self): PROJECT = 'PROJECT' METHOD = 'METHOD' DATA = 'DATA' - conn = Connection() + URI = 'http://api-url' + conn = _Connection(URI) datastore_api = self._makeOne(conn) - conn._http = Http({'status': '400'}, error.SerializeToString()) - with self.assertRaises(BadRequest) as e: + conn.http = Http({'status': '400'}, error.SerializeToString()) + with self.assertRaises(BadRequest) as exc: datastore_api._request(PROJECT, METHOD, DATA) expected_message = '400 Entity value is indexed.' - self.assertEqual(str(e.exception), expected_message) + self.assertEqual(str(exc.exception), expected_message) + self.assertEqual(conn.build_kwargs, + [{'method': METHOD, 'project': PROJECT}]) + + +class Test_DatastoreAPIOverGRPC(unittest.TestCase): + + def _getTargetClass(self): + from gcloud.datastore.connection import _DatastoreAPIOverGRPC + return _DatastoreAPIOverGRPC + + def _makeOne(self, connection=None, stub=None, mock_args=None): + from gcloud._testing import _Monkey + from gcloud.datastore import connection as MUT + + if connection is None: + connection = _Connection(None) + connection.credentials = object() + + if stub is None: + stub = _GRPCStub() + + if mock_args is None: + mock_args = [] + + def mock_make_stub(*args): + mock_args.append(args) + return stub + + with _Monkey(MUT, make_stub=mock_make_stub): + return self._getTargetClass()(connection) + + def test_constructor(self): + from gcloud.datastore import connection as MUT + + conn = _Connection(None) + conn.credentials = object() + + stub = _GRPCStub() + mock_args = [] + self.assertEqual(stub.enter_calls, 0) + datastore_api = self._makeOne(conn, stub=stub, mock_args=mock_args) + self.assertIs(datastore_api._stub, stub) + + self.assertEqual(mock_args, [( + conn.credentials, + conn.USER_AGENT, + MUT.DATASTORE_STUB_FACTORY, + MUT.DATASTORE_API_HOST, + MUT.DATASTORE_API_PORT, + )]) + + def test___del__valid_stub(self): + datastore_api = self._makeOne() + + stub = datastore_api._stub + self.assertEqual(stub.exit_calls, []) + self.assertIs(datastore_api._stub, stub) + datastore_api.__del__() + self.assertEqual(stub.exit_calls, [(None, None, None)]) + self.assertFalse(hasattr(datastore_api, '_stub')) + + def test___del__invalid_stub(self): + datastore_api = self._makeOne() + + stub = datastore_api._stub + self.assertEqual(stub.exit_calls, []) + + del datastore_api._stub + self.assertFalse(hasattr(datastore_api, '_stub')) + datastore_api.__del__() + self.assertEqual(stub.exit_calls, []) + self.assertFalse(hasattr(datastore_api, '_stub')) + + def test_lookup(self): + from gcloud.datastore import connection as MUT + + return_val = object() + stub = _GRPCStub(return_val) + datastore_api = self._makeOne(stub=stub) + + request_pb = _RequestPB() + project = 'PROJECT' + result = datastore_api.lookup(project, request_pb) + self.assertIs(result, return_val) + self.assertEqual(request_pb.project_id, project) + self.assertEqual(stub.method_calls, + [(request_pb, MUT.GRPC_TIMEOUT_SECONDS, 'Lookup')]) + + def test_run_query(self): + from gcloud.datastore import connection as MUT + + return_val = object() + stub = _GRPCStub(return_val) + datastore_api = self._makeOne(stub=stub) + + request_pb = _RequestPB() + project = 'PROJECT' + result = datastore_api.run_query(project, request_pb) + self.assertIs(result, return_val) + self.assertEqual(request_pb.project_id, project) + self.assertEqual(stub.method_calls, + [(request_pb, MUT.GRPC_TIMEOUT_SECONDS, 'RunQuery')]) + + def test_begin_transaction(self): + from gcloud.datastore import connection as MUT + + return_val = object() + stub = _GRPCStub(return_val) + datastore_api = self._makeOne(stub=stub) + + request_pb = _RequestPB() + project = 'PROJECT' + result = datastore_api.begin_transaction(project, request_pb) + self.assertIs(result, return_val) + self.assertEqual(request_pb.project_id, project) + self.assertEqual( + stub.method_calls, + [(request_pb, MUT.GRPC_TIMEOUT_SECONDS, 'BeginTransaction')]) + + def test_commit_success(self): + from gcloud.datastore import connection as MUT + + return_val = object() + stub = _GRPCStub(return_val) + datastore_api = self._makeOne(stub=stub) + + request_pb = _RequestPB() + project = 'PROJECT' + result = datastore_api.commit(project, request_pb) + self.assertIs(result, return_val) + self.assertEqual(request_pb.project_id, project) + self.assertEqual(stub.method_calls, + [(request_pb, MUT.GRPC_TIMEOUT_SECONDS, 'Commit')]) + + def _commit_failure_helper(self, exc, err_class): + from gcloud.datastore import connection as MUT + + stub = _GRPCStub(side_effect=exc) + datastore_api = self._makeOne(stub=stub) + + request_pb = _RequestPB() + project = 'PROJECT' + with self.assertRaises(err_class): + datastore_api.commit(project, request_pb) + + self.assertEqual(request_pb.project_id, project) + self.assertEqual(stub.method_calls, + [(request_pb, MUT.GRPC_TIMEOUT_SECONDS, 'Commit')]) + + @unittest.skipUnless(_HAVE_GRPC, 'No gRPC') + def test_commit_failure_aborted(self): + from grpc.beta.interfaces import StatusCode + from grpc.framework.interfaces.face.face import AbortionError + from gcloud.exceptions import Conflict + + exc = AbortionError(None, None, StatusCode.ABORTED, None) + self._commit_failure_helper(exc, Conflict) + + @unittest.skipUnless(_HAVE_GRPC, 'No gRPC') + def test_commit_failure_cancelled(self): + from grpc.beta.interfaces import StatusCode + from grpc.framework.interfaces.face.face import AbortionError + + exc = AbortionError(None, None, StatusCode.CANCELLED, None) + self._commit_failure_helper(exc, AbortionError) + + @unittest.skipUnless(_HAVE_GRPC, 'No gRPC') + def test_commit_failure_non_grpc_err(self): + exc = RuntimeError('Not a gRPC error') + self._commit_failure_helper(exc, RuntimeError) + + def test_rollback(self): + from gcloud.datastore import connection as MUT + + return_val = object() + stub = _GRPCStub(return_val) + datastore_api = self._makeOne(stub=stub) + + request_pb = _RequestPB() + project = 'PROJECT' + result = datastore_api.rollback(project, request_pb) + self.assertIs(result, return_val) + self.assertEqual(request_pb.project_id, project) + self.assertEqual(stub.method_calls, + [(request_pb, MUT.GRPC_TIMEOUT_SECONDS, 'Rollback')]) + + def test_allocate_ids(self): + from gcloud.datastore import connection as MUT + + return_val = object() + stub = _GRPCStub(return_val) + datastore_api = self._makeOne(stub=stub) + + request_pb = _RequestPB() + project = 'PROJECT' + result = datastore_api.allocate_ids(project, request_pb) + self.assertIs(result, return_val) + self.assertEqual(request_pb.project_id, project) + self.assertEqual( + stub.method_calls, + [(request_pb, MUT.GRPC_TIMEOUT_SECONDS, 'AllocateIds')]) class TestConnection(unittest.TestCase): @@ -132,8 +325,13 @@ def _make_query_pb(self, kind): pb.kind.add().name = kind return pb - def _makeOne(self, *args, **kw): - return self._getTargetClass()(*args, **kw) + def _makeOne(self, credentials=None, http=None, + api_base_url=None, have_grpc=False): + from gcloud._testing import _Monkey + from gcloud.datastore import connection as MUT + with _Monkey(MUT, _HAVE_GRPC=have_grpc): + return self._getTargetClass()(credentials=credentials, http=http, + api_base_url=api_base_url) def _verifyProtobufCall(self, called_with, URI, conn): self.assertEqual(called_with['uri'], URI) @@ -192,6 +390,42 @@ def test_ctor_defaults(self): conn = self._makeOne() self.assertEqual(conn.credentials, None) + def test_ctor_without_grpc(self): + from gcloud._testing import _Monkey + from gcloud.datastore import connection as MUT + + connections = [] + return_val = object() + + def mock_api(connection): + connections.append(connection) + return return_val + + with _Monkey(MUT, _DatastoreAPIOverHttp=mock_api): + conn = self._makeOne(have_grpc=False) + + self.assertEqual(conn.credentials, None) + self.assertIs(conn._datastore_api, return_val) + self.assertEqual(connections, [conn]) + + def test_ctor_with_grpc(self): + from gcloud._testing import _Monkey + from gcloud.datastore import connection as MUT + + connections = [] + return_val = object() + + def mock_api(connection): + connections.append(connection) + return return_val + + with _Monkey(MUT, _DatastoreAPIOverGRPC=mock_api): + conn = self._makeOne(have_grpc=True) + + self.assertEqual(conn.credentials, None) + self.assertIs(conn._datastore_api, return_val) + self.assertEqual(connections, [conn]) + def test_ctor_explicit(self): class Creds(object): @@ -901,3 +1135,63 @@ class _KeyProto(object): def __init__(self, id_): self.path = [_PathElementProto(id_)] + + +class _Connection(object): + + USER_AGENT = 'you-sir-age-int' + + def __init__(self, api_url): + self.api_url = api_url + self.build_kwargs = [] + + def build_api_url(self, **kwargs): + self.build_kwargs.append(kwargs) + return self.api_url + + +class _GRPCStub(object): + + def __init__(self, return_val=None, side_effect=Exception): + self.return_val = return_val + self.side_effect = side_effect + self.enter_calls = 0 + self.exit_calls = [] + self.method_calls = [] + + def __enter__(self): + self.enter_calls += 1 + return self + + def __exit__(self, *args): + self.exit_calls.append(args) + + def _method(self, request_pb, timeout, name): + self.method_calls.append((request_pb, timeout, name)) + return self.return_val + + def Lookup(self, request_pb, timeout): + return self._method(request_pb, timeout, 'Lookup') + + def RunQuery(self, request_pb, timeout): + return self._method(request_pb, timeout, 'RunQuery') + + def BeginTransaction(self, request_pb, timeout): + return self._method(request_pb, timeout, 'BeginTransaction') + + def Commit(self, request_pb, timeout): + result = self._method(request_pb, timeout, 'Commit') + if self.side_effect is Exception: + return result + else: + raise self.side_effect + + def Rollback(self, request_pb, timeout): + return self._method(request_pb, timeout, 'Rollback') + + def AllocateIds(self, request_pb, timeout): + return self._method(request_pb, timeout, 'AllocateIds') + + +class _RequestPB(object): + project_id = None diff --git a/scripts/make_datastore_grpc.py b/scripts/make_datastore_grpc.py index b128994d863e..6e296bb9d72c 100644 --- a/scripts/make_datastore_grpc.py +++ b/scripts/make_datastore_grpc.py @@ -29,6 +29,8 @@ GRPC_ONLY_FILE = os.path.join(ROOT_DIR, 'gcloud', 'datastore', '_generated', 'datastore_grpc_pb2.py') GRPCIO_VIRTUALENV = os.environ.get('GRPCIO_VIRTUALENV', 'protoc') +MESSAGE_SNIPPET = ' = _reflection.GeneratedProtocolMessageType(' +IMPORT_TEMPLATE = 'from gcloud.datastore._generated.datastore_pb2 import %s\n' def get_pb2_contents_with_grpc(): @@ -110,10 +112,32 @@ def get_pb2_grpc_only(): return grpc_only_lines +def get_pb2_message_types(): + """Get message types defined in datastore pb2 file. + + :rtype: list + :returns: A list of names that are defined as message types. + """ + non_grpc_contents = get_pb2_contents_without_grpc() + result = [] + for line in non_grpc_contents: + if MESSAGE_SNIPPET in line: + name, _ = line.split(MESSAGE_SNIPPET) + result.append(name) + + return sorted(result) + + def main(): """Write gRPC-only lines to custom module.""" grpc_only_lines = get_pb2_grpc_only() with open(GRPC_ONLY_FILE, 'wb') as file_obj: + # First add imports for public objects in the original. + file_obj.write('# BEGIN: Imports from datastore_pb2\n') + for name in get_pb2_message_types(): + import_line = IMPORT_TEMPLATE % (name,) + file_obj.write(import_line) + file_obj.write('# END: Imports from datastore_pb2\n') file_obj.write(''.join(grpc_only_lines))