diff --git a/CHANGELOG.md b/CHANGELOG.md index f914a82..914930b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## Unreleased + +* Add optional `$timeout` parameter to `await*()` methods. + ## 0.1.1 (2015-04-05) * `run()` the loop instead of making it `tick()`. diff --git a/src/Blocker.php b/src/Blocker.php index 63f3761..ab58cda 100644 --- a/src/Blocker.php +++ b/src/Blocker.php @@ -40,38 +40,20 @@ public function wait($time) * block waiting for the given $promise to resolve * * @param PromiseInterface $promise + * @param double $timeout maximum time to wait in seconds * @return mixed returns whatever the promise resolves to * @throws Exception when the promise is rejected + * @throws TimeoutException when the timeout is reached and the promise is not resolved */ - public function awaitOne(PromiseInterface $promise) + public function awaitOne(PromiseInterface $promise, $timeout = null) { $wait = true; - $resolved = null; - $exception = null; - $loop = $this->loop; - - $promise->then( - function ($c) use (&$resolved, &$wait, $loop) { - $resolved = $c; - $wait = false; - $loop->stop(); - }, - function ($error) use (&$exception, &$wait, $loop) { - $exception = $error; - $wait = false; - $loop->stop(); - } - ); - - while ($wait) { - $loop->run(); - } + $resolution = null; - if ($exception !== null) { - throw $exception; - } + $onComplete = $this->getOnCompleteFn($resolution, $wait, array($promise), $timeout); + $promise->then($onComplete, $onComplete); - return $resolved; + return $this->awaitResolution($wait, $resolution); } /** @@ -83,60 +65,37 @@ function ($error) use (&$exception, &$wait, $loop) { * If ALL promises fail to resolve, this will fail and throw an Exception. * * @param array $promises + * @param double $timeout maximum time to wait in seconds * @return mixed returns whatever the first promise resolves to * @throws Exception if ALL promises are rejected + * @throws TimeoutException if the timeout is reached and NO promise is resolved */ - public function awaitRace(array $promises) + public function awaitRace(array $promises, $timeout = null) { + if (!count($promises)) { + throw new UnderflowException('No promise could resolve'); + } + $wait = count($promises); - $value = null; - $success = false; - $loop = $this->loop; + $resolution = null; - foreach ($promises as $key => $promise) { + $onComplete = $this->getOnCompleteFn($resolution, $wait, $promises, $timeout); + + foreach ($promises as $promise) { /* @var $promise PromiseInterface */ $promise->then( - function ($return) use (&$value, &$wait, &$success, $promises, $loop) { - if (!$wait) { - // only store first promise value - return; - } - $value = $return; - $wait = 0; - $success = true; - - // cancel all remaining promises - foreach ($promises as $promise) { - if ($promise instanceof CancellablePromiseInterface) { - $promise->cancel(); - } - } - - $loop->stop(); - }, - function ($e) use (&$wait, $loop) { - if ($wait) { - // count number of promises to await - // cancelling promises will reject all remaining ones, ignore this + $onComplete, + function ($e) use (&$wait, $onComplete) { + if ($wait == 1) { + $onComplete(new UnderflowException('No promise could resolve')); + } elseif ($wait) { --$wait; - - if (!$wait) { - $loop->stop(); - } } } ); } - while ($wait) { - $loop->run(); - } - - if (!$success) { - throw new UnderflowException('No promise could resolve'); - } - - return $value; + return $this->awaitResolution($wait, $resolution); } /** @@ -150,56 +109,94 @@ function ($e) use (&$wait, $loop) { * remaining promises and throw an Exception. * * @param array $promises + * @param double $timeout maximum time to wait in seconds * @return array returns an array with whatever each promise resolves to * @throws Exception when ANY promise is rejected + * @throws TimeoutException if the timeout is reached and ANY promise is not resolved */ - public function awaitAll(array $promises) + public function awaitAll(array $promises, $timeout = null) { + if (!count($promises)) { + return array(); + } + $wait = count($promises); - $exception = null; + $resolution = null; $values = array(); - $loop = $this->loop; + + $onComplete = $this->getOnCompleteFn($resolution, $wait, $promises, $timeout); foreach ($promises as $key => $promise) { /* @var $promise PromiseInterface */ $promise->then( - function ($value) use (&$values, $key, &$wait, $loop) { + function ($value) use (&$wait, &$values, $key, $onComplete) { $values[$key] = $value; - --$wait; - if (!$wait) { - $loop->stop(); + if ($wait == 1) { + $onComplete($values); + } elseif ($wait) { + --$wait; } }, - function ($e) use ($promises, &$exception, &$wait, $loop) { - if (!$wait) { - // cancelling promises will reject all remaining ones, only store first error - return; - } - - $exception = $e; - $wait = 0; - - // cancel all remaining promises - foreach ($promises as $promise) { - if ($promise instanceof CancellablePromiseInterface) { - $promise->cancel(); - } - } - - $loop->stop(); - } + $onComplete ); } + return $this->awaitResolution($wait, $resolution); + } + + private function awaitResolution(&$wait, &$resolution) + { while ($wait) { - $loop->run(); + $this->loop->run(); + } + + if ($resolution instanceof Exception) { + throw $resolution; } - if ($exception !== null) { - throw $exception; + return $resolution; + } + + private function getOnCompleteFn(&$resolution, &$wait, array $promises, $timeout) + { + $loop = $this->loop; + + $onComplete = function ($valueOrError) use (&$resolution, &$wait, $promises, $loop) { + if (!$wait) { + // only store first promise value + return; + } + + $resolution = $valueOrError; + $wait = false; + + // cancel all remaining promises + foreach ($promises as $promise) { + if ($promise instanceof CancellablePromiseInterface) { + $promise->cancel(); + } + } + + $loop->stop(); + }; + + if ($timeout) { + $onComplete = $this->applyTimeout($timeout, $onComplete); } - return $values; + return $onComplete; + } + + private function applyTimeout($timeout, $onComplete) + { + $timer = $this->loop->addTimer($timeout, function () use ($onComplete) { + $onComplete(new TimeoutException('Could not resolve in the allowed time')); + }); + + return function ($valueOrError) use ($timer, $onComplete) { + $timer->cancel(); + $onComplete($valueOrError); + }; } } diff --git a/src/TimeoutException.php b/src/TimeoutException.php new file mode 100644 index 0000000..31df351 --- /dev/null +++ b/src/TimeoutException.php @@ -0,0 +1,10 @@ +assertEquals(2, $this->block->awaitOne($promise)); } + public function testAwaitOneTimedOut() + { + $promise = $this->createPromiseResolved(2, 0.02); + + $this->setExpectedException(self::TIMEOUT_EXCEPTION_CLASS); + $this->block->awaitOne($promise, 0.01); + } + + public function testAwaitOneTimeoutCleanedUp() + { + $promise = $this->createPromiseResolved(1, 0.01); + $this->assertEquals(1, $this->block->awaitOne($promise, 0.02)); + + $loop = $this->loop; + $timerInvoked = false; + $loop->addTimer(0.02, function () use (&$timerInvoked, $loop) { + $timerInvoked = true; + $loop->stop(); + }); + + $loop->run(); + $this->assertTrue($timerInvoked); + } + /** * @expectedException UnderflowException */ @@ -105,6 +131,45 @@ public function testAwaitRaceInterrupted() $this->assertEquals(2, $this->block->awaitRace(array($promise))); } + public function testAwaitRaceOneTimedOut() + { + $all = array( + $this->createPromiseResolved(1, 0.03), + $this->createPromiseResolved(2, 0.01), + $this->createPromiseResolved(3, 0.03), + ); + + $this->assertEquals(2, $this->block->awaitRace($all, 0.2)); + } + + public function testAwaitRaceAllTimedOut() + { + $all = array( + $this->createPromiseResolved(1, 0.03), + $this->createPromiseResolved(2, 0.02), + $this->createPromiseResolved(3, 0.03), + ); + + $this->setExpectedException(self::TIMEOUT_EXCEPTION_CLASS); + $this->block->awaitRace($all, 0.01); + } + + public function testAwaitRaceTimeoutCleanedUp() + { + $promise = $this->createPromiseResolved(1, 0.01); + $this->assertEquals(1, $this->block->awaitRace(array($promise), 0.02)); + + $loop = $this->loop; + $timerInvoked = false; + $loop->addTimer(0.02, function () use (&$timerInvoked, $loop) { + $timerInvoked = true; + $loop->stop(); + }); + + $loop->run(); + $this->assertTrue($timerInvoked); + } + public function testAwaitAllEmpty() { $this->assertEquals(array(), $this->block->awaitAll(array())); @@ -150,6 +215,34 @@ public function testAwaitAllInterrupted() $this->assertEquals(array(2), $this->block->awaitAll(array($promise))); } + public function testAwaitAllOneTimedOut() + { + $all = array( + $this->createPromiseResolved(1, 0.01), + $this->createPromiseResolved(2, 0.03), + $this->createPromiseResolved(3, 0.01), + ); + + $this->setExpectedException(self::TIMEOUT_EXCEPTION_CLASS); + $this->block->awaitAll($all, 0.02); + } + + public function testAwaitAllTimeoutCleanedUp() + { + $promise = $this->createPromiseResolved(1, 0.01); + $this->assertEquals(array(1), $this->block->awaitAll(array($promise), 0.02)); + + $loop = $this->loop; + $timerInvoked = false; + $loop->addTimer(0.02, function () use (&$timerInvoked, $loop) { + $timerInvoked = true; + $loop->stop(); + }); + + $loop->run(); + $this->assertTrue($timerInvoked); + } + private function createPromiseResolved($value = null, $delay = 0.01) { $deferred = new Deferred();