5959
6060MESSAGE_ACK_TIMEOUT_SECONDS = 30.0
6161
62+ NEW_SUBSCRIBER_TIMEOUT_SECONDS = 30.0
63+
6264# Note: Users of non-prod instances will have to manually configure a topic
6365TOPIC_FORMAT = "projects/sdm-prod/topics/enterprise-{project_id}"
6466
@@ -294,14 +296,17 @@ def callback_wrapper(message: pubsub_v1.subscriber.message.Message) -> None:
294296 callback_wrapper ,
295297 )
296298
299+
297300 def _new_subscriber (
298301 self ,
299302 creds : Credentials ,
300303 subscription_name : str ,
301304 callback_wrapper : Callable [[pubsub_v1 .subscriber .message .Message ], None ],
302305 ) -> pubsub_v1 .subscriber .futures .StreamingPullFuture :
303306 """Issue a command to verify subscriber creds are correct."""
307+ _LOGGER .debug ("Creating subscriber '%s'" , subscription_name )
304308 creds = refresh_creds (creds )
309+ _LOGGER .debug ("Subscriber credentials refreshed" )
305310 subscriber = pubsub_v1 .SubscriberClient (credentials = creds )
306311 subscription = subscriber .get_subscription (subscription = subscription_name )
307312 if subscription .topic :
@@ -311,6 +316,7 @@ def _new_subscriber(
311316 subscription_name ,
312317 subscription .topic ,
313318 )
319+ _LOGGER .debug ("Starting subscriber '%s'" , subscription_name )
314320 return subscriber .subscribe (subscription_name , callback_wrapper )
315321
316322
@@ -432,11 +438,17 @@ async def start_async(self) -> None:
432438 raise AuthException (f"Access token failure: { err } " ) from err
433439
434440 try :
435- self ._subscriber_future = (
436- await self ._subscriber_factory .async_new_subscriber (
437- creds , self ._subscriber_id , self ._loop , self ._async_message_callback_with_timeout
441+ async with asyncio .timeout (NEW_SUBSCRIBER_TIMEOUT_SECONDS ):
442+ self ._subscriber_future = (
443+ await self ._subscriber_factory .async_new_subscriber (
444+ creds , self ._subscriber_id , self ._loop , self ._async_message_callback_with_timeout
445+ )
438446 )
439- )
447+ except asyncio .TimeoutError as err :
448+ DIAGNOSTICS .increment ("start.timeout_error" )
449+ raise SubscriberException (
450+ f"Failed to create subscriber '{ self ._subscriber_id } ': { err } "
451+ ) from err
440452 except NotFound as err :
441453 DIAGNOSTICS .increment ("start.not_found_error" )
442454 raise ConfigurationException (
0 commit comments