Skip to content
7 changes: 5 additions & 2 deletions pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ def __init__(self, client, topic, settings, autocommit=True):
# These members are all communicated between threads; ensure that
# any writes to them use the "state lock" to remain atomic.
self._futures = []
# _futures list should remain unchanged after batch
# status changed from ACCEPTING_MESSAGES to any other
# in order to avoid race conditions
self._messages = []
self._size = 0
self._status = base.BatchStatus.ACCEPTING_MESSAGES
Expand Down Expand Up @@ -138,13 +141,13 @@ def status(self):
return self._status

def wait(self):
"""If commit in progress, waits until all of the futures resolved.
"""If commit is in progress, waits until all of the futures resolved.

.. note::

This method blocks until all futures of this batch resolved.
"""
if self._status in (base.BatchStatus.STARTING, base.BatchStatus.IN_PROGRESS):
if self._status != base.BatchStatus.ACCEPTING_MESSAGES:
for future in self._futures:
future.result()

Expand Down