Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 80 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ much any API that already uses Promises.
* [Promises](#promises)
* [Cancellation](#cancellation)
* [Timeout](#timeout)
* [all()](#all)
* [Blocking](#blocking)
* [Install](#install)
* [Tests](#tests)
Expand Down Expand Up @@ -256,6 +257,81 @@ $promise = Timer\timeout($q($url), 2.0, $loop);
Please refer to [react/promise-timer](https://github.com/reactphp/promise-timer)
for more details.

#### all()

The static `all(int $concurrency, array $jobs, callable $handler): PromiseInterface<mixed[]>` method can be used to
concurrently process all given jobs through the given `$handler`.

This is a convenience method which uses the `Queue` internally to
schedule all jobs while limiting concurrency to ensure no more than
`$concurrency` jobs ever run at once. It will return a promise which
resolves with the results of all jobs on success.

```php
$loop = React\EventLoop\Factory::create();
$browser = new Clue\React\Buzz\Browser($loop);

$promise = Queue:all(3, $urls, function ($url) use ($browser) {
return $browser->get($url);
});

$promise->then(function (array $responses) {
echo 'All ' . count($responses) . ' successful!' . PHP_EOL;
});
```

If either of the jobs fail, it will reject the resulting promise and will
try to cancel all outstanding jobs. Similarly, calling `cancel()` on the
resulting promise will try to cancel all outstanding jobs. See
[promises](#promises) and [cancellation](#cancellation) for details.

The `$concurrency` parameter sets a new soft limit for the maximum number
of jobs to handle concurrently. Finding a good concurrency limit depends
on your particular use case. It's common to limit concurrency to a rather
small value, as doing more than a dozen of things at once may easily
overwhelm the receiving side. Using a `1` value will ensure that all jobs
are processed one after another, effectively creating a "waterfall" of
jobs. Using a value less than 1 will reject with an
`InvalidArgumentException` without processing any jobs.

```php
// handle up to 10 jobs concurrently
$promise = Queue:all(10, $jobs, $handler);
```

```php
// handle each job after another without concurrency (waterfall)
$promise = Queue:all(1, $jobs, $handler);
```

The `$jobs` parameter must be an array with all jobs to process. Each
value in this array will be passed to the `$handler` to start one job.
The array keys will be preserved in the resulting array, while the array
values will be replaced with the job results as returned by the
`$handler`. If this array is empty, this method will resolve with an
empty array without processing any jobs.

The `$handler` parameter must be a valid callable that accepts your job
parameters, invokes the appropriate operation and returns a Promise as a
placeholder for its future result. If the given argument is not a valid
callable, this method will reject with an `InvalidArgumentExceptionn`
without processing any jobs.

```php
// using a Closure as handler is usually recommended
$promise = Queue::all(10, $jobs, function ($url) use ($browser) {
return $browser->get($url);
});
```

```php
// accepts any callable, so PHP's array notation is also supported
$promise = Queue:all(10, $jobs, array($browser, 'get'));
```

> Keep in mind that returning an array of response messages means that
the whole response body has to be kept in memory.

#### Blocking

As stated above, this library provides you a powerful, async API by default.
Expand All @@ -272,18 +348,12 @@ use Clue\React\Block;
$loop = React\EventLoop\Factory::create();
$browser = new Clue\React\Buzz\Browser($loop);

$q = new Queue(10, null, function ($url) use ($browser) {
$promise = Queue:all(3, $urls, function ($url) use ($browser) {
return $browser->get($url);
});

$promises = array(
$q('http://example.com/'),
$q('http://www.example.org/'),
$q('http://example.net/'),
);

try {
$responses = Block\awaitAll($promises, $loop);
$responses = Block\await($promise, $loop);
// responses successfully received
} catch (Exception $e) {
// an error occured while performing the requests
Expand All @@ -306,16 +376,11 @@ function download(array $uris)
$loop = React\EventLoop\Factory::create();
$browser = new Clue\React\Buzz\Browser($loop);

$q = new Queue(10, null, function ($uri) use ($browser) {
$promise = Queue::all(3, $uris, function ($uri) use ($browser) {
return $browser->get($uri);
});

$promises = array();
foreach ($uris as $uri) {
$promises[$uri] = $q($uri);
}

return Clue\React\Block\awaitAll($promises, $loop);
return Clue\React\Block\await($promise, $loop);
}
```

Expand Down
42 changes: 42 additions & 0 deletions examples/02-http-all.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<?php

use Clue\React\Buzz\Browser;
use Clue\React\Mq\Queue;
use Psr\Http\Message\ResponseInterface;
use React\EventLoop\Factory;

require __DIR__ . '/../vendor/autoload.php';

// list of all URLs you want to download
// this list may potentially contain hundreds or thousands of entries
$urls = array(
'http://www.github.com/',
'http://www.yahoo.com/',
'http://www.bing.com/',
'http://www.google.com/',
//'http://httpbin.org/delay/2',
);

$loop = Factory::create();
$browser = new Browser($loop);

// each job should use the browser to GET a certain URL
// limit number of concurrent jobs here to avoid using excessive network resources
$promise = Queue::all(3, array_combine($urls, $urls), function ($url) use ($browser) {
return $browser->get($url);
});

$promise->then(
function ($responses) {
/* @var $responses ResponseInterface[] */
echo 'All URLs succeeded!' . PHP_EOL;
foreach ($responses as $url => $response) {
echo $url . ' has ' . $response->getBody()->getSize() . ' bytes' . PHP_EOL;
}
},
function ($e) {
echo 'An error occured: ' . $e->getMessage() . PHP_EOL;
}
);

$loop->run();
114 changes: 114 additions & 0 deletions src/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,120 @@ class Queue implements \Countable
private $pending = 0;
private $queue = array();

/**
* Concurrently process all given jobs through the given `$handler`.
*
* This is a convenience method which uses the `Queue` internally to
* schedule all jobs while limiting concurrency to ensure no more than
* `$concurrency` jobs ever run at once. It will return a promise which
* resolves with the results of all jobs on success.
*
* ```php
* $loop = React\EventLoop\Factory::create();
* $browser = new Clue\React\Buzz\Browser($loop);
*
* $promise = Queue:all(3, $urls, function ($url) use ($browser) {
* return $browser->get($url);
* });
*
* $promise->then(function (array $responses) {
* echo 'All ' . count($responses) . ' successful!' . PHP_EOL;
* });
* ```
*
* If either of the jobs fail, it will reject the resulting promise and will
* try to cancel all outstanding jobs. Similarly, calling `cancel()` on the
* resulting promise will try to cancel all outstanding jobs. See
* [promises](#promises) and [cancellation](#cancellation) for details.
*
* The `$concurrency` parameter sets a new soft limit for the maximum number
* of jobs to handle concurrently. Finding a good concurrency limit depends
* on your particular use case. It's common to limit concurrency to a rather
* small value, as doing more than a dozen of things at once may easily
* overwhelm the receiving side. Using a `1` value will ensure that all jobs
* are processed one after another, effectively creating a "waterfall" of
* jobs. Using a value less than 1 will reject with an
* `InvalidArgumentException` without processing any jobs.
*
* ```php
* // handle up to 10 jobs concurrently
* $promise = Queue:all(10, $jobs, $handler);
* ```
*
* ```php
* // handle each job after another without concurrency (waterfall)
* $promise = Queue:all(1, $jobs, $handler);
* ```
*
* The `$jobs` parameter must be an array with all jobs to process. Each
* value in this array will be passed to the `$handler` to start one job.
* The array keys will be preserved in the resulting array, while the array
* values will be replaced with the job results as returned by the
* `$handler`. If this array is empty, this method will resolve with an
* empty array without processing any jobs.
*
* The `$handler` parameter must be a valid callable that accepts your job
* parameters, invokes the appropriate operation and returns a Promise as a
* placeholder for its future result. If the given argument is not a valid
* callable, this method will reject with an `InvalidArgumentExceptionn`
* without processing any jobs.
*
* ```php
* // using a Closure as handler is usually recommended
* $promise = Queue::all(10, $jobs, function ($url) use ($browser) {
* return $browser->get($url);
* });
* ```
*
* ```php
* // accepts any callable, so PHP's array notation is also supported
* $promise = Queue:all(10, $jobs, array($browser, 'get'));
* ```
*
* > Keep in mind that returning an array of response messages means that
* the whole response body has to be kept in memory.
*
* @param int $concurrency concurrency soft limit
* @param array $jobs
* @param callable $handler
* @return PromiseInterface Returns a Promise<mixed[]> which resolves with an array of all resolution values
* or rejects when any of the operations reject.
*/
public static function all($concurrency, array $jobs, $handler)
{
try {
// limit number of concurrent operations
$q = new self($concurrency, null, $handler);
} catch (\InvalidArgumentException $e) {
// reject if $concurrency or $handler is invalid
return Promise\reject($e);
}

// try invoking all operations and automatically queue excessive ones
$promises = array_map($q, $jobs);

return new Promise\Promise(function ($resolve, $reject) use ($promises) {
Promise\all($promises)->then($resolve, function ($e) use ($promises, $reject) {
// cancel all pending promises if a single promise fails
foreach (array_reverse($promises) as $promise) {
if ($promise instanceof CancellablePromiseInterface) {
$promise->cancel();
}
}

// reject with original rejection message
$reject($e);
});
}, function () use ($promises) {
// cancel all pending promises on cancellation
foreach (array_reverse($promises) as $promise) {
if ($promise instanceof CancellablePromiseInterface) {
$promise->cancel();
}
}
});
}

/**
* Instantiates a new queue object.
*
Expand Down
Loading