diff --git a/pubsub/cloud-client/publisher.py b/pubsub/cloud-client/publisher.py index 76d0589931c..4792affd6a6 100644 --- a/pubsub/cloud-client/publisher.py +++ b/pubsub/cloud-client/publisher.py @@ -22,6 +22,7 @@ """ import argparse +import concurrent.futures from google.cloud import pubsub_v1 @@ -109,6 +110,38 @@ def publish_messages_with_futures(project, topic_name): print(future.result()) +def publish_messages_with_error_handler(project, topic_name): + """Publishes multiple messages to a Pub/Sub topic with an error handler.""" + # [START pubsub_publish_messages_error_handler] + publisher = pubsub_v1.PublisherClient() + topic_path = publisher.topic_path(project, topic_name) + + # When you publish a message, the client returns a Future. This Future + # can be used to track if an error has occurred. + futures = [] + + def callback(f): + exc = f.exception() + if exc: + print('Publishing message on {} threw an Exception {}.'.format( + topic_name, exc)) + + for n in range(1, 10): + data = u'Message number {}'.format(n) + # Data must be a bytestring + data = data.encode('utf-8') + message_future = publisher.publish(topic_path, data=data) + message_future.add_done_callback(callback) + futures.append(message_future) + + # We must keep the main thread from exiting to allow it to process + # messages in the background. + concurrent.futures.wait(futures) + + print('Published messages.') + # [END pubsub_publish_messages_error_handler] + + def publish_messages_with_batch_settings(project, topic_name): """Publishes multiple messages to a Pub/Sub topic with batch settings.""" # Configure the batch to publish once there is one kilobyte of data or diff --git a/pubsub/cloud-client/requirements.txt b/pubsub/cloud-client/requirements.txt index fdea342db45..81f06995b8a 100644 --- a/pubsub/cloud-client/requirements.txt +++ b/pubsub/cloud-client/requirements.txt @@ -1 +1,2 @@ google-cloud-pubsub==0.32.1 +futures==3.1.1; python_version < '3'