Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pubsub/google/cloud/pubsub_v1/publisher/_batch/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,12 @@ def messages(self):
def size(self):
"""Return the total size of all of the messages currently in the batch.

The size includes any overhead of the actual ``PublishRequest`` that is
sent to the backend.

Returns:
int: The total size of all of the messages currently
in the batch, in bytes.
in the batch (including the request overhead), in bytes.
"""
raise NotImplementedError

Expand Down
31 changes: 24 additions & 7 deletions pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

_LOGGER = logging.getLogger(__name__)
_CAN_COMMIT = (base.BatchStatus.ACCEPTING_MESSAGES, base.BatchStatus.STARTING)
_SERVER_PUBLISH_MAX_BYTES = 10 * 1000 * 1000 # max accepted size of PublishRequest


class Batch(base.Batch):
Expand Down Expand Up @@ -79,9 +80,13 @@ def __init__(self, client, topic, settings, autocommit=True):
# in order to avoid race conditions
self._futures = []
self._messages = []
self._size = 0
self._status = base.BatchStatus.ACCEPTING_MESSAGES

# The initial size is not zero, we need to account for the size overhead
# of the PublishRequest message itself.
self._base_request_size = types.PublishRequest(topic=topic).ByteSize()
self._size = self._base_request_size

# If max latency is specified, start a thread to monitor the batch and
# commit when the max latency is reached.
self._thread = None
Expand Down Expand Up @@ -124,9 +129,12 @@ def settings(self):
def size(self):
"""Return the total size of all of the messages currently in the batch.

The size includes any overhead of the actual ``PublishRequest`` that is
sent to the backend.

Returns:
int: The total size of all of the messages currently
in the batch, in bytes.
in the batch (including the request overhead), in bytes.
"""
return self._size

Expand Down Expand Up @@ -292,12 +300,21 @@ def publish(self, message):
if not self.will_accept(message):
return future

new_size = self._size + message.ByteSize()
size_increase = types.PublishRequest(messages=[message]).ByteSize()

if (self._base_request_size + size_increase) > _SERVER_PUBLISH_MAX_BYTES:
err_msg = (
"The message being published would produce too large a publish "
"request that would exceed the maximum allowed size on the "
"backend ({} bytes).".format(_SERVER_PUBLISH_MAX_BYTES)
)
raise ValueError(err_msg)

new_size = self._size + size_increase
new_count = len(self._messages) + 1
overflow = (
new_size > self.settings.max_bytes
or new_count >= self._settings.max_messages
)

size_limit = min(self.settings.max_bytes, _SERVER_PUBLISH_MAX_BYTES)
overflow = new_size > size_limit or new_count >= self._settings.max_messages

if not self._messages or not overflow:

Expand Down
4 changes: 3 additions & 1 deletion pubsub/google/cloud/pubsub_v1/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@
BatchSettings.__doc__ = "The settings for batch publishing the messages."
BatchSettings.max_bytes.__doc__ = (
"The maximum total size of the messages to collect before automatically "
"publishing the batch."
"publishing the batch, including any byte size overhead of the publish "
"request itself. The maximum value is bound by the server-side limit of "
"10_000_000 bytes."
)
BatchSettings.max_latency.__doc__ = (
"The maximum number of seconds to wait for additional messages before "
Expand Down
31 changes: 31 additions & 0 deletions pubsub/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,37 @@ def test_publish_messages(publisher, topic_path, cleanup):
assert isinstance(result, six.string_types)


def test_publish_large_messages(publisher, topic_path, cleanup):
futures = []
# Make sure the topic gets deleted.
cleanup.append((publisher.delete_topic, topic_path))

# Each message should be smaller than 10**7 bytes (the server side limit for
# PublishRequest), but all messages combined in a PublishRequest should
# slightly exceed that threshold to make sure the publish code handles these
# cases well.
# Mind that the total PublishRequest size must still be smaller than
# 10 * 1024 * 1024 bytes in order to not exceed the max request body size limit.
msg_data = b"x" * (2 * 10 ** 6)

publisher.batch_settings = types.BatchSettings(
max_bytes=11 * 1000 * 1000, # more than the server limit of 10 ** 7
max_latency=2.0, # so that autocommit happens after publishing all messages
max_messages=100,
)
publisher.create_topic(topic_path)

for index in six.moves.range(5):
futures.append(publisher.publish(topic_path, msg_data, num=str(index)))

# If the publishing logic correctly split all messages into more than a
# single batch despite a high BatchSettings.max_bytes limit, there should
# be no "InvalidArgument: request_size is too large" error.
for future in futures:
result = future.result(timeout=10)
assert isinstance(result, six.string_types) # the message ID


def test_subscribe_to_messages(
publisher, topic_path, subscriber, subscription_path, cleanup
):
Expand Down
73 changes: 63 additions & 10 deletions pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ def create_client():
return publisher.Client(credentials=creds)


def create_batch(autocommit=False, **batch_settings):
def create_batch(autocommit=False, topic="topic_name", **batch_settings):
"""Return a batch object suitable for testing.

Args:
autocommit (bool): Whether the batch should commit after
``max_latency`` seconds. By default, this is ``False``
for unit testing.
autocommit (topic): The name of the topic the batch should publish
the messages to.
batch_settings (dict): Arguments passed on to the
:class:``~.pubsub_v1.types.BatchSettings`` constructor.

Expand All @@ -49,7 +51,7 @@ def create_batch(autocommit=False, **batch_settings):
"""
client = create_client()
settings = types.BatchSettings(**batch_settings)
return Batch(client, "topic_name", settings, autocommit=autocommit)
return Batch(client, topic, settings, autocommit=autocommit)


def test_init():
Expand Down Expand Up @@ -299,8 +301,8 @@ def test_monitor_already_committed():
assert batch._status == status


def test_publish():
batch = create_batch()
def test_publish_updating_batch_size():
batch = create_batch(topic="topic_foo")
messages = (
types.PubsubMessage(data=b"foobarbaz"),
types.PubsubMessage(data=b"spameggs"),
Expand All @@ -314,22 +316,28 @@ def test_publish():
assert len(batch.messages) == 3
assert batch._futures == futures

# The size should have been incremented by the sum of the size of the
# messages.
expected_size = sum([message_pb.ByteSize() for message_pb in messages])
assert batch.size == expected_size
# The size should have been incremented by the sum of the size
# contributions of each message to the PublishRequest.
base_request_size = types.PublishRequest(topic="topic_foo").ByteSize()
msg_size_overheads = [
types.PublishRequest(messages=[msg]).ByteSize() for msg in messages
]

expected_request_size = base_request_size + sum(msg_size_overheads)
assert batch.size == expected_request_size
assert batch.size > 0 # I do not always trust protobuf.


def test_publish_not_will_accept():
batch = create_batch(max_messages=0)
batch = create_batch(topic="topic_foo", max_messages=0)
base_request_size = types.PublishRequest(topic="topic_foo").ByteSize()

# Publish the message.
message = types.PubsubMessage(data=b"foobarbaz")
future = batch.publish(message)

assert future is None
assert batch.size == 0
assert batch.size == base_request_size
assert batch.messages == []
assert batch._futures == []

Expand Down Expand Up @@ -361,6 +369,51 @@ def test_publish_exceed_max_messages():
assert batch._futures == futures


@mock.patch.object(thread, "_SERVER_PUBLISH_MAX_BYTES", 1000)
def test_publish_single_message_size_exceeds_server_size_limit():
batch = create_batch(
topic="topic_foo",
max_messages=1000,
max_bytes=1000 * 1000, # way larger than (mocked) server side limit
)

big_message = types.PubsubMessage(data=b"x" * 984)

request_size = types.PublishRequest(
topic="topic_foo", messages=[big_message]
).ByteSize()
assert request_size == 1001 # sanity check, just above the (mocked) server limit

with pytest.raises(ValueError) as error:
batch.publish(big_message)

error_msg = str(error)
assert "too large" in error_msg
assert "1000 bytes" in error_msg


@mock.patch.object(thread, "_SERVER_PUBLISH_MAX_BYTES", 1000)
def test_publish_total_messages_size_exceeds_server_size_limit():
batch = create_batch(topic="topic_foo", max_messages=10, max_bytes=1500)

messages = (
types.PubsubMessage(data=b"x" * 500),
types.PubsubMessage(data=b"x" * 600),
)

# Sanity check - request size is still below BatchSettings.max_bytes,
# but it exceeds the server-side size limit.
request_size = types.PublishRequest(topic="topic_foo", messages=messages).ByteSize()
assert 1000 < request_size < 1500

with mock.patch.object(batch, "commit") as fake_commit:
batch.publish(messages[0])
batch.publish(messages[1])

# The server side limit should kick in and cause a commit.
fake_commit.assert_called_once()


def test_publish_dict():
batch = create_batch()
future = batch.publish({"data": b"foobarbaz", "attributes": {"spam": "eggs"}})
Expand Down