Skip to content
Closed
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ jobs:
strategy:
matrix:
php:
- 8.2
- 8.1
- 8.0
- 7.4
Expand Down
38 changes: 24 additions & 14 deletions src/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ class Queue implements \Countable
private $pending = 0;
private $queue = array();

/** @internal Make this private when support for PHP 5.3 is dropped. */
public $state = array();

/**
* Concurrently process all given jobs through the given `$handler`.
*
Expand Down Expand Up @@ -364,23 +367,27 @@ public function __invoke()
end($queue);
$id = key($queue);

$deferred = new Deferred(function ($_, $reject) use (&$queue, $id, &$deferred) {
$state = new State();
$deferred = new Deferred(function ($_, $reject) use (&$queue, $id, &$state) {
// forward cancellation to pending operation if it is currently executing
if (isset($deferred->pending) && $deferred->pending instanceof PromiseInterface && \method_exists($deferred->pending, 'cancel')) {
$deferred->pending->cancel();
if (isset($state->pending) && $state->pending instanceof PromiseInterface && \method_exists($state->pending, 'cancel')) {
$state->pending->cancel();
}
unset($deferred->pending);
unset($state->pending);

if (isset($deferred->args)) {
if (isset($state->args)) {
// queued promise cancelled before its handler is invoked
// remove from queue and reject explicitly
unset($queue[$id], $deferred->args);
unset($queue[$id], $state->args);
$reject(new \RuntimeException('Cancelled queued job before processing started'));
}
});

$this->state[spl_object_hash($deferred)] = $state;

// queue job to process if number of pending jobs is below concurrency limit again
$deferred->args = func_get_args();

$state->args = func_get_args();
$queue[$id] = $deferred;

return $deferred->promise();
Expand Down Expand Up @@ -428,18 +435,21 @@ public function processQueue()
// await this situation, invoke handler and await its resolution before invoking next queued job
++$this->pending;

$promise = call_user_func_array($this->handler, $deferred->args);
$deferred->pending = $promise;
unset($deferred->args);
$deferredHash = spl_object_hash($deferred);
$state = $this->state[$deferredHash];
$promise = call_user_func_array($this->handler, $state->args);
$state->pending = $promise;
unset($state->args);

$that = $this;
// invoke handler and await its resolution before invoking next queued job
$this->await($promise)->then(
function ($result) use ($deferred) {
unset($deferred->pending);
function ($result) use ($deferred, &$that, $deferredHash) {
unset($that->state[$deferredHash]);
$deferred->resolve($result);
},
function ($e) use ($deferred) {
unset($deferred->pending);
function ($e) use ($deferred, &$that, $deferredHash) {
unset($that->state[$deferredHash]);
$deferred->reject($e);
}
);
Expand Down
12 changes: 12 additions & 0 deletions src/State.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

namespace Clue\React\Mq;

/** @internal */
final class State
{

public $pending;

public $args;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: missing newline