diff --git a/st2common/st2common/transport/connection_retry_wrapper.py b/st2common/st2common/transport/connection_retry_wrapper.py index 05eb667e1b..30c780e63c 100644 --- a/st2common/st2common/transport/connection_retry_wrapper.py +++ b/st2common/st2common/transport/connection_retry_wrapper.py @@ -99,9 +99,12 @@ def wrapped_callback(connection, channel): retry_wrapper.run(connection=connection, wrapped_callback=wrapped_callback) """ - def __init__(self, cluster_size, logger): + def __init__(self, cluster_size, logger, ensure_max_retries=3): self._retry_context = ClusterRetryContext(cluster_size=cluster_size) self._logger = logger + # How many times to try to retrying establishing a connection in a place where we are + # calling connection.ensure_connection + self._ensure_max_retries = ensure_max_retries def errback(self, exc, interval): self._logger.error('Rabbitmq connection error: %s', exc.message) @@ -146,10 +149,22 @@ def run(self, connection, wrapped_callback): # entire ConnectionPool simultaneously but that would require writing our own # ConnectionPool. If a server recovers it could happen that the same process # ends up talking to separate nodes in a cluster. - connection.ensure_connection() + def log_error_on_conn_failure(exc, interval): + self._logger.debug('Failed to re-establish connection to RabbitMQ server, ' + 'retrying in %s seconds: %s' % (interval, str(e))) + + try: + # NOTE: This function blocks and tries to restablish a connection for + # indefinetly if "max_retries" argument is not specified + connection.ensure_connection(max_retries=self._ensure_max_retries, + errback=log_error_on_conn_failure) + except Exception: + self._logger.exception('Connections to RabbitMQ cannot be re-established: %s', + str(e)) + raise except Exception as e: - self._logger.exception('Connections to rabbitmq cannot be re-established: %s', + self._logger.exception('Connections to RabbitMQ cannot be re-established: %s', str(e)) # Not being able to publish a message could be a significant issue for an app. raise