Skip to content
Closed
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## Unreleased

* Add optional `$timeout` parameter to `await*()` methods.

## 0.1.1 (2015-04-05)

* `run()` the loop instead of making it `tick()`.
Expand Down
185 changes: 91 additions & 94 deletions src/Blocker.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,38 +40,20 @@ public function wait($time)
* block waiting for the given $promise to resolve
*
* @param PromiseInterface $promise
* @param double $timeout maximum time to wait in seconds
* @return mixed returns whatever the promise resolves to
* @throws Exception when the promise is rejected
* @throws TimeoutException when the timeout is reached and the promise is not resolved
*/
public function awaitOne(PromiseInterface $promise)
public function awaitOne(PromiseInterface $promise, $timeout = null)
{
$wait = true;
$resolved = null;
$exception = null;
$loop = $this->loop;

$promise->then(
function ($c) use (&$resolved, &$wait, $loop) {
$resolved = $c;
$wait = false;
$loop->stop();
},
function ($error) use (&$exception, &$wait, $loop) {
$exception = $error;
$wait = false;
$loop->stop();
}
);

while ($wait) {
$loop->run();
}
$resolution = null;

if ($exception !== null) {
throw $exception;
}
$onComplete = $this->getOnCompleteFn($resolution, $wait, array($promise), $timeout);
$promise->then($onComplete, $onComplete);

return $resolved;
return $this->awaitResolution($wait, $resolution);
}

/**
Expand All @@ -83,60 +65,37 @@ function ($error) use (&$exception, &$wait, $loop) {
* If ALL promises fail to resolve, this will fail and throw an Exception.
*
* @param array $promises
* @param double $timeout maximum time to wait in seconds
* @return mixed returns whatever the first promise resolves to
* @throws Exception if ALL promises are rejected
* @throws TimeoutException if the timeout is reached and NO promise is resolved
*/
public function awaitRace(array $promises)
public function awaitRace(array $promises, $timeout = null)
{
if (!count($promises)) {
throw new UnderflowException('No promise could resolve');
}

$wait = count($promises);
$value = null;
$success = false;
$loop = $this->loop;
$resolution = null;

foreach ($promises as $key => $promise) {
$onComplete = $this->getOnCompleteFn($resolution, $wait, $promises, $timeout);

foreach ($promises as $promise) {
/* @var $promise PromiseInterface */
$promise->then(
function ($return) use (&$value, &$wait, &$success, $promises, $loop) {
if (!$wait) {
// only store first promise value
return;
}
$value = $return;
$wait = 0;
$success = true;

// cancel all remaining promises
foreach ($promises as $promise) {
if ($promise instanceof CancellablePromiseInterface) {
$promise->cancel();
}
}

$loop->stop();
},
function ($e) use (&$wait, $loop) {
if ($wait) {
// count number of promises to await
// cancelling promises will reject all remaining ones, ignore this
$onComplete,
function ($e) use (&$wait, $onComplete) {
if ($wait == 1) {
$onComplete(new UnderflowException('No promise could resolve'));
} elseif ($wait) {
--$wait;

if (!$wait) {
$loop->stop();
}
}
}
);
}

while ($wait) {
$loop->run();
}

if (!$success) {
throw new UnderflowException('No promise could resolve');
}

return $value;
return $this->awaitResolution($wait, $resolution);
}

/**
Expand All @@ -150,56 +109,94 @@ function ($e) use (&$wait, $loop) {
* remaining promises and throw an Exception.
*
* @param array $promises
* @param double $timeout maximum time to wait in seconds
* @return array returns an array with whatever each promise resolves to
* @throws Exception when ANY promise is rejected
* @throws TimeoutException if the timeout is reached and ANY promise is not resolved
*/
public function awaitAll(array $promises)
public function awaitAll(array $promises, $timeout = null)
{
if (!count($promises)) {
return array();
}

$wait = count($promises);
$exception = null;
$resolution = null;
$values = array();
$loop = $this->loop;

$onComplete = $this->getOnCompleteFn($resolution, $wait, $promises, $timeout);

foreach ($promises as $key => $promise) {
/* @var $promise PromiseInterface */
$promise->then(
function ($value) use (&$values, $key, &$wait, $loop) {
function ($value) use (&$wait, &$values, $key, $onComplete) {
$values[$key] = $value;
--$wait;

if (!$wait) {
$loop->stop();
if ($wait == 1) {
$onComplete($values);
} elseif ($wait) {
--$wait;
}
},
function ($e) use ($promises, &$exception, &$wait, $loop) {
if (!$wait) {
// cancelling promises will reject all remaining ones, only store first error
return;
}

$exception = $e;
$wait = 0;

// cancel all remaining promises
foreach ($promises as $promise) {
if ($promise instanceof CancellablePromiseInterface) {
$promise->cancel();
}
}

$loop->stop();
}
$onComplete
);
}

return $this->awaitResolution($wait, $resolution);
}

private function awaitResolution(&$wait, &$resolution)
{
while ($wait) {
$loop->run();
$this->loop->run();
}

if ($resolution instanceof Exception) {
throw $resolution;
}

if ($exception !== null) {
throw $exception;
return $resolution;
}

private function getOnCompleteFn(&$resolution, &$wait, array $promises, $timeout)
{
$loop = $this->loop;

$onComplete = function ($valueOrError) use (&$resolution, &$wait, $promises, $loop) {
if (!$wait) {
// only store first promise value
return;
}

$resolution = $valueOrError;
$wait = false;

// cancel all remaining promises
foreach ($promises as $promise) {
if ($promise instanceof CancellablePromiseInterface) {
$promise->cancel();
}
}

$loop->stop();
};

if ($timeout) {
$onComplete = $this->applyTimeout($timeout, $onComplete);
}

return $values;
return $onComplete;
}

private function applyTimeout($timeout, $onComplete)
{
$timer = $this->loop->addTimer($timeout, function () use ($onComplete) {
$onComplete(new TimeoutException('Could not resolve in the allowed time'));
});

return function ($valueOrError) use ($timer, $onComplete) {
$timer->cancel();
$onComplete($valueOrError);
};
}
}
10 changes: 10 additions & 0 deletions src/TimeoutException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

namespace Clue\React\Block;

use RuntimeException;

class TimeoutException extends RuntimeException
{

}
93 changes: 93 additions & 0 deletions tests/BlockerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

class BlockerTest extends TestCase
{
const TIMEOUT_EXCEPTION_CLASS = 'Clue\React\Block\TimeoutException';

private $loop;
private $block;

Expand Down Expand Up @@ -46,6 +48,30 @@ public function testAwaitOneInterrupted()
$this->assertEquals(2, $this->block->awaitOne($promise));
}

public function testAwaitOneTimedOut()
{
$promise = $this->createPromiseResolved(2, 0.02);

$this->setExpectedException(self::TIMEOUT_EXCEPTION_CLASS);
$this->block->awaitOne($promise, 0.01);
}

public function testAwaitOneTimeoutCleanedUp()
{
$promise = $this->createPromiseResolved(1, 0.01);
$this->assertEquals(1, $this->block->awaitOne($promise, 0.02));

$loop = $this->loop;
$timerInvoked = false;
$loop->addTimer(0.02, function () use (&$timerInvoked, $loop) {
$timerInvoked = true;
$loop->stop();
});

$loop->run();
$this->assertTrue($timerInvoked);
}

/**
* @expectedException UnderflowException
*/
Expand Down Expand Up @@ -105,6 +131,45 @@ public function testAwaitRaceInterrupted()
$this->assertEquals(2, $this->block->awaitRace(array($promise)));
}

public function testAwaitRaceOneTimedOut()
{
$all = array(
$this->createPromiseResolved(1, 0.03),
$this->createPromiseResolved(2, 0.01),
$this->createPromiseResolved(3, 0.03),
);

$this->assertEquals(2, $this->block->awaitRace($all, 0.2));
}

public function testAwaitRaceAllTimedOut()
{
$all = array(
$this->createPromiseResolved(1, 0.03),
$this->createPromiseResolved(2, 0.02),
$this->createPromiseResolved(3, 0.03),
);

$this->setExpectedException(self::TIMEOUT_EXCEPTION_CLASS);
$this->block->awaitRace($all, 0.01);
}

public function testAwaitRaceTimeoutCleanedUp()
{
$promise = $this->createPromiseResolved(1, 0.01);
$this->assertEquals(1, $this->block->awaitRace(array($promise), 0.02));

$loop = $this->loop;
$timerInvoked = false;
$loop->addTimer(0.02, function () use (&$timerInvoked, $loop) {
$timerInvoked = true;
$loop->stop();
});

$loop->run();
$this->assertTrue($timerInvoked);
}

public function testAwaitAllEmpty()
{
$this->assertEquals(array(), $this->block->awaitAll(array()));
Expand Down Expand Up @@ -150,6 +215,34 @@ public function testAwaitAllInterrupted()
$this->assertEquals(array(2), $this->block->awaitAll(array($promise)));
}

public function testAwaitAllOneTimedOut()
{
$all = array(
$this->createPromiseResolved(1, 0.01),
$this->createPromiseResolved(2, 0.03),
$this->createPromiseResolved(3, 0.01),
);

$this->setExpectedException(self::TIMEOUT_EXCEPTION_CLASS);
$this->block->awaitAll($all, 0.02);
}

public function testAwaitAllTimeoutCleanedUp()
{
$promise = $this->createPromiseResolved(1, 0.01);
$this->assertEquals(array(1), $this->block->awaitAll(array($promise), 0.02));

$loop = $this->loop;
$timerInvoked = false;
$loop->addTimer(0.02, function () use (&$timerInvoked, $loop) {
$timerInvoked = true;
$loop->stop();
});

$loop->run();
$this->assertTrue($timerInvoked);
}

private function createPromiseResolved($value = null, $delay = 0.01)
{
$deferred = new Deferred();
Expand Down