Skip to content

Commit 80eb3dd

Browse files
committed
feat(TaskProcessingApiController): Add new next_batch endpoint
Signed-off-by: Marcel Klehr <mklehr@gmx.net>
1 parent ea8ab8e commit 80eb3dd

1 file changed

Lines changed: 100 additions & 23 deletions

File tree

core/Controller/TaskProcessingApiController.php

Lines changed: 100 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -532,29 +532,7 @@ public function cancelTask(int $taskId): DataResponse {
532532
#[ApiRoute(verb: 'GET', url: '/tasks_provider/next', root: '/taskprocessing')]
533533
public function getNextScheduledTask(array $providerIds, array $taskTypeIds): DataResponse {
534534
try {
535-
$providerIdsBasedOnTaskTypesWithNull = array_unique(array_map(function ($taskTypeId) {
536-
try {
537-
return $this->taskProcessingManager->getPreferredProvider($taskTypeId)->getId();
538-
} catch (Exception) {
539-
return null;
540-
}
541-
}, $taskTypeIds));
542-
543-
$providerIdsBasedOnTaskTypes = array_filter($providerIdsBasedOnTaskTypesWithNull, fn ($providerId) => $providerId !== null);
544-
545-
// restrict $providerIds to providers that are configured as preferred for the passed task types
546-
$possibleProviderIds = array_values(array_intersect($providerIdsBasedOnTaskTypes, $providerIds));
547-
548-
// restrict $taskTypeIds to task types that can actually be run by one of the now restricted providers
549-
$possibleTaskTypeIds = array_values(array_filter($taskTypeIds, function ($taskTypeId) use ($possibleProviderIds) {
550-
try {
551-
$providerForTaskType = $this->taskProcessingManager->getPreferredProvider($taskTypeId)->getId();
552-
} catch (Exception) {
553-
// no provider found for task type
554-
return false;
555-
}
556-
return in_array($providerForTaskType, $possibleProviderIds, true);
557-
}));
535+
[$possibleProviderIds, $possibleTaskTypeIds] = $this->intersectTaskTypesAndProviders($taskTypeIds, $providerIds);
558536

559537
if (count($possibleProviderIds) === 0 || count($possibleTaskTypeIds) === 0) {
560538
throw new NotFoundException();
@@ -596,6 +574,73 @@ public function getNextScheduledTask(array $providerIds, array $taskTypeIds): Da
596574
}
597575
}
598576

577+
/**
578+
* Returns the next n scheduled tasks for the specified set of taskTypes and providers
579+
*
580+
* @param list<string> $providerIds The ids of the providers
581+
* @param list<string> $taskTypeIds The ids of the task types
582+
* @param int $numberOfTasks The number of tasks to return
583+
* @return DataResponse<Http::STATUS_OK, array{tasks: list<array{task: CoreTaskProcessingTask, provider: string}>, has_more: bool}, array{}>|DataResponse<Http::STATUS_NO_CONTENT, null, array{}>|DataResponse<Http::STATUS_INTERNAL_SERVER_ERROR, array{message: string}, array{}>
584+
*
585+
* 200: Tasks returned
586+
*/
587+
#[ExAppRequired]
588+
#[ApiRoute(verb: 'GET', url: '/tasks_provider/next_batch', root: '/taskprocessing')]
589+
public function getNextScheduledTaskBatch(array $providerIds, array $taskTypeIds, int $numberOfTasks = 1): DataResponse {
590+
try {
591+
[$possibleProviderIds, $possibleTaskTypeIds] = $this->intersectTaskTypesAndProviders($taskTypeIds, $providerIds);
592+
593+
if (count($possibleProviderIds) === 0 || count($possibleTaskTypeIds) === 0) {
594+
return new DataResponse([
595+
'tasks' => [],
596+
'has_more' => false,
597+
]);
598+
}
599+
600+
/** @var list<array{task:CoreTaskProcessingTask, provider:string}> $tasks */
601+
$tasks = [];
602+
$taskIdsToIgnore = [];
603+
// Stop when $numberOfTasks is reached or the json payload is larger than 50MiB
604+
while (count($tasks) < $numberOfTasks && strlen(json_encode($tasks)) < 50 * 1024 * 1024) {
605+
// Until we find a task whose task type is set to be provided by the providers requested with this request
606+
// Or no scheduled task is found anymore (given the taskIds to ignore)
607+
try {
608+
$task = $this->taskProcessingManager->getNextScheduledTask($possibleTaskTypeIds, $taskIdsToIgnore);
609+
} catch (NotFoundException) {
610+
break;
611+
}
612+
try {
613+
$provider = $this->taskProcessingManager->getPreferredProvider($task->getTaskTypeId());
614+
if (in_array($provider->getId(), $possibleProviderIds, true)) {
615+
if ($this->taskProcessingManager->lockTask($task)) {
616+
$tasks[] = ['task' => $task->jsonSerialize(), 'provider' => $provider->getId()];
617+
continue;
618+
}
619+
}
620+
} catch (Exception) {
621+
// There is no provider set for the task type of this task
622+
// proceed to ignore this task
623+
}
624+
625+
$taskIdsToIgnore[] = (int)$task->getId();
626+
}
627+
628+
try {
629+
$this->taskProcessingManager->getNextScheduledTask($possibleTaskTypeIds, $taskIdsToIgnore);
630+
$hasMore = true;
631+
} catch (\Throwable) {
632+
$hasMore = false;
633+
}
634+
635+
return new DataResponse([
636+
'tasks' => $tasks,
637+
'has_more' => $hasMore,
638+
]);
639+
} catch (Exception) {
640+
return new DataResponse(['message' => $this->l->t('Internal error')], Http::STATUS_INTERNAL_SERVER_ERROR);
641+
}
642+
}
643+
599644
/**
600645
* @param resource $data
601646
* @return int
@@ -611,4 +656,36 @@ private function setFileContentsInternal($data): int {
611656
$file = $folder->newFile(time() . '-' . rand(1, 100000), $data);
612657
return $file->getId();
613658
}
659+
660+
/**
661+
* @param array $taskTypeIds
662+
* @param array $providerIds
663+
* @return array
664+
*/
665+
public function intersectTaskTypesAndProviders(array $taskTypeIds, array $providerIds): array {
666+
$providerIdsBasedOnTaskTypesWithNull = array_unique(array_map(function ($taskTypeId) {
667+
try {
668+
return $this->taskProcessingManager->getPreferredProvider($taskTypeId)->getId();
669+
} catch (Exception) {
670+
return null;
671+
}
672+
}, $taskTypeIds));
673+
674+
$providerIdsBasedOnTaskTypes = array_filter($providerIdsBasedOnTaskTypesWithNull, fn ($providerId) => $providerId !== null);
675+
676+
// restrict $providerIds to providers that are configured as preferred for the passed task types
677+
$possibleProviderIds = array_values(array_intersect($providerIdsBasedOnTaskTypes, $providerIds));
678+
679+
// restrict $taskTypeIds to task types that can actually be run by one of the now restricted providers
680+
$possibleTaskTypeIds = array_values(array_filter($taskTypeIds, function ($taskTypeId) use ($possibleProviderIds) {
681+
try {
682+
$providerForTaskType = $this->taskProcessingManager->getPreferredProvider($taskTypeId)->getId();
683+
} catch (Exception) {
684+
// no provider found for task type
685+
return false;
686+
}
687+
return in_array($providerForTaskType, $possibleProviderIds, true);
688+
}));
689+
return [$possibleProviderIds, $possibleTaskTypeIds];
690+
}
614691
}

0 commit comments

Comments
 (0)