Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ much any API that already uses Promises.
* [Promises](#promises)
* [Timeout](#timeout)
* [Streaming](#streaming)
* [all()](#all)
* [Install](#install)
* [Tests](#tests)
* [License](#license)
Expand Down Expand Up @@ -425,6 +426,112 @@ $uploader = new Transformer(10, function ($data) use ($http) {
});
```

#### all()

The static `all(ReadableStreamInterface $input, int $concurrency, callable $handler): PromiseInterface<int,Exception>` method can be used to
concurrently process all jobs from the input stream through the given `$handler`.

This is a convenience method which uses the `Transformer` internally to
schedule all jobs from the input stream while limiting concurrency to
ensure no more than `$concurrency` jobs ever run at once. It will return
a promise which resolves with the total number of all successful jobs
on success.

```php
$loop = React\EventLoop\Factory::create();
$browser = new Clue\React\Buzz\Browser($loop);

$promise = Transformer::all($input, 3, function ($data) use ($browser, $url) {
return $browser->post($url, [], json_encode($data));
});

$promise->then(function ($count) {
echo 'All ' . $count . ' jobs successful!' . PHP_EOL;
});
```

If either of the jobs fail, it will reject the resulting promise, will
`close()` the input stream and will try to cancel all outstanding jobs.
Calling `cancel()` on the pending promise will `close()` the input stream
and will try to cancel all outstanding jobs. Similarly, if the `$input`
stream emits an `error` event, it will reject the resulting promise and
will try to cancel all outstanding jobs.

The `$input` parameter must be a `ReadableStreamInterface` which emits
one `data` event for each job to process. Each element will be passed to
the `$handler` to start one job. The fulfillment value for each job will
be ignored, so for best performance it's recommended to not return any
excessive data structures. When the stream emits an `end` or `close`
event, this method will wait for all outstanding jobs to complete and
then resolve with the number of successful jobs. If this stream is
already closed or does not emit any `data` events, this method will
resolve with a `0` value without processing any jobs.

```php
$input = new ThroughStream();

$promise = Transformer::all($input, 2, $handler);

$input->write('a');
$input->write('b');
$input->write('c');
$input->end();
```

Because streams are one of the core abstractions of ReactPHP, a large number
of stream implementations are available for many different use cases. For
example, this allows you to use [clue/reactphp-ndjson](https://github.com/clue/reactphp-ndjson)
or [clue/reactphp-csv](https://github.com/clue/reactphp-csv) to process
large lists of structured input data. See also [streaming](#streaming) for
more details.

The `$concurrency` parameter sets a new soft limit for the maximum number
of jobs to handle concurrently. Finding a good concurrency limit depends
on your particular use case. It's common to limit concurrency to a rather
small value, as doing more than a dozen of things at once may easily
overwhelm the receiving side. Using a `1` value will ensure that all jobs
are processed one after another, effectively creating a "waterfall" of
jobs. Using a value less than 1 will reject with an
`InvalidArgumentException` without processing any jobs.

```php
// handle up to 10 jobs concurrently
$promise = Transformer::all($stream, 10, $handler);
```

```php
// handle each job after another without concurrency (waterfall)
$promise = Transformer::all($stream, 1, $handler);
```

The `$handler` parameter must be a valid callable that accepts your job
parameter (the data from the `$input` stream), invokes the appropriate
operation and returns a Promise as a placeholder for its future result.
The fulfillment value for each job will be ignored, so for best
performance it's recommended to not return any excessive data structures.
If the given argument is not a valid callable, this method will reject
with an `InvalidArgumentExceptionn` without processing any jobs.

```php
// using a Closure as handler is usually recommended
$promise = Transformer::all($stream, 10, function ($url) use ($browser) {
return $browser->get($url);
});
```

```php
// accepts any callable, so PHP's array notation is also supported
$promise = Transformer::all($stream, 10, array($browser, 'get'));
```

Note that this method returns a promise that resolves with the total
number of successful operations only if all operations succeed. This
is mostly a convenience method that uses the [`Transformer`](#transformer)
under the hood. If your input data is small enough to fit into memory
(a few dozens or hundreds of operations), you may want to use
[clue/reactphp-mq](https://github.com/clue/reactphp-mq) instead and keep
all operations in memory without using a streaming approach.

## Install

The recommended way to install this library is [through Composer](https://getcomposer.org).
Expand Down
2 changes: 1 addition & 1 deletion examples/example.php → examples/01-transform.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,4 @@ function (ResponseInterface $response) use ($user) {
});
$transformer->on('error', 'printf');

$loop->run();
$loop->run();
52 changes: 52 additions & 0 deletions examples/02-transform-all.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
<?php

use Clue\React\Flux\Transformer;
use Psr\Http\Message\ResponseInterface;

require __DIR__ . '/../vendor/autoload.php';

$loop = React\EventLoop\Factory::create();
$browser = new Clue\React\Buzz\Browser($loop);

$concurrency = isset($argv[1]) ? $argv[1] : 3;
$url = isset($argv[2]) ? $argv[2] : 'http://httpbin.org/post';

// load a huge number of users to process from NDJSON file
$input = new Clue\React\NDJson\Decoder(
new React\Stream\ReadableResourceStream(
fopen(__DIR__ . '/users.ndjson', 'r'),
$loop
),
true
);

// each job should use the browser to POST each user object to a certain URL
// process all users by processing all users through transformer
// limit number of concurrent jobs here
$promise = Transformer::all($input, $concurrency, function ($user) use ($browser, $url) {
return $browser->post(
$url,
array('Content-Type' => 'application/json'),
json_encode($user)
)->then(function (ResponseInterface $response) {
// demo HTTP response validation
$body = json_decode($response->getBody());
if (!isset($body->json)) {
throw new RuntimeException('Unexpected response');
}
});
});

$promise->then(
function ($count) {
echo 'Successfully processed all ' . $count . ' user records' . PHP_EOL;
},
function (Exception $e) {
echo 'An error occured: ' . $e->getMessage() . PHP_EOL;
if ($e->getPrevious()) {
echo 'Previous: ' . $e->getPrevious()->getMessage() . PHP_EOL;
}
}
);

$loop->run();
157 changes: 157 additions & 0 deletions src/Transformer.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
use React\Stream\DuplexStreamInterface;
use React\Stream\Util;
use React\Stream\WritableStreamInterface;
use React\Stream\ReadableStreamInterface;
use React\Promise;
use React\Promise\Deferred;
use React\Promise\PromiseInterface;

/**
* The `Transformer` passes all input data through its transformation handler
Expand Down Expand Up @@ -296,6 +300,159 @@ final class Transformer extends EventEmitter implements DuplexStreamInterface
private $promises = array();
private $queued = array();

/**
* Concurrently process all jobs from the input stream through the given `$handler`.
*
* This is a convenience method which uses the `Transformer` internally to
* schedule all jobs from the input stream while limiting concurrency to
* ensure no more than `$concurrency` jobs ever run at once. It will return
* a promise which resolves with the total number of all successful jobs
* on success.
*
* ```php
* $loop = React\EventLoop\Factory::create();
* $browser = new Clue\React\Buzz\Browser($loop);
*
* $promise = Transformer::all($input, 3, function ($data) use ($browser, $url) {
* return $browser->post($url, [], json_encode($data));
* });
*
* $promise->then(function ($count) {
* echo 'All ' . $count . ' jobs successful!' . PHP_EOL;
* });
* ```
*
* If either of the jobs fail, it will reject the resulting promise, will
* `close()` the input stream and will try to cancel all outstanding jobs.
* Calling `cancel()` on the pending promise will `close()` the input stream
* and will try to cancel all outstanding jobs. Similarly, if the `$input`
* stream emits an `error` event, it will reject the resulting promise and
* will try to cancel all outstanding jobs.
*
* The `$input` parameter must be a `ReadableStreamInterface` which emits
* one `data` event for each job to process. Each element will be passed to
* the `$handler` to start one job. The fulfillment value for each job will
* be ignored, so for best performance it's recommended to not return any
* excessive data structures. When the stream emits an `end` or `close`
* event, this method will wait for all outstanding jobs to complete and
* then resolve with the number of successful jobs. If this stream is
* already closed or does not emit any `data` events, this method will
* resolve with a `0` value without processing any jobs.
*
* ```php
* $input = new ThroughStream();
*
* $promise = Transformer::all($input, 2, $handler);
*
* $input->write('a');
* $input->write('b');
* $input->write('c');
* $input->end();
* ```
*
* Because streams are one of the core abstractions of ReactPHP, a large number
* of stream implementations are available for many different use cases. For
* example, this allows you to use [clue/reactphp-ndjson](https://github.com/clue/reactphp-ndjson)
* or [clue/reactphp-csv](https://github.com/clue/reactphp-csv) to process
* large lists of structured input data. See also [streaming](#streaming) for
* more details.
*
* The `$concurrency` parameter sets a new soft limit for the maximum number
* of jobs to handle concurrently. Finding a good concurrency limit depends
* on your particular use case. It's common to limit concurrency to a rather
* small value, as doing more than a dozen of things at once may easily
* overwhelm the receiving side. Using a `1` value will ensure that all jobs
* are processed one after another, effectively creating a "waterfall" of
* jobs. Using a value less than 1 will reject with an
* `InvalidArgumentException` without processing any jobs.
*
* ```php
* // handle up to 10 jobs concurrently
* $promise = Transformer::all($stream, 10, $handler);
* ```
*
* ```php
* // handle each job after another without concurrency (waterfall)
* $promise = Transformer::all($stream, 1, $handler);
* ```
*
* The `$handler` parameter must be a valid callable that accepts your job
* parameter (the data from the `$input` stream), invokes the appropriate
* operation and returns a Promise as a placeholder for its future result.
* The fulfillment value for each job will be ignored, so for best
* performance it's recommended to not return any excessive data structures.
* If the given argument is not a valid callable, this method will reject
* with an `InvalidArgumentExceptionn` without processing any jobs.
*
* ```php
* // using a Closure as handler is usually recommended
* $promise = Transformer::all($stream, 10, function ($url) use ($browser) {
* return $browser->get($url);
* });
* ```
*
* ```php
* // accepts any callable, so PHP's array notation is also supported
* $promise = Transformer::all($stream, 10, array($browser, 'get'));
* ```
*
* Note that this method returns a promise that resolves with the total
* number of successful operations only if all operations succeed. This
* is mostly a convenience method that uses the [`Transformer`](#transformer)
* under the hood. If your input data is small enough to fit into memory
* (a few dozens or hundreds of operations), you may want to use
* [clue/reactphp-mq](https://github.com/clue/reactphp-mq) instead and keep
* all operations in memory without using a streaming approach.
*
* @param ReadableStreamInterface $input
* @param int $concurrency
* @param callable $callback
* @return PromiseInterface Returns a Promise<int,Exception>
*/
public static function all(ReadableStreamInterface $input, $concurrency, $callback)
{
if (!$input->isReadable()) {
return Promise\resolve(0);
}

try {
$stream = new self($concurrency, $callback);
} catch (\InvalidArgumentException $e) {
return Promise\reject($e);
}

$deferred = new Deferred(function ($_, $reject) use ($input, $stream) {
$reject(new \RuntimeException('Transformer cancelled'));
$input->close();
$stream->close();
});

// forward input data through transformer until input stream ends/closes
$input->pipe($stream);
$input->on('close', array($stream, 'end'));

// count number of successful transformations and resolve with count on end
$count = 0;
$stream->on('data', function () use (&$count) {
++$count;
});
$stream->on('end', function () use (&$count, $deferred) {
$deferred->resolve($count);
});

// input error or transformation error should reject result
$input->on('error', function ($error) use ($deferred, $stream) {
$deferred->reject($error);
$stream->close();
});
$stream->on('error', function ($error) use ($deferred, $input) {
$deferred->reject($error);
$input->close();
});

return $deferred->promise();
}

/**
* Instantiates a new Transformer instance.
*
Expand Down
Loading