Skip to content

Commit 3eefa7a

Browse files
committed
fix:use pipelines
1 parent fc4cdf1 commit 3eefa7a

File tree

2 files changed

+11
-9
lines changed

2 files changed

+11
-9
lines changed

scheduler/redis_models/job.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,9 @@ def expire(self, ttl: int, connection: ConnectionType) -> None:
107107
connection.expire(self._key, ttl)
108108

109109
def persist(self, connection: ConnectionType) -> None:
110-
connection.persist(self._key)
110+
with connection.pipeline() as pipeline:
111+
pipeline.persist(self._key)
112+
pipeline.execute()
111113

112114
def prepare_for_execution(self, worker_name: str, registry: JobNamesRegistry, connection: ConnectionType) -> None:
113115
"""Prepares the job for execution, setting the worker name,

scheduler/redis_models/worker.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,13 @@ class WorkerModel(HashModel):
5050
_children_key_template: ClassVar[str] = ":queue-workers:{}:"
5151
_element_key_template: ClassVar[str] = ":workers:{}"
5252

53-
def save(self, connection: ConnectionType) -> None:
54-
pipeline = connection.pipeline()
55-
super(WorkerModel, self).save(pipeline)
56-
for queue_name in self.queue_names:
57-
pipeline.sadd(self._children_key_template.format(queue_name), self.name)
58-
pipeline.expire(self._key, DEFAULT_WORKER_TTL + 60)
59-
pipeline.execute()
53+
def save(self, connection: ConnectionType, save_all: bool = False) -> None:
54+
with connection.pipeline() as pipeline:
55+
super(WorkerModel, self).save(pipeline, save_all)
56+
for queue_name in self.queue_names:
57+
pipeline.sadd(self._children_key_template.format(queue_name), self.name)
58+
pipeline.expire(self._key, DEFAULT_WORKER_TTL + 60)
59+
pipeline.execute()
6060

6161
def delete(self, connection: ConnectionType) -> None:
6262
logger.debug(f"Deleting worker {self.name}")
@@ -118,4 +118,4 @@ def _split_list(a_list: List[str], segment_size: int) -> Generator[list[str], An
118118
:returns: The list split into smaller lists
119119
"""
120120
for i in range(0, len(a_list), segment_size):
121-
yield a_list[i : i + segment_size]
121+
yield a_list[i: i + segment_size]

0 commit comments

Comments
 (0)