diff --git a/Tests/NIOPosixTests/SALChannelTests.swift b/Tests/NIOPosixTests/SALChannelTests.swift index 80a2337f8c1..ebdfc5fe1ba 100644 --- a/Tests/NIOPosixTests/SALChannelTests.swift +++ b/Tests/NIOPosixTests/SALChannelTests.swift @@ -18,62 +18,55 @@ import XCTest @testable import NIOPosix -final class SALChannelTest: XCTestCase, SALTest { - var group: MultiThreadedEventLoopGroup! - var kernelToUserBox: LockedBox! - var userToKernelBox: LockedBox! - var wakeups: LockedBox<()>! - - override func setUp() { - self.setUpSAL() - } - - override func tearDown() { - self.tearDownSAL() - } - +final class SALChannelTest: XCTestCase { func testBasicConnectedChannel() throws { - let localAddress = try! SocketAddress(ipAddress: "0.1.2.3", port: 4) - let serverAddress = try! SocketAddress(ipAddress: "9.8.7.6", port: 5) - let buffer = ByteBuffer(string: "xxx") - - let channel = try self.makeConnectedSocketChannel( - localAddress: localAddress, - remoteAddress: serverAddress - ) - - try channel.eventLoop.runSAL(syscallAssertions: { - try self.assertWrite(expectedFD: .max, expectedBytes: buffer, return: .processed(buffer.readableBytes)) - try self.assertWritev( - expectedFD: .max, - expectedBytes: [buffer, buffer], - return: .processed(2 * buffer.readableBytes) + try withSALContext { context in + let localAddress = try! SocketAddress(ipAddress: "0.1.2.3", port: 4) + let serverAddress = try! SocketAddress(ipAddress: "9.8.7.6", port: 5) + let buffer = ByteBuffer(string: "xxx") + + let channel = try context.makeConnectedSocketChannel( + localAddress: localAddress, + remoteAddress: serverAddress ) - try self.assertDeregister { selectable in - try selectable.withUnsafeHandle { - XCTAssertEqual(.max, $0) + + try context.runSALOnEventLoopAndWait { _, _, _ in + channel.writeAndFlush(buffer).flatMap { + channel.write(buffer, promise: nil) + return channel.writeAndFlush(buffer) + }.flatMap { + channel.close() } - return true - } - try self.assertClose(expectedFD: .max) - }) { - channel.writeAndFlush(buffer).flatMap { - channel.write(buffer, promise: nil) - return channel.writeAndFlush(buffer) - }.flatMap { - channel.close() + } syscallAssertions: { assertions in + try assertions.assertWrite( + expectedFD: .max, + expectedBytes: buffer, + return: .processed(buffer.readableBytes) + ) + try assertions.assertWritev( + expectedFD: .max, + expectedBytes: [buffer, buffer], + return: .processed(2 * buffer.readableBytes) + ) + try assertions.assertDeregister { selectable in + try selectable.withUnsafeHandle { + XCTAssertEqual(.max, $0) + } + return true + } + try assertions.assertClose(expectedFD: .max) } - }.salWait() + } } - func testWritesFromWritabilityNotificationsDoNotGetLostIfWePreviouslyWroteEverything() { + func testWritesFromWritabilityNotificationsDoNotGetLostIfWePreviouslyWroteEverything() throws { // This is a unit test, doing what // testWriteAndFlushFromReentrantFlushNowTriggeredOutOfWritabilityWhereOuterSaysAllWrittenAndInnerDoesNot // does but in a deterministic way, without having to send actual bytes. let localAddress = try! SocketAddress(ipAddress: "0.1.2.3", port: 4) let serverAddress = try! SocketAddress(ipAddress: "9.8.7.6", port: 5) - var buffer = ByteBuffer(string: "12") + let buffer = ByteBuffer(string: "12") let writableNotificationStepExpectation = ManagedAtomic(0) @@ -129,25 +122,36 @@ final class SALChannelTest: XCTestCase, SALTest { } } - var maybeChannel: SocketChannel? = nil - XCTAssertNoThrow( - maybeChannel = try self.makeConnectedSocketChannel( + try withSALContext { context in + let channel = try context.makeConnectedSocketChannel( localAddress: localAddress, remoteAddress: serverAddress ) - ) - guard let channel = maybeChannel else { - XCTFail("couldn't construct channel") - return - } - XCTAssertNoThrow( - try channel.eventLoop.runSAL(syscallAssertions: { + try context.runSALOnEventLoopAndWait { _, _, _ in + channel.setOption(.writeSpin, value: 0).flatMap { + channel.setOption(.writeBufferWaterMark, value: .init(low: 1, high: 1)) + }.flatMapThrowing { + let sync = channel.pipeline.syncOperations + try sync.addHandler( + DoWriteFromWritabilityChangedNotification( + writableNotificationStepExpectation: writableNotificationStepExpectation + ) + ) + }.flatMap { + // This write should cause a Channel writability change. + XCTAssertTrue( + writableNotificationStepExpectation.compareExchange(expected: 0, desired: 1, ordering: .relaxed) + .exchanged + ) + return channel.writeAndFlush(buffer) + } + } syscallAssertions: { assertions in // We get in a write of 2 bytes, and we claim we wrote 1 bytes of that. - try self.assertWrite(expectedFD: .max, expectedBytes: buffer, return: .processed(1)) + try assertions.assertWrite(expectedFD: .max, expectedBytes: buffer, return: .processed(1)) // Next, we expect a reregistration which adds the `.write` notification - try self.assertReregister { selectable, eventSet in + try assertions.assertReregister { selectable, eventSet in XCTAssert(selectable as? Socket === channel.socket) XCTAssertEqual([.read, .reset, .error, .readEOF, .write], eventSet) return true @@ -166,17 +170,18 @@ final class SALChannelTest: XCTestCase, SALTest { registrationID: .initialRegistrationID ) ) - try self.assertWaitingForNotification(result: writableEvent) - try self.assertWrite( + try assertions.assertWaitingForNotification(result: writableEvent) + try assertions.assertWrite( expectedFD: .max, expectedBytes: buffer.getSlice(at: 1, length: 1)!, return: .processed(1) ) + var buffer = buffer buffer.clear() buffer.writeString("ABC") // expected // This time, the write again, just writes one byte, so we should remain registered for writable. - try self.assertWrite( + try assertions.assertWrite( expectedFD: .max, expectedBytes: buffer, return: .processed(1) @@ -184,7 +189,7 @@ final class SALChannelTest: XCTestCase, SALTest { buffer.moveReaderIndex(forwardBy: 1) // Let's send them another 'writable' notification: - try self.assertWaitingForNotification(result: writableEvent) + try assertions.assertWaitingForNotification(result: writableEvent) // This time, we'll make the write write everything which should also lead to a final channelWritability // change. @@ -192,54 +197,36 @@ final class SALChannelTest: XCTestCase, SALTest { writableNotificationStepExpectation.compareExchange(expected: 3, desired: 4, ordering: .relaxed) .exchanged ) - try self.assertWrite( + try assertions.assertWrite( expectedFD: .max, expectedBytes: buffer, return: .processed(2) ) // And lastly, after having written everything, we'd expect to unregister for write - try self.assertReregister { selectable, eventSet in + try assertions.assertReregister { selectable, eventSet in XCTAssert(selectable as? Socket === channel.socket) XCTAssertEqual([.read, .reset, .error, .readEOF], eventSet) return true } - try self.assertParkedRightNow() - }) { () -> EventLoopFuture in - channel.setOption(.writeSpin, value: 0).flatMap { - channel.setOption(.writeBufferWaterMark, value: .init(low: 1, high: 1)) - }.flatMap { - channel.pipeline.addHandler( - DoWriteFromWritabilityChangedNotification( - writableNotificationStepExpectation: writableNotificationStepExpectation - ) - ) - }.flatMap { - // This write should cause a Channel writability change. - XCTAssertTrue( - writableNotificationStepExpectation.compareExchange(expected: 0, desired: 1, ordering: .relaxed) - .exchanged - ) - return channel.writeAndFlush(buffer) - } - }.salWait() - ) + try assertions.assertParkedRightNow() + } + } } - func testWeSurviveIfIgnoringSIGPIPEFails() { - // We know this sometimes happens on Darwin, so let's test it. - let expectedError = IOError(errnoCode: EINVAL, reason: "bad") - XCTAssertThrowsError(try self.makeSocketChannelInjectingFailures(disableSIGPIPEFailure: expectedError)) { - error in - XCTAssertEqual(expectedError.errnoCode, (error as? IOError)?.errnoCode) + func testWeSurviveIfIgnoringSIGPIPEFails() throws { + try withSALContext { context in + // We know this sometimes happens on Darwin, so let's test it. + let expectedError = IOError(errnoCode: EINVAL, reason: "bad") + XCTAssertThrowsError(try context.makeSocketChannelInjectingFailures(disableSIGPIPEFailure: expectedError)) { + error in + XCTAssertEqual(expectedError.errnoCode, (error as? IOError)?.errnoCode) + } } } - func testBasicRead() { - let localAddress = try! SocketAddress(ipAddress: "0.1.2.3", port: 4) - let serverAddress = try! SocketAddress(ipAddress: "9.8.7.6", port: 5) - + func testBasicRead() throws { final class SignalGroupOnRead: ChannelInboundHandler { typealias InboundIn = ByteBuffer @@ -262,25 +249,24 @@ final class SALChannelTest: XCTestCase, SALTest { } } - var maybeChannel: SocketChannel? = nil - XCTAssertNoThrow( - maybeChannel = try self.makeConnectedSocketChannel( + let g = DispatchGroup() + g.enter() + + try withSALContext { context in + let localAddress = try! SocketAddress(ipAddress: "0.1.2.3", port: 4) + let serverAddress = try! SocketAddress(ipAddress: "9.8.7.6", port: 5) + let buffer = ByteBuffer(string: "hello") + + let channel = try context.makeConnectedSocketChannel( localAddress: localAddress, remoteAddress: serverAddress ) - ) - guard let channel = maybeChannel else { - XCTFail("couldn't construct channel") - return - } - - let buffer = ByteBuffer(string: "hello") - let g = DispatchGroup() - g.enter() - - XCTAssertNoThrow( - try channel.eventLoop.runSAL(syscallAssertions: { + try context.runSALOnEventLoopAndWait { _, _, _ in + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler(SignalGroupOnRead(group: g)) + } + } syscallAssertions: { assertions in let readEvent = SelectorEvent( io: [.read], registration: NIORegistration( @@ -289,103 +275,97 @@ final class SALChannelTest: XCTestCase, SALTest { registrationID: .initialRegistrationID ) ) - try self.assertWaitingForNotification(result: readEvent) - try self.assertRead(expectedFD: .max, expectedBufferSpace: 2048, return: buffer) - }) { - channel.pipeline.addHandler(SignalGroupOnRead(group: g)) + try assertions.assertWaitingForNotification(result: readEvent) + try assertions.assertRead(expectedFD: .max, expectedBufferSpace: 2048, return: buffer) } - ) + } g.wait() } - func testBasicConnectWithClientBootstrap() { - guard let channel = try? self.makeSocketChannel() else { - XCTFail("couldn't make a channel") - return - } - let localAddress = try! SocketAddress(ipAddress: "1.2.3.4", port: 5) - let serverAddress = try! SocketAddress(ipAddress: "9.8.7.6", port: 5) - XCTAssertNoThrow( - try channel.eventLoop.runSAL(syscallAssertions: { - try self.assertSetOption(expectedLevel: .tcp, expectedOption: .tcp_nodelay) { value in + func testBasicConnectWithClientBootstrap() throws { + try withSALContext { context in + let channel = try context.makeSocketChannel() + let localAddress = try! SocketAddress(ipAddress: "1.2.3.4", port: 5) + let serverAddress = try! SocketAddress(ipAddress: "9.8.7.6", port: 5) + + try context.runSALOnEventLoopAndWait { _, _, _ in + ClientBootstrap(group: channel.eventLoop) + .channelOption(.autoRead, value: false) + .testOnly_connect(injectedChannel: channel, to: serverAddress) + .flatMap { channel in + channel.close() + } + } syscallAssertions: { assertions in + try assertions.assertSetOption(expectedLevel: .tcp, expectedOption: .tcp_nodelay) { value in (value as? SocketOptionValue) == 1 } - try self.assertConnect(expectedAddress: serverAddress, result: true) - try self.assertLocalAddress(address: localAddress) - try self.assertRemoteAddress(address: localAddress) - try self.assertRegister { selectable, event, Registration in + try assertions.assertConnect(expectedAddress: serverAddress, result: true) + try assertions.assertLocalAddress(address: localAddress) + try assertions.assertRemoteAddress(address: localAddress) + try assertions.assertRegister { selectable, event, Registration in XCTAssertEqual([.reset, .error], event) return true } - try self.assertReregister { selectable, event in + try assertions.assertReregister { selectable, event in XCTAssertEqual([.reset, .error, .readEOF], event) return true } - try self.assertDeregister { selectable in + try assertions.assertDeregister { selectable in true } - try self.assertClose(expectedFD: .max) - }) { + try assertions.assertClose(expectedFD: .max) + } + } + } + + func testClientBootstrapBindIsDoneAfterSocketOptions() throws { + try withSALContext { context in + let channel = try context.makeSocketChannel() + let localAddress = try! SocketAddress(ipAddress: "1.2.3.4", port: 5) + let serverAddress = try! SocketAddress(ipAddress: "9.8.7.6", port: 5) + + try context.runSALOnEventLoopAndWait { _, _, _ in ClientBootstrap(group: channel.eventLoop) + .channelOption(.socketOption(.so_reuseaddr), value: 1) .channelOption(.autoRead, value: false) + .bind(to: localAddress) .testOnly_connect(injectedChannel: channel, to: serverAddress) .flatMap { channel in channel.close() } - }.salWait() - ) - } - - func testClientBootstrapBindIsDoneAfterSocketOptions() { - guard let channel = try? self.makeSocketChannel() else { - XCTFail("couldn't make a channel") - return - } - let localAddress = try! SocketAddress(ipAddress: "1.2.3.4", port: 5) - let serverAddress = try! SocketAddress(ipAddress: "9.8.7.6", port: 5) - XCTAssertNoThrow( - try channel.eventLoop.runSAL(syscallAssertions: { - try self.assertSetOption(expectedLevel: .tcp, expectedOption: .tcp_nodelay) { value in + } syscallAssertions: { assertions in + try assertions.assertSetOption(expectedLevel: .tcp, expectedOption: .tcp_nodelay) { value in (value as? SocketOptionValue) == 1 } // This is the important bit: We need to apply the socket options _before_ ... - try self.assertSetOption(expectedLevel: .socket, expectedOption: .so_reuseaddr) { value in + try assertions.assertSetOption(expectedLevel: .socket, expectedOption: .so_reuseaddr) { value in (value as? SocketOptionValue) == 1 } // ... we call bind. - try self.assertBind(expectedAddress: localAddress) - try self.assertLocalAddress(address: nil) // this is an inefficiency in `bind0`. - try self.assertConnect(expectedAddress: serverAddress, result: true) - try self.assertLocalAddress(address: localAddress) - try self.assertRemoteAddress(address: localAddress) - try self.assertRegister { selectable, event, Registration in + try assertions.assertBind(expectedAddress: localAddress) + try assertions.assertLocalAddress(address: nil) // this is an inefficiency in `bind0`. + try assertions.assertConnect(expectedAddress: serverAddress, result: true) + try assertions.assertLocalAddress(address: localAddress) + try assertions.assertRemoteAddress(address: localAddress) + try assertions.assertRegister { selectable, event, Registration in XCTAssertEqual([.reset, .error], event) return true } - try self.assertReregister { selectable, event in + try assertions.assertReregister { selectable, event in XCTAssertEqual([.reset, .error, .readEOF], event) return true } - try self.assertDeregister { selectable in + try assertions.assertDeregister { selectable in true } - try self.assertClose(expectedFD: .max) - }) { - ClientBootstrap(group: channel.eventLoop) - .channelOption(.socketOption(.so_reuseaddr), value: 1) - .channelOption(.autoRead, value: false) - .bind(to: localAddress) - .testOnly_connect(injectedChannel: channel, to: serverAddress) - .flatMap { channel in - channel.close() - } - }.salWait() - ) + try assertions.assertClose(expectedFD: .max) + } + } } func testAcceptingInboundConnections() throws { - final class ConnectionRecorder: ChannelInboundHandler { + final class ConnectionRecorder: ChannelInboundHandler, Sendable { typealias InboundIn = Any typealias InboundOut = Any @@ -397,15 +377,17 @@ final class SALChannelTest: XCTestCase, SALTest { } } - let localAddress = try! SocketAddress(ipAddress: "1.2.3.4", port: 5) - let remoteAddress = try! SocketAddress(ipAddress: "5.6.7.8", port: 10) - let channel = try self.makeBoundServerSocketChannel(localAddress: localAddress) + try withSALContext { context in + let localAddress = try! SocketAddress(ipAddress: "1.2.3.4", port: 5) + let remoteAddress = try! SocketAddress(ipAddress: "5.6.7.8", port: 10) + let channel = try context.makeBoundServerSocketChannel(localAddress: localAddress) + let unsafeTransferSocket = try context.makeSocket() - let socket = try self.makeSocket() + let readRecorder = ConnectionRecorder() - let readRecorder = ConnectionRecorder() - XCTAssertNoThrow( - try channel.eventLoop.runSAL(syscallAssertions: { + try context.runSALOnEventLoopAndWait { _, _, _ in + channel.pipeline.addHandler(readRecorder) + } syscallAssertions: { assertions in let readEvent = SelectorEvent( io: [.read], registration: NIORegistration( @@ -414,16 +396,20 @@ final class SALChannelTest: XCTestCase, SALTest { registrationID: .initialRegistrationID ) ) - try self.assertWaitingForNotification(result: readEvent) - try self.assertAccept(expectedFD: .max, expectedNonBlocking: true, return: socket) - try self.assertLocalAddress(address: localAddress) - try self.assertRemoteAddress(address: remoteAddress) + try assertions.assertWaitingForNotification(result: readEvent) + try assertions.assertAccept( + expectedFD: .max, + expectedNonBlocking: true, + return: unsafeTransferSocket.wrappedValue + ) + try assertions.assertLocalAddress(address: localAddress) + try assertions.assertRemoteAddress(address: remoteAddress) // This accept is expected: we delay inbound channel registration by one EL tick. - try self.assertAccept(expectedFD: .max, expectedNonBlocking: true, return: nil) + try assertions.assertAccept(expectedFD: .max, expectedNonBlocking: true, return: nil) // Then we register the inbound channel. - try self.assertRegister { selectable, eventSet, registration in + try assertions.assertRegister { selectable, eventSet, registration in if case (.socketChannel(let channel), let registrationEventSet) = (registration.channel, registration.interested) { @@ -437,27 +423,25 @@ final class SALChannelTest: XCTestCase, SALTest { return false } } - try self.assertReregister { selectable, eventSet in + try assertions.assertReregister { selectable, eventSet in XCTAssertEqual([.reset, .error, .readEOF], eventSet) return true } // because autoRead is on by default - try self.assertReregister { selectable, eventSet in + try assertions.assertReregister { selectable, eventSet in XCTAssertEqual([.reset, .error, .readEOF, .read], eventSet) return true } - try self.assertParkedRightNow() - }) { - channel.pipeline.addHandler(readRecorder) + try assertions.assertParkedRightNow() } - ) - XCTAssertEqual(readRecorder.readCount.load(ordering: .sequentiallyConsistent), 1) + XCTAssertEqual(readRecorder.readCount.load(ordering: .sequentiallyConsistent), 1) + } } func testAcceptingInboundConnectionsDoesntUnregisterForReadIfTheSecondAcceptErrors() throws { - final class ConnectionRecorder: ChannelInboundHandler { + final class ConnectionRecorder: ChannelInboundHandler, Sendable { typealias InboundIn = Any typealias InboundOut = Any @@ -469,15 +453,17 @@ final class SALChannelTest: XCTestCase, SALTest { } } - let localAddress = try! SocketAddress(ipAddress: "1.2.3.4", port: 5) - let remoteAddress = try! SocketAddress(ipAddress: "5.6.7.8", port: 10) - let channel = try self.makeBoundServerSocketChannel(localAddress: localAddress) + try withSALContext { context in + let localAddress = try! SocketAddress(ipAddress: "1.2.3.4", port: 5) + let remoteAddress = try! SocketAddress(ipAddress: "5.6.7.8", port: 10) + let channel = try context.makeBoundServerSocketChannel(localAddress: localAddress) + let unsafeTransferSocket = try context.makeSocket() - let socket = try self.makeSocket() + let readRecorder = ConnectionRecorder() - let readRecorder = ConnectionRecorder() - XCTAssertNoThrow( - try channel.eventLoop.runSAL(syscallAssertions: { + try context.runSALOnEventLoopAndWait { _, _, _ in + channel.pipeline.addHandler(readRecorder) + } syscallAssertions: { assertions in let readEvent = SelectorEvent( io: [.read], registration: NIORegistration( @@ -486,17 +472,25 @@ final class SALChannelTest: XCTestCase, SALTest { registrationID: .initialRegistrationID ) ) - try self.assertWaitingForNotification(result: readEvent) - try self.assertAccept(expectedFD: .max, expectedNonBlocking: true, return: socket) - try self.assertLocalAddress(address: localAddress) - try self.assertRemoteAddress(address: remoteAddress) + try assertions.assertWaitingForNotification(result: readEvent) + try assertions.assertAccept( + expectedFD: .max, + expectedNonBlocking: true, + return: unsafeTransferSocket.wrappedValue + ) + try assertions.assertLocalAddress(address: localAddress) + try assertions.assertRemoteAddress(address: remoteAddress) // This accept is expected: we delay inbound channel registration by one EL tick. This one throws. // We throw a deliberate error here: this one hits the buggy codepath. - try self.assertAccept(expectedFD: .max, expectedNonBlocking: true, throwing: NIOFcntlFailedError()) + try assertions.assertAccept( + expectedFD: .max, + expectedNonBlocking: true, + throwing: NIOFcntlFailedError() + ) // Then we register the inbound channel from the first accept. - try self.assertRegister { selectable, eventSet, registration in + try assertions.assertRegister { selectable, eventSet, registration in if case (.socketChannel(let channel), let registrationEventSet) = (registration.channel, registration.interested) { @@ -510,49 +504,59 @@ final class SALChannelTest: XCTestCase, SALTest { return false } } - try self.assertReregister { selectable, eventSet in + try assertions.assertReregister { selectable, eventSet in XCTAssertEqual([.reset, .error, .readEOF], eventSet) return true } // because autoRead is on by default - try self.assertReregister { selectable, eventSet in + try assertions.assertReregister { selectable, eventSet in XCTAssertEqual([.reset, .error, .readEOF, .read], eventSet) return true } // Importantly, we should now be _parked_. This test is mostly testing in the absence: // we expect not to see a reregister that removes readable. - try self.assertParkedRightNow() - }) { - channel.pipeline.addHandler(readRecorder) + try assertions.assertParkedRightNow() } - ) - XCTAssertEqual(readRecorder.readCount.load(ordering: .sequentiallyConsistent), 1) + XCTAssertEqual(readRecorder.readCount.load(ordering: .sequentiallyConsistent), 1) + } } - func testWriteBeforeChannelActiveClientStreamDelayedConnect() { - guard let channel = try? self.makeSocketChannel() else { - XCTFail("couldn't make a channel") - return - } - let localAddress = try! SocketAddress(ipAddress: "1.2.3.4", port: 5) - let serverAddress = try! SocketAddress(ipAddress: "9.8.7.6", port: 5) - let firstWrite = ByteBuffer(string: "foo") - let secondWrite = ByteBuffer(string: "bar") + func testWriteBeforeChannelActiveClientStreamDelayedConnect() throws { + try withSALContext { context in + let channel = try context.makeSocketChannel() + let localAddress = try! SocketAddress(ipAddress: "1.2.3.4", port: 5) + let serverAddress = try! SocketAddress(ipAddress: "9.8.7.6", port: 5) + let firstWrite = ByteBuffer(string: "foo") + let secondWrite = ByteBuffer(string: "bar") - XCTAssertNoThrow( - try channel.eventLoop.runSAL(syscallAssertions: { - try self.assertSetOption(expectedLevel: .tcp, expectedOption: .tcp_nodelay) { value in + try context.runSALOnEventLoopAndWait { _, _, _ in + ClientBootstrap(group: channel.eventLoop) + .channelOption(.autoRead, value: false) + .channelInitializer { channel in + channel.write(firstWrite, promise: nil) + channel.write(secondWrite).whenComplete { _ in + channel.close(promise: nil) + } + channel.flush() + return channel.eventLoop.makeSucceededVoidFuture() + } + .testOnly_connect(injectedChannel: channel, to: serverAddress) + .flatMap { + $0.closeFuture + } + } syscallAssertions: { assertions in + try assertions.assertSetOption(expectedLevel: .tcp, expectedOption: .tcp_nodelay) { value in (value as? SocketOptionValue) == 1 } - try self.assertConnect(expectedAddress: serverAddress, result: false) - try self.assertLocalAddress(address: localAddress) - try self.assertRegister { selectable, event, Registration in + try assertions.assertConnect(expectedAddress: serverAddress, result: false) + try assertions.assertLocalAddress(address: localAddress) + try assertions.assertRegister { selectable, event, Registration in XCTAssertEqual([.reset, .error], event) return true } - try self.assertReregister { selectable, event in + try assertions.assertReregister { selectable, event in XCTAssertEqual([.reset, .error, .write], event) return true } @@ -565,25 +569,37 @@ final class SALChannelTest: XCTestCase, SALTest { registrationID: .initialRegistrationID ) ) - try self.assertWaitingForNotification(result: writeEvent) - try self.assertGetOption(expectedLevel: .socket, expectedOption: .so_error, value: CInt(0)) - try self.assertRemoteAddress(address: serverAddress) + try assertions.assertWaitingForNotification(result: writeEvent) + try assertions.assertGetOption(expectedLevel: .socket, expectedOption: .so_error, value: CInt(0)) + try assertions.assertRemoteAddress(address: serverAddress) - try self.assertReregister { selectable, event in + try assertions.assertReregister { selectable, event in XCTAssertEqual([.reset, .error, .readEOF, .write], event) return true } - try self.assertWritev( + try assertions.assertWritev( expectedFD: .max, expectedBytes: [firstWrite, secondWrite], return: .processed(6) ) - try self.assertDeregister { selectable in + try assertions.assertDeregister { selectable in true } - try self.assertClose(expectedFD: .max) - }) { + try assertions.assertClose(expectedFD: .max) + } + } + } + + func testWriteBeforeChannelActiveClientStreamInstantConnect() throws { + try withSALContext { context in + let channel = try context.makeSocketChannel() + let localAddress = try! SocketAddress(ipAddress: "1.2.3.4", port: 5) + let serverAddress = try! SocketAddress(ipAddress: "9.8.7.6", port: 5) + let firstWrite = ByteBuffer(string: "foo") + let secondWrite = ByteBuffer(string: "bar") + + try context.runSALOnEventLoopAndWait { _, _, _ in ClientBootstrap(group: channel.eventLoop) .channelOption(.autoRead, value: false) .channelInitializer { channel in @@ -598,54 +614,56 @@ final class SALChannelTest: XCTestCase, SALTest { .flatMap { $0.closeFuture } - }.salWait() - ) - } - - func testWriteBeforeChannelActiveClientStreamInstantConnect() { - guard let channel = try? self.makeSocketChannel() else { - XCTFail("couldn't make a channel") - return - } - let localAddress = try! SocketAddress(ipAddress: "1.2.3.4", port: 5) - let serverAddress = try! SocketAddress(ipAddress: "9.8.7.6", port: 5) - let firstWrite = ByteBuffer(string: "foo") - let secondWrite = ByteBuffer(string: "bar") - - XCTAssertNoThrow( - try channel.eventLoop.runSAL(syscallAssertions: { - try self.assertSetOption(expectedLevel: .tcp, expectedOption: .tcp_nodelay) { value in + } syscallAssertions: { assertions in + try assertions.assertSetOption(expectedLevel: .tcp, expectedOption: .tcp_nodelay) { value in (value as? SocketOptionValue) == 1 } - try self.assertConnect(expectedAddress: serverAddress, result: true) - try self.assertLocalAddress(address: localAddress) - try self.assertRemoteAddress(address: serverAddress) - try self.assertRegister { selectable, event, Registration in + try assertions.assertConnect(expectedAddress: serverAddress, result: true) + try assertions.assertLocalAddress(address: localAddress) + try assertions.assertRemoteAddress(address: serverAddress) + try assertions.assertRegister { selectable, event, Registration in XCTAssertEqual([.reset, .error], event) return true } - try self.assertReregister { selectable, event in + try assertions.assertReregister { selectable, event in XCTAssertEqual([.reset, .error, .readEOF], event) return true } - try self.assertWritev( + try assertions.assertWritev( expectedFD: .max, expectedBytes: [firstWrite, secondWrite], return: .processed(6) ) - try self.assertDeregister { selectable in + try assertions.assertDeregister { selectable in true } - try self.assertClose(expectedFD: .max) - }) { + try assertions.assertClose(expectedFD: .max) + } + } + } + + func testWriteBeforeChannelActiveClientStreamInstantConnect_shortWriteLeadsToWritable() throws { + try withSALContext { context in + let channel = try context.makeSocketChannel() + let localAddress = try! SocketAddress(ipAddress: "1.2.3.4", port: 5) + let serverAddress = try! SocketAddress(ipAddress: "9.8.7.6", port: 5) + let firstWrite = ByteBuffer(string: "foo") + let secondWrite = ByteBuffer(string: "bar") + + try context.runSALOnEventLoopAndWait { _, _, _ in ClientBootstrap(group: channel.eventLoop) .channelOption(.autoRead, value: false) + .channelOption(.writeSpin, value: 1) .channelInitializer { channel in - channel.write(firstWrite, promise: nil) - channel.write(secondWrite).whenComplete { _ in - channel.close(promise: nil) + channel.write(firstWrite).whenComplete { _ in + // An extra EL spin here to ensure that the close doesn't + // beat the writable + channel.eventLoop.execute { + channel.close(promise: nil) + } } + channel.write(secondWrite, promise: nil) channel.flush() return channel.eventLoop.makeSucceededVoidFuture() } @@ -653,62 +671,56 @@ final class SALChannelTest: XCTestCase, SALTest { .flatMap { $0.closeFuture } - }.salWait() - ) - } - - func testWriteBeforeChannelActiveClientStreamInstantConnect_shortWriteLeadsToWritable() { - guard let channel = try? self.makeSocketChannel() else { - XCTFail("couldn't make a channel") - return - } - let localAddress = try! SocketAddress(ipAddress: "1.2.3.4", port: 5) - let serverAddress = try! SocketAddress(ipAddress: "9.8.7.6", port: 5) - let firstWrite = ByteBuffer(string: "foo") - let secondWrite = ByteBuffer(string: "bar") - - XCTAssertNoThrow( - try channel.eventLoop.runSAL(syscallAssertions: { - try self.assertSetOption(expectedLevel: .tcp, expectedOption: .tcp_nodelay) { value in + } syscallAssertions: { assertions in + try assertions.assertSetOption(expectedLevel: .tcp, expectedOption: .tcp_nodelay) { value in (value as? SocketOptionValue) == 1 } - try self.assertConnect(expectedAddress: serverAddress, result: true) - try self.assertLocalAddress(address: localAddress) - try self.assertRemoteAddress(address: serverAddress) - try self.assertRegister { selectable, event, Registration in + try assertions.assertConnect(expectedAddress: serverAddress, result: true) + try assertions.assertLocalAddress(address: localAddress) + try assertions.assertRemoteAddress(address: serverAddress) + try assertions.assertRegister { selectable, event, Registration in XCTAssertEqual([.reset, .error], event) return true } - try self.assertReregister { selectable, event in + try assertions.assertReregister { selectable, event in XCTAssertEqual([.reset, .error, .readEOF], event) return true } - try self.assertWritev( + try assertions.assertWritev( expectedFD: .max, expectedBytes: [firstWrite, secondWrite], return: .processed(3) ) - try self.assertWrite(expectedFD: .max, expectedBytes: secondWrite, return: .wouldBlock(0)) - try self.assertReregister { selectable, event in + try assertions.assertWrite(expectedFD: .max, expectedBytes: secondWrite, return: .wouldBlock(0)) + try assertions.assertReregister { selectable, event in XCTAssertEqual([.reset, .error, .readEOF, .write], event) return true } - try self.assertDeregister { selectable in + try assertions.assertDeregister { selectable in true } - try self.assertClose(expectedFD: .max) - }) { + try assertions.assertClose(expectedFD: .max) + } + } + } + + func testWriteBeforeChannelActiveClientStreamInstantConnect_shortWriteLeadsToWritable_instantClose() throws { + try withSALContext { context in + let channel = try context.makeSocketChannel() + let localAddress = try! SocketAddress(ipAddress: "1.2.3.4", port: 5) + let serverAddress = try! SocketAddress(ipAddress: "9.8.7.6", port: 5) + let firstWrite = ByteBuffer(string: "foo") + let secondWrite = ByteBuffer(string: "bar") + + try context.runSALOnEventLoopAndWait { _, _, _ in ClientBootstrap(group: channel.eventLoop) .channelOption(.autoRead, value: false) .channelOption(.writeSpin, value: 1) .channelInitializer { channel in channel.write(firstWrite).whenComplete { _ in - // An extra EL spin here to ensure that the close doesn't - // beat the writable - channel.eventLoop.execute { - channel.close(promise: nil) - } + // No EL spin here so the close happens in the middle of the write spin. + channel.close(promise: nil) } channel.write(secondWrite, promise: nil) channel.flush() @@ -718,78 +730,60 @@ final class SALChannelTest: XCTestCase, SALTest { .flatMap { $0.closeFuture } - }.salWait() - ) - } - - func testWriteBeforeChannelActiveClientStreamInstantConnect_shortWriteLeadsToWritable_instantClose() { - guard let channel = try? self.makeSocketChannel() else { - XCTFail("couldn't make a channel") - return - } - let localAddress = try! SocketAddress(ipAddress: "1.2.3.4", port: 5) - let serverAddress = try! SocketAddress(ipAddress: "9.8.7.6", port: 5) - let firstWrite = ByteBuffer(string: "foo") - let secondWrite = ByteBuffer(string: "bar") - - XCTAssertNoThrow( - try channel.eventLoop.runSAL(syscallAssertions: { - try self.assertSetOption(expectedLevel: .tcp, expectedOption: .tcp_nodelay) { value in + } syscallAssertions: { assertions in + try assertions.assertSetOption(expectedLevel: .tcp, expectedOption: .tcp_nodelay) { value in (value as? SocketOptionValue) == 1 } - try self.assertConnect(expectedAddress: serverAddress, result: true) - try self.assertLocalAddress(address: localAddress) - try self.assertRemoteAddress(address: serverAddress) - try self.assertRegister { selectable, event, Registration in + try assertions.assertConnect(expectedAddress: serverAddress, result: true) + try assertions.assertLocalAddress(address: localAddress) + try assertions.assertRemoteAddress(address: serverAddress) + try assertions.assertRegister { selectable, event, Registration in XCTAssertEqual([.reset, .error], event) return true } - try self.assertReregister { selectable, event in + try assertions.assertReregister { selectable, event in XCTAssertEqual([.reset, .error, .readEOF], event) return true } - try self.assertWritev( + try assertions.assertWritev( expectedFD: .max, expectedBytes: [firstWrite, secondWrite], return: .processed(3) ) - try self.assertDeregister { selectable in + try assertions.assertDeregister { selectable in true } - try self.assertClose(expectedFD: .max) - }) { - ClientBootstrap(group: channel.eventLoop) - .channelOption(.autoRead, value: false) - .channelOption(.writeSpin, value: 1) - .channelInitializer { channel in - channel.write(firstWrite).whenComplete { _ in - // No EL spin here so the close happens in the middle of the write spin. - channel.close(promise: nil) - } - channel.write(secondWrite, promise: nil) - channel.flush() - return channel.eventLoop.makeSucceededVoidFuture() - } - .testOnly_connect(injectedChannel: channel, to: serverAddress) - .flatMap { - $0.closeFuture - } - }.salWait() - ) + try assertions.assertClose(expectedFD: .max) + } + } } func testWriteBeforeChannelActiveServerStream() throws { - let localAddress = try! SocketAddress(ipAddress: "1.2.3.4", port: 5) - let remoteAddress = try! SocketAddress(ipAddress: "5.6.7.8", port: 10) - let channel = try self.makeBoundServerSocketChannel(localAddress: localAddress) + try withSALContext { context in + let localAddress = try! SocketAddress(ipAddress: "1.2.3.4", port: 5) + let remoteAddress = try! SocketAddress(ipAddress: "5.6.7.8", port: 10) + let channel = try context.makeBoundServerSocketChannel(localAddress: localAddress) - let socket = try self.makeSocket() - let firstWrite = ByteBuffer(string: "foo") - let secondWrite = ByteBuffer(string: "bar") + let unsafeTransferSocket = try context.makeSocket() + let firstWrite = ByteBuffer(string: "foo") + let secondWrite = ByteBuffer(string: "bar") - XCTAssertNoThrow( - try channel.eventLoop.runSAL(syscallAssertions: { + try context.runSALOnEventLoop { _, _, _ in + try channel.pipeline.syncOperations.addHandler( + ServerBootstrap.AcceptHandler( + childChannelInitializer: { channel in + channel.write(firstWrite, promise: nil) + channel.write(secondWrite).whenComplete { _ in + channel.close(promise: nil) + } + channel.flush() + return channel.eventLoop.makeSucceededVoidFuture() + }, + childChannelOptions: .init() + ) + ) + } syscallAssertions: { assertions in let readEvent = SelectorEvent( io: [.read], registration: NIORegistration( @@ -798,16 +792,20 @@ final class SALChannelTest: XCTestCase, SALTest { registrationID: .initialRegistrationID ) ) - try self.assertWaitingForNotification(result: readEvent) - try self.assertAccept(expectedFD: .max, expectedNonBlocking: true, return: socket) - try self.assertLocalAddress(address: localAddress) - try self.assertRemoteAddress(address: remoteAddress) + try assertions.assertWaitingForNotification(result: readEvent) + try assertions.assertAccept( + expectedFD: .max, + expectedNonBlocking: true, + return: unsafeTransferSocket.wrappedValue + ) + try assertions.assertLocalAddress(address: localAddress) + try assertions.assertRemoteAddress(address: remoteAddress) // This accept is expected: we delay inbound channel registration by one EL tick. - try self.assertAccept(expectedFD: .max, expectedNonBlocking: true, return: nil) + try assertions.assertAccept(expectedFD: .max, expectedNonBlocking: true, return: nil) // Then we register the inbound channel. - try self.assertRegister { selectable, eventSet, registration in + try assertions.assertRegister { selectable, eventSet, registration in if case (.socketChannel(let channel), let registrationEventSet) = (registration.channel, registration.interested) { @@ -821,55 +819,59 @@ final class SALChannelTest: XCTestCase, SALTest { return false } } - try self.assertReregister { selectable, event in + try assertions.assertReregister { selectable, event in XCTAssertEqual([.reset, .error, .readEOF], event) return true } // We then get an immediate write which completes, then a close. - try self.assertWritev( + try assertions.assertWritev( expectedFD: .max, expectedBytes: [firstWrite, secondWrite], return: .processed(6) ) - try self.assertDeregister { selectable in + try assertions.assertDeregister { selectable in true } - try self.assertClose(expectedFD: .max) + try assertions.assertClose(expectedFD: .max) - try self.assertParkedRightNow() - }) { - channel.pipeline.addHandler( + try assertions.assertParkedRightNow() + } + } + } + + func testWriteBeforeChannelActiveServerStream_shortWriteLeadsToWritable() throws { + try withSALContext { context in + let localAddress = try! SocketAddress(ipAddress: "1.2.3.4", port: 5) + let remoteAddress = try! SocketAddress(ipAddress: "5.6.7.8", port: 10) + let channel = try context.makeBoundServerSocketChannel(localAddress: localAddress) + + let unsafeTransferSocket = try context.makeSocket() + let firstWrite = ByteBuffer(string: "foo") + let secondWrite = ByteBuffer(string: "bar") + var childChannelOptions = ChannelOptions.Storage() + childChannelOptions.append(key: .autoRead, value: false) + + try context.runSALOnEventLoop { [childChannelOptions] _, _, _ in + try channel.pipeline.syncOperations.addHandler( ServerBootstrap.AcceptHandler( childChannelInitializer: { channel in - channel.write(firstWrite, promise: nil) - channel.write(secondWrite).whenComplete { _ in - channel.close(promise: nil) + channel.write(firstWrite).whenComplete { _ in + // An extra EL spin here to ensure that the close doesn't + // beat the writable + channel.eventLoop.execute { + channel.close(promise: nil) + } } + channel.write(secondWrite, promise: nil) channel.flush() return channel.eventLoop.makeSucceededVoidFuture() }, - childChannelOptions: .init() + childChannelOptions: childChannelOptions ) ) - } - ) - } - - func testWriteBeforeChannelActiveServerStream_shortWriteLeadsToWritable() throws { - let localAddress = try! SocketAddress(ipAddress: "1.2.3.4", port: 5) - let remoteAddress = try! SocketAddress(ipAddress: "5.6.7.8", port: 10) - let channel = try self.makeBoundServerSocketChannel(localAddress: localAddress) - - let socket = try self.makeSocket() - let firstWrite = ByteBuffer(string: "foo") - let secondWrite = ByteBuffer(string: "bar") - var childChannelOptions = ChannelOptions.Storage() - childChannelOptions.append(key: .autoRead, value: false) - - XCTAssertNoThrow( - try channel.eventLoop.runSAL(syscallAssertions: { + } syscallAssertions: { assertions in let readEvent = SelectorEvent( io: [.read], registration: NIORegistration( @@ -878,16 +880,20 @@ final class SALChannelTest: XCTestCase, SALTest { registrationID: .initialRegistrationID ) ) - try self.assertWaitingForNotification(result: readEvent) - try self.assertAccept(expectedFD: .max, expectedNonBlocking: true, return: socket) - try self.assertLocalAddress(address: localAddress) - try self.assertRemoteAddress(address: remoteAddress) + try assertions.assertWaitingForNotification(result: readEvent) + try assertions.assertAccept( + expectedFD: .max, + expectedNonBlocking: true, + return: unsafeTransferSocket.wrappedValue + ) + try assertions.assertLocalAddress(address: localAddress) + try assertions.assertRemoteAddress(address: remoteAddress) // This accept is expected: we delay inbound channel registration by one EL tick. - try self.assertAccept(expectedFD: .max, expectedNonBlocking: true, return: nil) + try assertions.assertAccept(expectedFD: .max, expectedNonBlocking: true, return: nil) // Then we register the inbound channel. - try self.assertRegister { selectable, eventSet, registration in + try assertions.assertRegister { selectable, eventSet, registration in if case (.socketChannel(let channel), let registrationEventSet) = (registration.channel, registration.interested) { @@ -901,38 +907,50 @@ final class SALChannelTest: XCTestCase, SALTest { return false } } - try self.assertReregister { selectable, event in + try assertions.assertReregister { selectable, event in XCTAssertEqual([.reset, .error, .readEOF], event) return true } - try self.assertWritev( + try assertions.assertWritev( expectedFD: .max, expectedBytes: [firstWrite, secondWrite], return: .processed(3) ) - try self.assertWrite(expectedFD: .max, expectedBytes: secondWrite, return: .wouldBlock(0)) - try self.assertReregister { selectable, event in + try assertions.assertWrite(expectedFD: .max, expectedBytes: secondWrite, return: .wouldBlock(0)) + try assertions.assertReregister { selectable, event in XCTAssertEqual([.reset, .error, .readEOF, .write], event) return true } - try self.assertDeregister { selectable in + try assertions.assertDeregister { selectable in true } - try self.assertClose(expectedFD: .max) + try assertions.assertClose(expectedFD: .max) - try self.assertParkedRightNow() - }) { - channel.pipeline.addHandler( + try assertions.assertParkedRightNow() + } + } + } + + func testWriteBeforeChannelActiveServerStream_shortWriteLeadsToWritable_instantClose() throws { + try withSALContext { context in + let localAddress = try! SocketAddress(ipAddress: "1.2.3.4", port: 5) + let remoteAddress = try! SocketAddress(ipAddress: "5.6.7.8", port: 10) + let channel = try context.makeBoundServerSocketChannel(localAddress: localAddress) + + let unsafeTransferSocket = try context.makeSocket() + let firstWrite = ByteBuffer(string: "foo") + let secondWrite = ByteBuffer(string: "bar") + var childChannelOptions = ChannelOptions.Storage() + childChannelOptions.append(key: .autoRead, value: false) + + try context.runSALOnEventLoop { [childChannelOptions] _, _, _ in + try channel.pipeline.syncOperations.addHandler( ServerBootstrap.AcceptHandler( childChannelInitializer: { channel in channel.write(firstWrite).whenComplete { _ in - // An extra EL spin here to ensure that the close doesn't - // beat the writable - channel.eventLoop.execute { - channel.close(promise: nil) - } + channel.close(promise: nil) } channel.write(secondWrite, promise: nil) channel.flush() @@ -941,23 +959,7 @@ final class SALChannelTest: XCTestCase, SALTest { childChannelOptions: childChannelOptions ) ) - } - ) - } - - func testWriteBeforeChannelActiveServerStream_shortWriteLeadsToWritable_instantClose() throws { - let localAddress = try! SocketAddress(ipAddress: "1.2.3.4", port: 5) - let remoteAddress = try! SocketAddress(ipAddress: "5.6.7.8", port: 10) - let channel = try self.makeBoundServerSocketChannel(localAddress: localAddress) - - let socket = try self.makeSocket() - let firstWrite = ByteBuffer(string: "foo") - let secondWrite = ByteBuffer(string: "bar") - var childChannelOptions = ChannelOptions.Storage() - childChannelOptions.append(key: .autoRead, value: false) - - XCTAssertNoThrow( - try channel.eventLoop.runSAL(syscallAssertions: { + } syscallAssertions: { assertions in let readEvent = SelectorEvent( io: [.read], registration: NIORegistration( @@ -966,16 +968,20 @@ final class SALChannelTest: XCTestCase, SALTest { registrationID: .initialRegistrationID ) ) - try self.assertWaitingForNotification(result: readEvent) - try self.assertAccept(expectedFD: .max, expectedNonBlocking: true, return: socket) - try self.assertLocalAddress(address: localAddress) - try self.assertRemoteAddress(address: remoteAddress) + try assertions.assertWaitingForNotification(result: readEvent) + try assertions.assertAccept( + expectedFD: .max, + expectedNonBlocking: true, + return: unsafeTransferSocket.wrappedValue + ) + try assertions.assertLocalAddress(address: localAddress) + try assertions.assertRemoteAddress(address: remoteAddress) // This accept is expected: we delay inbound channel registration by one EL tick. - try self.assertAccept(expectedFD: .max, expectedNonBlocking: true, return: nil) + try assertions.assertAccept(expectedFD: .max, expectedNonBlocking: true, return: nil) // Then we register the inbound channel. - try self.assertRegister { selectable, eventSet, registration in + try assertions.assertRegister { selectable, eventSet, registration in if case (.socketChannel(let channel), let registrationEventSet) = (registration.channel, registration.interested) { @@ -989,41 +995,27 @@ final class SALChannelTest: XCTestCase, SALTest { return false } } - try self.assertReregister { selectable, event in + try assertions.assertReregister { selectable, event in XCTAssertEqual([.reset, .error, .readEOF], event) return true } - try self.assertWritev( + try assertions.assertWritev( expectedFD: .max, expectedBytes: [firstWrite, secondWrite], return: .processed(3) ) - try self.assertDeregister { selectable in + try assertions.assertDeregister { selectable in true } - try self.assertClose(expectedFD: .max) + try assertions.assertClose(expectedFD: .max) - try self.assertParkedRightNow() - }) { - channel.pipeline.addHandler( - ServerBootstrap.AcceptHandler( - childChannelInitializer: { channel in - channel.write(firstWrite).whenComplete { _ in - channel.close(promise: nil) - } - channel.write(secondWrite, promise: nil) - channel.flush() - return channel.eventLoop.makeSucceededVoidFuture() - }, - childChannelOptions: childChannelOptions - ) - ) + try assertions.assertParkedRightNow() } - ) + } } - func testBaseSocketChannelFlushNowReentrancyCrash() { + func testBaseSocketChannelFlushNowReentrancyCrash() throws { final class TestHandler: ChannelInboundHandler { typealias InboundIn = Any typealias OutboundOut = ByteBuffer @@ -1048,26 +1040,40 @@ final class SALChannelTest: XCTestCase, SALTest { context.fireChannelWritabilityChanged() } } - guard let channel = try? self.makeSocketChannel() else { - XCTFail("couldn't make a channel") - return - } - let localAddress = try! SocketAddress(ipAddress: "1.2.3.4", port: 5) - let serverAddress = try! SocketAddress(ipAddress: "9.8.7.6", port: 5) - let buffer = ByteBuffer(repeating: 0, count: 1024) - XCTAssertNoThrow( - try channel.eventLoop.runSAL(syscallAssertions: { - try self.assertSetOption(expectedLevel: .tcp, expectedOption: .tcp_nodelay) { value in + try withSALContext { context in + let channel = try context.makeSocketChannel() + let localAddress = try! SocketAddress(ipAddress: "1.2.3.4", port: 5) + let serverAddress = try! SocketAddress(ipAddress: "9.8.7.6", port: 5) + let buffer = ByteBuffer(repeating: 0, count: 1024) + + try context.runSALOnEventLoopAndWait { _, _, _ in + ClientBootstrap(group: channel.eventLoop) + .channelOption(.autoRead, value: false) + .channelOption(.writeSpin, value: 0) + .channelOption( + .writeBufferWaterMark, + value: .init(low: buffer.readableBytes + 1, high: buffer.readableBytes + 1) + ) + .channelInitializer { channel in + try! channel.pipeline.syncOperations.addHandler(TestHandler(buffer)) + return channel.eventLoop.makeSucceededVoidFuture() + } + .testOnly_connect(injectedChannel: channel, to: serverAddress) + .flatMap { + $0.closeFuture + } + } syscallAssertions: { assertions in + try assertions.assertSetOption(expectedLevel: .tcp, expectedOption: .tcp_nodelay) { value in (value as? SocketOptionValue) == 1 } - try self.assertConnect(expectedAddress: serverAddress, result: false) - try self.assertLocalAddress(address: localAddress) - try self.assertRegister { selectable, event, Registration in + try assertions.assertConnect(expectedAddress: serverAddress, result: false) + try assertions.assertLocalAddress(address: localAddress) + try assertions.assertRegister { selectable, event, Registration in XCTAssertEqual([.reset, .error], event) return true } - try self.assertReregister { selectable, event in + try assertions.assertReregister { selectable, event in XCTAssertEqual([.reset, .error, .write], event) return true } @@ -1080,20 +1086,20 @@ final class SALChannelTest: XCTestCase, SALTest { registrationID: .initialRegistrationID ) ) - try self.assertWaitingForNotification(result: writeEvent) - try self.assertGetOption(expectedLevel: .socket, expectedOption: .so_error, value: CInt(0)) - try self.assertRemoteAddress(address: serverAddress) + try assertions.assertWaitingForNotification(result: writeEvent) + try assertions.assertGetOption(expectedLevel: .socket, expectedOption: .so_error, value: CInt(0)) + try assertions.assertRemoteAddress(address: serverAddress) - try self.assertReregister { selectable, event in + try assertions.assertReregister { selectable, event in XCTAssertEqual([.reset, .error, .readEOF, .write], event) return true } - try self.assertWritev( + try assertions.assertWritev( expectedFD: .max, expectedBytes: [buffer, buffer], return: .wouldBlock(0) ) - try self.assertWritev( + try assertions.assertWritev( expectedFD: .max, expectedBytes: [buffer, buffer], return: .wouldBlock(0) @@ -1107,34 +1113,18 @@ final class SALChannelTest: XCTestCase, SALTest { registrationID: .initialRegistrationID ) ) - try self.assertWaitingForNotification(result: canWriteEvent) - try self.assertWritev( + try assertions.assertWaitingForNotification(result: canWriteEvent) + try assertions.assertWritev( expectedFD: .max, expectedBytes: [buffer, buffer], return: .processed(buffer.readableBytes) ) - try self.assertDeregister { selectable in + try assertions.assertDeregister { selectable in true } - try self.assertClose(expectedFD: .max) - }) { - ClientBootstrap(group: channel.eventLoop) - .channelOption(.autoRead, value: false) - .channelOption(.writeSpin, value: 0) - .channelOption( - .writeBufferWaterMark, - value: .init(low: buffer.readableBytes + 1, high: buffer.readableBytes + 1) - ) - .channelInitializer { channel in - try! channel.pipeline.syncOperations.addHandler(TestHandler(buffer)) - return channel.eventLoop.makeSucceededVoidFuture() - } - .testOnly_connect(injectedChannel: channel, to: serverAddress) - .flatMap { - $0.closeFuture - } - }.salWait() - ) + try assertions.assertClose(expectedFD: .max) + } + } } } diff --git a/Tests/NIOPosixTests/SALEventLoopTests.swift b/Tests/NIOPosixTests/SALEventLoopTests.swift index ad3656475db..ddd75a8c648 100644 --- a/Tests/NIOPosixTests/SALEventLoopTests.swift +++ b/Tests/NIOPosixTests/SALEventLoopTests.swift @@ -12,67 +12,48 @@ // //===----------------------------------------------------------------------===// -import Dispatch -import NIOConcurrencyHelpers import NIOCore import XCTest -@testable import NIOPosix - -final class SALEventLoopTests: XCTestCase, SALTest { - var group: MultiThreadedEventLoopGroup! - var kernelToUserBox: LockedBox! - var userToKernelBox: LockedBox! - var wakeups: LockedBox<()>! - - override func setUp() { - self.setUpSAL() - } - - override func tearDown() { - self.tearDownSAL() - } - +final class SALEventLoopTests: XCTestCase { func testSchedulingTaskOnSleepingLoopWakesUpOnce() throws { - let thisLoop = self.group.next() - - try thisLoop.runSAL(syscallAssertions: { - try self.assertParkedRightNow() - - try self.assertWakeup() - - // We actually need to wait for the inner code to exit, as the optimisation we're testing here will remove a signal that the - // SAL is actually going to wait for in salWait(). - try self.assertParkedRightNow() - }) { () -> EventLoopFuture in - // We're going to execute some tasks on the loop. This will force a single wakeup, as the first task will wedge the loop open. - // However, we're currently _on_ the loop so the first thing we have to do is give it up. - let promise = thisLoop.makePromise(of: Void.self) - - DispatchQueue(label: "background").asyncAfter(deadline: .now() + .milliseconds(100)) { - let semaphore = DispatchSemaphore(value: 0) - - thisLoop.execute { - // Wedge the loop open. This will also _wake_ the loop. - XCTAssertEqual(semaphore.wait(timeout: .now() + .milliseconds(500)), .success) - print("Unblocking wedged task") + try withSALContext { context in + try context.runSALOnEventLoopAndWait { thisLoop, _, _ in + // We're going to execute some tasks on the loop. This will force a single wakeup, as the first task will wedge the loop open. + // However, we're currently _on_ the loop so the first thing we have to do is give it up. + let promise = thisLoop.makePromise(of: Void.self) + + DispatchQueue(label: "background").asyncAfter(deadline: .now() + .milliseconds(100)) { + let semaphore = DispatchSemaphore(value: 0) + + thisLoop.execute { + // Wedge the loop open. This will also _wake_ the loop. + XCTAssertEqual(semaphore.wait(timeout: .now() + .milliseconds(500)), .success) + } + + // Now execute 10 tasks. + for _ in 0..<10 { + thisLoop.execute {} + } + + // Now enqueue a "last" task. + thisLoop.execute { + promise.succeed(()) + } + + // Now we can unblock the semaphore. + semaphore.signal() } - // Now execute 10 tasks. - for _ in 0..<10 { - thisLoop.execute {} - } + return promise.futureResult + } syscallAssertions: { assertions in + try assertions.assertParkedRightNow() + try assertions.assertWakeup() - // Now enqueue a "last" task. - thisLoop.execute { - promise.succeed(()) - } - - // Now we can unblock the semaphore. - semaphore.signal() + // We actually need to wait for the inner code to exit, as the optimisation we're testing here will remove a signal that the + // SAL is actually going to wait for in salWait(). + try assertions.assertParkedRightNow() } - - return promise.futureResult - }.salWait() + } } } diff --git a/Tests/NIOPosixTests/SyscallAbstractionLayer.swift b/Tests/NIOPosixTests/SyscallAbstractionLayer.swift index 36e774e2d25..7e6921be9a6 100644 --- a/Tests/NIOPosixTests/SyscallAbstractionLayer.swift +++ b/Tests/NIOPosixTests/SyscallAbstractionLayer.swift @@ -14,6 +14,39 @@ // This file contains a syscall abstraction layer (SAL) which hooks the Selector and the Socket in a way that we can // play the kernel whilst NIO thinks it's running on a real OS. +// +// The SAL tests are pretty darn awkward with respect to sendability. +// +// Each SAL test is setup such that it can run some work on an event loop whose +// selector has been hooked such that it signals various events to locked +// containers. Another closure in each test can make assertions on the values of +// these containers, waiting for them to happen. These run in lock step. It +// means that various very much not sendable values must be transferred between +// isoaltion domains (the event loop and the calling thread). In general this +// isn't safe as non-sendable values could be escaped. These tests are quite +// careful to avoid that. This does however, mean that annotating types +// appropriately in order for the compiler to catch thread safety warnings is +// difficult. +// +// The current setup requires a 'SALContext' which holds the hooked event +// loop and various locked containers. It's also responsible for the lifecycle +// of these objects. +// +// The context also provides you with various methods for running a SAL test on +// the hooked event loop. Within that closure you're provided with the event +// loop and the user-to-kernel and kernel-to-user locked boxes. Another closure +// is also provided with an assertions object to wait for various events to +// happen. +// +// There are some sendability holes here: +// +// - LockedBox is unconditionally Sendable. It should only be Sendable when the +// value it stores is Sendable. +// - Instances of LockedBox hold KernelToUser and UserToKernel types which are +// *not* Sendable. These are shared between the event loop and the calling +// thread. +// - HookedSocket isn't Sendable but is transferred between the EventLoop and +// the calling thread in an UnsafeTransfer. import CNIOLinux import NIOConcurrencyHelpers @@ -25,14 +58,17 @@ import XCTest internal enum SAL { fileprivate static let defaultTimeout: Double = 5 private static let debugTests: Bool = false - fileprivate static func printIfDebug(_ item: Any) { + static func printIfDebug(_ item: Any) { if debugTests { print(item) } } } -final class LockedBox { +final class LockedBox: @unchecked Sendable { + // @unchecked: _value is protected by a condition lock. + // TODO: a condition-locked-value-box could hide some of the unchecked-ness here. + struct TimeoutError: Error { var description: String init(_ description: String) { @@ -43,7 +79,7 @@ final class LockedBox { private let condition = ConditionLock(value: 0) private let description: String - private let didSet: (T?) -> Void + private let didSet: @Sendable (T?) -> Void private var _value: T? { didSet { self.didSet(self._value) @@ -55,7 +91,7 @@ final class LockedBox { description: String? = nil, file: StaticString = #filePath, line: UInt = #line, - didSet: @escaping (T?) -> Void = { _ in } + didSet: @escaping @Sendable (T?) -> Void = { _ in } ) { self._value = value self.didSet = didSet @@ -107,12 +143,12 @@ final class LockedBox { } } - func waitForValue() throws -> T { + func waitForLockedValue(_ body: (T) throws -> R) throws -> R { if self.condition.lock(whenValue: 1, timeoutSeconds: SAL.defaultTimeout) { defer { self.condition.unlock(withValue: 1) } - return self._value! + return try body(self._value!) } else { throw TimeoutError(self.description) } @@ -122,11 +158,12 @@ final class LockedBox { extension LockedBox where T == UserToKernel { func assertParkedRightNow(file: StaticString = #filePath, line: UInt = #line) throws { SAL.printIfDebug("\(#function)") - let syscall = try self.waitForValue() - if case .whenReady(.block) = syscall { - return - } else { - XCTFail("unexpected syscall \(syscall)", file: (file), line: line) + try self.waitForLockedValue { syscall in + if case .whenReady(.block) = syscall { + return + } else { + XCTFail("unexpected syscall \(syscall)", file: (file), line: line) + } } } } @@ -164,18 +201,18 @@ enum KernelToUser { } struct UnexpectedKernelReturn: Error { - private var ret: KernelToUser + private var ret: String init(_ ret: KernelToUser) { - self.ret = ret + self.ret = String(describing: ret) } } struct UnexpectedSyscall: Error { - private var syscall: UserToKernel + private var syscall: String init(_ syscall: UserToKernel) { - self.syscall = syscall + self.syscall = String(describing: syscall) } } @@ -371,7 +408,7 @@ class HookedServerSocket: ServerSocket, UserKernelInterface { } } -class HookedSocket: Socket, UserKernelInterface { +final class HookedSocket: Socket, UserKernelInterface { fileprivate let userToKernel: LockedBox fileprivate let kernelToUser: LockedBox @@ -556,152 +593,91 @@ extension HookedSelector { } } -extension EventLoop { - internal func runSAL( - syscallAssertions: () throws -> Void = {}, - file: StaticString = #filePath, - line: UInt = #line, - _ body: @escaping () throws -> T - ) throws -> T { - let hookedSelector = ((self as! SelectableEventLoop)._selector as! HookedSelector) - let box = LockedBox>() - - // To prevent races between the test driver thread (this thread) and the EventLoop (another thread), we need - // to wait for the EventLoop to finish its tick and park itself. That makes sure both threads are synchronised - // so we know exactly what the EventLoop thread is currently up to (nothing at all, waiting for a wakeup). - try hookedSelector.userToKernel.assertParkedRightNow() - - self.execute { - do { - try box.waitForEmptyAndSet(.init(catching: body)) - } catch { - box.value = .failure(error) - } - } - try hookedSelector.assertWakeup(file: (file), line: line) - try syscallAssertions() - - // Here as well, we need to synchronise and wait for the EventLoop to finish running its tick. - try hookedSelector.userToKernel.assertParkedRightNow() - return try box.takeValue().get() - } -} - -extension EventLoopFuture { +extension EventLoopFuture where Value: Sendable { /// This works like `EventLoopFuture.wait()` but can be used together with the SAL. /// /// Using a plain `EventLoopFuture.wait()` together with the SAL would require you to spin the `EventLoop` manually /// which is error prone and hard. - internal func salWait() throws -> Value { + func salWait(context: SALContext) throws -> Value { + precondition(context.eventLoop === self.eventLoop) let box = LockedBox>() XCTAssertNoThrow( - try self.eventLoop.runSAL { + try context.runSALOnEventLoop { _, _, _ in self.whenComplete { value in // We can bang this because the LockedBox is empty so it'll immediately succeed. try! box.waitForEmptyAndSet(value) } } ) - return try box.waitForValue().get() - } -} - -protocol SALTest: AnyObject { - var group: MultiThreadedEventLoopGroup! { get set } - var wakeups: LockedBox<()>! { get set } - var userToKernelBox: LockedBox! { get set } - var kernelToUserBox: LockedBox! { get set } -} - -extension SALTest { - private var selector: HookedSelector { - precondition(Array(self.group.makeIterator()).count == 1) - return self.loop._selector as! HookedSelector - } - private var loop: SelectableEventLoop { - precondition(Array(self.group.makeIterator()).count == 1) - return ((self.group!.next()) as! SelectableEventLoop) - } - - func setUpSAL() { - XCTAssertNil(self.group) - XCTAssertNil(self.kernelToUserBox) - XCTAssertNil(self.userToKernelBox) - XCTAssertNil(self.wakeups) - self.kernelToUserBox = .init(description: "k2u") { newValue in - if let newValue = newValue { - SAL.printIfDebug("K --> U: \(newValue)") - } - } - self.userToKernelBox = .init(description: "u2k") { newValue in - if let newValue = newValue { - SAL.printIfDebug("U --> K: \(newValue)") - } - } - self.wakeups = .init(description: "wakeups") - self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1, metricsDelegate: nil) { - try HookedSelector( - userToKernel: self.userToKernelBox, - kernelToUser: self.kernelToUserBox, - wakeups: self.wakeups - ) - } + return try box.waitForLockedValue { try $0.get() } } +} +extension SALContext { private func makeSocketChannel( eventLoop: SelectableEventLoop, file: StaticString = #filePath, line: UInt = #line ) throws -> SocketChannel { - let channel = try eventLoop.runSAL { - try self.assertdisableSIGPIPE(expectedFD: .max, result: .success(())) - try self.assertLocalAddress(address: nil) - try self.assertRemoteAddress(address: nil) - } _: { + let channel = try self.runSALOnEventLoop { eventLoop, kernelToUser, userToKernel in try SocketChannel( socket: HookedSocket( - userToKernel: self.userToKernelBox, - kernelToUser: self.kernelToUserBox, + userToKernel: userToKernel, + kernelToUser: kernelToUser, socket: .max ), eventLoop: eventLoop ) + } syscallAssertions: { assertions in + try assertions.assertdisableSIGPIPE(expectedFD: .max, result: .success(())) + try assertions.assertLocalAddress(address: nil) + try assertions.assertRemoteAddress(address: nil) + try assertions.assertParkedRightNow() } - try self.assertParkedRightNow() + + try self.selector.assertParkedRightNow() return channel } private func makeServerSocketChannel( eventLoop: SelectableEventLoop, - group: MultiThreadedEventLoopGroup, file: StaticString = #filePath, line: UInt = #line ) throws -> ServerSocketChannel { - let channel = try eventLoop.runSAL { - try self.assertdisableSIGPIPE(expectedFD: .max, result: .success(())) - try self.assertLocalAddress(address: nil) - try self.assertRemoteAddress(address: nil) - } _: { + let channel = try self.runSALOnEventLoop { eventLoop, kernelToUser, userToKernel in try ServerSocketChannel( serverSocket: HookedServerSocket( - userToKernel: self.userToKernelBox, - kernelToUser: self.kernelToUserBox, + userToKernel: userToKernel, + kernelToUser: kernelToUser, socket: .max ), eventLoop: eventLoop, - group: group + group: eventLoop ) + } syscallAssertions: { assertions in + try assertions.assertdisableSIGPIPE(expectedFD: .max, result: .success(())) + try assertions.assertLocalAddress(address: nil) + try assertions.assertRemoteAddress(address: nil) } - try self.assertParkedRightNow() + try self.selector.assertParkedRightNow() return channel } func makeSocketChannelInjectingFailures(disableSIGPIPEFailure: IOError?) throws -> SocketChannel { - let channel = try self.loop.runSAL { - try self.assertdisableSIGPIPE( + let channel = try self.runSALOnEventLoop { eventLoop, kernelToUser, userToKernel in + try SocketChannel( + socket: HookedSocket( + userToKernel: userToKernel, + kernelToUser: kernelToUser, + socket: .max + ), + eventLoop: eventLoop + ) + } syscallAssertions: { assertions in + try assertions.assertdisableSIGPIPE( expectedFD: .max, result: disableSIGPIPEFailure.map { Result.failure($0) @@ -711,28 +687,20 @@ extension SALTest { // if F_NOSIGPIPE failed, we shouldn't see other syscalls. return } - try self.assertLocalAddress(address: nil) - try self.assertRemoteAddress(address: nil) - } _: { - try SocketChannel( - socket: HookedSocket( - userToKernel: self.userToKernelBox, - kernelToUser: self.kernelToUserBox, - socket: .max - ), - eventLoop: self.loop - ) + try assertions.assertLocalAddress(address: nil) + try assertions.assertRemoteAddress(address: nil) } - try self.assertParkedRightNow() + + try self.selector.assertParkedRightNow() return channel } func makeSocketChannel(file: StaticString = #filePath, line: UInt = #line) throws -> SocketChannel { - try self.makeSocketChannel(eventLoop: self.loop, file: (file), line: line) + try self.makeSocketChannel(eventLoop: self.eventLoop, file: file, line: line) } func makeServerSocketChannel(file: StaticString = #filePath, line: UInt = #line) throws -> ServerSocketChannel { - try self.makeServerSocketChannel(eventLoop: self.loop, group: self.group, file: (file), line: line) + try self.makeServerSocketChannel(eventLoop: self.eventLoop, file: file, line: line) } func makeConnectedSocketChannel( @@ -741,12 +709,16 @@ extension SALTest { file: StaticString = #filePath, line: UInt = #line ) throws -> SocketChannel { - let channel = try self.makeSocketChannel(eventLoop: self.loop) - let connectFuture = try channel.eventLoop.runSAL { - try self.assertConnect(expectedAddress: remoteAddress, result: true) - try self.assertLocalAddress(address: localAddress) - try self.assertRemoteAddress(address: remoteAddress) - try self.assertRegister { selectable, eventSet, registration in + let channel = try self.makeSocketChannel(eventLoop: self.eventLoop) + try self.runSALOnEventLoopAndWait { _, _, _ in + channel.register().flatMap { + channel.connect(to: remoteAddress) + } + } syscallAssertions: { assertions in + try assertions.assertConnect(expectedAddress: remoteAddress, result: true) + try assertions.assertLocalAddress(address: localAddress) + try assertions.assertRemoteAddress(address: remoteAddress) + try assertions.assertRegister { selectable, eventSet, registration in if case (.socketChannel(let channel), let registrationEventSet) = (registration.channel, registration.interested) { @@ -760,21 +732,16 @@ extension SALTest { return false } } - try self.assertReregister { selectable, eventSet in + try assertions.assertReregister { selectable, eventSet in XCTAssertEqual([.reset, .error, .readEOF], eventSet) return true } // because autoRead is on by default - try self.assertReregister { selectable, eventSet in + try assertions.assertReregister { selectable, eventSet in XCTAssertEqual([.reset, .error, .readEOF, .read], eventSet) return true } - } _: { - channel.register().flatMap { - channel.connect(to: remoteAddress) - } } - XCTAssertNoThrow(try connectFuture.salWait()) return channel } @@ -783,12 +750,16 @@ extension SALTest { file: StaticString = #filePath, line: UInt = #line ) throws -> ServerSocketChannel { - let channel = try self.makeServerSocketChannel(eventLoop: self.loop, group: self.group) - let bindFuture = try channel.eventLoop.runSAL { - try self.assertBind(expectedAddress: localAddress) - try self.assertLocalAddress(address: localAddress) - try self.assertListen(expectedFD: .max, expectedBacklog: 128) - try self.assertRegister { selectable, eventSet, registration in + let channel = try self.makeServerSocketChannel(eventLoop: self.eventLoop) + try self.runSALOnEventLoopAndWait { _, _, _ in + channel.register().flatMap { + channel.bind(to: localAddress) + } + } syscallAssertions: { assertions in + try assertions.assertBind(expectedAddress: localAddress) + try assertions.assertLocalAddress(address: localAddress) + try assertions.assertListen(expectedFD: .max, expectedBacklog: 128) + try assertions.assertRegister { selectable, eventSet, registration in if case (.serverSocketChannel(let channel), let registrationEventSet) = (registration.channel, registration.interested) { @@ -802,61 +773,38 @@ extension SALTest { return false } } - try self.assertReregister { selectable, eventSet in + try assertions.assertReregister { selectable, eventSet in XCTAssertEqual([.reset, .error, .readEOF], eventSet) return true } // because autoRead is on by default - try self.assertReregister { selectable, eventSet in + try assertions.assertReregister { selectable, eventSet in XCTAssertEqual([.reset, .error, .readEOF, .read], eventSet) return true } - } _: { - channel.register().flatMap { - channel.bind(to: localAddress) - } } - XCTAssertNoThrow(try bindFuture.salWait()) return channel } - func makeSocket() throws -> HookedSocket { - try self.loop.runSAL { - try self.assertdisableSIGPIPE(expectedFD: .max, result: .success(())) - } _: { - try HookedSocket(userToKernel: self.userToKernelBox, kernelToUser: self.kernelToUserBox, socket: .max) + func makeSocket() throws -> UnsafeTransfer { + try self.runSALOnEventLoop { _, kernelToUser, userToKernel in + let socket = try HookedSocket(userToKernel: userToKernel, kernelToUser: kernelToUser, socket: .max) + return UnsafeTransfer(socket) + } syscallAssertions: { assertions in + try assertions.assertdisableSIGPIPE(expectedFD: .max, result: .success(())) } } - func tearDownSAL() { - SAL.printIfDebug("=== TEAR DOWN ===") - XCTAssertNotNil(self.kernelToUserBox) - XCTAssertNotNil(self.userToKernelBox) - XCTAssertNotNil(self.wakeups) - XCTAssertNotNil(self.group) - - let group = DispatchGroup() - group.enter() - XCTAssertNoThrow( - self.group.shutdownGracefully(queue: DispatchQueue.global()) { error in - XCTAssertNil(error, "unexpected error: \(error!)") - group.leave() - } - ) - // We're in a slightly tricky situation here. We don't know if the EventLoop thread enters `whenReady` again - // or not. If it has, we have to wake it up, so let's just put a return value in the 'kernel to user' box, just - // in case :) - XCTAssertNoThrow(try self.kernelToUserBox.waitForEmptyAndSet(.returnSelectorEvent(nil))) - group.wait() +} - self.group = nil - self.kernelToUserBox = nil - self.userToKernelBox = nil - self.wakeups = nil - } +struct SyscallAssertions: @unchecked Sendable { + // The HookedSelector _isn't_ Sendable and holds locked value boxes for types which also + // aren't Sendable. However, these assertions are safe; they effectively wait on a locked + // value and make assertions against them. + private let selector: HookedSelector - func assertParkedRightNow(file: StaticString = #filePath, line: UInt = #line) throws { - try self.userToKernelBox.assertParkedRightNow(file: file, line: line) + init(selector: HookedSelector) { + self.selector = selector } func assertWaitingForNotification( @@ -1180,7 +1128,16 @@ extension SALTest { } } - func waitForNextSyscall() throws -> UserToKernel { - try self.userToKernelBox.waitForValue() + func assertSyscallAndReturn( + _ result: KernelToUser, + file: StaticString = #filePath, + line: UInt = #line, + matcher: (UserToKernel) throws -> Bool + ) throws { + try self.selector.assertSyscallAndReturn(result, file: file, line: line, matcher: matcher) + } + + func assertParkedRightNow(file: StaticString = #filePath, line: UInt = #line) throws { + try self.selector.assertParkedRightNow(file: file, line: line) } } diff --git a/Tests/NIOPosixTests/SyscallAbstractionLayerContext.swift b/Tests/NIOPosixTests/SyscallAbstractionLayerContext.swift new file mode 100644 index 00000000000..3259f871662 --- /dev/null +++ b/Tests/NIOPosixTests/SyscallAbstractionLayerContext.swift @@ -0,0 +1,162 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2025 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Dispatch +import NIOCore +import XCTest + +@testable import NIOPosix + +struct SALContext { + let eventLoop: SelectableEventLoop + private let wakeups: LockedBox + private let unchecked: Unchecked + + // KernelToUser and UserToKernel are *not* Sendable but we need an escape hatch so that + // they can be moved from the testing thread to the event-loop and back. + struct Unchecked: @unchecked Sendable { + let kernelToUserBox: LockedBox + let userToKernelBox: LockedBox + } + + var selector: HookedSelector { + self.eventLoop._selector as! HookedSelector + } + + fileprivate init( + eventLoop: SelectableEventLoop, + wakeups: LockedBox, + unchecked: Unchecked + ) { + self.eventLoop = eventLoop + self.wakeups = wakeups + self.unchecked = unchecked + } + + func runSALOnEventLoop( + file: StaticString = #filePath, + line: UInt = #line, + body: @escaping @Sendable ( + _ eventLoop: SelectableEventLoop, + _ kernelToUser: LockedBox, + _ userToKernal: LockedBox + ) throws -> Result, + syscallAssertions: (SyscallAssertions) throws -> Void + ) throws -> Result { + let box = LockedBox>() + let hookedSelector = self.eventLoop._selector as! HookedSelector + + // To prevent races between the test driver thread (this thread) and the EventLoop (another thread), we need + // to wait for the EventLoop to finish its tick and park itself. That makes sure both threads are synchronised + // so we know exactly what the EventLoop thread is currently up to (nothing at all, waiting for a wakeup). + try self.unchecked.userToKernelBox.assertParkedRightNow() + + self.eventLoop.execute { + do { + let result = try body(self.eventLoop, self.unchecked.kernelToUserBox, self.unchecked.userToKernelBox) + try box.waitForEmptyAndSet(.success(result)) + } catch { + box.value = .failure(error) + } + } + try hookedSelector.assertWakeup(file: file, line: line) + try syscallAssertions(SyscallAssertions(selector: hookedSelector)) + + // Here as well, we need to synchronise and wait for the EventLoop to finish running its tick. + try self.unchecked.userToKernelBox.assertParkedRightNow() + return try box.takeValue().get() + } + + func runSALOnEventLoopAndWait( + file: StaticString = #filePath, + line: UInt = #line, + body: @escaping @Sendable ( + _ eventLoop: SelectableEventLoop, + _ kernelToUser: LockedBox, + _ userToKernal: LockedBox + ) throws -> EventLoopFuture, + syscallAssertions: (SyscallAssertions) throws -> Void + ) throws -> Result { + let result = try self.runSALOnEventLoop(body: body, syscallAssertions: syscallAssertions) + return try result.salWait(context: self) + } + + func runSALOnEventLoop( + file: StaticString = #filePath, + line: UInt = #line, + body: @escaping @Sendable ( + _ eventLoop: SelectableEventLoop, + _ kernelToUser: LockedBox, + _ userToKernal: LockedBox + ) throws -> Result + ) throws -> Result { + try self.runSALOnEventLoop(file: file, line: line, body: body) { _ in } + } +} + +@available(*, unavailable) +extension SALContext: Sendable {} + +func withSALContext(body: (SALContext) throws -> R) throws -> R { + let kernelToUserBox = LockedBox(description: "k2u") { newValue in + if let newValue = newValue { + SAL.printIfDebug("K --> U: \(newValue)") + } + } + + let userToKernelBox = LockedBox(description: "u2k") { newValue in + if let newValue = newValue { + SAL.printIfDebug("U --> K: \(newValue)") + } + } + + let wakeups = LockedBox(description: "wakeups") + let group = MultiThreadedEventLoopGroup(numberOfThreads: 1, metricsDelegate: nil) { + try HookedSelector( + userToKernel: userToKernelBox, + kernelToUser: kernelToUserBox, + wakeups: wakeups + ) + } + + let context = SALContext( + eventLoop: group.next() as! SelectableEventLoop, + wakeups: wakeups, + unchecked: SALContext.Unchecked( + kernelToUserBox: kernelToUserBox, + userToKernelBox: userToKernelBox + ) + ) + + defer { + SAL.printIfDebug("=== TEAR DOWN ===") + + let dispatchGroup = DispatchGroup() + dispatchGroup.enter() + XCTAssertNoThrow( + group.shutdownGracefully(queue: DispatchQueue.global()) { error in + XCTAssertNil(error, "unexpected error: \(error!)") + dispatchGroup.leave() + } + ) + // We're in a slightly tricky situation here. We don't know if the EventLoop thread enters `whenReady` again + // or not. If it has, we have to wake it up, so let's just put a return value in the 'kernel to user' box, just + // in case :) + XCTAssertNoThrow(try kernelToUserBox.waitForEmptyAndSet(.returnSelectorEvent(nil))) + dispatchGroup.wait() + } + + let result = try body(context) + return result +}