diff --git a/gcloud/_testing.py b/gcloud/_testing.py index 15ef5dd298e1..0a440e817436 100644 --- a/gcloud/_testing.py +++ b/gcloud/_testing.py @@ -59,17 +59,3 @@ def __init__(self, items, page_token): def next(self): items, self._items = self._items, None return items - - -class _GAXBundlingEvent(object): - - result = None - - def __init__(self, result): - self._result = result - - def is_set(self): - return self.result is not None - - def wait(self, *_): - self.result = self._result diff --git a/gcloud/pubsub/_gax.py b/gcloud/pubsub/_gax.py index 0639833feb73..28ac6c23e294 100644 --- a/gcloud/pubsub/_gax.py +++ b/gcloud/pubsub/_gax.py @@ -162,17 +162,17 @@ def topic_publish(self, topic_path, messages): :raises: :exc:`gcloud.exceptions.NotFound` if the topic does not exist """ + options = CallOptions(is_bundling=False) message_pbs = [_message_pb_from_dict(message) for message in messages] try: - event = self._gax_api.publish(topic_path, message_pbs) - if not event.is_set(): - event.wait() + result = self._gax_api.publish(topic_path, message_pbs, + options=options) except GaxError as exc: if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: raise NotFound(topic_path) raise - return event.result.message_ids + return result.message_ids def topic_list_subscriptions(self, topic_path, page_size=0, page_token=None): diff --git a/gcloud/pubsub/test__gax.py b/gcloud/pubsub/test__gax.py index d285cb6e3260..2426d2dfb7e8 100644 --- a/gcloud/pubsub/test__gax.py +++ b/gcloud/pubsub/test__gax.py @@ -204,15 +204,12 @@ def test_topic_delete_error(self): def test_topic_publish_hit(self): import base64 - from gcloud._testing import _GAXBundlingEvent PAYLOAD = b'This is the message text' B64 = base64.b64encode(PAYLOAD).decode('ascii') MSGID = 'DEADBEEF' MESSAGE = {'data': B64, 'attributes': {}} response = _PublishResponsePB([MSGID]) - event = _GAXBundlingEvent(response) - event.wait() # already received result - gax_api = _GAXPublisherAPI(_publish_response=event) + gax_api = _GAXPublisherAPI(_publish_response=response) api = self._makeOne(gax_api) resource = api.topic_publish(self.TOPIC_PATH, [MESSAGE]) @@ -223,29 +220,7 @@ def test_topic_publish_hit(self): message_pb, = message_pbs self.assertEqual(message_pb.data, B64) self.assertEqual(message_pb.attributes, {}) - self.assertEqual(options, None) - - def test_topic_publish_hit_with_wait(self): - import base64 - from gcloud._testing import _GAXBundlingEvent - PAYLOAD = b'This is the message text' - B64 = base64.b64encode(PAYLOAD).decode('ascii') - MSGID = 'DEADBEEF' - MESSAGE = {'data': B64, 'attributes': {}} - response = _PublishResponsePB([MSGID]) - event = _GAXBundlingEvent(response) - gax_api = _GAXPublisherAPI(_publish_response=event) - api = self._makeOne(gax_api) - - resource = api.topic_publish(self.TOPIC_PATH, [MESSAGE]) - - self.assertEqual(resource, [MSGID]) - topic_path, message_pbs, options = gax_api._publish_called_with - self.assertEqual(topic_path, self.TOPIC_PATH) - message_pb, = message_pbs - self.assertEqual(message_pb.data, B64) - self.assertEqual(message_pb.attributes, {}) - self.assertEqual(options, None) + self.assertEqual(options.is_bundling, False) def test_topic_publish_miss_w_attrs_w_bytes_payload(self): import base64 @@ -264,7 +239,7 @@ def test_topic_publish_miss_w_attrs_w_bytes_payload(self): message_pb, = message_pbs self.assertEqual(message_pb.data, B64) self.assertEqual(message_pb.attributes, {'foo': 'bar'}) - self.assertEqual(options, None) + self.assertEqual(options.is_bundling, False) def test_topic_publish_error(self): import base64 @@ -283,7 +258,7 @@ def test_topic_publish_error(self): message_pb, = message_pbs self.assertEqual(message_pb.data, B64) self.assertEqual(message_pb.attributes, {}) - self.assertEqual(options, None) + self.assertEqual(options.is_bundling, False) def test_topic_list_subscriptions_no_paging(self): from google.gax import INITIAL_PAGE