diff --git a/gcloud/logging/_gax.py b/gcloud/logging/_gax.py index 05f00a3d22a2..15b2f9a8b3d9 100644 --- a/gcloud/logging/_gax.py +++ b/gcloud/logging/_gax.py @@ -19,18 +19,25 @@ # pylint: disable=import-error from google.gax import CallOptions from google.gax import INITIAL_PAGE +from google.gax.errors import GaxError +from google.gax.grpc import exc_to_code from google.logging.type.log_severity_pb2 import LogSeverity +from google.logging.v2.logging_config_pb2 import LogSink from google.logging.v2.log_entry_pb2 import LogEntry from google.protobuf.json_format import Parse +from grpc.beta.interfaces import StatusCode # pylint: enable=import-error +from gcloud.exceptions import Conflict +from gcloud.exceptions import NotFound from gcloud._helpers import _datetime_to_pb_timestamp class _LoggingAPI(object): """Helper mapping logging-related APIs. - :type gax_api: :class:`google.logging.v2.logging_api.LoggingApi` + :type gax_api: + :class:`google.logging.v2.logging_service_v2_api.LoggingServiceV2Api` :param gax_api: API object used to make GAX requests. """ def __init__(self, gax_api): @@ -113,6 +120,140 @@ def logger_delete(self, project, logger_name): self._gax_api.delete_log(path, options) +class _SinksAPI(object): + """Helper mapping sink-related APIs. + + :type gax_api: + :class:`google.logging.v2.config_service_v2_api.ConfigServiceV2Api` + :param gax_api: API object used to make GAX requests. + """ + def __init__(self, gax_api): + self._gax_api = gax_api + + def list_sinks(self, project, page_size=0, page_token=None): + """List sinks for the project associated with this client. + + :type project: string + :param project: ID of the project whose sinks are to be listed. + + :type page_size: int + :param page_size: maximum number of sinks to return, If not passed, + defaults to a value set by the API. + + :type page_token: str + :param page_token: opaque marker for the next "page" of sinks. If not + passed, the API will return the first page of + sinks. + + :rtype: tuple, (list, str) + :returns: list of mappings, plus a "next page token" string: + if not None, indicates that more sinks can be retrieved + with another call (pass that value as ``page_token``). + """ + options = _build_paging_options(page_token) + page_iter = self._gax_api.list_sinks(project, page_size, options) + sinks = [_log_sink_pb_to_mapping(log_sink_pb) + for log_sink_pb in page_iter.next()] + token = page_iter.page_token or None + return sinks, token + + def sink_create(self, project, sink_name, filter_, destination): + """API call: create a sink resource. + + See: + https://cloud.google.com/logging/docs/api/ref_v2beta1/rest/v2beta1/projects.sinks/create + + :type project: string + :param project: ID of the project in which to create the sink. + + :type sink_name: string + :param sink_name: the name of the sink + + :type filter_: string + :param filter_: the advanced logs filter expression defining the + entries exported by the sink. + + :type destination: string + :param destination: destination URI for the entries exported by + the sink. + """ + options = None + parent = 'projects/%s' % (project,) + path = 'projects/%s/sinks/%s' % (project, sink_name) + sink_pb = LogSink(name=path, filter=filter_, destination=destination) + try: + self._gax_api.create_sink(parent, sink_pb, options) + except GaxError as exc: + if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION: + raise Conflict(path) + raise + + def sink_get(self, project, sink_name): + """API call: retrieve a sink resource. + + :type project: string + :param project: ID of the project containing the sink. + + :type sink_name: string + :param sink_name: the name of the sink + """ + options = None + path = 'projects/%s/sinks/%s' % (project, sink_name) + try: + sink_pb = self._gax_api.get_sink(path, options) + except GaxError as exc: + if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: + raise NotFound(path) + raise + return _log_sink_pb_to_mapping(sink_pb) + + def sink_update(self, project, sink_name, filter_, destination): + """API call: update a sink resource. + + :type project: string + :param project: ID of the project containing the sink. + + :type sink_name: string + :param sink_name: the name of the sink + + :type filter_: string + :param filter_: the advanced logs filter expression defining the + entries exported by the sink. + + :type destination: string + :param destination: destination URI for the entries exported by + the sink. + """ + options = None + path = 'projects/%s/sinks/%s' % (project, sink_name) + sink_pb = LogSink(name=path, filter=filter_, destination=destination) + try: + self._gax_api.update_sink(path, sink_pb, options) + except GaxError as exc: + if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: + raise NotFound(path) + raise + return _log_sink_pb_to_mapping(sink_pb) + + def sink_delete(self, project, sink_name): + """API call: delete a sink resource. + + :type project: string + :param project: ID of the project containing the sink. + + :type sink_name: string + :param sink_name: the name of the sink + """ + options = None + path = 'projects/%s/sinks/%s' % (project, sink_name) + try: + self._gax_api.delete_sink(path, options) + except GaxError as exc: + if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: + raise NotFound(path) + raise + + def _build_paging_options(page_token=None): """Helper for :meth:'_PublisherAPI.list_topics' et aliae.""" if page_token is None: @@ -250,3 +391,17 @@ def _log_entry_mapping_to_pb(mapping): return entry_pb # pylint: enable=too-many-branches + + +def _log_sink_pb_to_mapping(sink_pb): + """Helper for :meth:`list_sinks`, et aliae + + Ideally, would use a function from :mod:`protobuf.json_format`, but + the right one isn't public. See: + https://github.com/google/protobuf/issues/1351 + """ + return { + 'name': sink_pb.name, + 'destination': sink_pb.destination, + 'filter': sink_pb.filter, + } diff --git a/gcloud/logging/test__gax.py b/gcloud/logging/test__gax.py index 89169e5dd674..825d62299a3e 100644 --- a/gcloud/logging/test__gax.py +++ b/gcloud/logging/test__gax.py @@ -28,9 +28,7 @@ class _Base(object): PROJECT = 'PROJECT' PROJECT_PATH = 'projects/%s' % (PROJECT,) - LIST_SINKS_PATH = '%s/sinks' % (PROJECT_PATH,) - SINK_NAME = 'sink_name' - SINK_PATH = 'projects/%s/sinks/%s' % (PROJECT, SINK_NAME) + FILTER = 'logName:syslog AND severity>=ERROR' def _makeOne(self, *args, **kw): return self._getTargetClass()(*args, **kw) @@ -39,7 +37,6 @@ def _makeOne(self, *args, **kw): @unittest2.skipUnless(_HAVE_GAX, 'No gax-python') class Test_LoggingAPI(_Base, unittest2.TestCase): LOG_NAME = 'log_name' - FILTER = 'logName:syslog AND severity>=ERROR' def _getTargetClass(self): from gcloud.logging._gax import _LoggingAPI @@ -395,11 +392,240 @@ def test_logger_delete(self): self.assertEqual(options, None) -class _GAXLoggingAPI(object): +@unittest2.skipUnless(_HAVE_GAX, 'No gax-python') +class Test_SinksAPI(_Base, unittest2.TestCase): + LIST_SINKS_PATH = '%s/sinks' % (_Base.PROJECT_PATH,) + SINK_NAME = 'sink_name' + SINK_PATH = 'projects/%s/sinks/%s' % (_Base.PROJECT, SINK_NAME) + DESTINATION_URI = 'faux.googleapis.com/destination' + + def _getTargetClass(self): + from gcloud.logging._gax import _SinksAPI + return _SinksAPI + + def test_ctor(self): + gax_api = _GAXSinksAPI() + api = self._makeOne(gax_api) + self.assertTrue(api._gax_api is gax_api) + + def test_list_sinks_no_paging(self): + from google.gax import INITIAL_PAGE + from gcloud._testing import _GAXPageIterator + TOKEN = 'TOKEN' + SINKS = [{ + 'name': self.SINK_PATH, + 'filter': self.FILTER, + 'destination': self.DESTINATION_URI, + }] + response = _GAXPageIterator( + [_LogSinkPB(self.SINK_PATH, self.DESTINATION_URI, self.FILTER)], + TOKEN) + gax_api = _GAXSinksAPI(_list_sinks_response=response) + api = self._makeOne(gax_api) + + sinks, token = api.list_sinks(self.PROJECT) + + self.assertEqual(sinks, SINKS) + self.assertEqual(token, TOKEN) + + project, page_size, options = gax_api._list_sinks_called_with + self.assertEqual(project, self.PROJECT) + self.assertEqual(page_size, 0) + self.assertEqual(options.page_token, INITIAL_PAGE) + + def test_list_sinks_w_paging(self): + from gcloud._testing import _GAXPageIterator + TOKEN = 'TOKEN' + PAGE_SIZE = 42 + SINKS = [{ + 'name': self.SINK_PATH, + 'filter': self.FILTER, + 'destination': self.DESTINATION_URI, + }] + response = _GAXPageIterator( + [_LogSinkPB(self.SINK_PATH, self.DESTINATION_URI, self.FILTER)], + None) + gax_api = _GAXSinksAPI(_list_sinks_response=response) + api = self._makeOne(gax_api) + + sinks, token = api.list_sinks( + self.PROJECT, page_size=PAGE_SIZE, page_token=TOKEN) + + self.assertEqual(sinks, SINKS) + self.assertEqual(token, None) + + project, page_size, options = gax_api._list_sinks_called_with + self.assertEqual(project, self.PROJECT) + self.assertEqual(page_size, PAGE_SIZE) + self.assertEqual(options.page_token, TOKEN) + + def test_sink_create_error(self): + from google.gax.errors import GaxError + gax_api = _GAXSinksAPI(_random_gax_error=True) + api = self._makeOne(gax_api) + + with self.assertRaises(GaxError): + api.sink_create( + self.PROJECT, self.SINK_NAME, self.FILTER, + self.DESTINATION_URI) + + def test_sink_create_conflict(self): + from gcloud.exceptions import Conflict + gax_api = _GAXSinksAPI(_create_sink_conflict=True) + api = self._makeOne(gax_api) + + with self.assertRaises(Conflict): + api.sink_create( + self.PROJECT, self.SINK_NAME, self.FILTER, + self.DESTINATION_URI) + + def test_sink_create_ok(self): + from google.logging.v2.logging_config_pb2 import LogSink + gax_api = _GAXSinksAPI() + api = self._makeOne(gax_api) + + api.sink_create( + self.PROJECT, self.SINK_NAME, self.FILTER, self.DESTINATION_URI) + + parent, sink, options = ( + gax_api._create_sink_called_with) + self.assertEqual(parent, self.PROJECT_PATH) + self.assertTrue(isinstance(sink, LogSink)) + self.assertEqual(sink.name, self.SINK_PATH) + self.assertEqual(sink.filter, self.FILTER) + self.assertEqual(sink.destination, self.DESTINATION_URI) + self.assertEqual(options, None) + + def test_sink_get_error(self): + from gcloud.exceptions import NotFound + gax_api = _GAXSinksAPI() + api = self._makeOne(gax_api) + + with self.assertRaises(NotFound): + api.sink_get(self.PROJECT, self.SINK_NAME) + + def test_sink_get_miss(self): + from google.gax.errors import GaxError + gax_api = _GAXSinksAPI(_random_gax_error=True) + api = self._makeOne(gax_api) + + with self.assertRaises(GaxError): + api.sink_get(self.PROJECT, self.SINK_NAME) + + def test_sink_get_hit(self): + RESPONSE = { + 'name': self.SINK_PATH, + 'filter': self.FILTER, + 'destination': self.DESTINATION_URI, + } + sink_pb = _LogSinkPB( + self.SINK_PATH, self.DESTINATION_URI, self.FILTER) + gax_api = _GAXSinksAPI(_get_sink_response=sink_pb) + api = self._makeOne(gax_api) + + response = api.sink_get(self.PROJECT, self.SINK_NAME) + + self.assertEqual(response, RESPONSE) + + sink_name, options = gax_api._get_sink_called_with + self.assertEqual(sink_name, self.SINK_PATH) + self.assertEqual(options, None) + + def test_sink_update_error(self): + from google.gax.errors import GaxError + gax_api = _GAXSinksAPI(_random_gax_error=True) + api = self._makeOne(gax_api) + + with self.assertRaises(GaxError): + api.sink_update( + self.PROJECT, self.SINK_NAME, self.FILTER, + self.DESTINATION_URI) + + def test_sink_update_miss(self): + from gcloud.exceptions import NotFound + gax_api = _GAXSinksAPI() + api = self._makeOne(gax_api) + + with self.assertRaises(NotFound): + api.sink_update( + self.PROJECT, self.SINK_NAME, self.FILTER, + self.DESTINATION_URI) + + def test_sink_update_hit(self): + from google.logging.v2.logging_config_pb2 import LogSink + response = _LogSinkPB( + self.SINK_NAME, self.FILTER, self.DESTINATION_URI) + gax_api = _GAXSinksAPI(_update_sink_response=response) + api = self._makeOne(gax_api) + + api.sink_update( + self.PROJECT, self.SINK_NAME, self.FILTER, self.DESTINATION_URI) + + sink_name, sink, options = ( + gax_api._update_sink_called_with) + self.assertEqual(sink_name, self.SINK_PATH) + self.assertTrue(isinstance(sink, LogSink)) + self.assertEqual(sink.name, self.SINK_PATH) + self.assertEqual(sink.filter, self.FILTER) + self.assertEqual(sink.destination, self.DESTINATION_URI) + self.assertEqual(options, None) + + def test_sink_delete_error(self): + from google.gax.errors import GaxError + gax_api = _GAXSinksAPI(_random_gax_error=True) + api = self._makeOne(gax_api) + + with self.assertRaises(GaxError): + api.sink_delete(self.PROJECT, self.SINK_NAME) + + def test_sink_delete_miss(self): + from gcloud.exceptions import NotFound + gax_api = _GAXSinksAPI(_sink_not_found=True) + api = self._makeOne(gax_api) + + with self.assertRaises(NotFound): + api.sink_delete(self.PROJECT, self.SINK_NAME) + + def test_sink_delete_hit(self): + gax_api = _GAXSinksAPI() + api = self._makeOne(gax_api) + + api.sink_delete(self.PROJECT, self.SINK_NAME) + + sink_name, options = gax_api._delete_sink_called_with + self.assertEqual(sink_name, self.SINK_PATH) + self.assertEqual(options, None) + + +class _GAXBaseAPI(object): + + _random_gax_error = False def __init__(self, **kw): self.__dict__.update(kw) + def _make_grpc_error(self, status_code): + from grpc.framework.interfaces.face.face import AbortionError + + class _DummyException(AbortionError): + code = status_code + + def __init__(self): + pass + + return _DummyException() + + def _make_grpc_not_found(self): + from grpc.beta.interfaces import StatusCode + return self._make_grpc_error(StatusCode.NOT_FOUND) + + def _make_grpc_failed_precondition(self): + from grpc.beta.interfaces import StatusCode + return self._make_grpc_error(StatusCode.FAILED_PRECONDITION) + + +class _GAXLoggingAPI(_GAXBaseAPI): + def list_log_entries( self, projects, filter_, order_by, page_size, options): self._list_log_entries_called_with = ( @@ -415,6 +641,52 @@ def delete_log(self, log_name, options): self._delete_log_called_with = log_name, options +class _GAXSinksAPI(_GAXBaseAPI): + + _create_sink_conflict = False + _sink_not_found = False + + def list_sinks(self, parent, page_size, options): + self._list_sinks_called_with = parent, page_size, options + return self._list_sinks_response + + def create_sink(self, parent, sink, options): + from google.gax.errors import GaxError + self._create_sink_called_with = parent, sink, options + if self._random_gax_error: + raise GaxError('error') + if self._create_sink_conflict: + raise GaxError('conflict', self._make_grpc_failed_precondition()) + + def get_sink(self, sink_name, options): + from google.gax.errors import GaxError + self._get_sink_called_with = sink_name, options + if self._random_gax_error: + raise GaxError('error') + try: + return self._get_sink_response + except AttributeError: + raise GaxError('notfound', self._make_grpc_not_found()) + + def update_sink(self, sink_name, sink, options=None): + from google.gax.errors import GaxError + self._update_sink_called_with = sink_name, sink, options + if self._random_gax_error: + raise GaxError('error') + try: + return self._update_sink_response + except AttributeError: + raise GaxError('notfound', self._make_grpc_not_found()) + + def delete_sink(self, sink_name, options=None): + from google.gax.errors import GaxError + self._delete_sink_called_with = sink_name, options + if self._random_gax_error: + raise GaxError('error') + if self._sink_not_found: + raise GaxError('notfound', self._make_grpc_not_found()) + + class _HTTPRequestPB(object): request_url = 'http://example.com/requested' @@ -455,3 +727,11 @@ def _make_timestamp(): from gcloud.logging.test_entries import _datetime_to_rfc3339_w_nanos NOW = datetime.utcnow().replace(tzinfo=UTC) return _datetime_to_rfc3339_w_nanos(NOW) + + +class _LogSinkPB(object): + + def __init__(self, name, destination, filter_): + self.name = name + self.destination = destination + self.filter = filter_ diff --git a/gcloud/logging/test_connection.py b/gcloud/logging/test_connection.py index e63642fdd7b2..89cda74efe0b 100644 --- a/gcloud/logging/test_connection.py +++ b/gcloud/logging/test_connection.py @@ -338,7 +338,7 @@ def test_sink_get_miss(self): def test_sink_get_hit(self): RESPONSE = { - 'name': self.SINK_NAME, + 'name': self.SINK_PATH, 'filter': self.FILTER, 'destination': self.DESTINATION_URI, }