|
30 | 30 | from pulpcore.metrics import init_otel_meter |
31 | 31 | from pulpcore.app.apps import pulp_plugin_configs |
32 | 32 | from pulpcore.app.models import Task, AppStatus |
| 33 | +from pulpcore.app.util import with_domain |
33 | 34 |
|
34 | 35 | from pulpcore.tasking.storage import WorkerDirectory |
35 | 36 | from pulpcore.tasking._util import ( |
|
38 | 39 | perform_task, |
39 | 40 | startup_hook, |
40 | 41 | ) |
| 42 | +from pulpcore.tasking.tasks import using_workdir, execute_task |
41 | 43 |
|
42 | 44 | _logger = logging.getLogger(__name__) |
43 | 45 | random.seed() |
@@ -443,6 +445,18 @@ def sleep(self): |
443 | 445 | os.read(self.sentinel, 256) |
444 | 446 | _logger.debug(_("Worker %s leaving sleep state."), self.name) |
445 | 447 |
|
| 448 | + def supervise_immediate_task(self, task): |
| 449 | + """Call and supervise the immediate async task process. |
| 450 | +
|
| 451 | + This function must only be called while holding the lock for that task.""" |
| 452 | + self.task = task |
| 453 | + with with_domain(task.domain), using_workdir(): |
| 454 | + # TODO set user, cid, ... |
| 455 | + execute_task(task) |
| 456 | + if task.reserved_resources_record: |
| 457 | + self.notify_workers(TASK_WAKEUP_UNBLOCK) |
| 458 | + self.task = None |
| 459 | + |
446 | 460 | def supervise_task(self, task): |
447 | 461 | """Call and supervise the task process while heart beating. |
448 | 462 |
|
@@ -596,7 +610,10 @@ def handle_unblocked_tasks(self): |
596 | 610 | # A running task without a lock must be abandoned. |
597 | 611 | self.cancel_abandoned_task(task, TASK_STATES.FAILED, "Worker has gone missing.") |
598 | 612 | elif task.state == TASK_STATES.WAITING and self.is_compatible(task): |
599 | | - self.supervise_task(task) |
| 613 | + if task.immediate: |
| 614 | + self.supervise_immediate_task(task) |
| 615 | + else: |
| 616 | + self.supervise_task(task) |
600 | 617 | else: |
601 | 618 | # Probably incompatible, but for whatever reason we didn't pick it up this time, |
602 | 619 | # we don't need to look at it ever again. |
|
0 commit comments