From 4d930b7fea0eaa1220fb3722a9d76d56e199a50f Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Thu, 11 Aug 2016 09:15:38 -0400 Subject: [PATCH 1/6] Wait even longer for orphaned subscr topic to clear. See #2080. --- system_tests/pubsub.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/system_tests/pubsub.py b/system_tests/pubsub.py index 2f3d3295373c..8e8cbc636030 100644 --- a/system_tests/pubsub.py +++ b/system_tests/pubsub.py @@ -267,7 +267,8 @@ def test_fetch_delete_subscription_w_deleted_topic(self): def _no_topic(instance): return instance.topic is None - retry = RetryInstanceState(_no_topic, max_tries=6) + # Wait for the topic to clear: up to 63 seconds (2 ** 8 - 1) + retry = RetryInstanceState(_no_topic, max_tries=7) retry(orphaned.reload)() self.assertTrue(orphaned.topic is None) From 535e67263abd65fa9ecfcbf833bfd0dec45f3038 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Thu, 11 Aug 2016 09:27:28 -0400 Subject: [PATCH 2/6] Retry fetching subscriptions until orphan is found. Toward #2079. --- system_tests/pubsub.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/system_tests/pubsub.py b/system_tests/pubsub.py index 8e8cbc636030..4011dbb9e715 100644 --- a/system_tests/pubsub.py +++ b/system_tests/pubsub.py @@ -243,6 +243,7 @@ def test_subscription_iam_policy(self): self.assertEqual(new_policy.viewers, policy.viewers) def test_fetch_delete_subscription_w_deleted_topic(self): + from gcloud.iterator import MethodIterator TO_DELETE = 'delete-me' + unique_resource_id('-') ORPHANED = 'orphaned' + unique_resource_id('-') topic = Config.CLIENT.topic(TO_DELETE) @@ -251,13 +252,15 @@ def test_fetch_delete_subscription_w_deleted_topic(self): subscription.create() topic.delete() - all_subs = [] - token = None - while True: - subs, token = Config.CLIENT.list_subscriptions(page_token=token) - all_subs.extend(subs) - if token is None: - break + def _fetch(): + return list(MethodIterator(Config.CLIENT.list_subscriptions)) + + def _found_orphan(result): + return any(subscription for subscription in result + if subscription.name == ORPHANED) + + retry_until_found_orphan = RetryResult(_found_orphan) + all_subs = retry_until_found_orphan(_fetch)() created = [subscription for subscription in all_subs if subscription.name == ORPHANED] @@ -268,8 +271,8 @@ def _no_topic(instance): return instance.topic is None # Wait for the topic to clear: up to 63 seconds (2 ** 8 - 1) - retry = RetryInstanceState(_no_topic, max_tries=7) - retry(orphaned.reload)() + retry_until_no_topic = RetryInstanceState(_no_topic, max_tries=7) + retry_until_no_topic(orphaned.reload)() self.assertTrue(orphaned.topic is None) orphaned.delete() From 7028bb463fd47f20fbeeaa7a278c3b0ed0733fe1 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Thu, 11 Aug 2016 10:01:19 -0400 Subject: [PATCH 3/6] Add retries for pulling messages in pubsub E2E test. Toward #2077. --- system_tests/pubsub.py | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/system_tests/pubsub.py b/system_tests/pubsub.py index 4011dbb9e715..bb97c6e174ce 100644 --- a/system_tests/pubsub.py +++ b/system_tests/pubsub.py @@ -169,15 +169,26 @@ def test_message_pull_mode_e2e(self): topic.publish(MESSAGE_1, extra=EXTRA_1) topic.publish(MESSAGE_2, extra=EXTRA_2) - received = subscription.pull(max_messages=2) - ack_ids = [recv[0] for recv in received] - subscription.acknowledge(ack_ids) - messages = [recv[1] for recv in received] + class Hoover(object): + + def __init__(self): + self.received = [] + + def done(self, *dummy): + return len(self.received) == 2 + + def suction(self): + with subscription.auto_ack(max_messages=2) as ack: + self.received.extend(ack.values()) + + hoover = Hoover() + retry = RetryInstanceState(hoover.done) + retry(hoover.suction)() def _by_timestamp(message): return message.timestamp - message1, message2 = sorted(messages, key=_by_timestamp) + message1, message2 = sorted(hoover.received, key=_by_timestamp) self.assertEqual(message1.data, MESSAGE_1) self.assertEqual(message1.attributes['extra'], EXTRA_1) self.assertEqual(message2.data, MESSAGE_2) @@ -257,7 +268,7 @@ def _fetch(): def _found_orphan(result): return any(subscription for subscription in result - if subscription.name == ORPHANED) + if subscription.name == ORPHANED) retry_until_found_orphan = RetryResult(_found_orphan) all_subs = retry_until_found_orphan(_fetch)() From 47202560a052562ef891abd86923b59bd0100f74 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Thu, 11 Aug 2016 10:07:44 -0400 Subject: [PATCH 4/6] Use more idiomatic key function. --- system_tests/pubsub.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/system_tests/pubsub.py b/system_tests/pubsub.py index bb97c6e174ce..74533c0e6d2b 100644 --- a/system_tests/pubsub.py +++ b/system_tests/pubsub.py @@ -151,6 +151,7 @@ def _all_created(result): self.assertEqual(len(created), len(subscriptions_to_create)) def test_message_pull_mode_e2e(self): + import operator topic = Config.CLIENT.topic(DEFAULT_TOPIC_NAME, timestamp_messages=True) self.assertFalse(topic.exists()) @@ -185,10 +186,8 @@ def suction(self): retry = RetryInstanceState(hoover.done) retry(hoover.suction)() - def _by_timestamp(message): - return message.timestamp - - message1, message2 = sorted(hoover.received, key=_by_timestamp) + message1, message2 = sorted(hoover.received, + key=operator.attrgetter('timestamp')) self.assertEqual(message1.data, MESSAGE_1) self.assertEqual(message1.attributes['extra'], EXTRA_1) self.assertEqual(message2.data, MESSAGE_2) From 768e62d81dec463d2c34d562b96238ee83c3b6fa Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Thu, 11 Aug 2016 17:43:27 -0400 Subject: [PATCH 5/6] Clarify the intent of the '_found_orphaned' predicate. --- system_tests/pubsub.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/system_tests/pubsub.py b/system_tests/pubsub.py index 74533c0e6d2b..62097008f2ca 100644 --- a/system_tests/pubsub.py +++ b/system_tests/pubsub.py @@ -266,8 +266,8 @@ def _fetch(): return list(MethodIterator(Config.CLIENT.list_subscriptions)) def _found_orphan(result): - return any(subscription for subscription in result - if subscription.name == ORPHANED) + names = [subscription.name for subscription in result] + return ORPHANED in names retry_until_found_orphan = RetryResult(_found_orphan) all_subs = retry_until_found_orphan(_fetch)() From 40784049c61fc2712d4130377494e5437b725088 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Thu, 11 Aug 2016 20:11:58 -0400 Subject: [PATCH 6/6] Typo fix. --- system_tests/pubsub.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/system_tests/pubsub.py b/system_tests/pubsub.py index 62097008f2ca..30d0fd5de3d9 100644 --- a/system_tests/pubsub.py +++ b/system_tests/pubsub.py @@ -280,7 +280,7 @@ def _found_orphan(result): def _no_topic(instance): return instance.topic is None - # Wait for the topic to clear: up to 63 seconds (2 ** 8 - 1) + # Wait for the topic to clear: up to 63 seconds (2 ** 6 - 1) retry_until_no_topic = RetryInstanceState(_no_topic, max_tries=7) retry_until_no_topic(orphaned.reload)()