2727_T = TypeVar ("_T" )
2828
2929RPC_TIMEOUT_SECONDS = 30.0
30- STREAMING_PULL_TIMEOUT_SECONDS = 45 .0
30+ STREAMING_PULL_TIMEOUT_SECONDS = 55 .0
3131STREAM_ACK_TIMEOUT_SECONDS = 180
3232STREAM_ACK_FREQUENCY_SECONDS = 90
3333
@@ -53,19 +53,14 @@ def refresh_creds(creds: Credentials) -> Credentials:
5353
5454
5555def exception_handler [_T : Any ](
56- func_name : str , timeout : float = RPC_TIMEOUT_SECONDS
56+ func_name : str ,
5757) -> Callable [..., Callable [..., Awaitable [_T ]]]:
5858 """Wrap a function with exception handling."""
5959
6060 def wrapped (func : Callable [..., Awaitable [_T ]]) -> Callable [..., Awaitable [_T ]]:
6161 async def wrapped_func (* args : Any , ** kwargs : Any ) -> _T :
6262 try :
63- async with asyncio .timeout (timeout ):
64- return await func (* args , ** kwargs )
65- except asyncio .TimeoutError as err :
66- _LOGGER .debug ("Timeout in %s: %s" , func_name , err )
67- DIAGNOSTICS .increment (f"{ func_name } .timeout" )
68- raise SubscriberException (f"Timeout in { func_name } " ) from err
63+ return await func (* args , ** kwargs )
6964 except NotFound as err :
7065 _LOGGER .debug ("NotFound error in %s: %s" , func_name , err )
7166 DIAGNOSTICS .increment (f"{ func_name } .not_found_error" )
@@ -174,28 +169,26 @@ async def _async_get_client(self) -> pubsub_v1.SubscriberAsyncClient:
174169 self ._client = pubsub_v1 .SubscriberAsyncClient (credentials = self ._creds )
175170 return self ._client
176171
172+ @exception_handler ("streaming_pull" )
177173 async def streaming_pull (
178174 self ,
179175 ack_ids_generator : Callable [[], list [str ]],
180176 ) -> AsyncIterable [pubsub_v1 .types .StreamingPullResponse ]:
181177 """Start the streaming pull."""
182- stream : AsyncIterable [pubsub_v1 .types .StreamingPullResponse ] = (
183- await self ._streaming_pull_req (ack_ids_generator )
184- )
185- return aiter_exception_handler (stream )
186-
187- @exception_handler (
188- func_name = "streaming_pull" , timeout = STREAMING_PULL_TIMEOUT_SECONDS
189- )
190- async def _streaming_pull_req (
191- self ,
192- ack_ids_generator : Callable [[], list [str ]],
193- ) -> AsyncIterable [pubsub_v1 .types .StreamingPullResponse ]:
194178 client = await self ._async_get_client ()
179+ req_gen = pull_request_generator (self ._subscription_name , ack_ids_generator )
195180 _LOGGER .debug ("Sending streaming pull request for %s" , self ._subscription_name )
196- return await client .streaming_pull (
197- requests = pull_request_generator (self ._subscription_name , ack_ids_generator )
198- )
181+ try :
182+ async with asyncio .timeout (STREAMING_PULL_TIMEOUT_SECONDS ):
183+ stream : AsyncIterable [pubsub_v1 .types .StreamingPullResponse ] = (
184+ await client .streaming_pull (requests = req_gen )
185+ )
186+ except asyncio .TimeoutError as err :
187+ _LOGGER .debug ("Timeout in streaming_pull %s" , err )
188+ DIAGNOSTICS .increment ("streaming_pull.timeout" )
189+ raise SubscriberException ("Timeout in streaming_pull" ) from err
190+ _LOGGER .debug ("Streaming pull started" )
191+ return aiter_exception_handler (stream )
199192
200193 @exception_handler ("acknowledge" )
201194 async def ack_messages (self , ack_ids : list [str ]) -> None :
@@ -204,7 +197,13 @@ async def ack_messages(self, ack_ids: list[str]) -> None:
204197 return
205198 client = await self ._async_get_client ()
206199 _LOGGER .debug ("Acking %s messages" , len (ack_ids ))
207- await client .acknowledge (
208- subscription = self ._subscription_name ,
209- ack_ids = ack_ids ,
210- )
200+ try :
201+ async with asyncio .timeout (RPC_TIMEOUT_SECONDS ):
202+ await client .acknowledge (
203+ subscription = self ._subscription_name ,
204+ ack_ids = ack_ids ,
205+ )
206+ except asyncio .TimeoutError as err :
207+ _LOGGER .debug ("Timeout in acknowledge: %s" , err )
208+ DIAGNOSTICS .increment ("acknowledge.timeout" )
209+ raise SubscriberException ("Timeout in acknowledge" ) from err
0 commit comments