Skip to content

Commit 0dbccac

Browse files
authored
Refactor Pub / Sub callback queue (#4511)
* Renaming `QueueCallbackThread` -> `QueueCallbackWorker`. Also fixing a few typos nearby a mention of `QueueCallbackThread` in `pubsub/CHANGELOG.md`. * Making Policy.on_callback_request() less open-ended.
1 parent e0ce8c5 commit 0dbccac

File tree

6 files changed

+77
-35
lines changed

6 files changed

+77
-35
lines changed

pubsub/CHANGELOG.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
names (#4476).
1818
- Logging changes
1919
- Adding debug logs when lease management exits (#4484)
20-
- Adding debug logs when hen `QueueCallbackThread` exits (#4494).
21-
Instances handle theprocessing of messages in a
20+
- Adding debug logs when `QueueCallbackThread` exits (#4494).
21+
Instances handle the processing of messages in a
2222
subscription (e.g. to `ack`).
2323
- Using a named logger in `publisher.batch.thread` (#4473)
2424
- Adding newlines before logging protobuf payloads (#4471)

pubsub/google/cloud/pubsub_v1/subscriber/_consumer.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,21 +96,21 @@
9696
"gRPC C Core" -> "gRPC Python" [label="queue", dir="both"]
9797
"gRPC Python" -> "Consumer" [label="responses", color="red"]
9898
"Consumer" -> "request generator thread" [label="starts", color="gray"]
99-
"Policy" -> "QueueCallbackThread" [label="starts", color="gray"]
99+
"Policy" -> "QueueCallbackWorker" [label="starts", color="gray"]
100100
"request generator thread" -> "gRPC Python"
101101
[label="requests", color="blue"]
102102
"Consumer" -> "Policy" [label="responses", color="red"]
103103
"Policy" -> "futures.Executor" [label="response", color="red"]
104104
"futures.Executor" -> "callback" [label="response", color="red"]
105105
"callback" -> "callback_request_queue" [label="requests", color="blue"]
106-
"callback_request_queue" -> "QueueCallbackThread"
106+
"callback_request_queue" -> "QueueCallbackWorker"
107107
[label="consumed by", color="blue"]
108-
"QueueCallbackThread" -> "Consumer"
108+
"QueueCallbackWorker" -> "Consumer"
109109
[label="send_response", color="blue"]
110110
}
111111
112112
This part is actually up to the Policy to enable. The consumer just provides a
113-
thread-safe queue for requests. The :cls:`QueueCallbackThread` can be used by
113+
thread-safe queue for requests. The :cls:`QueueCallbackWorker` can be used by
114114
the Policy implementation to spin up the worker thread to pump the
115115
concurrency-safe queue. See the Pub/Sub subscriber implementation for an
116116
example of this.

pubsub/google/cloud/pubsub_v1/subscriber/_helper_threads.py

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
__all__ = (
2323
'HelperThreadRegistry',
24-
'QueueCallbackThread',
24+
'QueueCallbackWorker',
2525
'STOP',
2626
)
2727

@@ -125,14 +125,9 @@ def stop_all(self):
125125
self.stop(name)
126126

127127

128-
class QueueCallbackThread(object):
128+
class QueueCallbackWorker(object):
129129
"""A helper that executes a callback for every item in the queue.
130130
131-
.. note::
132-
133-
This is not actually a thread, but it is intended to be a target
134-
for a thread.
135-
136131
Calls a blocking ``get()`` on the ``queue`` until it encounters
137132
:attr:`STOP`.
138133
@@ -141,8 +136,10 @@ class QueueCallbackThread(object):
141136
concurrency boundary implemented by ``executor``. Items will
142137
be popped off (with a blocking ``get()``) until :attr:`STOP`
143138
is encountered.
144-
callback (Callable): A callback that can process items pulled off
145-
of the queue.
139+
callback (Callable[[str, Dict], Any]): A callback that can process
140+
items pulled off of the queue. Items are assumed to be a pair
141+
of a method name to be invoked and a dictionary of keyword
142+
arguments for that method.
146143
"""
147144

148145
def __init__(self, queue, callback):
@@ -153,12 +150,13 @@ def __call__(self):
153150
while True:
154151
item = self.queue.get()
155152
if item == STOP:
156-
_LOGGER.debug('Exiting the QueueCallbackThread.')
153+
_LOGGER.debug('Exiting the QueueCallbackWorker.')
157154
return
158155

159156
# Run the callback. If any exceptions occur, log them and
160157
# continue.
161158
try:
162-
self._callback(item)
159+
action, kwargs = item
160+
self._callback(action, kwargs)
163161
except Exception as exc:
164162
_LOGGER.error('%s: %s', exc.__class__.__name__, exc)

pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,9 @@ def __init__(self, client, subscription, flow_control=types.FlowControl(),
117117
)
118118
self._executor = executor
119119
_LOGGER.debug('Creating callback requests thread (not starting).')
120-
self._callback_requests = _helper_threads.QueueCallbackThread(
120+
self._callback_requests = _helper_threads.QueueCallbackWorker(
121121
self._request_queue,
122-
self.on_callback_request,
122+
self.dispatch_callback,
123123
)
124124

125125
def close(self):
@@ -180,10 +180,33 @@ def open(self, callback):
180180
# Return the future.
181181
return self._future
182182

183-
def on_callback_request(self, callback_request):
184-
"""Map the callback request to the appropriate gRPC request."""
185-
action, kwargs = callback_request[0], callback_request[1]
186-
getattr(self, action)(**kwargs)
183+
def dispatch_callback(self, action, kwargs):
184+
"""Map the callback request to the appropriate gRPC request.
185+
186+
Args:
187+
action (str): The method to be invoked.
188+
kwargs (Dict[str, Any]): The keyword arguments for the method
189+
specified by ``action``.
190+
191+
Raises:
192+
ValueError: If ``action`` isn't one of the expected actions
193+
"ack", "drop", "lease", "modify_ack_deadline" or "nack".
194+
"""
195+
if action == 'ack':
196+
self.ack(**kwargs)
197+
elif action == 'drop':
198+
self.drop(**kwargs)
199+
elif action == 'lease':
200+
self.lease(**kwargs)
201+
elif action == 'modify_ack_deadline':
202+
self.modify_ack_deadline(**kwargs)
203+
elif action == 'nack':
204+
self.nack(**kwargs)
205+
else:
206+
raise ValueError(
207+
'Unexpected action', action,
208+
'Must be one of "ack", "drop", "lease", '
209+
'"modify_ack_deadline" or "nack".')
187210

188211
def on_exception(self, exception):
189212
"""Handle the exception.

pubsub/tests/unit/pubsub_v1/subscriber/test_helper_threads.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -117,33 +117,35 @@ def test_stop_all_noop():
117117
assert len(registry._helper_threads) == 0
118118

119119

120-
def test_queue_callback_thread():
120+
def test_queue_callback_worker():
121121
queue_ = queue.Queue()
122122
callback = mock.Mock(spec=())
123-
qct = _helper_threads.QueueCallbackThread(queue_, callback)
123+
qct = _helper_threads.QueueCallbackWorker(queue_, callback)
124124

125125
# Set up an appropriate mock for the queue, and call the queue callback
126126
# thread.
127127
with mock.patch.object(queue.Queue, 'get') as get:
128-
get.side_effect = (mock.sentinel.A, _helper_threads.STOP)
128+
item1 = ('action', mock.sentinel.A)
129+
get.side_effect = (item1, _helper_threads.STOP)
129130
qct()
130131

131132
# Assert that we got the expected calls.
132133
assert get.call_count == 2
133-
callback.assert_called_once_with(mock.sentinel.A)
134+
callback.assert_called_once_with('action', mock.sentinel.A)
134135

135136

136-
def test_queue_callback_thread_exception():
137+
def test_queue_callback_worker_exception():
137138
queue_ = queue.Queue()
138139
callback = mock.Mock(spec=(), side_effect=(Exception,))
139-
qct = _helper_threads.QueueCallbackThread(queue_, callback)
140+
qct = _helper_threads.QueueCallbackWorker(queue_, callback)
140141

141142
# Set up an appropriate mock for the queue, and call the queue callback
142143
# thread.
143144
with mock.patch.object(queue.Queue, 'get') as get:
144-
get.side_effect = (mock.sentinel.A, _helper_threads.STOP)
145+
item1 = ('action', mock.sentinel.A)
146+
get.side_effect = (item1, _helper_threads.STOP)
145147
qct()
146148

147149
# Assert that we got the expected calls.
148150
assert get.call_count == 2
149-
callback.assert_called_once_with(mock.sentinel.A)
151+
callback.assert_called_once_with('action', mock.sentinel.A)

pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,30 @@ def test_open(thread_start, htr_start):
8181
thread_start.assert_called()
8282

8383

84-
def test_on_callback_request():
84+
def test_dispatch_callback_valid_actions():
8585
policy = create_policy()
86-
with mock.patch.object(policy, 'call_rpc') as call_rpc:
87-
policy.on_callback_request(('call_rpc', {'something': 42}))
88-
call_rpc.assert_called_once_with(something=42)
86+
kwargs = {'foo': 10, 'bar': 13.37}
87+
actions = (
88+
'ack',
89+
'drop',
90+
'lease',
91+
'modify_ack_deadline',
92+
'nack',
93+
)
94+
for action in actions:
95+
with mock.patch.object(policy, action) as mocked:
96+
policy.dispatch_callback(action, kwargs)
97+
mocked.assert_called_once_with(**kwargs)
98+
99+
100+
def test_dispatch_callback_invalid_action():
101+
policy = create_policy()
102+
with pytest.raises(ValueError) as exc_info:
103+
policy.dispatch_callback('gecko', {})
104+
105+
assert len(exc_info.value.args) == 3
106+
assert exc_info.value.args[0] == 'Unexpected action'
107+
assert exc_info.value.args[1] == 'gecko'
89108

90109

91110
def test_on_exception_deadline_exceeded():

0 commit comments

Comments
 (0)