diff --git a/system_tests/pubsub.py b/system_tests/pubsub.py index 2f3d3295373c..30d0fd5de3d9 100644 --- a/system_tests/pubsub.py +++ b/system_tests/pubsub.py @@ -151,6 +151,7 @@ def _all_created(result): self.assertEqual(len(created), len(subscriptions_to_create)) def test_message_pull_mode_e2e(self): + import operator topic = Config.CLIENT.topic(DEFAULT_TOPIC_NAME, timestamp_messages=True) self.assertFalse(topic.exists()) @@ -169,15 +170,24 @@ def test_message_pull_mode_e2e(self): topic.publish(MESSAGE_1, extra=EXTRA_1) topic.publish(MESSAGE_2, extra=EXTRA_2) - received = subscription.pull(max_messages=2) - ack_ids = [recv[0] for recv in received] - subscription.acknowledge(ack_ids) - messages = [recv[1] for recv in received] + class Hoover(object): - def _by_timestamp(message): - return message.timestamp + def __init__(self): + self.received = [] - message1, message2 = sorted(messages, key=_by_timestamp) + def done(self, *dummy): + return len(self.received) == 2 + + def suction(self): + with subscription.auto_ack(max_messages=2) as ack: + self.received.extend(ack.values()) + + hoover = Hoover() + retry = RetryInstanceState(hoover.done) + retry(hoover.suction)() + + message1, message2 = sorted(hoover.received, + key=operator.attrgetter('timestamp')) self.assertEqual(message1.data, MESSAGE_1) self.assertEqual(message1.attributes['extra'], EXTRA_1) self.assertEqual(message2.data, MESSAGE_2) @@ -243,6 +253,7 @@ def test_subscription_iam_policy(self): self.assertEqual(new_policy.viewers, policy.viewers) def test_fetch_delete_subscription_w_deleted_topic(self): + from gcloud.iterator import MethodIterator TO_DELETE = 'delete-me' + unique_resource_id('-') ORPHANED = 'orphaned' + unique_resource_id('-') topic = Config.CLIENT.topic(TO_DELETE) @@ -251,13 +262,15 @@ def test_fetch_delete_subscription_w_deleted_topic(self): subscription.create() topic.delete() - all_subs = [] - token = None - while True: - subs, token = Config.CLIENT.list_subscriptions(page_token=token) - all_subs.extend(subs) - if token is None: - break + def _fetch(): + return list(MethodIterator(Config.CLIENT.list_subscriptions)) + + def _found_orphan(result): + names = [subscription.name for subscription in result] + return ORPHANED in names + + retry_until_found_orphan = RetryResult(_found_orphan) + all_subs = retry_until_found_orphan(_fetch)() created = [subscription for subscription in all_subs if subscription.name == ORPHANED] @@ -267,8 +280,9 @@ def test_fetch_delete_subscription_w_deleted_topic(self): def _no_topic(instance): return instance.topic is None - retry = RetryInstanceState(_no_topic, max_tries=6) - retry(orphaned.reload)() + # Wait for the topic to clear: up to 63 seconds (2 ** 6 - 1) + retry_until_no_topic = RetryInstanceState(_no_topic, max_tries=7) + retry_until_no_topic(orphaned.reload)() self.assertTrue(orphaned.topic is None) orphaned.delete()