Skip to content
4 changes: 1 addition & 3 deletions pubsub/google/cloud/pubsub_v1/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ def running(self):
bool: ``True`` if this method has not yet completed, or
``False`` if it has completed.
"""
if self.done():
return False
return True
return not self.done()

def done(self):
"""Return True the future is done, False otherwise.
Expand Down
11 changes: 11 additions & 0 deletions pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,17 @@ def status(self):
"""
return self._status

def wait(self):
"""If commit in progress, waits until all of the futures resolved.

.. note::

This method blocks until all futures of this batch resolved.
"""
if self._status in (base.BatchStatus.STARTING, base.BatchStatus.IN_PROGRESS):
for future in self._futures:
future.result()

def commit(self):
"""Actually publish all of the messages on the active batch.

Expand Down
19 changes: 19 additions & 0 deletions pubsub/google/cloud/pubsub_v1/publisher/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,3 +274,22 @@ def publish(self, topic, data, **attrs):
batch = self._batch(topic, create=True)

return future

def stop(self):
"""Immediately publish all outstanding batches.

This asynchronously pushes all outstanding messages
and waits until all futures resolved. Method should be
invoked prior to deleting this Client object in order
to ensure that no pending messages are lost.

.. note::

This method blocks until all futures of all
batches resolved.
"""
for topic in self._batches:
self._batches[topic].commit()

for topic in self._batches:
self._batches[topic].wait()
25 changes: 25 additions & 0 deletions pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,31 @@ def test_block__commmit_api_error():
assert future.exception() == error


def test_wait():
batch = create_batch()
with mock.patch(
"google.cloud.pubsub_v1.publisher.futures.Future.result", return_value=True
) as result_mock:
futures = (batch.publish({"data": b"msg"}),)

batch._status = BatchStatus.IN_PROGRESS
futures[0]._completed.set()
batch.wait()

result_mock.assert_called()


def test_wait_not_started():
batch = create_batch()
with mock.patch(
"google.cloud.pubsub_v1.publisher.futures.Future.result", return_value=True
) as result_mock:
batch.publish({"data": b"msg"})
batch.wait()

result_mock.assert_not_called()


def test_block__commmit_retry_error():
batch = create_batch()
futures = (
Expand Down
29 changes: 29 additions & 0 deletions pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,35 @@ def test_publish_attrs_type_error():
client.publish(topic, b"foo", answer=42)


def test_stop():
creds = mock.Mock(spec=credentials.Credentials)
client = publisher.Client(credentials=creds)

batch = client._batch("topic1", autocommit=False)
batch2 = client._batch("topic2", autocommit=False)

pubsub_msg = types.PubsubMessage(data=b"msg")

cp = mock.patch.object(batch, "commit")
wp = mock.patch.object(batch, "wait")
cp2 = mock.patch.object(batch2, "commit")
wp2 = mock.patch.object(batch2, "wait")

with cp as c_mock, cp2 as c_mock2, wp as w_mock, wp2 as w_mock2:
batch.publish(pubsub_msg)
batch2.publish(pubsub_msg)

client.stop()

# check if commit() called
c_mock.assert_called()
c_mock2.assert_called()

# check if wait() called
w_mock.assert_called()
w_mock2.assert_called()


def test_gapic_instance_method():
creds = mock.Mock(spec=credentials.Credentials)
client = publisher.Client(credentials=creds)
Expand Down