diff --git a/Tests/NIOPosixTests/AcceptBackoffHandlerTest.swift b/Tests/NIOPosixTests/AcceptBackoffHandlerTest.swift index fa66c5300f1..f249d0d15ed 100644 --- a/Tests/NIOPosixTests/AcceptBackoffHandlerTest.swift +++ b/Tests/NIOPosixTests/AcceptBackoffHandlerTest.swift @@ -20,7 +20,11 @@ import XCTest @testable import NIOPosix -public final class AcceptBackoffHandlerTest: XCTestCase { +final class AcceptBackoffHandlerTest: XCTestCase { + @Sendable + static func defaultBackoffProvider(error: IOError) -> TimeAmount? { + AcceptBackoffHandler.defaultBackoffProvider(error: error) + } private let acceptHandlerName = "AcceptBackoffHandler" @@ -46,13 +50,17 @@ public final class AcceptBackoffHandlerTest: XCTestCase { private func assertBackoffRead(error: Int32) throws { let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) + let loop = group.next() as! SelectableEventLoop defer { XCTAssertNoThrow(try group.syncShutdownGracefully()) } - let readCountHandler = ReadCountHandler() + // Only used from EL + let readCountHandler = try! loop.submit { + NIOLoopBound(ReadCountHandler(), eventLoop: loop) + }.wait() let serverChannel = try setupChannel( - group: group, + eventLoop: loop, readCountHandler: readCountHandler, backoffProvider: { _ in .milliseconds(100) }, errors: [error] @@ -62,7 +70,7 @@ public final class AcceptBackoffHandlerTest: XCTestCase { try serverChannel.eventLoop.submit { serverChannel.readable() serverChannel.read() - return readCountHandler.readCount + return readCountHandler.value.readCount }.wait() ) @@ -70,7 +78,7 @@ public final class AcceptBackoffHandlerTest: XCTestCase { XCTAssertEqual( 1, try serverChannel.eventLoop.scheduleTask(in: .milliseconds(100)) { - readCountHandler.readCount + readCountHandler.value.readCount }.futureResult.wait() ) @@ -79,7 +87,7 @@ public final class AcceptBackoffHandlerTest: XCTestCase { 2, try serverChannel.eventLoop.submit { serverChannel.read() - return readCountHandler.readCount + return readCountHandler.value.readCount }.wait() ) @@ -96,13 +104,16 @@ public final class AcceptBackoffHandlerTest: XCTestCase { private func assertRemoval(read: Bool) throws { let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) + let loop = group.next() as! SelectableEventLoop defer { XCTAssertNoThrow(try group.syncShutdownGracefully()) } - let readCountHandler = ReadCountHandler() + let readCountHandler = try! loop.submit { + NIOLoopBound(ReadCountHandler(), eventLoop: loop) + }.wait() let serverChannel = try setupChannel( - group: group, + eventLoop: loop, readCountHandler: readCountHandler, backoffProvider: { err in .hours(1) @@ -116,7 +127,7 @@ public final class AcceptBackoffHandlerTest: XCTestCase { if read { serverChannel.read() } - return readCountHandler.readCount + return readCountHandler.value.readCount }.wait() ) @@ -127,7 +138,7 @@ public final class AcceptBackoffHandlerTest: XCTestCase { XCTAssertEqual( 1, try serverChannel.eventLoop.submit { - readCountHandler.readCount + readCountHandler.value.readCount }.wait() ) } else { @@ -135,7 +146,7 @@ public final class AcceptBackoffHandlerTest: XCTestCase { XCTAssertEqual( 0, try serverChannel.eventLoop.submit { - readCountHandler.readCount + readCountHandler.value.readCount }.wait() ) } @@ -144,13 +155,16 @@ public final class AcceptBackoffHandlerTest: XCTestCase { public func testNotScheduleReadIfAlreadyScheduled() throws { let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) + let loop = group.next() as! SelectableEventLoop defer { XCTAssertNoThrow(try group.syncShutdownGracefully()) } - let readCountHandler = ReadCountHandler() + let readCountHandler = try! loop.submit { + NIOLoopBound(ReadCountHandler(), eventLoop: loop) + }.wait() let serverChannel = try setupChannel( - group: group, + eventLoop: loop, readCountHandler: readCountHandler, backoffProvider: { err in .milliseconds(10) @@ -163,7 +177,7 @@ public final class AcceptBackoffHandlerTest: XCTestCase { serverChannel.readable() serverChannel.read() serverChannel.read() - return readCountHandler.readCount + return readCountHandler.value.readCount }.wait() ) @@ -172,7 +186,7 @@ public final class AcceptBackoffHandlerTest: XCTestCase { XCTAssertEqual( 1, try serverChannel.eventLoop.scheduleTask(in: .milliseconds(500)) { - readCountHandler.readCount + readCountHandler.value.readCount }.futureResult.wait() ) @@ -181,7 +195,7 @@ public final class AcceptBackoffHandlerTest: XCTestCase { 2, try serverChannel.eventLoop.submit { serverChannel.read() - return readCountHandler.readCount + return readCountHandler.value.readCount }.wait() ) @@ -190,6 +204,7 @@ public final class AcceptBackoffHandlerTest: XCTestCase { public func testChannelInactiveCancelScheduled() throws { let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) + let loop = group.next() as! SelectableEventLoop defer { XCTAssertNoThrow(try group.syncShutdownGracefully()) } @@ -212,9 +227,12 @@ public final class AcceptBackoffHandlerTest: XCTestCase { } } - let readCountHandler = ReadCountHandler() + let readCountHandler = try! loop.submit { + NIOLoopBound(ReadCountHandler(), eventLoop: loop) + }.wait() + let serverChannel = try setupChannel( - group: group, + eventLoop: loop, readCountHandler: readCountHandler, backoffProvider: { err in .milliseconds(10) @@ -222,8 +240,12 @@ public final class AcceptBackoffHandlerTest: XCTestCase { errors: [ENFILE] ) - let inactiveVerificationHandler = InactiveVerificationHandler(promise: serverChannel.eventLoop.makePromise()) - XCTAssertNoThrow(try serverChannel.pipeline.addHandler(inactiveVerificationHandler).wait()) + let inactivePromise = serverChannel.eventLoop.makePromise(of: Void.self) + let configured = loop.submit { + let inactiveVerificationHandler = InactiveVerificationHandler(promise: inactivePromise) + try serverChannel.pipeline.syncOperations.addHandler(inactiveVerificationHandler) + } + XCTAssertNoThrow(try configured.wait()) XCTAssertEqual( 0, @@ -232,7 +254,7 @@ public final class AcceptBackoffHandlerTest: XCTestCase { serverChannel.read() // Close the channel, this should also take care of cancel the scheduled read. serverChannel.close(promise: nil) - return readCountHandler.readCount + return readCountHandler.value.readCount }.wait() ) @@ -240,24 +262,27 @@ public final class AcceptBackoffHandlerTest: XCTestCase { XCTAssertEqual( 0, try serverChannel.eventLoop.scheduleTask(in: .milliseconds(500)) { - readCountHandler.readCount + readCountHandler.value.readCount }.futureResult.wait() ) - XCTAssertNoThrow(try inactiveVerificationHandler.waitForInactive()) + XCTAssertNoThrow(try inactivePromise.futureResult.wait()) } public func testSecondErrorUpdateScheduledRead() throws { let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) + let loop = group.next() as! SelectableEventLoop defer { XCTAssertNoThrow(try group.syncShutdownGracefully()) } - let readCountHandler = ReadCountHandler() + let readCountHandler = try! loop.submit { + NIOLoopBound(ReadCountHandler(), eventLoop: loop) + }.wait() let backoffProviderCalled = ManagedAtomic(0) let serverChannel = try setupChannel( - group: group, + eventLoop: loop, readCountHandler: readCountHandler, backoffProvider: { err in if backoffProviderCalled.loadThenWrappingIncrement(ordering: .relaxed) == 0 { @@ -273,7 +298,7 @@ public final class AcceptBackoffHandlerTest: XCTestCase { try serverChannel.eventLoop.submit { serverChannel.readable() serverChannel.read() - let readCount = readCountHandler.readCount + let readCount = readCountHandler.value.readCount // Directly trigger a read again without going through the pipeline. This will allow us to use serverChannel.readable() serverChannel._channelCore.read0() serverChannel.readable() @@ -285,7 +310,7 @@ public final class AcceptBackoffHandlerTest: XCTestCase { XCTAssertEqual( 0, try serverChannel.eventLoop.scheduleTask(in: .seconds(1)) { - readCountHandler.readCount + readCountHandler.value.readCount }.futureResult.wait() ) @@ -293,7 +318,7 @@ public final class AcceptBackoffHandlerTest: XCTestCase { XCTAssertEqual( 1, try serverChannel.eventLoop.scheduleTask(in: .seconds(1)) { - readCountHandler.readCount + readCountHandler.value.readCount }.futureResult.wait() ) @@ -327,30 +352,32 @@ public final class AcceptBackoffHandlerTest: XCTestCase { } private func setupChannel( - group: EventLoopGroup, - readCountHandler: ReadCountHandler, - backoffProvider: @escaping (IOError) -> TimeAmount? = AcceptBackoffHandler.defaultBackoffProvider, + eventLoop: SelectableEventLoop, + readCountHandler: NIOLoopBound, + backoffProvider: @escaping @Sendable (IOError) -> TimeAmount? = AcceptBackoffHandlerTest.defaultBackoffProvider, errors: [Int32] ) throws -> ServerSocketChannel { - let eventLoop = group.next() as! SelectableEventLoop let socket = try NonAcceptingServerSocket(errors: errors) let serverChannel = try assertNoThrowWithValue( ServerSocketChannel( serverSocket: socket, eventLoop: eventLoop, - group: group + group: eventLoop ) ) XCTAssertNoThrow(try serverChannel.setOption(.autoRead, value: false).wait()) - XCTAssertNoThrow( - try serverChannel.pipeline.addHandler(readCountHandler).flatMapThrowing { _ in - try serverChannel.pipeline.syncOperations.addHandler( - AcceptBackoffHandler(backoffProvider: backoffProvider), - name: self.acceptHandlerName - ) - }.wait() - ) + + let configured = eventLoop.submit { [acceptHandlerName = self.acceptHandlerName] in + let sync = serverChannel.pipeline.syncOperations + try sync.addHandler(readCountHandler.value) + try sync.addHandler( + AcceptBackoffHandler(backoffProvider: backoffProvider), + name: acceptHandlerName + ) + } + + XCTAssertNoThrow(try configured.wait()) XCTAssertNoThrow( try eventLoop.flatSubmit {