diff --git a/src/EventLoop/Driver.php b/src/EventLoop/Driver.php index 5209353..0f2fcd7 100644 --- a/src/EventLoop/Driver.php +++ b/src/EventLoop/Driver.php @@ -29,6 +29,18 @@ public function run(): void; */ public function stop(): void; + /** + * Interrupts the event loop and continues with {main}. + * + * The driver MUST check for a set interrupt after invoking an event callback or microtask. If an interrupt exists, + * it must be reset, and the driver must suspend with the given callback, i.e. call \Fiber::suspend($callback); + * + * @param callable $callback Callback to run on {main} before continuing. + * + * @internal This API is only supposed to be called by the Suspension API. + */ + public function interrupt(callable $callback): void; + /** * @return bool True if the event loop is running, false if it is stopped. */ diff --git a/src/EventLoop/Driver/TracingDriver.php b/src/EventLoop/Driver/TracingDriver.php index 9ff959f..99172f6 100644 --- a/src/EventLoop/Driver/TracingDriver.php +++ b/src/EventLoop/Driver/TracingDriver.php @@ -36,6 +36,11 @@ public function stop(): void $this->driver->stop(); } + public function interrupt(callable $callback): void + { + $this->driver->interrupt($callback); + } + public function isRunning(): bool { return $this->driver->isRunning(); diff --git a/src/EventLoop/Internal/AbstractDriver.php b/src/EventLoop/Internal/AbstractDriver.php index c973c39..bec9585 100644 --- a/src/EventLoop/Internal/AbstractDriver.php +++ b/src/EventLoop/Internal/AbstractDriver.php @@ -22,7 +22,8 @@ abstract class AbstractDriver implements Driver /** @var string Next callback identifier. */ private string $nextId = "a"; - private \Fiber $fiber; + private \Fiber $callbackFiber; + private \Fiber $queueFiber; /** @var Callback[] */ private array $callbacks = []; @@ -42,11 +43,18 @@ abstract class AbstractDriver implements Driver /** @var callable(\Throwable):void|null */ private $errorHandler; + /** @var callable|null */ + private $interrupt; + private bool $running = false; + private \stdClass $internalSuspensionMarker; + public function __construct() { - $this->fiber = $this->createFiber(); + $this->internalSuspensionMarker = new \stdClass(); + $this->createCallbackFiber(); + $this->createQueueFiber(); } /** @@ -98,6 +106,11 @@ public function stop(): void $this->running = false; } + public function interrupt(callable $callback): void + { + $this->interrupt = $callback; + } + /** * @return bool True if the event loop is running, false if it is stopped. */ @@ -560,22 +573,28 @@ abstract protected function deactivate(Callback $callback): void; protected function invokeCallback(Callback $callback): void { - if ($this->fiber->isRunning()) { - $this->fiber = $this->createFiber(); + if ($this->callbackFiber->isRunning()) { + $this->createCallbackFiber(); } try { - $yielded = $this->fiber->resume($callback); - - if ($yielded !== $callback) { + $yielded = $this->callbackFiber->resume($callback); + if ($yielded !== $this->internalSuspensionMarker) { // Callback suspended. - $this->fiber = $this->createFiber(); + $this->createCallbackFiber(); } } catch (\Throwable $exception) { - $this->fiber = $this->createFiber(); + $this->createCallbackFiber(); $this->error($exception); } + if ($this->interrupt) { + $interrupt = $this->interrupt; + $this->interrupt = null; + + \Fiber::suspend($interrupt); + } + if ($this->microQueue) { $this->invokeMicrotasks(); } @@ -656,22 +675,39 @@ private function tick(): void private function invokeMicrotasks(): void { while ($this->microQueue) { - foreach ($this->microQueue as $id => [$callable, $args]) { + foreach ($this->microQueue as $id => $queueEntry) { + if ($this->queueFiber->isRunning()) { + $this->createQueueFiber(); + } + try { unset($this->microQueue[$id]); - $callable(...$args); + + $yielded = $this->queueFiber->resume($queueEntry); + if ($yielded !== $this->internalSuspensionMarker) { + $this->createQueueFiber(); + } } catch (\Throwable $exception) { + $this->createQueueFiber(); $this->error($exception); } + + if ($this->interrupt) { + $interrupt = $this->interrupt; + $this->interrupt = null; + + \Fiber::suspend($interrupt); + } } } } - private function createFiber(): \Fiber + private function createCallbackFiber(): void { - $fiber = new \Fiber(static function (): void { - $callback = null; - while ($callback = \Fiber::suspend($callback)) { + $suspensionMarker = $this->internalSuspensionMarker; + + $this->callbackFiber = new \Fiber(static function () use ($suspensionMarker): void { + while ($callback = \Fiber::suspend($suspensionMarker)) { $result = match (true) { $callback instanceof StreamCallback => ($callback->callback)($callback->id, $callback->stream), $callback instanceof SignalCallback => ($callback->callback)($callback->id, $callback->signal), @@ -681,10 +717,26 @@ private function createFiber(): \Fiber if ($result !== null) { throw InvalidCallbackError::nonNullReturn($callback->id, $callback->callback); } + + unset($callback); + } + }); + + $this->callbackFiber->start(); + } + + private function createQueueFiber(): void + { + $suspensionMarker = $this->internalSuspensionMarker; + + $this->queueFiber = new \Fiber(static function () use ($suspensionMarker): void { + while ([$callback, $args] = \Fiber::suspend($suspensionMarker)) { + $callback(...$args); + + unset($callback, $args); } }); - $fiber->start(); - return $fiber; + $this->queueFiber->start(); } } diff --git a/src/EventLoop/Suspension.php b/src/EventLoop/Suspension.php index d2e4b27..c704aa5 100644 --- a/src/EventLoop/Suspension.php +++ b/src/EventLoop/Suspension.php @@ -60,7 +60,7 @@ public function throw(\Throwable $throwable): void $this->driver->queue([$this->fiber, 'throw'], $throwable); } else { // Suspend event loop fiber to {main}. - $this->driver->queue([\Fiber::class, 'suspend'], static fn () => throw $throwable); + $this->driver->interrupt(static fn () => throw $throwable); } } @@ -76,7 +76,7 @@ public function resume(mixed $value): void $this->driver->queue([$this->fiber, 'resume'], $value); } else { // Suspend event loop fiber to {main}. - $this->driver->queue([\Fiber::class, 'suspend'], static fn () => $value); + $this->driver->interrupt(static fn () => $value); } } diff --git a/test/EventLoopTest.php b/test/EventLoopTest.php index 2deb157..71c4f18 100644 --- a/test/EventLoopTest.php +++ b/test/EventLoopTest.php @@ -24,12 +24,8 @@ public function testRepeatWithNegativeInterval(): void public function testOnReadable(): void { - if (!\class_exists(\Fiber::class, false)) { - self::markTestSkipped("Fibers required for this test"); - } - $ends = \stream_socket_pair( - \stripos(PHP_OS, "win") === 0 ? STREAM_PF_INET : STREAM_PF_UNIX, + \DIRECTORY_SEPARATOR === "\\" ? STREAM_PF_INET : STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP ); @@ -51,12 +47,8 @@ public function testOnReadable(): void self::assertSame(1, $count); } - public function testOnWritable() + public function testOnWritable(): void { - if (!\class_exists(\Fiber::class, false)) { - self::markTestSkipped("Fibers required for this test"); - } - $count = 0; $suspension = EventLoop::createSuspension(); @@ -95,12 +87,25 @@ public function testRun(): void self::assertTrue($invoked); } - public function testRunInFiber(): void + public function testFiberReuse(): void { - if (!\class_exists(\Fiber::class, false)) { - self::markTestSkipped("Fibers required for this test"); - } + EventLoop::defer(function () use (&$fiber1): void { + $fiber1 = \Fiber::getCurrent(); + }); + + EventLoop::defer(function () use (&$fiber2): void { + $fiber2 = \Fiber::getCurrent(); + }); + + EventLoop::run(); + self::assertNotNull($fiber1); + self::assertNotNull($fiber2); + self::assertSame($fiber1, $fiber2); + } + + public function testRunInFiber(): void + { launch(fn () => EventLoop::run()); $this->expectException(\Error::class); @@ -111,10 +116,6 @@ public function testRunInFiber(): void public function testRunAfterSuspension(): void { - if (!\class_exists(\Fiber::class, false)) { - self::markTestSkipped("Fibers required for this test"); - } - $suspension = EventLoop::createSuspension(); EventLoop::defer(fn () => $suspension->resume('test')); @@ -131,12 +132,8 @@ public function testRunAfterSuspension(): void self::assertTrue($invoked); } - public function testSuspensionAfter(): void + public function testSuspensionAfterRun(): void { - if (!\class_exists(\Fiber::class, false)) { - self::markTestSkipped("Fibers required for this test"); - } - $invoked = false; EventLoop::defer(function () use (&$invoked): void { $invoked = true; @@ -153,13 +150,10 @@ public function testSuspensionAfter(): void self::assertSame($suspension->suspend(), 'test'); } - public function testSuspensionWithinFiberWithinRun(): void + public function testSuspensionWithinFiber(): void { - if (!\class_exists(\Fiber::class, false)) { - self::markTestSkipped("Fibers required for this test"); - } - $invoked = false; + launch(function () use (&$invoked): void { $suspension = EventLoop::createSuspension(); @@ -177,10 +171,6 @@ public function testSuspensionWithinFiberWithinRun(): void public function testSuspensionWithinCallback(): void { - if (!\class_exists(\Fiber::class, false)) { - self::markTestSkipped("Fibers required for this test"); - } - $send = 42; EventLoop::defer(static function () use (&$received, $send): void { @@ -189,6 +179,20 @@ public function testSuspensionWithinCallback(): void $received = $suspension->suspend(); }); + EventLoop::run(); + + self::assertSame($send, $received); + } + + public function testSuspensionWithinQueue(): void + { + $send = 42; + + EventLoop::queue(static function () use (&$received, $send): void { + $suspension = EventLoop::createSuspension(); + EventLoop::defer(static fn () => $suspension->resume($send)); + $received = $suspension->suspend(); + }); EventLoop::run();