Skip to content

Commit 7462b2d

Browse files
authored
Merge pull request #267 from Niccolum/main
find new master every time
2 parents fe9414f + a0da046 commit 7462b2d

File tree

1 file changed

+28
-11
lines changed

1 file changed

+28
-11
lines changed

redbeat/schedulers.py

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from kombu.utils.objects import cached_property
2020
from kombu.utils.url import maybe_sanitize_url
2121
from redis.client import StrictRedis
22+
from redis.sentinel import MasterNotFoundError, Sentinel
2223
from tenacity import retry, retry_if_exception_type, stop_after_delay, wait_exponential
2324

2425
from .decoder import RedBeatJSONDecoder, RedBeatJSONEncoder, to_timestamp
@@ -51,6 +52,9 @@
5152
return 1
5253
"""
5354

55+
REDBEAT_REDIS_KEY = "redbeat_redis"
56+
REDBEAT_SENTINEL_KEY = "redbeat_sentinel"
57+
5458

5559
class RetryingConnection:
5660
"""A proxy for the Redis connection that delegates all the calls to
@@ -65,6 +69,7 @@ def __init__(self, retry_period, wrapped_connection):
6569
retry=(
6670
retry_if_exception_type(redis.exceptions.ConnectionError)
6771
| retry_if_exception_type(redis.exceptions.TimeoutError)
72+
| retry_if_exception_type(MasterNotFoundError)
6873
),
6974
reraise=True,
7075
wait=wait_exponential(multiplier=1, max=self.RETRY_MAX_WAIT),
@@ -114,9 +119,10 @@ def ensure_conf(app):
114119
def get_redis(app=None):
115120
app = app_or_default(app)
116121
conf = ensure_conf(app)
117-
if not hasattr(app, 'redbeat_redis') or app.redbeat_redis is None:
118-
redis_options = conf.redbeat_redis_options
119-
retry_period = redis_options.get('retry_period')
122+
redis_options = conf.redbeat_redis_options
123+
retry_period = redis_options.get('retry_period')
124+
125+
if not hasattr(app, REDBEAT_REDIS_KEY) or getattr(app, REDBEAT_REDIS_KEY) is None:
120126
if redis_options.get('cluster', False):
121127
from redis.cluster import RedisCluster
122128

@@ -137,9 +143,8 @@ def get_redis(app=None):
137143
sentinel_kwargs=redis_options.get('sentinel_kwargs'),
138144
**connection_kwargs,
139145
)
140-
connection = sentinel.master_for(
141-
redis_options.get('service_name', 'master'), db=redis_options.get('db', 0)
142-
)
146+
_set_redbeat_connect(app, REDBEAT_SENTINEL_KEY, sentinel, retry_period)
147+
connection = None
143148
elif conf.redis_url.startswith('rediss'):
144149
ssl_options = {'ssl_cert_reqs': ssl.CERT_REQUIRED}
145150
if isinstance(conf.redis_use_ssl, dict):
@@ -154,12 +159,24 @@ def get_redis(app=None):
154159
else:
155160
connection = StrictRedis.from_url(conf.redis_url, decode_responses=True)
156161

157-
if retry_period is None:
158-
app.redbeat_redis = connection
159-
else:
160-
app.redbeat_redis = RetryingConnection(retry_period, connection)
162+
if connection:
163+
_set_redbeat_connect(app, REDBEAT_REDIS_KEY, connection, retry_period)
164+
165+
if hasattr(app, REDBEAT_SENTINEL_KEY) and isinstance(getattr(app, REDBEAT_SENTINEL_KEY), Sentinel):
166+
sentinel = getattr(app, REDBEAT_SENTINEL_KEY)
167+
connection = sentinel.master_for(
168+
redis_options.get('service_name', 'master'), db=redis_options.get('db', 0)
169+
)
170+
_set_redbeat_connect(app, REDBEAT_REDIS_KEY, connection, retry_period)
171+
172+
return getattr(app, REDBEAT_REDIS_KEY)
173+
161174

162-
return app.redbeat_redis
175+
def _set_redbeat_connect(app, connect_name, connection, retry_period):
176+
if retry_period is None:
177+
setattr(app, connect_name, connection)
178+
else:
179+
setattr(app, connect_name, RetryingConnection(retry_period, connection))
163180

164181

165182
ADD_ENTRY_ERROR = """\

0 commit comments

Comments
 (0)