From 0ce5195d848b589a462ef140b54ccd5d8fbf73b7 Mon Sep 17 00:00:00 2001 From: George Barnett Date: Thu, 3 Apr 2025 18:09:50 +0100 Subject: [PATCH 1/3] Strict concurrency for NIOPosix AcceptBackoffHandlerTests --- .../AcceptBackoffHandlerTest.swift | 94 +++++++++++-------- 1 file changed, 53 insertions(+), 41 deletions(-) diff --git a/Tests/NIOPosixTests/AcceptBackoffHandlerTest.swift b/Tests/NIOPosixTests/AcceptBackoffHandlerTest.swift index fa66c5300f1..d2e77f999c3 100644 --- a/Tests/NIOPosixTests/AcceptBackoffHandlerTest.swift +++ b/Tests/NIOPosixTests/AcceptBackoffHandlerTest.swift @@ -20,7 +20,7 @@ import XCTest @testable import NIOPosix -public final class AcceptBackoffHandlerTest: XCTestCase { +final class AcceptBackoffHandlerTest: XCTestCase { private let acceptHandlerName = "AcceptBackoffHandler" @@ -46,13 +46,15 @@ public final class AcceptBackoffHandlerTest: XCTestCase { private func assertBackoffRead(error: Int32) throws { let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) + let loop = group.next() as! SelectableEventLoop defer { XCTAssertNoThrow(try group.syncShutdownGracefully()) } - let readCountHandler = ReadCountHandler() + // Only used from EL + let readCountHandler = NIOLoopBound(ReadCountHandler(), eventLoop: loop) let serverChannel = try setupChannel( - group: group, + eventLoop: loop, readCountHandler: readCountHandler, backoffProvider: { _ in .milliseconds(100) }, errors: [error] @@ -62,7 +64,7 @@ public final class AcceptBackoffHandlerTest: XCTestCase { try serverChannel.eventLoop.submit { serverChannel.readable() serverChannel.read() - return readCountHandler.readCount + return readCountHandler.value.readCount }.wait() ) @@ -70,7 +72,7 @@ public final class AcceptBackoffHandlerTest: XCTestCase { XCTAssertEqual( 1, try serverChannel.eventLoop.scheduleTask(in: .milliseconds(100)) { - readCountHandler.readCount + readCountHandler.value.readCount }.futureResult.wait() ) @@ -79,7 +81,7 @@ public final class AcceptBackoffHandlerTest: XCTestCase { 2, try serverChannel.eventLoop.submit { serverChannel.read() - return readCountHandler.readCount + return readCountHandler.value.readCount }.wait() ) @@ -96,13 +98,14 @@ public final class AcceptBackoffHandlerTest: XCTestCase { private func assertRemoval(read: Bool) throws { let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) + let loop = group.next() as! SelectableEventLoop defer { XCTAssertNoThrow(try group.syncShutdownGracefully()) } - let readCountHandler = ReadCountHandler() + let readCountHandler = NIOLoopBound(ReadCountHandler(), eventLoop: loop) let serverChannel = try setupChannel( - group: group, + eventLoop: loop, readCountHandler: readCountHandler, backoffProvider: { err in .hours(1) @@ -116,7 +119,7 @@ public final class AcceptBackoffHandlerTest: XCTestCase { if read { serverChannel.read() } - return readCountHandler.readCount + return readCountHandler.value.readCount }.wait() ) @@ -127,7 +130,7 @@ public final class AcceptBackoffHandlerTest: XCTestCase { XCTAssertEqual( 1, try serverChannel.eventLoop.submit { - readCountHandler.readCount + readCountHandler.value.readCount }.wait() ) } else { @@ -135,7 +138,7 @@ public final class AcceptBackoffHandlerTest: XCTestCase { XCTAssertEqual( 0, try serverChannel.eventLoop.submit { - readCountHandler.readCount + readCountHandler.value.readCount }.wait() ) } @@ -144,13 +147,14 @@ public final class AcceptBackoffHandlerTest: XCTestCase { public func testNotScheduleReadIfAlreadyScheduled() throws { let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) + let loop = group.next() as! SelectableEventLoop defer { XCTAssertNoThrow(try group.syncShutdownGracefully()) } - let readCountHandler = ReadCountHandler() + let readCountHandler = NIOLoopBound(ReadCountHandler(), eventLoop: loop) let serverChannel = try setupChannel( - group: group, + eventLoop: loop, readCountHandler: readCountHandler, backoffProvider: { err in .milliseconds(10) @@ -163,7 +167,7 @@ public final class AcceptBackoffHandlerTest: XCTestCase { serverChannel.readable() serverChannel.read() serverChannel.read() - return readCountHandler.readCount + return readCountHandler.value.readCount }.wait() ) @@ -172,7 +176,7 @@ public final class AcceptBackoffHandlerTest: XCTestCase { XCTAssertEqual( 1, try serverChannel.eventLoop.scheduleTask(in: .milliseconds(500)) { - readCountHandler.readCount + readCountHandler.value.readCount }.futureResult.wait() ) @@ -181,7 +185,7 @@ public final class AcceptBackoffHandlerTest: XCTestCase { 2, try serverChannel.eventLoop.submit { serverChannel.read() - return readCountHandler.readCount + return readCountHandler.value.readCount }.wait() ) @@ -190,6 +194,7 @@ public final class AcceptBackoffHandlerTest: XCTestCase { public func testChannelInactiveCancelScheduled() throws { let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) + let loop = group.next() as! SelectableEventLoop defer { XCTAssertNoThrow(try group.syncShutdownGracefully()) } @@ -212,9 +217,9 @@ public final class AcceptBackoffHandlerTest: XCTestCase { } } - let readCountHandler = ReadCountHandler() + let readCountHandler = NIOLoopBound(ReadCountHandler(), eventLoop: loop) let serverChannel = try setupChannel( - group: group, + eventLoop: loop, readCountHandler: readCountHandler, backoffProvider: { err in .milliseconds(10) @@ -222,8 +227,12 @@ public final class AcceptBackoffHandlerTest: XCTestCase { errors: [ENFILE] ) - let inactiveVerificationHandler = InactiveVerificationHandler(promise: serverChannel.eventLoop.makePromise()) - XCTAssertNoThrow(try serverChannel.pipeline.addHandler(inactiveVerificationHandler).wait()) + let inactivePromise = serverChannel.eventLoop.makePromise(of: Void.self) + let configured = loop.submit { + let inactiveVerificationHandler = InactiveVerificationHandler(promise: inactivePromise) + try serverChannel.pipeline.syncOperations.addHandler(inactiveVerificationHandler) + } + XCTAssertNoThrow(try configured.wait()) XCTAssertEqual( 0, @@ -232,7 +241,7 @@ public final class AcceptBackoffHandlerTest: XCTestCase { serverChannel.read() // Close the channel, this should also take care of cancel the scheduled read. serverChannel.close(promise: nil) - return readCountHandler.readCount + return readCountHandler.value.readCount }.wait() ) @@ -240,24 +249,25 @@ public final class AcceptBackoffHandlerTest: XCTestCase { XCTAssertEqual( 0, try serverChannel.eventLoop.scheduleTask(in: .milliseconds(500)) { - readCountHandler.readCount + readCountHandler.value.readCount }.futureResult.wait() ) - XCTAssertNoThrow(try inactiveVerificationHandler.waitForInactive()) + XCTAssertNoThrow(try inactivePromise.futureResult.wait()) } public func testSecondErrorUpdateScheduledRead() throws { let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) + let loop = group.next() as! SelectableEventLoop defer { XCTAssertNoThrow(try group.syncShutdownGracefully()) } - let readCountHandler = ReadCountHandler() + let readCountHandler = NIOLoopBound(ReadCountHandler(), eventLoop: loop) let backoffProviderCalled = ManagedAtomic(0) let serverChannel = try setupChannel( - group: group, + eventLoop: loop, readCountHandler: readCountHandler, backoffProvider: { err in if backoffProviderCalled.loadThenWrappingIncrement(ordering: .relaxed) == 0 { @@ -273,7 +283,7 @@ public final class AcceptBackoffHandlerTest: XCTestCase { try serverChannel.eventLoop.submit { serverChannel.readable() serverChannel.read() - let readCount = readCountHandler.readCount + let readCount = readCountHandler.value.readCount // Directly trigger a read again without going through the pipeline. This will allow us to use serverChannel.readable() serverChannel._channelCore.read0() serverChannel.readable() @@ -285,7 +295,7 @@ public final class AcceptBackoffHandlerTest: XCTestCase { XCTAssertEqual( 0, try serverChannel.eventLoop.scheduleTask(in: .seconds(1)) { - readCountHandler.readCount + readCountHandler.value.readCount }.futureResult.wait() ) @@ -293,7 +303,7 @@ public final class AcceptBackoffHandlerTest: XCTestCase { XCTAssertEqual( 1, try serverChannel.eventLoop.scheduleTask(in: .seconds(1)) { - readCountHandler.readCount + readCountHandler.value.readCount }.futureResult.wait() ) @@ -327,30 +337,32 @@ public final class AcceptBackoffHandlerTest: XCTestCase { } private func setupChannel( - group: EventLoopGroup, - readCountHandler: ReadCountHandler, - backoffProvider: @escaping (IOError) -> TimeAmount? = AcceptBackoffHandler.defaultBackoffProvider, + eventLoop: SelectableEventLoop, + readCountHandler: NIOLoopBound, + backoffProvider: @escaping @Sendable (IOError) -> TimeAmount? = AcceptBackoffHandler.defaultBackoffProvider, errors: [Int32] ) throws -> ServerSocketChannel { - let eventLoop = group.next() as! SelectableEventLoop let socket = try NonAcceptingServerSocket(errors: errors) let serverChannel = try assertNoThrowWithValue( ServerSocketChannel( serverSocket: socket, eventLoop: eventLoop, - group: group + group: eventLoop ) ) XCTAssertNoThrow(try serverChannel.setOption(.autoRead, value: false).wait()) - XCTAssertNoThrow( - try serverChannel.pipeline.addHandler(readCountHandler).flatMapThrowing { _ in - try serverChannel.pipeline.syncOperations.addHandler( - AcceptBackoffHandler(backoffProvider: backoffProvider), - name: self.acceptHandlerName - ) - }.wait() - ) + + let configured = eventLoop.submit { [acceptHandlerName = self.acceptHandlerName] in + let sync = serverChannel.pipeline.syncOperations + try sync.addHandler(readCountHandler.value) + try sync.addHandler( + AcceptBackoffHandler(backoffProvider: backoffProvider), + name: acceptHandlerName + ) + } + + XCTAssertNoThrow(try configured.wait()) XCTAssertNoThrow( try eventLoop.flatSubmit { From e0b2858ed34cf7f30f389d9aadafdce07d1916ed Mon Sep 17 00:00:00 2001 From: George Barnett Date: Tue, 8 Apr 2025 09:32:39 +0100 Subject: [PATCH 2/3] fixup --- Tests/NIOPosixTests/AcceptBackoffHandlerTest.swift | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/Tests/NIOPosixTests/AcceptBackoffHandlerTest.swift b/Tests/NIOPosixTests/AcceptBackoffHandlerTest.swift index d2e77f999c3..5c28f9cd8b6 100644 --- a/Tests/NIOPosixTests/AcceptBackoffHandlerTest.swift +++ b/Tests/NIOPosixTests/AcceptBackoffHandlerTest.swift @@ -21,6 +21,10 @@ import XCTest @testable import NIOPosix final class AcceptBackoffHandlerTest: XCTestCase { + @Sendable + static func defaultBackoffProvider(error: IOError) -> TimeAmount? { + AcceptBackoffHandler.defaultBackoffProvider(error: error) + } private let acceptHandlerName = "AcceptBackoffHandler" @@ -339,7 +343,7 @@ final class AcceptBackoffHandlerTest: XCTestCase { private func setupChannel( eventLoop: SelectableEventLoop, readCountHandler: NIOLoopBound, - backoffProvider: @escaping @Sendable (IOError) -> TimeAmount? = AcceptBackoffHandler.defaultBackoffProvider, + backoffProvider: @escaping @Sendable (IOError) -> TimeAmount? = AcceptBackoffHandlerTest.defaultBackoffProvider, errors: [Int32] ) throws -> ServerSocketChannel { let socket = try NonAcceptingServerSocket(errors: errors) From 8ff3d9283ed2f11654562e26b9d7c13898ec0592 Mon Sep 17 00:00:00 2001 From: George Barnett Date: Tue, 8 Apr 2025 09:39:22 +0100 Subject: [PATCH 3/3] create box on loop --- .../AcceptBackoffHandlerTest.swift | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/Tests/NIOPosixTests/AcceptBackoffHandlerTest.swift b/Tests/NIOPosixTests/AcceptBackoffHandlerTest.swift index 5c28f9cd8b6..f249d0d15ed 100644 --- a/Tests/NIOPosixTests/AcceptBackoffHandlerTest.swift +++ b/Tests/NIOPosixTests/AcceptBackoffHandlerTest.swift @@ -56,7 +56,9 @@ final class AcceptBackoffHandlerTest: XCTestCase { } // Only used from EL - let readCountHandler = NIOLoopBound(ReadCountHandler(), eventLoop: loop) + let readCountHandler = try! loop.submit { + NIOLoopBound(ReadCountHandler(), eventLoop: loop) + }.wait() let serverChannel = try setupChannel( eventLoop: loop, readCountHandler: readCountHandler, @@ -107,7 +109,9 @@ final class AcceptBackoffHandlerTest: XCTestCase { XCTAssertNoThrow(try group.syncShutdownGracefully()) } - let readCountHandler = NIOLoopBound(ReadCountHandler(), eventLoop: loop) + let readCountHandler = try! loop.submit { + NIOLoopBound(ReadCountHandler(), eventLoop: loop) + }.wait() let serverChannel = try setupChannel( eventLoop: loop, readCountHandler: readCountHandler, @@ -156,7 +160,9 @@ final class AcceptBackoffHandlerTest: XCTestCase { XCTAssertNoThrow(try group.syncShutdownGracefully()) } - let readCountHandler = NIOLoopBound(ReadCountHandler(), eventLoop: loop) + let readCountHandler = try! loop.submit { + NIOLoopBound(ReadCountHandler(), eventLoop: loop) + }.wait() let serverChannel = try setupChannel( eventLoop: loop, readCountHandler: readCountHandler, @@ -221,7 +227,10 @@ final class AcceptBackoffHandlerTest: XCTestCase { } } - let readCountHandler = NIOLoopBound(ReadCountHandler(), eventLoop: loop) + let readCountHandler = try! loop.submit { + NIOLoopBound(ReadCountHandler(), eventLoop: loop) + }.wait() + let serverChannel = try setupChannel( eventLoop: loop, readCountHandler: readCountHandler, @@ -267,7 +276,9 @@ final class AcceptBackoffHandlerTest: XCTestCase { XCTAssertNoThrow(try group.syncShutdownGracefully()) } - let readCountHandler = NIOLoopBound(ReadCountHandler(), eventLoop: loop) + let readCountHandler = try! loop.submit { + NIOLoopBound(ReadCountHandler(), eventLoop: loop) + }.wait() let backoffProviderCalled = ManagedAtomic(0) let serverChannel = try setupChannel(