diff --git a/gcloud/_testing.py b/gcloud/_testing.py index 185af9557423..15ef5dd298e1 100644 --- a/gcloud/_testing.py +++ b/gcloud/_testing.py @@ -48,3 +48,28 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): import os os.remove(self.name) + + +class _GAXPageIterator(object): + + def __init__(self, items, page_token): + self._items = items + self.page_token = page_token + + def next(self): + items, self._items = self._items, None + return items + + +class _GAXBundlingEvent(object): + + result = None + + def __init__(self, result): + self._result = result + + def is_set(self): + return self.result is not None + + def wait(self, *_): + self.result = self._result diff --git a/gcloud/logging/_gax.py b/gcloud/logging/_gax.py new file mode 100644 index 000000000000..05f00a3d22a2 --- /dev/null +++ b/gcloud/logging/_gax.py @@ -0,0 +1,252 @@ +# Copyright 2016 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""GAX wrapper for Logging API requests.""" + +import json + +# pylint: disable=import-error +from google.gax import CallOptions +from google.gax import INITIAL_PAGE +from google.logging.type.log_severity_pb2 import LogSeverity +from google.logging.v2.log_entry_pb2 import LogEntry +from google.protobuf.json_format import Parse +# pylint: enable=import-error + +from gcloud._helpers import _datetime_to_pb_timestamp + + +class _LoggingAPI(object): + """Helper mapping logging-related APIs. + + :type gax_api: :class:`google.logging.v2.logging_api.LoggingApi` + :param gax_api: API object used to make GAX requests. + """ + def __init__(self, gax_api): + self._gax_api = gax_api + + def list_entries(self, projects, filter_='', order_by='', + page_size=0, page_token=None): + """Return a page of log entry resources. + + :type projects: list of strings + :param projects: project IDs to include. If not passed, + defaults to the project bound to the API's client. + + :type filter_: str + :param filter_: a filter expression. See: + https://cloud.google.com/logging/docs/view/advanced_filters + + :type order_by: str + :param order_by: One of :data:`gcloud.logging.ASCENDING` or + :data:`gcloud.logging.DESCENDING`. + + :type page_size: int + :param page_size: maximum number of entries to return, If not passed, + defaults to a value set by the API. + + :type page_token: str + :param page_token: opaque marker for the next "page" of entries. If not + passed, the API will return the first page of + entries. + + :rtype: tuple, (list, str) + :returns: list of mappings, plus a "next page token" string: + if not None, indicates that more entries can be retrieved + with another call (pass that value as ``page_token``). + """ + options = _build_paging_options(page_token) + page_iter = self._gax_api.list_log_entries( + projects, filter_, order_by, page_size, options) + entries = [_log_entry_pb_to_mapping(entry_pb) + for entry_pb in page_iter.next()] + token = page_iter.page_token or None + return entries, token + + def write_entries(self, entries, logger_name=None, resource=None, + labels=None): + """API call: log an entry resource via a POST request + + :type entries: sequence of mapping + :param entries: the log entry resources to log. + + :type logger_name: string + :param logger_name: name of default logger to which to log the entries; + individual entries may override. + + :type resource: mapping + :param resource: default resource to associate with entries; + individual entries may override. + + :type labels: mapping + :param labels: default labels to associate with entries; + individual entries may override. + """ + options = None + partial_success = False + entry_pbs = [_log_entry_mapping_to_pb(entry) for entry in entries] + self._gax_api.write_log_entries(entry_pbs, logger_name, resource, + labels, partial_success, options) + + def logger_delete(self, project, logger_name): + """API call: delete all entries in a logger via a DELETE request + + :type project: string + :param project: ID of project containing the log entries to delete + + :type logger_name: string + :param logger_name: name of logger containing the log entries to delete + """ + options = None + path = 'projects/%s/logs/%s' % (project, logger_name) + self._gax_api.delete_log(path, options) + + +def _build_paging_options(page_token=None): + """Helper for :meth:'_PublisherAPI.list_topics' et aliae.""" + if page_token is None: + page_token = INITIAL_PAGE + options = {'page_token': page_token} + return CallOptions(**options) + + +def _log_entry_pb_to_mapping(entry_pb): + """Helper for :meth:`list_entries`, et aliae + + Ideally, would use a function from :mod:`protobuf.json_format`, but + the right one isn't public. See: + https://github.com/google/protobuf/issues/1351 + """ + mapping = { + 'log_name': entry_pb.log_name, + 'resource': entry_pb.resource, + 'severity': entry_pb.severity, + 'insert_id': entry_pb.insert_id, + 'timestamp': entry_pb.timestamp, + 'labels': entry_pb.labels, + 'text_payload': entry_pb.text_payload, + 'json_payload': entry_pb.json_payload, + 'proto_payload': entry_pb.proto_payload, + } + + if entry_pb.http_request: + request = entry_pb.http_request + mapping['http_request'] = { + 'request_method': request.request_method, + 'request_url': request.request_url, + 'status': request.status, + 'referer': request.referer, + 'user_agent': request.user_agent, + 'cache_hit': request.cache_hit, + 'request_size': request.request_size, + 'response_size': request.response_size, + 'remote_ip': request.remote_ip, + } + + if entry_pb.operation: + operation = entry_pb.operation + mapping['operation'] = { + 'producer': operation.producer, + 'id': operation.id, + 'first': operation.first, + 'last': operation.last, + } + + return mapping + + +def _http_request_mapping_to_pb(info, request): + """Helper for _log_entry_mapping_to_pb""" + optional_request_keys = ( + 'request_method', + 'request_url', + 'status', + 'referer', + 'user_agent', + 'cache_hit', + 'request_size', + 'response_size', + 'remote_ip', + ) + for key in optional_request_keys: + if key in info: + setattr(request, key, info[key]) + + +def _log_operation_mapping_to_pb(info, operation): + """Helper for _log_entry_mapping_to_pb""" + operation.producer = info['producer'] + operation.id = info['id'] + + if 'first' in info: + operation.first = info['first'] + + if 'last' in info: + operation.last = info['last'] + + +def _log_entry_mapping_to_pb(mapping): + """Helper for :meth:`write_entries`, et aliae + + Ideally, would use a function from :mod:`protobuf.json_format`, but + the right one isn't public. See: + https://github.com/google/protobuf/issues/1351 + """ + # pylint: disable=too-many-branches + entry_pb = LogEntry() + + optional_scalar_keys = ( + 'log_name', + 'insert_id', + 'text_payload', + ) + + for key in optional_scalar_keys: + if key in mapping: + setattr(entry_pb, key, mapping[key]) + + if 'resource' in mapping: + entry_pb.resource.type = mapping['resource']['type'] + + if 'severity' in mapping: + severity = mapping['severity'] + if isinstance(severity, str): + severity = LogSeverity.Value(severity) + entry_pb.severity = severity + + if 'timestamp' in mapping: + timestamp = _datetime_to_pb_timestamp(mapping['timestamp']) + entry_pb.timestamp.CopyFrom(timestamp) + + if 'labels' in mapping: + for key, value in mapping['labels'].items(): + entry_pb.labels[key] = value + + if 'json_payload' in mapping: + for key, value in mapping['json_payload'].items(): + entry_pb.json_payload[key] = value + + if 'proto_payload' in mapping: + Parse(json.dumps(mapping['proto_payload']), entry_pb.proto_payload) + + if 'http_request' in mapping: + _http_request_mapping_to_pb( + mapping['http_request'], entry_pb.http_request) + + if 'operation' in mapping: + _log_operation_mapping_to_pb( + mapping['operation'], entry_pb.operation) + + return entry_pb + # pylint: enable=too-many-branches diff --git a/gcloud/logging/test__gax.py b/gcloud/logging/test__gax.py new file mode 100644 index 000000000000..89169e5dd674 --- /dev/null +++ b/gcloud/logging/test__gax.py @@ -0,0 +1,457 @@ +# Copyright 2016 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest2 + + +try: + # pylint: disable=unused-import + import gcloud.pubsub._gax + # pylint: enable=unused-import +except ImportError: # pragma: NO COVER + _HAVE_GAX = False +else: + _HAVE_GAX = True + + +class _Base(object): + PROJECT = 'PROJECT' + PROJECT_PATH = 'projects/%s' % (PROJECT,) + LIST_SINKS_PATH = '%s/sinks' % (PROJECT_PATH,) + SINK_NAME = 'sink_name' + SINK_PATH = 'projects/%s/sinks/%s' % (PROJECT, SINK_NAME) + + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + +@unittest2.skipUnless(_HAVE_GAX, 'No gax-python') +class Test_LoggingAPI(_Base, unittest2.TestCase): + LOG_NAME = 'log_name' + FILTER = 'logName:syslog AND severity>=ERROR' + + def _getTargetClass(self): + from gcloud.logging._gax import _LoggingAPI + return _LoggingAPI + + def test_ctor(self): + gax_api = _GAXLoggingAPI() + api = self._makeOne(gax_api) + self.assertTrue(api._gax_api is gax_api) + + def test_list_entries_no_paging(self): + from google.gax import INITIAL_PAGE + from gcloud.logging import DESCENDING + from gcloud._testing import _GAXPageIterator + TOKEN = 'TOKEN' + TEXT = 'TEXT' + response = _GAXPageIterator( + [_LogEntryPB(self.LOG_NAME, text_payload=TEXT)], TOKEN) + gax_api = _GAXLoggingAPI(_list_log_entries_response=response) + api = self._makeOne(gax_api) + + entries, next_token = api.list_entries( + [self.PROJECT], self.FILTER, DESCENDING) + + self.assertEqual(len(entries), 1) + entry = entries[0] + self.assertIsInstance(entry, dict) + self.assertEqual(entry['log_name'], self.LOG_NAME) + self.assertEqual(entry['resource'], {'type': 'global'}) + self.assertEqual(entry['text_payload'], TEXT) + self.assertEqual(next_token, TOKEN) + + projects, filter_, order_by, page_size, options = ( + gax_api._list_log_entries_called_with) + self.assertEqual(projects, [self.PROJECT]) + self.assertEqual(filter_, self.FILTER) + self.assertEqual(order_by, DESCENDING) + self.assertEqual(page_size, 0) + self.assertTrue(options.page_token is INITIAL_PAGE) + + def test_list_entries_with_paging(self): + from gcloud._testing import _GAXPageIterator + SIZE = 23 + TOKEN = 'TOKEN' + NEW_TOKEN = 'NEW_TOKEN' + PAYLOAD = {'message': 'MESSAGE', 'weather': 'sunny'} + response = _GAXPageIterator( + [_LogEntryPB(self.LOG_NAME, json_payload=PAYLOAD)], NEW_TOKEN) + gax_api = _GAXLoggingAPI(_list_log_entries_response=response) + api = self._makeOne(gax_api) + + entries, next_token = api.list_entries( + [self.PROJECT], page_size=SIZE, page_token=TOKEN) + + self.assertEqual(len(entries), 1) + entry = entries[0] + self.assertIsInstance(entry, dict) + self.assertEqual(entry['log_name'], self.LOG_NAME) + self.assertEqual(entry['resource'], {'type': 'global'}) + self.assertEqual(entry['json_payload'], PAYLOAD) + self.assertEqual(next_token, NEW_TOKEN) + + projects, filter_, order_by, page_size, options = ( + gax_api._list_log_entries_called_with) + self.assertEqual(projects, [self.PROJECT]) + self.assertEqual(filter_, '') + self.assertEqual(order_by, '') + self.assertEqual(page_size, SIZE) + self.assertEqual(options.page_token, TOKEN) + + def test_list_entries_with_extra_properties(self): + from gcloud._testing import _GAXPageIterator + SIZE = 23 + TOKEN = 'TOKEN' + NEW_TOKEN = 'NEW_TOKEN' + PAYLOAD = {'message': 'MESSAGE', 'weather': 'sunny'} + SEVERITY = 'WARNING' + LABELS = { + 'foo': 'bar', + } + IID = 'IID' + request = _HTTPRequestPB() + operation = _LogEntryOperationPB() + EXTRAS = { + 'severity': SEVERITY, + 'labels': LABELS, + 'insert_id': IID, + 'http_request': request, + 'operation': operation, + } + ENTRY = _LogEntryPB(self.LOG_NAME, proto_payload=PAYLOAD, **EXTRAS) + response = _GAXPageIterator([ENTRY], NEW_TOKEN) + gax_api = _GAXLoggingAPI(_list_log_entries_response=response) + api = self._makeOne(gax_api) + + entries, next_token = api.list_entries( + [self.PROJECT], page_size=SIZE, page_token=TOKEN) + + self.assertEqual(len(entries), 1) + entry = entries[0] + self.assertIsInstance(entry, dict) + self.assertEqual(entry['log_name'], self.LOG_NAME) + self.assertEqual(entry['resource'], {'type': 'global'}) + self.assertEqual(entry['proto_payload'], PAYLOAD) + self.assertEqual(entry['severity'], SEVERITY) + self.assertEqual(entry['labels'], LABELS) + self.assertEqual(entry['insert_id'], IID) + EXPECTED_REQUEST = { + 'request_method': request.request_method, + 'request_url': request.request_url, + 'status': request.status, + 'request_size': request.request_size, + 'response_size': request.response_size, + 'referer': request.referer, + 'user_agent': request.user_agent, + 'remote_ip': request.remote_ip, + 'cache_hit': request.cache_hit, + } + self.assertEqual(entry['http_request'], EXPECTED_REQUEST) + EXPECTED_OPERATION = { + 'producer': operation.producer, + 'id': operation.id, + 'first': operation.first, + 'last': operation.last, + } + self.assertEqual(entry['operation'], EXPECTED_OPERATION) + self.assertEqual(next_token, NEW_TOKEN) + + projects, filter_, order_by, page_size, options = ( + gax_api._list_log_entries_called_with) + self.assertEqual(projects, [self.PROJECT]) + self.assertEqual(filter_, '') + self.assertEqual(order_by, '') + self.assertEqual(page_size, SIZE) + self.assertEqual(options.page_token, TOKEN) + + def test_write_entries_single(self): + from google.logging.v2.log_entry_pb2 import LogEntry + TEXT = 'TEXT' + LOG_PATH = 'projects/%s/logs/%s' % (self.PROJECT, self.LOG_NAME) + ENTRY = { + 'log_name': LOG_PATH, + 'resource': {'type': 'global'}, + 'text_payload': TEXT, + } + gax_api = _GAXLoggingAPI() + api = self._makeOne(gax_api) + + api.write_entries([ENTRY]) + + entries, log_name, resource, labels, partial_success, options = ( + gax_api._write_log_entries_called_with) + self.assertEqual(len(entries), 1) + + entry = entries[0] + self.assertTrue(isinstance(entry, LogEntry)) + self.assertEqual(entry.log_name, LOG_PATH) + self.assertEqual(entry.resource.type, 'global') + self.assertEqual(entry.labels, {}) + self.assertEqual(entry.text_payload, TEXT) + + self.assertEqual(log_name, None) + self.assertEqual(resource, None) + self.assertEqual(labels, None) + self.assertEqual(partial_success, False) + self.assertEqual(options, None) + + def test_write_entries_w_extra_properties(self): + # pylint: disable=too-many-statements + from datetime import datetime + from google.logging.type.log_severity_pb2 import WARNING + from google.logging.v2.log_entry_pb2 import LogEntry + from gcloud._helpers import UTC, _pb_timestamp_to_datetime + NOW = datetime.utcnow().replace(tzinfo=UTC) + TEXT = 'TEXT' + LOG_PATH = 'projects/%s/logs/%s' % (self.PROJECT, self.LOG_NAME) + SEVERITY = 'WARNING' + LABELS = { + 'foo': 'bar', + } + IID = 'IID' + REQUEST_METHOD = 'GET' + REQUEST_URL = 'http://example.com/requested' + STATUS = 200 + REQUEST_SIZE = 256 + RESPONSE_SIZE = 1024 + REFERRER_URL = 'http://example.com/referer' + USER_AGENT = 'Agent/1.0' + REMOTE_IP = '1.2.3.4' + REQUEST = { + 'request_method': REQUEST_METHOD, + 'request_url': REQUEST_URL, + 'status': STATUS, + 'request_size': REQUEST_SIZE, + 'response_size': RESPONSE_SIZE, + 'referer': REFERRER_URL, + 'user_agent': USER_AGENT, + 'remote_ip': REMOTE_IP, + 'cache_hit': False, + } + PRODUCER = 'PRODUCER' + OPID = 'OPID' + OPERATION = { + 'producer': PRODUCER, + 'id': OPID, + 'first': False, + 'last': True, + } + ENTRY = { + 'log_name': LOG_PATH, + 'resource': {'type': 'global'}, + 'text_payload': TEXT, + 'severity': SEVERITY, + 'labels': LABELS, + 'insert_id': IID, + 'timestamp': NOW, + 'http_request': REQUEST, + 'operation': OPERATION, + } + gax_api = _GAXLoggingAPI() + api = self._makeOne(gax_api) + + api.write_entries([ENTRY]) + + entries, log_name, resource, labels, partial_success, options = ( + gax_api._write_log_entries_called_with) + self.assertEqual(len(entries), 1) + + entry = entries[0] + self.assertTrue(isinstance(entry, LogEntry)) + self.assertEqual(entry.log_name, LOG_PATH) + self.assertEqual(entry.resource.type, 'global') + self.assertEqual(entry.text_payload, TEXT) + self.assertEqual(entry.severity, WARNING) + self.assertEqual(entry.labels, LABELS) + self.assertEqual(entry.insert_id, IID) + stamp = _pb_timestamp_to_datetime(entry.timestamp) + self.assertEqual(stamp, NOW) + + request = entry.http_request + self.assertEqual(request.request_method, REQUEST_METHOD) + self.assertEqual(request.request_url, REQUEST_URL) + self.assertEqual(request.status, STATUS) + self.assertEqual(request.request_size, REQUEST_SIZE) + self.assertEqual(request.response_size, RESPONSE_SIZE) + self.assertEqual(request.referer, REFERRER_URL) + self.assertEqual(request.user_agent, USER_AGENT) + self.assertEqual(request.remote_ip, REMOTE_IP) + self.assertEqual(request.cache_hit, False) + + operation = entry.operation + self.assertEqual(operation.producer, PRODUCER) + self.assertEqual(operation.id, OPID) + self.assertFalse(operation.first) + self.assertTrue(operation.last) + + self.assertEqual(log_name, None) + self.assertEqual(resource, None) + self.assertEqual(labels, None) + self.assertEqual(partial_success, False) + self.assertEqual(options, None) + # pylint: enable=too-many-statements + + def test_write_entries_multiple(self): + # pylint: disable=too-many-statements + from google.logging.type.log_severity_pb2 import WARNING + from google.logging.v2.log_entry_pb2 import LogEntry + from google.protobuf.any_pb2 import Any + from google.protobuf.struct_pb2 import Struct + TEXT = 'TEXT' + TIMESTAMP = _LogEntryPB._make_timestamp() + TIMESTAMP_TYPE_URL = 'type.googleapis.com/google.protobuf.Timestamp' + JSON = {'payload': 'PAYLOAD', 'type': 'json'} + PROTO = { + '@type': TIMESTAMP_TYPE_URL, + 'value': TIMESTAMP, + } + PRODUCER = 'PRODUCER' + OPID = 'OPID' + URL = 'http://example.com/' + ENTRIES = [ + {'text_payload': TEXT, + 'severity': WARNING}, + {'json_payload': JSON, + 'operation': {'producer': PRODUCER, 'id': OPID}}, + {'proto_payload': PROTO, + 'http_request': {'request_url': URL}}, + ] + LOG_PATH = 'projects/%s/logs/%s' % (self.PROJECT, self.LOG_NAME) + RESOURCE = { + 'type': 'global', + } + LABELS = { + 'foo': 'bar', + } + gax_api = _GAXLoggingAPI() + api = self._makeOne(gax_api) + + api.write_entries(ENTRIES, LOG_PATH, RESOURCE, LABELS) + + entries, log_name, resource, labels, partial_success, options = ( + gax_api._write_log_entries_called_with) + self.assertEqual(len(entries), len(ENTRIES)) + + entry = entries[0] + self.assertTrue(isinstance(entry, LogEntry)) + self.assertEqual(entry.log_name, '') + self.assertEqual(entry.resource.type, '') + self.assertEqual(entry.labels, {}) + self.assertEqual(entry.text_payload, TEXT) + self.assertEqual(entry.severity, WARNING) + + entry = entries[1] + self.assertTrue(isinstance(entry, LogEntry)) + self.assertEqual(entry.log_name, '') + self.assertEqual(entry.resource.type, '') + self.assertEqual(entry.labels, {}) + json_struct = entry.json_payload + self.assertTrue(isinstance(json_struct, Struct)) + self.assertEqual(json_struct.fields['payload'].string_value, + JSON['payload']) + operation = entry.operation + self.assertEqual(operation.producer, PRODUCER) + self.assertEqual(operation.id, OPID) + + entry = entries[2] + self.assertTrue(isinstance(entry, LogEntry)) + self.assertEqual(entry.log_name, '') + self.assertEqual(entry.resource.type, '') + self.assertEqual(entry.labels, {}) + proto = entry.proto_payload + self.assertTrue(isinstance(proto, Any)) + self.assertEqual(proto.type_url, TIMESTAMP_TYPE_URL) + request = entry.http_request + self.assertEqual(request.request_url, URL) + + self.assertEqual(log_name, LOG_PATH) + self.assertEqual(resource, RESOURCE) + self.assertEqual(labels, LABELS) + self.assertEqual(partial_success, False) + self.assertEqual(options, None) + # pylint: enable=too-many-statements + + def test_logger_delete(self): + LOG_PATH = 'projects/%s/logs/%s' % (self.PROJECT, self.LOG_NAME) + gax_api = _GAXLoggingAPI() + api = self._makeOne(gax_api) + + api.logger_delete(self.PROJECT, self.LOG_NAME) + + log_name, options = gax_api._delete_log_called_with + self.assertEqual(log_name, LOG_PATH) + self.assertEqual(options, None) + + +class _GAXLoggingAPI(object): + + def __init__(self, **kw): + self.__dict__.update(kw) + + def list_log_entries( + self, projects, filter_, order_by, page_size, options): + self._list_log_entries_called_with = ( + projects, filter_, order_by, page_size, options) + return self._list_log_entries_response + + def write_log_entries(self, entries, log_name, resource, labels, + partial_success, options): + self._write_log_entries_called_with = ( + entries, log_name, resource, labels, partial_success, options) + + def delete_log(self, log_name, options): + self._delete_log_called_with = log_name, options + + +class _HTTPRequestPB(object): + + request_url = 'http://example.com/requested' + request_method = 'GET' + status = 200 + referer = 'http://example.com/referer' + user_agent = 'AGENT' + cache_hit = False + request_size = 256 + response_size = 1024 + remote_ip = '1.2.3.4' + + +class _LogEntryOperationPB(object): + + producer = 'PRODUCER' + first = last = False + id = 'OPID' + + +class _LogEntryPB(object): + + severity = 'DEFAULT' + http_request = operation = insert_id = None + text_payload = json_payload = proto_payload = None + + def __init__(self, log_name, **kw): + self.log_name = log_name + self.resource = {'type': 'global'} + self.timestamp = self._make_timestamp() + self.labels = kw.pop('labels', {}) + self.__dict__.update(kw) + + @staticmethod + def _make_timestamp(): + from datetime import datetime + from gcloud._helpers import UTC + from gcloud.logging.test_entries import _datetime_to_rfc3339_w_nanos + NOW = datetime.utcnow().replace(tzinfo=UTC) + return _datetime_to_rfc3339_w_nanos(NOW) diff --git a/gcloud/pubsub/test__gax.py b/gcloud/pubsub/test__gax.py index bcf636586ab0..d285cb6e3260 100644 --- a/gcloud/pubsub/test__gax.py +++ b/gcloud/pubsub/test__gax.py @@ -53,8 +53,9 @@ def test_ctor(self): def test_list_topics_no_paging(self): from google.gax import INITIAL_PAGE + from gcloud._testing import _GAXPageIterator TOKEN = 'TOKEN' - response = _PageIterator([_TopicPB(self.TOPIC_PATH)], TOKEN) + response = _GAXPageIterator([_TopicPB(self.TOPIC_PATH)], TOKEN) gax_api = _GAXPublisherAPI(_list_topics_response=response) api = self._makeOne(gax_api) @@ -72,10 +73,11 @@ def test_list_topics_no_paging(self): self.assertTrue(options.page_token is INITIAL_PAGE) def test_list_topics_with_paging(self): + from gcloud._testing import _GAXPageIterator SIZE = 23 TOKEN = 'TOKEN' NEW_TOKEN = 'NEW_TOKEN' - response = _PageIterator( + response = _GAXPageIterator( [_TopicPB(self.TOPIC_PATH)], NEW_TOKEN) gax_api = _GAXPublisherAPI(_list_topics_response=response) api = self._makeOne(gax_api) @@ -202,12 +204,13 @@ def test_topic_delete_error(self): def test_topic_publish_hit(self): import base64 + from gcloud._testing import _GAXBundlingEvent PAYLOAD = b'This is the message text' B64 = base64.b64encode(PAYLOAD).decode('ascii') MSGID = 'DEADBEEF' MESSAGE = {'data': B64, 'attributes': {}} response = _PublishResponsePB([MSGID]) - event = _Event(response) + event = _GAXBundlingEvent(response) event.wait() # already received result gax_api = _GAXPublisherAPI(_publish_response=event) api = self._makeOne(gax_api) @@ -224,12 +227,13 @@ def test_topic_publish_hit(self): def test_topic_publish_hit_with_wait(self): import base64 + from gcloud._testing import _GAXBundlingEvent PAYLOAD = b'This is the message text' B64 = base64.b64encode(PAYLOAD).decode('ascii') MSGID = 'DEADBEEF' MESSAGE = {'data': B64, 'attributes': {}} response = _PublishResponsePB([MSGID]) - event = _Event(response) + event = _GAXBundlingEvent(response) gax_api = _GAXPublisherAPI(_publish_response=event) api = self._makeOne(gax_api) @@ -283,7 +287,8 @@ def test_topic_publish_error(self): def test_topic_list_subscriptions_no_paging(self): from google.gax import INITIAL_PAGE - response = _PageIterator([ + from gcloud._testing import _GAXPageIterator + response = _GAXPageIterator([ {'name': self.SUB_PATH, 'topic': self.TOPIC_PATH}], None) gax_api = _GAXPublisherAPI(_list_topic_subscriptions_response=response) api = self._makeOne(gax_api) @@ -305,10 +310,11 @@ def test_topic_list_subscriptions_no_paging(self): self.assertTrue(options.page_token is INITIAL_PAGE) def test_topic_list_subscriptions_with_paging(self): + from gcloud._testing import _GAXPageIterator SIZE = 23 TOKEN = 'TOKEN' NEW_TOKEN = 'NEW_TOKEN' - response = _PageIterator([ + response = _GAXPageIterator([ {'name': self.SUB_PATH, 'topic': self.TOPIC_PATH}], NEW_TOKEN) gax_api = _GAXPublisherAPI(_list_topic_subscriptions_response=response) api = self._makeOne(gax_api) @@ -376,7 +382,8 @@ def test_ctor(self): def test_list_subscriptions_no_paging(self): from google.gax import INITIAL_PAGE - response = _PageIterator([_SubscriptionPB( + from gcloud._testing import _GAXPageIterator + response = _GAXPageIterator([_SubscriptionPB( self.SUB_PATH, self.TOPIC_PATH, self.PUSH_ENDPOINT, 0)], None) gax_api = _GAXSubscriberAPI(_list_subscriptions_response=response) api = self._makeOne(gax_api) @@ -399,10 +406,11 @@ def test_list_subscriptions_no_paging(self): self.assertTrue(options.page_token is INITIAL_PAGE) def test_list_subscriptions_with_paging(self): + from gcloud._testing import _GAXPageIterator SIZE = 23 TOKEN = 'TOKEN' NEW_TOKEN = 'NEW_TOKEN' - response = _PageIterator([_SubscriptionPB( + response = _GAXPageIterator([_SubscriptionPB( self.SUB_PATH, self.TOPIC_PATH, self.PUSH_ENDPOINT, 0)], NEW_TOKEN) gax_api = _GAXSubscriberAPI(_list_subscriptions_response=response) api = self._makeOne(gax_api) @@ -913,31 +921,6 @@ def modify_ack_deadline(self, name, ack_ids, deadline, options=None): raise GaxError('miss', self._make_grpc_not_found()) -class _PageIterator(object): - - def __init__(self, items, page_token): - self._items = items - self.page_token = page_token - - def next(self): - items, self._items = self._items, None - return items - - -class _Event(object): - - result = None - - def __init__(self, result): - self._result = result - - def is_set(self): - return self.result is not None - - def wait(self, *_): - self.result = self._result - - class _TopicPB(object): def __init__(self, name):