Skip to content

Commit 612ff28

Browse files
committed
Add backing off to auxilary workers
1 parent 018c49f commit 612ff28

File tree

3 files changed

+71
-43
lines changed

3 files changed

+71
-43
lines changed

CHANGES/+worker_backoff.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Added backing off on auxiliary workers if wrongly alarmed on pending tasks.

pulpcore/tasking/entrypoint.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ def worker(
3434
auxiliary,
3535
):
3636
"""A Pulp worker."""
37-
3837
if reload:
3938
try:
4039
import hupper

pulpcore/tasking/worker.py

Lines changed: 70 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import functools
44
import logging
5+
import math
56
import os
67
import random
78
import select
@@ -89,9 +90,11 @@ def __init__(self, auxiliary=False):
8990
self.wakeup_unblock = False
9091
self.wakeup_handle = False
9192
self.cancel_task = False
93+
self.unblocked_count = 0
9294

9395
self.ignored_task_ids = []
9496
self.ignored_task_countdown = IGNORED_TASKS_CLEANUP_INTERVAL
97+
self.false_alarms = 0
9598

9699
self.auxiliary = auxiliary
97100
self.task = None
@@ -158,6 +161,14 @@ def _signal_handler(self, thesignal, frame):
158161
self.shutdown_requested = True
159162

160163
def _pg_notify_handler(self, notification):
164+
if notification.channel == "pulp_worker_broadcast":
165+
key, value = notification.payload.split(":", maxsplit=1)
166+
_logger.debug("broadcast message recieved: %s: %s", key, value)
167+
if key == "unblocked_count":
168+
self.unblocked_count = int(value)
169+
self.wakeup_handle = self.unblocked_count > 0
170+
elif key == "metrics_heartbeat":
171+
self.last_metric_heartbeat = datetime.fromisoformat(key)
161172
if notification.channel == "pulp_worker_wakeup":
162173
if notification.payload == TASK_WAKEUP_UNBLOCK:
163174
# Auxiliary workers don't do this.
@@ -171,6 +182,7 @@ def _pg_notify_handler(self, notification):
171182
self.wakeup_handle = True
172183

173184
elif notification.channel == "pulp_worker_metrics_heartbeat":
185+
# TODO (in one of the next releases) Remove that superseeded channel.
174186
self.last_metric_heartbeat = datetime.fromisoformat(notification.payload)
175187
elif self.task and notification.channel == "pulp_worker_cancel":
176188
if notification.payload == str(self.task.pk):
@@ -257,6 +269,7 @@ def record_unblocked_waiting_tasks_metric(self, now):
257269
)
258270

259271
self.cursor.execute(f"NOTIFY pulp_worker_metrics_heartbeat, '{str(now)}'")
272+
self.broadcast("metrics_heartbeat", now)
260273

261274
def beat(self):
262275
now = timezone.now()
@@ -278,6 +291,9 @@ def beat(self):
278291
if self.otel_enabled and now > self.last_metric_heartbeat + self.heartbeat_period:
279292
self.record_unblocked_waiting_tasks_metric(now)
280293

294+
def broadcast(self, key, value):
295+
self.cursor.execute("SELECT pg_notify('pulp_worker_broadcast', %s)", (f"{key}:{value}",))
296+
281297
def notify_workers(self, reason):
282298
self.cursor.execute("SELECT pg_notify('pulp_worker_wakeup', %s)", (reason,))
283299

@@ -290,21 +306,13 @@ def cancel_abandoned_task(self, task, final_state, reason=None):
290306
# A task is considered abandoned when in running state, but no worker holds its lock
291307
domain = task.pulp_domain
292308
task.set_canceling()
293-
if reason:
294-
_logger.info(
295-
"Cleaning up task %s in domain: %s and marking as %s. Reason: %s",
296-
task.pk,
297-
domain.name,
298-
final_state,
299-
reason,
300-
)
301-
else:
302-
_logger.info(
303-
_("Cleaning up task %s in domain: %s and marking as %s."),
304-
task.pk,
305-
domain.name,
306-
final_state,
307-
)
309+
_logger.info(
310+
"Cleaning up task %s in domain: %s and marking as %s. Reason: %s",
311+
task.pk,
312+
domain.name,
313+
final_state,
314+
reason or "unknown",
315+
)
308316
delete_incomplete_resources(task)
309317
task.set_canceled(final_state=final_state, reason=reason)
310318
if task.reserved_resources_record:
@@ -345,14 +353,17 @@ def unblock_tasks(self):
345353

346354
self.wakeup_unblock = False
347355
result = self._unblock_tasks()
348-
if result is not None and (
349-
Task.objects.filter(
350-
state__in=[TASK_STATES.WAITING, TASK_STATES.CANCELING], app_lock=None
356+
if result is not None:
357+
unblocked_count = (
358+
Task.objects.filter(
359+
state__in=[TASK_STATES.WAITING, TASK_STATES.CANCELING], app_lock=None
360+
)
361+
.exclude(unblocked_at=None)
362+
.count()
351363
)
352-
.exclude(unblocked_at=None)
353-
.exists()
354-
):
355-
self.notify_workers(TASK_WAKEUP_HANDLE)
364+
if unblocked_count > 0:
365+
self.notify_workers(TASK_WAKEUP_HANDLE)
366+
self.broadcast("unblocked_count", unblocked_count)
356367
return True
357368

358369
return result
@@ -369,6 +380,7 @@ def _unblock_tasks(self):
369380
.order_by("pulp_created")
370381
.select_related("pulp_domain")
371382
):
383+
_logger.debug("Considering task %s for unblocking.", task.pk)
372384
reserved_resources_record = task.reserved_resources_record or []
373385
exclusive_resources = [
374386
resource
@@ -389,23 +401,26 @@ def _unblock_tasks(self):
389401
)
390402
task.unblock()
391403

392-
elif (
393-
task.state == TASK_STATES.WAITING
394-
and task.unblocked_at is None
395-
# No exclusive resource taken?
396-
and not any(
397-
resource in taken_exclusive_resources or resource in taken_shared_resources
398-
for resource in exclusive_resources
399-
)
400-
# No shared resource exclusively taken?
401-
and not any(resource in taken_exclusive_resources for resource in shared_resources)
402-
):
403-
_logger.debug(
404-
"Marking waiting task %s in domain: %s unblocked.",
405-
task.pk,
406-
task.pulp_domain.name,
407-
)
408-
task.unblock()
404+
elif task.state == TASK_STATES.WAITING and task.unblocked_at is None:
405+
if (
406+
# No exclusive resource taken?
407+
not any(
408+
resource in taken_exclusive_resources or resource in taken_shared_resources
409+
for resource in exclusive_resources
410+
)
411+
# No shared resource exclusively taken?
412+
and not any(
413+
resource in taken_exclusive_resources for resource in shared_resources
414+
)
415+
):
416+
_logger.debug(
417+
"Marking waiting task %s in domain: %s unblocked.",
418+
task.pk,
419+
task.pulp_domain.name,
420+
)
421+
task.unblock()
422+
else:
423+
_logger.debug("Task %s is still blocked.", task.pk)
409424
elif task.state == TASK_STATES.RUNNING and task.unblocked_at is None:
410425
# This should not happen in normal operation.
411426
# And it is only an issue if the worker running that task died, because it will
@@ -426,8 +441,8 @@ def _unblock_tasks(self):
426441
def sleep(self):
427442
"""Wait for signals on the wakeup channel while heart beating."""
428443

429-
_logger.debug(_("Worker %s entering sleep state."), self.name)
430-
while not self.shutdown_requested and not self.wakeup_handle:
444+
_logger.debug("Worker %s entering sleep state.", self.name)
445+
while not self.shutdown_requested:
431446
r, w, x = select.select(
432447
[self.sentinel, connection.connection],
433448
[],
@@ -441,7 +456,14 @@ def sleep(self):
441456
self.unblock_tasks()
442457
if self.sentinel in r:
443458
os.read(self.sentinel, 256)
444-
_logger.debug(_("Worker %s leaving sleep state."), self.name)
459+
if self.wakeup_handle:
460+
if not self.auxiliary or random.random() < math.exp(
461+
self.unblocked_count - self.false_alarms
462+
):
463+
_logger.debug("Worker %s leaving sleep state.", self.name)
464+
break
465+
else:
466+
_logger.debug("Worker %s backing off", self.name)
445467

446468
def supervise_task(self, task):
447469
"""Call and supervise the task process while heart beating.
@@ -586,8 +608,12 @@ def handle_unblocked_tasks(self):
586608
task = self.fetch_task()
587609
if task is None:
588610
# No task found
611+
self.false_alarms += 1
612+
_logger.debug("False Alarms: %s", self.false_alarms)
589613
break
590614
try:
615+
self.false_alarms //= 2
616+
_logger.debug("False Alarms: %s", self.false_alarms)
591617
if task.state == TASK_STATES.CANCELING:
592618
# No worker picked this task up before being canceled.
593619
# Or the worker disappeared before handling the canceling.
@@ -615,6 +641,7 @@ def run(self, burst=False):
615641
signal.signal(signal.SIGHUP, self._signal_handler)
616642
# Subscribe to pgsql channels
617643
connection.connection.add_notify_handler(self._pg_notify_handler)
644+
self.cursor.execute("LISTEN pulp_worker_broadcast")
618645
self.cursor.execute("LISTEN pulp_worker_cancel")
619646
self.cursor.execute("LISTEN pulp_worker_metrics_heartbeat")
620647
if burst:
@@ -638,4 +665,5 @@ def run(self, burst=False):
638665
self.cursor.execute("UNLISTEN pulp_worker_wakeup")
639666
self.cursor.execute("UNLISTEN pulp_worker_metrics_heartbeat")
640667
self.cursor.execute("UNLISTEN pulp_worker_cancel")
668+
self.cursor.execute("UNLISTEN pulp_worker_broadcast")
641669
self.shutdown()

0 commit comments

Comments
 (0)