diff --git a/README.md b/README.md index 37d718f..c22ff6d 100644 --- a/README.md +++ b/README.md @@ -8,14 +8,73 @@ built on top of [ReactPHP](https://reactphp.org/). **Table of contents** * [Quickstart example](#quickstart-example) +* [Usage](#usage) + * [EventSource](#eventsource) * [Install](#install) * [Tests](#tests) * [License](#license) ## Quickstart example +Once [installed](#install), you can use the following code to stream messages +from any Server-Sent Events (SSE) server endpoint: + +```php +$loop = Factory::create(); +$es = new EventSource('https://example.com/stream.php', $loop); + +$es->on('message', function (MessageEvent $message) { + //$data = json_decode($message->data); + var_dump($message); +}); + +$loop->run(); +``` + See the [examples](examples). +## Usage + +### EventSource + +The `EventSource` class is responsible for communication with the remote Server-Sent Events (SSE) endpoint. + +The `EventSource` object works very similar to the one found in common +web browsers. Unless otherwise noted, it follows the same semantics as defined +under https://html.spec.whatwg.org/multipage/server-sent-events.html + +It requires the URL to the remote Server-Sent Events (SSE) endpoint and also +registers everything with the main [`EventLoop`](https://github.com/reactphp/event-loop#usage) +in order to handle async HTTP requests. + +```php +$loop = \React\EventLoop\Factory::create(); + +$es = new \Clue\React\EventSource\EventSource('https://example.com/stream.php', $loop); +``` + +If you need custom connector settings (DNS resolution, TLS parameters, timeouts, +proxy servers etc.), you can explicitly pass a custom instance of the +[`ConnectorInterface`](https://github.com/reactphp/socket#connectorinterface) +to the [`Browser`](https://github.com/clue/reactphp-buzz#browser) instance +and pass it as an additional argument to the `EventSource` like this: + +```php +$connector = new \React\Socket\Connector($loop, array( + 'dns' => '127.0.0.1', + 'tcp' => array( + 'bindto' => '192.168.10.1:0' + ), + 'tls' => array( + 'verify_peer' => false, + 'verify_peer_name' => false + ) +)); +$browser = new \Clue\React\Buzz\Browser($loop, $connector); + +$es = new \Clue\React\EventSource\EventSource('https://example.com/stream.php', $loop, $browser); +``` + ## Install The recommended way to install this library is [through Composer](https://getcomposer.org). diff --git a/src/EventSource.php b/src/EventSource.php index f07f349..6ec3c88 100644 --- a/src/EventSource.php +++ b/src/EventSource.php @@ -2,13 +2,51 @@ namespace Clue\React\EventSource; -use React\EventLoop\LoopInterface; -use Psr\Http\Message\ResponseInterface; -use React\Stream\ReadableStreamInterface; use Clue\React\Buzz\Browser; use Evenement\EventEmitter; -use React\Socket\ConnectorInterface; +use Psr\Http\Message\ResponseInterface; +use React\EventLoop\LoopInterface; +use React\Stream\ReadableStreamInterface; +/** + * The `EventSource` class is responsible for communication with the remote Server-Sent Events (SSE) endpoint. + * + * The `EventSource` object works very similar to the one found in common + * web browsers. Unless otherwise noted, it follows the same semantics as defined + * under https://html.spec.whatwg.org/multipage/server-sent-events.html + * + * It requires the URL to the remote Server-Sent Events (SSE) endpoint and also + * registers everything with the main [`EventLoop`](https://github.com/reactphp/event-loop#usage) + * in order to handle async HTTP requests. + * + * ```php + * $loop = \React\EventLoop\Factory::create(); + * + * $es = new \Clue\React\EventSource\EventSource('https://example.com/stream.php', $loop); + * ``` + * + * If you need custom connector settings (DNS resolution, TLS parameters, timeouts, + * proxy servers etc.), you can explicitly pass a custom instance of the + * [`ConnectorInterface`](https://github.com/reactphp/socket#connectorinterface) + * to the [`Browser`](https://github.com/clue/reactphp-buzz#browser) instance + * and pass it as an additional argument to the `EventSource` like this: + * + * ```php + * $connector = new \React\Socket\Connector($loop, array( + * 'dns' => '127.0.0.1', + * 'tcp' => array( + * 'bindto' => '192.168.10.1:0' + * ), + * 'tls' => array( + * 'verify_peer' => false, + * 'verify_peer_name' => false + * ) + * )); + * $browser = new \Clue\React\Buzz\Browser($loop, $connector); + * + * $es = new \Clue\React\EventSource\EventSource('https://example.com/stream.php', $loop, $browser); + * ``` + */ class EventSource extends EventEmitter { /** @@ -40,27 +78,25 @@ class EventSource extends EventEmitter private $timer; private $reconnectTime = 3.0; - public function __construct($url, LoopInterface $loop, ConnectorInterface $connector = null) + public function __construct($url, LoopInterface $loop, Browser $browser = null) { $parts = parse_url($url); if (!isset($parts['scheme'], $parts['host']) || !in_array($parts['scheme'], array('http', 'https'))) { throw new \InvalidArgumentException(); } - $browser = new Browser($loop, $connector); + if ($browser === null) { + $browser = new Browser($loop); + } $this->browser = $browser->withOptions(array('streaming' => true, 'obeySuccessCode' => false)); $this->loop = $loop; $this->url = $url; $this->readyState = self::CONNECTING; - - $this->timer = $loop->addTimer(0, function () { - $this->timer = null; - $this->send(); - }); + $this->request(); } - private function send() + private function request() { $headers = array( 'Accept' => 'text/event-stream', @@ -120,7 +156,7 @@ private function send() $this->readyState = self::CONNECTING; $this->timer = $this->loop->addTimer($this->reconnectTime, function () { $this->timer = null; - $this->send(); + $this->request(); }); } }); @@ -140,7 +176,7 @@ private function send() $this->timer = $this->loop->addTimer($this->reconnectTime, function () { $this->timer = null; - $this->send(); + $this->request(); }); }); } diff --git a/tests/EventSourceTest.php b/tests/EventSourceTest.php index f0621f7..e698a61 100644 --- a/tests/EventSourceTest.php +++ b/tests/EventSourceTest.php @@ -7,6 +7,7 @@ use RingCentral\Psr7\Response; use React\Stream\ThroughStream; use Clue\React\Buzz\Message\ReadableBodyStream; +use Clue\React\Buzz\Browser; class EventSourceTest extends TestCase { @@ -37,152 +38,123 @@ public function testConstructorThrowsIfUriArgumentIncludesInvalidScheme() new EventSource('ftp://example.com', $loop); } - /** - * @doesNotPerformAssertions - */ - public function testConstructorCanBeCalledWithoutConnector() + public function testConstructorCanBeCalledWithoutBrowser() { $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - new EventSource('http://example.com', $loop); - } - public function testConstructorWillStartTimerToStartConnection() - { - $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $loop->expects($this->once())->method('addTimer')->with(0, $this->isType('callable')); + $es = new EventSource('http://example.invalid', $loop); - $connector = $this->getMockBuilder('React\Socket\ConnectorInterface')->getMock(); - $connector->expects($this->never())->method('connect'); + $ref = new ReflectionProperty($es, 'browser'); + $ref->setAccessible(true); + $browser = $ref->getValue($es); - new EventSource('http://example.com', $loop, $connector); + $this->assertInstanceOf('Clue\React\Buzz\Browser', $browser); } - - public function testConstructorWillConnectThroughGivenConnectorAfterTimer() + public function testConstructorWillSendGetRequestThroughGivenBrowser() { $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $timer = null; - $loop->expects($this->at(0))->method('addTimer')->with(0, $this->callback(function ($cb) use (&$timer) { - $timer = $cb; - return true; - })); $pending = new Promise(function () { }); - $connector = $this->getMockBuilder('React\Socket\ConnectorInterface')->getMock(); - $connector->expects($this->once())->method('connect')->with('example.com:80')->willReturn($pending); - - $es = new EventSource('http://example.com', $loop, $connector); + $browser = $this->getMockBuilder('Clue\React\Buzz\Browser')->disableOriginalConstructor()->getMock(); + $browser->expects($this->once())->method('withOptions')->willReturnSelf(); + $browser->expects($this->once())->method('get')->with('http://example.com')->willReturn($pending); - $this->assertNotNull($timer); - $timer(); + $es = new EventSource('http://example.com', $loop, $browser); } - public function testConstructorWillConnectThroughGivenConnectorWithTlsSchemeForHttpsAfterTimer() + public function testConstructorWillSendGetRequestThroughGivenBrowserWithHttpsScheme() { $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $timer = null; - $loop->expects($this->at(0))->method('addTimer')->with(0, $this->callback(function ($cb) use (&$timer) { - $timer = $cb; - return true; - })); $pending = new Promise(function () { }); - $connector = $this->getMockBuilder('React\Socket\ConnectorInterface')->getMock(); - $connector->expects($this->once())->method('connect')->with('tls://example.com:443')->willReturn($pending); - - $es = new EventSource('https://example.com', $loop, $connector); + $browser = $this->getMockBuilder('Clue\React\Buzz\Browser')->disableOriginalConstructor()->getMock(); + $browser->expects($this->once())->method('withOptions')->willReturnSelf(); + $browser->expects($this->once())->method('get')->with('https://example.com')->willReturn($pending); - $this->assertNotNull($timer); - $timer(); + $es = new EventSource('https://example.com', $loop, $browser); } - public function testCloseWillCancelPendingConnectionTimerWhenCalledDirectlyAfterConstruction() + public function testCloseWillCancelPendingGetRequest() { - $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $loop->expects($this->once())->method('addTimer')->willReturn($timer); - $loop->expects($this->once())->method('cancelTimer')->with($timer); - $connector = $this->getMockBuilder('React\Socket\ConnectorInterface')->getMock(); - $connector->expects($this->never())->method('connect'); + $cancelled = null; + $pending = new Promise(function () { }, function () use (&$cancelled) { + ++$cancelled; + }); + $browser = $this->getMockBuilder('Clue\React\Buzz\Browser')->disableOriginalConstructor()->getMock(); + $browser->expects($this->once())->method('withOptions')->willReturnSelf(); + $browser->expects($this->once())->method('get')->with('http://example.com')->willReturn($pending); - $es = new EventSource('http://example.com', $loop, $connector); + $es = new EventSource('http://example.com', $loop, $browser); $es->close(); + + $this->assertEquals(1, $cancelled); } - public function testCloseWillCancelPendingConnectionAttemptAfterTimer() + public function testCloseWillNotEmitErrorEventWhenGetRequestCancellationHandlerRejects() { $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $timer = null; - $loop->expects($this->at(0))->method('addTimer')->with(0, $this->callback(function ($cb) use (&$timer) { - $timer = $cb; - return true; - })); - $cancelled = null; - $pending = new Promise(function () { }, function () use (&$cancelled) { - ++$cancelled; + $pending = new Promise(function () { }, function () { + throw new RuntimeException(); }); - $connector = $this->getMockBuilder('React\Socket\ConnectorInterface')->getMock(); - $connector->expects($this->once())->method('connect')->with('example.com:80')->willReturn($pending); + $browser = $this->getMockBuilder('Clue\React\Buzz\Browser')->disableOriginalConstructor()->getMock(); + $browser->expects($this->once())->method('withOptions')->willReturnSelf(); + $browser->expects($this->once())->method('get')->with('http://example.com')->willReturn($pending); - $es = new EventSource('http://example.com', $loop, $connector); + $es = new EventSource('http://example.com', $loop, $browser); - $this->assertNotNull($timer); - $timer(); + $error = null; + $es->on('error', function ($e) use (&$error) { + $error = $e; + }); $es->close(); - $this->assertEquals(1, $cancelled); + $this->assertNull($error); } - public function testConstructorWillStartConnectionThatWillStartRetryTimerWhenConnectionFails() + public function testConstructorWillStartGetRequestThatWillStartRetryTimerWhenGetRequestRejects() { $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $timer = null; - $loop->expects($this->at(0))->method('addTimer')->with(0, $this->callback(function ($cb) use (&$timer) { - $timer = $cb; - return true; - })); - $loop->expects($this->at(2))->method('addTimer')->with(3.0, $this->isType('callable')); + $loop->expects($this->once())->method('addTimer')->with( + 3.0, + $this->isType('callable') + ); $deferred = new Deferred(); - $connector = $this->getMockBuilder('React\Socket\ConnectorInterface')->getMock(); - $connector->expects($this->once())->method('connect')->willReturn($deferred->promise()); - - $es = new EventSource('http://example.com', $loop, $connector); + $browser = $this->getMockBuilder('Clue\React\Buzz\Browser')->disableOriginalConstructor()->getMock(); + $browser->expects($this->once())->method('withOptions')->willReturnSelf(); + $browser->expects($this->once())->method('get')->willReturn($deferred->promise()); - $this->assertNotNull($timer); - $timer(); + $es = new EventSource('http://example.com', $loop, $browser); $deferred->reject(new RuntimeException()); } - public function testConstructorWillStartConnectionThatWillStartRetryTimerThatWillRetryConnectionWhenConnectionFails() + public function testConstructorWillStartGetRequestThatWillStartRetryTimerThatWillRetryGetRequestWhenInitialGetRequestRejects() { $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $timerStart = null; - $loop->expects($this->at(0))->method('addTimer')->with(0, $this->callback(function ($cb) use (&$timerStart) { - $timerStart = $cb; - return true; - })); $timerRetry = null; - $loop->expects($this->at(2))->method('addTimer')->with(3, $this->callback(function ($cb) use (&$timerRetry) { - $timerRetry = $cb; - return true; - })); + $loop->expects($this->once())->method('addTimer')->with( + 3.0, + $this->callback(function ($cb) use (&$timerRetry) { + $timerRetry = $cb; + return true; + }) + ); $deferred = new Deferred(); - $connector = $this->getMockBuilder('React\Socket\ConnectorInterface')->getMock(); - $connector->expects($this->exactly(2))->method('connect')->willReturnOnConsecutiveCalls( + $browser = $this->getMockBuilder('Clue\React\Buzz\Browser')->disableOriginalConstructor()->getMock(); + $browser->expects($this->once())->method('withOptions')->willReturnSelf(); + $browser->expects($this->exactly(2))->method('get')->willReturnOnConsecutiveCalls( $deferred->promise(), new Promise(function () { }) ); - $es = new EventSource('http://example.com', $loop, $connector); - - $this->assertNotNull($timerStart); - $timerStart(); + $es = new EventSource('http://example.com', $loop, $browser); $deferred->reject(new RuntimeException()); @@ -190,23 +162,20 @@ public function testConstructorWillStartConnectionThatWillStartRetryTimerThatWil $timerRetry(); } - public function testConstructorWillStartConnectionThatWillEmitErrorWhenConnectionFails() + public function testConstructorWillStartGetRequestThatWillEmitErrorWhenGetRequestRejects() { $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $timer = null; - $loop->expects($this->at(0))->method('addTimer')->with(0, $this->callback(function ($cb) use (&$timer) { - $timer = $cb; - return true; - })); + $loop->expects($this->once())->method('addTimer')->with( + 3.0, + $this->isType('callable') + ); $deferred = new Deferred(); - $connector = $this->getMockBuilder('React\Socket\ConnectorInterface')->getMock(); - $connector->expects($this->once())->method('connect')->willReturn($deferred->promise()); - - $es = new EventSource('http://example.com', $loop, $connector); + $browser = $this->getMockBuilder('Clue\React\Buzz\Browser')->disableOriginalConstructor()->getMock(); + $browser->expects($this->once())->method('withOptions')->willReturnSelf(); + $browser->expects($this->once())->method('get')->willReturn($deferred->promise()); - $this->assertNotNull($timer); - $timer(); + $es = new EventSource('http://example.com', $loop, $browser); $caught = null; $es->on('error', function ($e) use (&$caught) { @@ -217,23 +186,16 @@ public function testConstructorWillStartConnectionThatWillEmitErrorWhenConnectio $this->assertSame($expected, $caught); } - public function testConstructorWillStartConnectionThatWillNotStartRetryTimerWhenConnectionFailsAndErrorHandlerClosesExplicitly() + public function testConstructorWillStartGetRequestThatWillNotStartRetryTimerWhenGetRequestRejectAndErrorHandlerClosesExplicitly() { $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $timer = null; - $loop->expects($this->at(0))->method('addTimer')->with(0, $this->callback(function ($cb) use (&$timer) { - $timer = $cb; - return true; - })); $deferred = new Deferred(); - $connector = $this->getMockBuilder('React\Socket\ConnectorInterface')->getMock(); - $connector->expects($this->once())->method('connect')->willReturn($deferred->promise()); - - $es = new EventSource('http://example.com', $loop, $connector); + $browser = $this->getMockBuilder('Clue\React\Buzz\Browser')->disableOriginalConstructor()->getMock(); + $browser->expects($this->once())->method('withOptions')->willReturnSelf(); + $browser->expects($this->once())->method('get')->with('http://example.com')->willReturn($deferred->promise()); - $this->assertNotNull($timer); - $timer(); + $es = new EventSource('http://example.com', $loop, $browser); $es->on('error', function () use ($es) { $es->close(); @@ -241,106 +203,38 @@ public function testConstructorWillStartConnectionThatWillNotStartRetryTimerWhen $deferred->reject(new RuntimeException()); } - public function testCloseAfterConnectionFromConstructorFailsWillCancelPendingRetryTimer() + public function testCloseAfterGetRequestFromConstructorFailsWillCancelPendingRetryTimer() { - $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - - $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $timerStart = null; - $loop->expects($this->at(0))->method('addTimer')->with(0, $this->callback(function ($cb) use (&$timerStart) { - $timerStart = $cb; - return true; - })); - $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); - $loop->expects($this->at(2))->method('addTimer')->with(3.0, $this->isType('callable'))->willReturn($timer); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + $loop->expects($this->once())->method('addTimer')->with( + 3.0, + $this->isType('callable') + )->willReturn($timer); $loop->expects($this->once())->method('cancelTimer')->with($timer); $deferred = new Deferred(); - $connector = $this->getMockBuilder('React\Socket\ConnectorInterface')->getMock(); - $connector->expects($this->once())->method('connect')->willReturn($deferred->promise()); - - $es = new EventSource('http://example.com', $loop, $connector); - - $this->assertNotNull($timerStart); - $timerStart(); - - $deferred->reject(new RuntimeException()); - - $es->close(); - } - - public function testConstructorWillSendRequestThroughInjectedBrowserAfterTimer() - { - $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $timer = null; - $loop->expects($this->at(0))->method('addTimer')->with(0, $this->callback(function ($cb) use (&$timer) { - $timer = $cb; - return true; - })); - - $pending = new Promise(function () { }); $browser = $this->getMockBuilder('Clue\React\Buzz\Browser')->disableOriginalConstructor()->getMock(); - $browser->expects($this->once())->method('get')->with('http://example.com')->willReturn($pending); - - $es = new EventSource('http://example.com', $loop); - - $ref = new ReflectionProperty($es, 'browser'); - $ref->setAccessible(true); - $ref->setValue($es, $browser); - - $this->assertNotNull($timer); - $timer(); - } + $browser->expects($this->once())->method('withOptions')->willReturnSelf(); + $browser->expects($this->once())->method('get')->with('http://example.com')->willReturn($deferred->promise()); - public function testCloseWillCancelPendingGetRequestAfterTimer() - { - $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $timer = null; - $loop->expects($this->at(0))->method('addTimer')->with(0, $this->callback(function ($cb) use (&$timer) { - $timer = $cb; - return true; - })); + $es = new EventSource('http://example.com', $loop, $browser); - $cancelled = null; - $pending = new Promise(function () { }, function () use (&$cancelled) { - ++$cancelled; - }); - $browser = $this->getMockBuilder('Clue\React\Buzz\Browser')->disableOriginalConstructor()->getMock(); - $browser->expects($this->once())->method('get')->willReturn($pending); - - $es = new EventSource('http://example.com', $loop); - - $ref = new ReflectionProperty($es, 'browser'); - $ref->setAccessible(true); - $ref->setValue($es, $browser); - - $this->assertNotNull($timer); - $timer(); + $deferred->reject(new RuntimeException()); $es->close(); - - $this->assertEquals(1, $cancelled); } - public function testConstructorWillReportFatalErrorWhenGetResponseResolvesWithInvalidStatusCodeAfterTimer() + public function testConstructorWillReportFatalErrorWhenGetResponseResolvesWithInvalidStatusCode() { $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $timer = null; - $loop->expects($this->at(0))->method('addTimer')->with(0, $this->callback(function ($cb) use (&$timer) { - $timer = $cb; - return true; - })); - $response = new Response(400, array('Content-Type' => 'text/event-stream'), ''); + $deferred = new Deferred(); $browser = $this->getMockBuilder('Clue\React\Buzz\Browser')->disableOriginalConstructor()->getMock(); - $browser->expects($this->once())->method('get')->with('http://example.com')->willReturn(\React\Promise\resolve($response)); + $browser->expects($this->once())->method('withOptions')->willReturnSelf(); + $browser->expects($this->once())->method('get')->with('http://example.com')->willReturn($deferred->promise()); - $es = new EventSource('http://example.com', $loop); - - $ref = new ReflectionProperty($es, 'browser'); - $ref->setAccessible(true); - $ref->setValue($es, $browser); + $es = new EventSource('http://example.com', $loop, $browser); $readyState = null; $caught = null; @@ -349,31 +243,23 @@ public function testConstructorWillReportFatalErrorWhenGetResponseResolvesWithIn $caught = $e; }); - $this->assertNotNull($timer); - $timer(); + $response = new Response(400, array('Content-Type' => 'text/event-stream'), ''); + $deferred->resolve($response); $this->assertEquals(EventSource::CLOSED, $readyState); $this->assertInstanceOf('UnexpectedValueException', $caught); } - public function testConstructorWillReportFatalErrorWhenGetResponseResolvesWithInvalidContentTypeAfterTimer() + public function testConstructorWillReportFatalErrorWhenGetResponseResolvesWithInvalidContentType() { $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $timer = null; - $loop->expects($this->at(0))->method('addTimer')->with(0, $this->callback(function ($cb) use (&$timer) { - $timer = $cb; - return true; - })); - $response = new Response(200, array(), ''); + $deferred = new Deferred(); $browser = $this->getMockBuilder('Clue\React\Buzz\Browser')->disableOriginalConstructor()->getMock(); - $browser->expects($this->once())->method('get')->with('http://example.com')->willReturn(\React\Promise\resolve($response)); + $browser->expects($this->once())->method('withOptions')->willReturnSelf(); + $browser->expects($this->once())->method('get')->with('http://example.com')->willReturn($deferred->promise()); - $es = new EventSource('http://example.com', $loop); - - $ref = new ReflectionProperty($es, 'browser'); - $ref->setAccessible(true); - $ref->setValue($es, $browser); + $es = new EventSource('http://example.com', $loop, $browser); $readyState = null; $caught = null; @@ -382,71 +268,55 @@ public function testConstructorWillReportFatalErrorWhenGetResponseResolvesWithIn $caught = $e; }); - $this->assertNotNull($timer); - $timer(); + $response = new Response(200, array(), ''); + $deferred->resolve($response); $this->assertEquals(EventSource::CLOSED, $readyState); $this->assertInstanceOf('UnexpectedValueException', $caught); } - public function testConstructorWillReportOpenWhenGetResponseResolvesWithValidResponseAfterTimer() + public function testConstructorWillReportOpenWhenGetResponseResolvesWithValidResponse() { $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $timer = null; - $loop->expects($this->at(0))->method('addTimer')->with(0, $this->callback(function ($cb) use (&$timer) { - $timer = $cb; - return true; - })); - $stream = new ThroughStream(); - $response = new Response(200, array('Content-Type' => 'text/event-stream'), new ReadableBodyStream($stream)); + $deferred = new Deferred(); $browser = $this->getMockBuilder('Clue\React\Buzz\Browser')->disableOriginalConstructor()->getMock(); - $browser->expects($this->once())->method('get')->with('http://example.com')->willReturn(\React\Promise\resolve($response)); - - $es = new EventSource('http://example.com', $loop); + $browser->expects($this->once())->method('withOptions')->willReturnSelf(); + $browser->expects($this->once())->method('get')->with('http://example.com')->willReturn($deferred->promise()); - $ref = new ReflectionProperty($es, 'browser'); - $ref->setAccessible(true); - $ref->setValue($es, $browser); + $es = new EventSource('http://example.com', $loop, $browser); $readyState = null; $es->on('open', function () use ($es, &$readyState) { $readyState = $es->readyState; }); - $this->assertNotNull($timer); - $timer(); + $stream = new ThroughStream(); + $response = new Response(200, array('Content-Type' => 'text/event-stream'), new ReadableBodyStream($stream)); + $deferred->resolve($response); $this->assertEquals(EventSource::OPEN, $readyState); } - public function testConstructorWillReportOpenWhenGetResponseResolvesWithValidResponseWithCaseInsensitiveContentTypeAfterTimer() + public function testConstructorWillReportOpenWhenGetResponseResolvesWithValidResponseWithCaseInsensitiveContentType() { $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $timer = null; - $loop->expects($this->at(0))->method('addTimer')->with(0, $this->callback(function ($cb) use (&$timer) { - $timer = $cb; - return true; - })); - $stream = new ThroughStream(); - $response = new Response(200, array('CONTENT-type' => 'TEXT/Event-Stream'), new ReadableBodyStream($stream)); + $deferred = new Deferred(); $browser = $this->getMockBuilder('Clue\React\Buzz\Browser')->disableOriginalConstructor()->getMock(); - $browser->expects($this->once())->method('get')->with('http://example.com')->willReturn(\React\Promise\resolve($response)); - - $es = new EventSource('http://example.com', $loop); + $browser->expects($this->once())->method('withOptions')->willReturnSelf(); + $browser->expects($this->once())->method('get')->with('http://example.com')->willReturn($deferred->promise()); - $ref = new ReflectionProperty($es, 'browser'); - $ref->setAccessible(true); - $ref->setValue($es, $browser); + $es = new EventSource('http://example.com', $loop, $browser); $readyState = null; $es->on('open', function () use ($es, &$readyState) { $readyState = $es->readyState; }); - $this->assertNotNull($timer); - $timer(); + $stream = new ThroughStream(); + $response = new Response(200, array('CONTENT-type' => 'TEXT/Event-Stream'), new ReadableBodyStream($stream)); + $deferred->resolve($response); $this->assertEquals(EventSource::OPEN, $readyState); } @@ -454,57 +324,44 @@ public function testConstructorWillReportOpenWhenGetResponseResolvesWithValidRes public function testConstructorWillReportOpenWhenGetResponseResolvesWithValidResponseAndSuperfluousParametersAfterTimer() { $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $timer = null; - $loop->expects($this->at(0))->method('addTimer')->with(0, $this->callback(function ($cb) use (&$timer) { - $timer = $cb; - return true; - })); - $stream = new ThroughStream(); - $response = new Response(200, array('Content-Type' => 'text/event-stream;charset=utf-8;foo=bar'), new ReadableBodyStream($stream)); + $deferred = new Deferred(); $browser = $this->getMockBuilder('Clue\React\Buzz\Browser')->disableOriginalConstructor()->getMock(); - $browser->expects($this->once())->method('get')->with('http://example.com')->willReturn(\React\Promise\resolve($response)); + $browser->expects($this->once())->method('withOptions')->willReturnSelf(); + $browser->expects($this->once())->method('get')->with('http://example.com')->willReturn($deferred->promise()); - $es = new EventSource('http://example.com', $loop); - - $ref = new ReflectionProperty($es, 'browser'); - $ref->setAccessible(true); - $ref->setValue($es, $browser); + $es = new EventSource('http://example.com', $loop, $browser); $readyState = null; $es->on('open', function () use ($es, &$readyState) { $readyState = $es->readyState; }); - $this->assertNotNull($timer); - $timer(); + $stream = new ThroughStream(); + $response = new Response(200, array('Content-Type' => 'text/event-stream;charset=utf-8;foo=bar'), new ReadableBodyStream($stream)); + $deferred->resolve($response); $this->assertEquals(EventSource::OPEN, $readyState); } - public function testCloseResponseStreamWillReturnToStartTimerToReconnectWithoutErrorEvent() + public function testCloseResponseStreamWillStartRetryTimerWithoutErrorEvent() { $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $timer = null; - $loop->expects($this->at(0))->method('addTimer')->with(0, $this->callback(function ($cb) use (&$timer) { - $timer = $cb; - return true; - })); - $loop->expects($this->at(1))->method('addTimer')->with(3.0, $this->isType('callable')); + $loop->expects($this->once())->method('addTimer')->with( + 3.0, + $this->isType('callable') + ); - $stream = new ThroughStream(); - $response = new Response(200, array('Content-Type' => 'text/event-stream'), new ReadableBodyStream($stream)); + $deferred = new Deferred(); $browser = $this->getMockBuilder('Clue\React\Buzz\Browser')->disableOriginalConstructor()->getMock(); - $browser->expects($this->once())->method('get')->with('http://example.com')->willReturn(\React\Promise\resolve($response)); + $browser->expects($this->once())->method('withOptions')->willReturnSelf(); + $browser->expects($this->once())->method('get')->with('http://example.com')->willReturn($deferred->promise()); - $es = new EventSource('http://example.com', $loop); + $es = new EventSource('http://example.com', $loop, $browser); - $ref = new ReflectionProperty($es, 'browser'); - $ref->setAccessible(true); - $ref->setValue($es, $browser); - - $this->assertNotNull($timer); - $timer(); + $stream = new ThroughStream(); + $response = new Response(200, array('Content-Type' => 'text/event-stream'), new ReadableBodyStream($stream)); + $deferred->resolve($response); $error = null; $es->on('error', function ($e) use (&$error) { @@ -520,29 +377,21 @@ public function testCloseResponseStreamWillReturnToStartTimerToReconnectWithoutE public function testCloseFromOpenEventWillCloseResponseStreamAndCloseEventSource() { $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $timer = null; - $loop->expects($this->at(0))->method('addTimer')->with(0, $this->callback(function ($cb) use (&$timer) { - $timer = $cb; - return true; - })); - $stream = new ThroughStream(); - $response = new Response(200, array('Content-Type' => 'text/event-stream'), new ReadableBodyStream($stream)); + $deferred = new Deferred(); $browser = $this->getMockBuilder('Clue\React\Buzz\Browser')->disableOriginalConstructor()->getMock(); - $browser->expects($this->once())->method('get')->with('http://example.com')->willReturn(\React\Promise\resolve($response)); - - $es = new EventSource('http://example.com', $loop); + $browser->expects($this->once())->method('withOptions')->willReturnSelf(); + $browser->expects($this->once())->method('get')->with('http://example.com')->willReturn($deferred->promise()); - $ref = new ReflectionProperty($es, 'browser'); - $ref->setAccessible(true); - $ref->setValue($es, $browser); + $es = new EventSource('http://example.com', $loop, $browser); $es->on('open', function () use ($es) { $es->close(); }); - $this->assertNotNull($timer); - $timer(); + $stream = new ThroughStream(); + $response = new Response(200, array('Content-Type' => 'text/event-stream'), new ReadableBodyStream($stream)); + $deferred->resolve($response); $this->assertEquals(EventSource::CLOSED, $es->readyState); $this->assertFalse($stream->isReadable()); @@ -551,25 +400,17 @@ public function testCloseFromOpenEventWillCloseResponseStreamAndCloseEventSource public function testEmitMessageWithParsedDataFromEventStream() { $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $timer = null; - $loop->expects($this->at(0))->method('addTimer')->with(0, $this->callback(function ($cb) use (&$timer) { - $timer = $cb; - return true; - })); - $stream = new ThroughStream(); - $response = new Response(200, array('Content-Type' => 'text/event-stream'), new ReadableBodyStream($stream)); + $deferred = new Deferred(); $browser = $this->getMockBuilder('Clue\React\Buzz\Browser')->disableOriginalConstructor()->getMock(); - $browser->expects($this->once())->method('get')->with('http://example.com')->willReturn(\React\Promise\resolve($response)); - - $es = new EventSource('http://example.com', $loop); + $browser->expects($this->once())->method('withOptions')->willReturnSelf(); + $browser->expects($this->once())->method('get')->with('http://example.com')->willReturn($deferred->promise()); - $ref = new ReflectionProperty($es, 'browser'); - $ref->setAccessible(true); - $ref->setValue($es, $browser); + $es = new EventSource('http://example.com', $loop, $browser); - $this->assertNotNull($timer); - $timer(); + $stream = new ThroughStream(); + $response = new Response(200, array('Content-Type' => 'text/event-stream'), new ReadableBodyStream($stream)); + $deferred->resolve($response); $message = null; $es->on('message', function ($m) use (&$message) { @@ -586,25 +427,17 @@ public function testEmitMessageWithParsedDataFromEventStream() public function testEmitMessageWithParsedIdAndDataOverMultipleRowsFromEventStream() { $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $timer = null; - $loop->expects($this->at(0))->method('addTimer')->with(0, $this->callback(function ($cb) use (&$timer) { - $timer = $cb; - return true; - })); - $stream = new ThroughStream(); - $response = new Response(200, array('Content-Type' => 'text/event-stream'), new ReadableBodyStream($stream)); + $deferred = new Deferred(); $browser = $this->getMockBuilder('Clue\React\Buzz\Browser')->disableOriginalConstructor()->getMock(); - $browser->expects($this->once())->method('get')->with('http://example.com')->willReturn(\React\Promise\resolve($response)); - - $es = new EventSource('http://example.com', $loop); + $browser->expects($this->once())->method('withOptions')->willReturnSelf(); + $browser->expects($this->once())->method('get')->with('http://example.com')->willReturn($deferred->promise()); - $ref = new ReflectionProperty($es, 'browser'); - $ref->setAccessible(true); - $ref->setValue($es, $browser); + $es = new EventSource('http://example.com', $loop, $browser); - $this->assertNotNull($timer); - $timer(); + $stream = new ThroughStream(); + $response = new Response(200, array('Content-Type' => 'text/event-stream'), new ReadableBodyStream($stream)); + $deferred->resolve($response); $message = null; $es->on('message', function ($m) use (&$message) { @@ -621,25 +454,17 @@ public function testEmitMessageWithParsedIdAndDataOverMultipleRowsFromEventStrea public function testEmitMessageWithParsedEventTypeAndDataWithTrailingWhitespaceFromEventStream() { $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $timer = null; - $loop->expects($this->at(0))->method('addTimer')->with(0, $this->callback(function ($cb) use (&$timer) { - $timer = $cb; - return true; - })); - $stream = new ThroughStream(); - $response = new Response(200, array('Content-Type' => 'text/event-stream'), new ReadableBodyStream($stream)); + $deferred = new Deferred(); $browser = $this->getMockBuilder('Clue\React\Buzz\Browser')->disableOriginalConstructor()->getMock(); - $browser->expects($this->once())->method('get')->with('http://example.com')->willReturn(\React\Promise\resolve($response)); + $browser->expects($this->once())->method('withOptions')->willReturnSelf(); + $browser->expects($this->once())->method('get')->with('http://example.com')->willReturn($deferred->promise()); - $es = new EventSource('http://example.com', $loop); - - $ref = new ReflectionProperty($es, 'browser'); - $ref->setAccessible(true); - $ref->setValue($es, $browser); + $es = new EventSource('http://example.com', $loop, $browser); - $this->assertNotNull($timer); - $timer(); + $stream = new ThroughStream(); + $response = new Response(200, array('Content-Type' => 'text/event-stream'), new ReadableBodyStream($stream)); + $deferred->resolve($response); $message = null; $es->on('patch', function ($m) use (&$message) { @@ -655,25 +480,17 @@ public function testEmitMessageWithParsedEventTypeAndDataWithTrailingWhitespaceF public function testDoesNotEmitMessageWhenParsedEventStreamHasNoData() { $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $timer = null; - $loop->expects($this->at(0))->method('addTimer')->with(0, $this->callback(function ($cb) use (&$timer) { - $timer = $cb; - return true; - })); - $stream = new ThroughStream(); - $response = new Response(200, array('Content-Type' => 'text/event-stream'), new ReadableBodyStream($stream)); + $deferred = new Deferred(); $browser = $this->getMockBuilder('Clue\React\Buzz\Browser')->disableOriginalConstructor()->getMock(); - $browser->expects($this->once())->method('get')->with('http://example.com')->willReturn(\React\Promise\resolve($response)); + $browser->expects($this->once())->method('withOptions')->willReturnSelf(); + $browser->expects($this->once())->method('get')->with('http://example.com')->willReturn($deferred->promise()); - $es = new EventSource('http://example.com', $loop); + $es = new EventSource('http://example.com', $loop, $browser); - $ref = new ReflectionProperty($es, 'browser'); - $ref->setAccessible(true); - $ref->setValue($es, $browser); - - $this->assertNotNull($timer); - $timer(); + $stream = new ThroughStream(); + $response = new Response(200, array('Content-Type' => 'text/event-stream'), new ReadableBodyStream($stream)); + $deferred->resolve($response); $message = null; $es->on('message', function ($m) use (&$message) { @@ -688,25 +505,17 @@ public function testDoesNotEmitMessageWhenParsedEventStreamHasNoData() public function testEmitMessageWithParsedDataAndPreviousIdWhenNotGivenAgainFromEventStream() { $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $timer = null; - $loop->expects($this->at(0))->method('addTimer')->with(0, $this->callback(function ($cb) use (&$timer) { - $timer = $cb; - return true; - })); - $stream = new ThroughStream(); - $response = new Response(200, array('Content-Type' => 'text/event-stream'), new ReadableBodyStream($stream)); + $deferred = new Deferred(); $browser = $this->getMockBuilder('Clue\React\Buzz\Browser')->disableOriginalConstructor()->getMock(); - $browser->expects($this->once())->method('get')->with('http://example.com')->willReturn(\React\Promise\resolve($response)); - - $es = new EventSource('http://example.com', $loop); + $browser->expects($this->once())->method('withOptions')->willReturnSelf(); + $browser->expects($this->once())->method('get')->with('http://example.com')->willReturn($deferred->promise()); - $ref = new ReflectionProperty($es, 'browser'); - $ref->setAccessible(true); - $ref->setValue($es, $browser); + $es = new EventSource('http://example.com', $loop, $browser); - $this->assertNotNull($timer); - $timer(); + $stream = new ThroughStream(); + $response = new Response(200, array('Content-Type' => 'text/event-stream'), new ReadableBodyStream($stream)); + $deferred->resolve($response); $message = null; $es->on('message', function ($m) use (&$message) { @@ -723,37 +532,31 @@ public function testEmitMessageWithParsedDataAndPreviousIdWhenNotGivenAgainFromE public function testReconnectAfterStreamClosesUsesLastEventIdFromParsedEventStreamForNextRequest() { $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $timerStart = null; - $loop->expects($this->at(0))->method('addTimer')->with(0, $this->callback(function ($cb) use (&$timerStart) { - $timerStart = $cb; - return true; - })); $timerReconnect = null; - $loop->expects($this->at(1))->method('addTimer')->with(3, $this->callback(function ($cb) use (&$timerReconnect) { - $timerReconnect = $cb; - return true; - })); - + $loop->expects($this->once())->method('addTimer')->with( + 3.0, + $this->callback(function ($cb) use (&$timerReconnect) { + $timerReconnect = $cb; + return true; + }) + ); - $stream = new ThroughStream(); - $response = new Response(200, array('Content-Type' => 'text/event-stream'), new ReadableBodyStream($stream)); + $deferred = new Deferred(); $browser = $this->getMockBuilder('Clue\React\Buzz\Browser')->disableOriginalConstructor()->getMock(); + $browser->expects($this->once())->method('withOptions')->willReturnSelf(); $browser->expects($this->exactly(2))->method('get')->withConsecutive( ['http://example.com', ['Accept' => 'text/event-stream', 'Cache-Control' => 'no-cache']], ['http://example.com', ['Accept' => 'text/event-stream', 'Cache-Control' => 'no-cache', 'Last-Event-ID' => '123']] )->willReturnOnConsecutiveCalls( - \React\Promise\resolve($response), + $deferred->promise(), new Promise(function () { }) ); - $es = new EventSource('http://example.com', $loop); + $es = new EventSource('http://example.com', $loop, $browser); - $ref = new ReflectionProperty($es, 'browser'); - $ref->setAccessible(true); - $ref->setValue($es, $browser); - - $this->assertNotNull($timerStart); - $timerStart(); + $stream = new ThroughStream(); + $response = new Response(200, array('Content-Type' => 'text/event-stream'), new ReadableBodyStream($stream)); + $deferred->resolve($response); $stream->write("id:123\n\n"); $stream->end();