Skip to content

Commit e6ce02e

Browse files
committed
Improve promise cancellation and clean up any garbage references
1 parent 469b8ee commit e6ce02e

3 files changed

Lines changed: 130 additions & 32 deletions

File tree

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
"require": {
1717
"php": ">=5.3",
1818
"react/promise": "^2.1 || ^1.2",
19-
"react/socket": "^1.0 || ^0.8.6"
19+
"react/socket": "^1.1"
2020
},
2121
"require-dev": {
2222
"phpunit/phpunit": "^6.0 || ^5.7 || ^4.8.35",

src/Client.php

Lines changed: 54 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -157,19 +157,53 @@ public function connect($uri)
157157
$socksUri .= '#' . $parts['fragment'];
158158
}
159159

160-
$that = $this;
160+
// start TCP/IP connection to SOCKS server
161+
$connecting = $this->connector->connect($socksUri);
162+
163+
$deferred = new Deferred(function ($_, $reject) use ($connecting) {
164+
$reject(new RuntimeException(
165+
'Connection cancelled while waiting for proxy (ECONNABORTED)',
166+
defined('SOCKET_ECONNABORTED') ? SOCKET_ECONNABORTED : 103
167+
));
168+
169+
// either close active connection or cancel pending connection attempt
170+
$connecting->then(function (ConnectionInterface $stream) {
171+
$stream->close();
172+
});
173+
$connecting->cancel();
174+
});
161175

162-
// start TCP/IP connection to SOCKS server and then
163176
// handle SOCKS protocol once connection is ready
164177
// resolve plain connection once SOCKS protocol is completed
165-
return $this->connector->connect($socksUri)->then(
166-
function (ConnectionInterface $stream) use ($that, $host, $port) {
167-
return $that->handleConnectedSocks($stream, $host, $port);
178+
$that = $this;
179+
$connecting->then(
180+
function (ConnectionInterface $stream) use ($that, $host, $port, $deferred) {
181+
$that->handleConnectedSocks($stream, $host, $port, $deferred);
168182
},
169-
function (Exception $e) {
170-
throw new RuntimeException('Unable to connect to proxy (ECONNREFUSED)', defined('SOCKET_ECONNREFUSED') ? SOCKET_ECONNREFUSED : 111, $e);
183+
function (Exception $e) use ($deferred) {
184+
$deferred->reject($e = new RuntimeException(
185+
'Connection failed because connection to proxy failed (ECONNREFUSED)',
186+
defined('SOCKET_ECONNREFUSED') ? SOCKET_ECONNREFUSED : 111,
187+
$e
188+
));
189+
190+
// avoid garbage references by replacing all closures in call stack.
191+
// what a lovely piece of code!
192+
$r = new \ReflectionProperty('Exception', 'trace');
193+
$r->setAccessible(true);
194+
$trace = $r->getValue($e);
195+
foreach ($trace as &$one) {
196+
foreach ($one['args'] as &$arg) {
197+
if ($arg instanceof \Closure) {
198+
$arg = 'Object(' . get_class($arg) . ')';
199+
}
200+
}
201+
}
202+
$r->setValue($e, $trace);
171203
}
172204
);
205+
206+
return $deferred->promise();
173207
}
174208

175209
/**
@@ -178,14 +212,14 @@ function (Exception $e) {
178212
* @param ConnectionInterface $stream
179213
* @param string $host
180214
* @param int $port
181-
* @return Promise Promise<ConnectionInterface, Exception>
215+
* @param Deferred $deferred
216+
* @return void
182217
* @internal
183218
*/
184-
public function handleConnectedSocks(ConnectionInterface $stream, $host, $port)
219+
public function handleConnectedSocks(ConnectionInterface $stream, $host, $port, Deferred $d)
185220
{
186-
$deferred = new Deferred(function ($_, $reject) {
187-
$reject(new RuntimeException('Connection canceled while establishing SOCKS session (ECONNABORTED)', defined('SOCKET_ECONNABORTED') ? SOCKET_ECONNABORTED : 103));
188-
});
221+
$deferred = $d;
222+
$d = null;
189223

190224
$reader = new StreamReader();
191225
$stream->on('data', array($reader, 'write'));
@@ -203,32 +237,22 @@ public function handleConnectedSocks(ConnectionInterface $stream, $host, $port)
203237
} else {
204238
$promise = $this->handleSocks4($stream, $host, $port, $reader);
205239
}
206-
$promise->then(function () use ($deferred, $stream) {
240+
241+
$promise->then(function () use ($deferred, $stream, $reader, $onError, $onClose) {
242+
$stream->removeListener('data', array($reader, 'write'));
243+
$stream->removeListener('error', $onError);
244+
$stream->removeListener('close', $onClose);
245+
207246
$deferred->resolve($stream);
208-
}, function (Exception $error) use ($deferred) {
247+
}, function (Exception $error) use ($deferred, $stream) {
209248
// pass custom RuntimeException through as-is, otherwise wrap in protocol error
210249
if (!$error instanceof RuntimeException) {
211250
$error = new RuntimeException('Invalid response received from proxy (EBADMSG)', defined('SOCKET_EBADMSG') ? SOCKET_EBADMSG: 71, $error);
212251
}
213252

214253
$deferred->reject($error);
254+
$stream->close();
215255
});
216-
217-
return $deferred->promise()->then(
218-
function (ConnectionInterface $stream) use ($reader, $onError, $onClose) {
219-
$stream->removeListener('data', array($reader, 'write'));
220-
$stream->removeListener('error', $onError);
221-
$stream->removeListener('close', $onClose);
222-
223-
return $stream;
224-
},
225-
function ($error) use ($stream, $onClose) {
226-
$stream->removeListener('close', $onClose);
227-
$stream->close();
228-
229-
throw $error;
230-
}
231-
);
232256
}
233257

234258
private function handleSocks4(ConnectionInterface $stream, $host, $port, StreamReader $reader)

tests/ClientTest.php

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
use Clue\React\Socks\Client;
44
use React\Promise\Promise;
55
use Clue\React\Socks\Server;
6+
use React\Promise\Deferred;
67

78
class ClientTest extends TestCase
89
{
@@ -199,6 +200,22 @@ public function testCancelConnectionDuringSessionWillCloseStream()
199200
$promise->then(null, $this->expectCallableOnceWithExceptionCode(SOCKET_ECONNABORTED));
200201
}
201202

203+
public function testCancelConnectionDuringDeferredSessionWillCloseStream()
204+
{
205+
$stream = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->getMock();
206+
$stream->expects($this->once())->method('close');
207+
208+
$deferred = new Deferred();
209+
210+
$this->connector->expects($this->once())->method('connect')->with('127.0.0.1:1080?hostname=google.com')->willReturn($deferred->promise());
211+
212+
$promise = $this->client->connect('google.com:80');
213+
$deferred->resolve($stream);
214+
$promise->cancel();
215+
216+
$promise->then(null, $this->expectCallableOnceWithExceptionCode(SOCKET_ECONNABORTED));
217+
}
218+
202219
public function testEmitConnectionCloseDuringSessionWillRejectConnection()
203220
{
204221
$stream = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('write', 'close'))->getMock();
@@ -217,7 +234,6 @@ public function testEmitConnectionCloseDuringSessionWillRejectConnection()
217234
public function testEmitConnectionErrorDuringSessionWillRejectConnection()
218235
{
219236
$stream = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('write', 'close'))->getMock();
220-
$stream->expects($this->once())->method('close');
221237

222238
$promise = \React\Promise\resolve($stream);
223239

@@ -400,4 +416,62 @@ public function testEmitSocks5DataErrorMapsToExceptionCode($error, $expectedCode
400416

401417
$promise->then(null, $this->expectCallableOnceWithExceptionCode($expectedCode));
402418
}
419+
420+
public function testConnectionErrorShouldNotCreateGarbageCycles()
421+
{
422+
if (class_exists('React\Promise\When')) {
423+
$this->markTestSkipped('Not supported on legacy Promise v1 API');
424+
}
425+
426+
$deferred = new Deferred();
427+
$this->connector->expects($this->once())->method('connect')->willReturn($deferred->promise());
428+
429+
gc_collect_cycles();
430+
431+
$promise = $this->client->connect('google.com:80');
432+
$deferred->reject(new RuntimeException());
433+
unset($deferred, $promise);
434+
435+
$this->assertEquals(0, gc_collect_cycles());
436+
}
437+
438+
public function testCancelConnectionDuringConnectionShouldNotCreateGarbageCycles()
439+
{
440+
if (class_exists('React\Promise\When')) {
441+
$this->markTestSkipped('Not supported on legacy Promise v1 API');
442+
}
443+
444+
$promise = new Promise(function () { }, function () {
445+
throw new \RuntimeException();
446+
});
447+
$this->connector->expects($this->once())->method('connect')->willReturn($promise);
448+
449+
gc_collect_cycles();
450+
451+
$promise = $this->client->connect('google.com:80');
452+
$promise->cancel();
453+
unset($promise);
454+
455+
$this->assertEquals(0, gc_collect_cycles());
456+
}
457+
458+
public function testCancelConnectionDuringSessionShouldNotCreateGarbageCycles()
459+
{
460+
if (class_exists('React\Promise\When')) {
461+
$this->markTestSkipped('Not supported on legacy Promise v1 API');
462+
}
463+
464+
$stream = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('write'))->getMock();
465+
466+
$promise = new Promise(function ($resolve) use ($stream) { $resolve($stream); });
467+
$this->connector->expects($this->once())->method('connect')->willReturn($promise);
468+
469+
gc_collect_cycles();
470+
471+
$promise = $this->client->connect('google.com:80');
472+
$promise->cancel();
473+
unset($promise);
474+
475+
$this->assertEquals(0, gc_collect_cycles());
476+
}
403477
}

0 commit comments

Comments
 (0)