Skip to content

Commit f40c398

Browse files
committed
Add support for auto-acknowledging pulled messages.
Follows @tmatsuo's suggested implementation in: #798 (comment) Closes #798.
1 parent 98edc64 commit f40c398

3 files changed

Lines changed: 177 additions & 0 deletions

File tree

docs/pubsub-usage.rst

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,3 +272,30 @@ Fetch messages for a pull subscription without blocking (none pending):
272272
>>> messages = [recv[1] for recv in received]
273273
>>> [message.message_id for message in messages]
274274
[]
275+
276+
Fetch pending messages, acknowledging those whose processing doesn't raise an
277+
error:
278+
279+
.. doctest::
280+
281+
>>> from gcloud import pubsub
282+
>>> client = pubsub.Client()
283+
>>> topic = client.topic('topic_name')
284+
>>> subscription = topic.subscription('subscription_name')
285+
>>> with topic.batch() as batch:
286+
... batch.publish('this is the first message_payload')
287+
... batch.publish('this is the second message_payload',
288+
... attr1='value1', attr2='value2')
289+
>>> from gcloud.pubsub.subscription import AutoAck
290+
>>> with AutoAck(subscription, max_messages=10) as ack:
291+
... for ack_id, message in ack.items():
292+
... try:
293+
... do_something_with(message)
294+
... except:
295+
... del ack[ack_id]
296+
297+
.. note::
298+
299+
The ``pull`` API request occurs at entry to the ``with`` block, and the
300+
``acknowlege`` API request occurs at the end, passing only the ``ack_ids``
301+
which haven't been deleted from ``ack``

gcloud/pubsub/subscription.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,3 +263,51 @@ def delete(self, client=None):
263263
"""
264264
client = self._require_client(client)
265265
client.connection.api_request(method='DELETE', path=self.path)
266+
267+
268+
class AutoAck(dict):
269+
"""Wrapper for :meth:`Subscription.pull` results.
270+
271+
Mapping, tracks messages still-to-be-acknowledged.
272+
273+
When used as a context manager, acknowledges all messages still in the
274+
mapping on `__exit__`. E.g.:
275+
276+
.. code-block: python
277+
278+
with AutoAck(subscription) as ack: # calls ``subscription.pull``
279+
for ack_id, message in ack.items():
280+
try:
281+
do_something_with(message):
282+
except:
283+
del ack[ack_id]
284+
285+
:type subscription: :class:`Subscription`
286+
:param subscription: subcription to be pulled.
287+
288+
:type return_immediately: boolean
289+
:param return_immediately: passed through to :meth:`Subscription.pull`
290+
291+
:type max_messages: int
292+
:param max_messages: passed through to :meth:`Subscription.pull`
293+
294+
:type client: :class:`gcloud.pubsub.client.Client` or ``NoneType``
295+
:param client: passed through to :meth:`Subscription.pull` and
296+
:meth:`Subscription.acknowledge`.
297+
"""
298+
def __init__(self, subscription,
299+
return_immediately=False, max_messages=1, client=None):
300+
super(AutoAck, self).__init__()
301+
self._subscription = subscription
302+
self._return_immediately = return_immediately
303+
self._max_messages = max_messages
304+
self._client = client
305+
306+
def __enter__(self):
307+
items = self._subscription.pull(
308+
self._return_immediately, self._max_messages, self._client)
309+
self.update(items)
310+
return self
311+
312+
def __exit__(self, exc_type, exc_val, exc_tb):
313+
self._subscription.acknowledge(list(self), self._client)

gcloud/pubsub/test_subscription.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,81 @@ def test_delete_w_alternate_client(self):
485485
self.assertEqual(req['path'], '/%s' % SUB_PATH)
486486

487487

488+
class TestAutoAck(unittest2.TestCase):
489+
490+
def _getTargetClass(self):
491+
from gcloud.pubsub.subscription import AutoAck
492+
return AutoAck
493+
494+
def _makeOne(self, *args, **kw):
495+
return self._getTargetClass()(*args, **kw)
496+
497+
def test_ctor_defaults(self):
498+
subscription = _FauxSubscription(())
499+
auto_ack = self._makeOne(subscription)
500+
self.assertEqual(auto_ack._return_immediately, False)
501+
self.assertEqual(auto_ack._max_messages, 1)
502+
self.assertTrue(auto_ack._client is None)
503+
504+
def test_ctor_explicit(self):
505+
CLIENT = object()
506+
subscription = _FauxSubscription(())
507+
auto_ack = self._makeOne(
508+
subscription, return_immediately=True, max_messages=10,
509+
client=CLIENT)
510+
self.assertTrue(auto_ack._subscription is subscription)
511+
self.assertEqual(auto_ack._return_immediately, True)
512+
self.assertEqual(auto_ack._max_messages, 10)
513+
self.assertTrue(auto_ack._client is CLIENT)
514+
515+
def test___enter___w_defaults(self):
516+
subscription = _FauxSubscription(())
517+
auto_ack = self._makeOne(subscription)
518+
519+
with auto_ack as returned:
520+
pass
521+
522+
self.assertTrue(returned is auto_ack)
523+
self.assertEqual(subscription._return_immediately, False)
524+
self.assertEqual(subscription._max_messages, 1)
525+
self.assertTrue(subscription._client is None)
526+
527+
def test___enter___w_explicit(self):
528+
CLIENT = object()
529+
subscription = _FauxSubscription(())
530+
auto_ack = self._makeOne(
531+
subscription, return_immediately=True, max_messages=10,
532+
client=CLIENT)
533+
534+
with auto_ack as returned:
535+
pass
536+
537+
self.assertTrue(returned is auto_ack)
538+
self.assertEqual(subscription._return_immediately, True)
539+
self.assertEqual(subscription._max_messages, 10)
540+
self.assertTrue(subscription._client is CLIENT)
541+
542+
def test___exit___(self):
543+
CLIENT = object()
544+
ACK_ID1, MESSAGE1 = 'ACK_ID1', _FallibleMessage()
545+
ACK_ID2, MESSAGE2 = 'ACK_ID2', _FallibleMessage()
546+
ACK_ID3, MESSAGE3 = 'ACK_ID3', _FallibleMessage(True)
547+
ITEMS = [
548+
(ACK_ID1, MESSAGE1),
549+
(ACK_ID2, MESSAGE2),
550+
(ACK_ID3, MESSAGE3),
551+
]
552+
subscription = _FauxSubscription(ITEMS)
553+
auto_ack = self._makeOne(subscription, client=CLIENT)
554+
with auto_ack:
555+
for ack_id, message in list(auto_ack.items()):
556+
if message.fail:
557+
del auto_ack[ack_id]
558+
self.assertEqual(sorted(subscription._acknowledged),
559+
[ACK_ID1, ACK_ID2])
560+
self.assertTrue(subscription._ack_client is CLIENT)
561+
562+
488563
class _Connection(object):
489564

490565
def __init__(self, *responses):
@@ -522,3 +597,30 @@ def __init__(self, project, connection=None):
522597
def topic(self, name, timestamp_messages=False):
523598
from gcloud.pubsub.topic import Topic
524599
return Topic(name, client=self, timestamp_messages=timestamp_messages)
600+
601+
602+
class _FallibleMessage(object):
603+
604+
def __init__(self, fail=False):
605+
self.fail = fail
606+
607+
608+
class _FauxSubscription(object):
609+
610+
def __init__(self, items):
611+
self._items = items
612+
self._mapping = dict(items)
613+
self._acknowledged = set()
614+
615+
def pull(self, return_immediately=False, max_messages=1, client=None):
616+
self._return_immediately = return_immediately
617+
self._max_messages = max_messages
618+
self._client = client
619+
return self._items
620+
621+
def acknowledge(self, ack_ids, client=None):
622+
self._ack_client = client
623+
for ack_id in ack_ids:
624+
message = self._mapping[ack_id]
625+
assert not message.fail
626+
self._acknowledged.add(ack_id)

0 commit comments

Comments
 (0)