-
Notifications
You must be signed in to change notification settings - Fork 57
refactor(FileListener): Process file events asynchronously #1429
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
e40dd2a
c0d40dd
8ca821e
d6e4f25
0e5f2e5
a8fb342
19f4aeb
200a469
6e442b2
69429c3
82f7850
3b2f850
4411763
82d555b
693994f
8de39a0
88fc3ce
bac8ee2
8684d31
e1b1377
481b0d1
d710e38
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| <?php | ||
|
|
||
| /* | ||
| * Copyright (c) 2021-2022 The Recognize contributors. | ||
| * This file is licensed under the Affero General Public License version 3 or later. See the COPYING file. | ||
| */ | ||
| declare(strict_types=1); | ||
| namespace OCA\Recognize\BackgroundJobs; | ||
|
|
||
| use OCA\Recognize\Classifiers\Audio\MusicnnClassifier; | ||
| use OCA\Recognize\Classifiers\Images\ClusteringFaceClassifier; | ||
| use OCA\Recognize\Classifiers\Images\ImagenetClassifier; | ||
| use OCA\Recognize\Classifiers\Images\LandmarksClassifier; | ||
| use OCA\Recognize\Classifiers\Video\MovinetClassifier; | ||
| use OCA\Recognize\Db\AccessUpdateMapper; | ||
| use OCA\Recognize\Db\QueueFile; | ||
| use OCA\Recognize\Service\AccessUpdateService; | ||
| use OCA\Recognize\Service\Logger; | ||
| use OCA\Recognize\Service\QueueService; | ||
| use OCA\Recognize\Service\StorageService; | ||
| use OCA\Recognize\Service\TagManager; | ||
| use OCP\AppFramework\Utility\ITimeFactory; | ||
| use OCP\BackgroundJob\IJobList; | ||
| use OCP\BackgroundJob\QueuedJob; | ||
| use OCP\DB\Exception; | ||
| use Psr\Log\LoggerInterface; | ||
|
|
||
| final class ProcessAccessUpdatesJob extends QueuedJob { | ||
|
|
||
| public function __construct( | ||
| ITimeFactory $timeFactory, | ||
| private AccessUpdateService $accessUpdateService, | ||
marcelklehr marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| private IJobList $jobList, | ||
| private AccessUpdateMapper $accessUpdateMapper, | ||
| private LoggerInterface $logger, | ||
| ) { | ||
| parent::__construct($timeFactory); | ||
| } | ||
|
|
||
| /** | ||
| * @param array{storage_id:int} $argument | ||
| * @return void | ||
| */ | ||
| protected function run($argument): void { | ||
| $storageId = $argument['storage_id']; | ||
|
|
||
| $this->accessUpdateService->processAccessUpdates($storageId); | ||
| try { | ||
| $count = $this->accessUpdateMapper->countByStorageId($storageId); | ||
| } catch (Exception $e) { | ||
| $this->logger->error('Failed to count access updates' . $e->getMessage(), ['exception' => $e]); | ||
marcelklehr marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| $count = 1; | ||
| } | ||
| if ($count > 0) { | ||
| // Schedule next iteration | ||
| $this->jobList->add(self::class, [ | ||
| 'storage_id' => $storageId, | ||
| ]); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| <?php | ||
|
|
||
| /* | ||
| * Copyright (c) 2022 The Recognize contributors. | ||
| * This file is licensed under the Affero General Public License version 3 or later. See the COPYING file. | ||
| */ | ||
| declare(strict_types=1); | ||
| namespace OCA\Recognize\Db; | ||
|
|
||
| use OCP\AppFramework\Db\Entity; | ||
|
|
||
| /** | ||
| * Class AccessUpdate | ||
| * | ||
| * @package OCA\Recognize\Db | ||
| * @method int getStorageId() | ||
| * @method setStorageId(int $storageId) | ||
| * @method int getRootId() | ||
| * @method setRootId(int $rootId) | ||
| */ | ||
| final class AccessUpdate extends Entity { | ||
| protected $storageId; | ||
|
Check failure on line 22 in lib/Db/AccessUpdate.php
|
||
| protected $rootId; | ||
|
Check failure on line 23 in lib/Db/AccessUpdate.php
|
||
|
|
||
| /** | ||
| * @var string[] | ||
| */ | ||
| public static array $columns = ['id', 'storage_id', 'root_id']; | ||
|
|
||
| /** | ||
| * @var string[] | ||
| */ | ||
| public static array $fields = ['id', 'storageId', 'rootId']; | ||
|
|
||
| public function __construct() { | ||
| // add types in constructor | ||
| $this->addType('id', 'integer'); | ||
| $this->addType('storageId', 'integer'); | ||
| $this->addType('rootId', 'integer'); | ||
| } | ||
|
|
||
| public function toArray(): array { | ||
| $array = []; | ||
| foreach (self::$fields as $field) { | ||
| $array[$field] = $this->{$field}; | ||
| } | ||
| return $array; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,103 @@ | ||
| <?php | ||
|
|
||
| /* | ||
| * Copyright (c) 2022 The Recognize contributors. | ||
| * This file is licensed under the Affero General Public License version 3 or later. See the COPYING file. | ||
| */ | ||
| declare(strict_types=1); | ||
| namespace OCA\Recognize\Db; | ||
|
|
||
| use OCA\Recognize\BackgroundJobs\ProcessAccessUpdatesJob; | ||
| use OCP\AppFramework\Db\DoesNotExistException; | ||
| use OCP\AppFramework\Db\MultipleObjectsReturnedException; | ||
| use OCP\AppFramework\Db\QBMapper; | ||
| use OCP\BackgroundJob\IJobList; | ||
| use OCP\DB\Exception; | ||
| use OCP\DB\QueryBuilder\IQueryBuilder; | ||
| use OCP\IDBConnection; | ||
|
|
||
| /** | ||
| * @psalm-extends QBMapper<AccessUpdate> | ||
| */ | ||
| final class AccessUpdateMapper extends QBMapper { | ||
|
Check failure on line 22 in lib/Db/AccessUpdateMapper.php
|
||
| public function __construct( | ||
| IDBConnection $db, | ||
| private IJobList $jobList, | ||
| ) { | ||
| parent::__construct($db, 'recognize_access_updates', AccessUpdate::class); | ||
| $this->db = $db; | ||
| } | ||
|
|
||
| /** | ||
| * @throws \OCP\DB\Exception | ||
| * @return list<\OCA\Recognize\Db\AccessUpdate> | ||
|
Check failure on line 33 in lib/Db/AccessUpdateMapper.php
|
||
| */ | ||
| public function findByStorageId(int $storageId, int $limit = 0): array { | ||
| $qb = $this->db->getQueryBuilder(); | ||
| $qb->selectDistinct(AccessUpdate::$columns) | ||
| ->from($this->getTableName()) | ||
| ->where($qb->expr()->eq('storage_id', $qb->createPositionalParameter($storageId, IQueryBuilder::PARAM_INT))); | ||
| if ($limit > 0) { | ||
| $qb->setMaxResults($limit); | ||
| } | ||
| return $this->findEntities($qb); | ||
| } | ||
|
|
||
| /** | ||
| * @param int $storageId | ||
| * @return int | ||
| * @throws Exception | ||
| */ | ||
| public function countByStorageId(int $storageId): int { | ||
| $qb = $this->db->getQueryBuilder(); | ||
| $qb->select($qb->func()->count('id')) | ||
| ->from($this->getTableName()) | ||
| ->where($qb->expr()->eq('storage_id', $qb->createPositionalParameter($storageId, IQueryBuilder::PARAM_INT))); | ||
| $result = $qb->executeQuery(); | ||
| $count = $result->fetchOne(); | ||
| $result->closeCursor(); | ||
| if ($count === false) { | ||
| return 0; | ||
| } | ||
| return $count; | ||
| } | ||
|
|
||
| /** | ||
| * @param int $storageId | ||
| * @param int $rootId | ||
| * @return AccessUpdate | ||
| * @throws DoesNotExistException | ||
| * @throws MultipleObjectsReturnedException | ||
| * @throws Exception | ||
| */ | ||
| public function findByStorageIdAndRootId(int $storageId, int $rootId): AccessUpdate { | ||
| $qb = $this->db->getQueryBuilder(); | ||
| $qb->selectDistinct(AccessUpdate::$columns) | ||
| ->from($this->getTableName()) | ||
| ->where($qb->expr()->eq('storage_id', $qb->createPositionalParameter($storageId, IQueryBuilder::PARAM_INT))) | ||
| ->andWhere($qb->expr()->eq('root_id', $qb->createPositionalParameter($rootId, IQueryBuilder::PARAM_INT))); | ||
| return $this->findEntity($qb); | ||
| } | ||
|
|
||
| /** | ||
| * @param int $storageId | ||
| * @param int $rootId | ||
| * @return AccessUpdate | ||
| * @throws Exception | ||
| * @throws MultipleObjectsReturnedException | ||
| */ | ||
| public function insertAccessUpdate(int $storageId, int $rootId): AccessUpdate { | ||
| try { | ||
| $accessUpdate = $this->findByStorageIdAndRootId($storageId, $rootId); | ||
| } catch (DoesNotExistException $e) { | ||
| $accessUpdate = new AccessUpdate(); | ||
| $accessUpdate->setStorageId($storageId); | ||
| $accessUpdate->setRootId($rootId); | ||
| $this->insert($accessUpdate); | ||
| if (!$this->jobList->has(ProcessAccessUpdatesJob::class, [ 'storage_id' => $storageId ])) { | ||
| $this->jobList->add(self::class, [ 'storage_id' => $storageId ]); | ||
marcelklehr marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
| return $accessUpdate; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| <?php | ||
|
|
||
| /* | ||
| * Copyright (c) 2020-2025 The Recognize contributors. | ||
| * This file is licensed under the Affero General Public License version 3 or later. See the COPYING file. | ||
| */ | ||
| declare(strict_types=1); | ||
| namespace OCA\Recognize\Migration; | ||
|
|
||
| use Closure; | ||
| use Doctrine\DBAL\Schema\SchemaException; | ||
| use OCP\DB\ISchemaWrapper; | ||
| use OCP\DB\Types; | ||
| use OCP\Migration\IOutput; | ||
| use OCP\Migration\SimpleMigrationStep; | ||
|
|
||
| final class Version011000001Date20251215094821 extends SimpleMigrationStep { | ||
|
|
||
| /** | ||
| * @param IOutput $output | ||
| * @param Closure $schemaClosure The `\Closure` returns a `ISchemaWrapper` | ||
| * @param array $options | ||
| * | ||
| * @return ?ISchemaWrapper | ||
| * @throws SchemaException | ||
| */ | ||
| public function changeSchema(IOutput $output, Closure $schemaClosure, array $options) { | ||
| /** @var ISchemaWrapper $schema */ | ||
| $schema = $schemaClosure(); | ||
|
|
||
| $changed = false; | ||
| if (!$schema->hasTable('recognize_access_updates')) { | ||
| $table = $schema->createTable('recognize_access_updates'); | ||
| $table->addColumn('id', Types::BIGINT, ['autoincrement' => true]); | ||
| $table->addColumn('storage_id', Types::BIGINT, ['notnull' => true]); | ||
| $table->addColumn('root_id', Types::BIGINT, ['notnull' => true]); | ||
marcelklehr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| $table->addUniqueIndex(['storage_id', 'root_id'], 'recognize_au_unique'); | ||
| $changed = true; | ||
| } | ||
| return $changed ? $schema : null; | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.