diff --git a/Tests/NIOPosixTests/StreamChannelsTest.swift b/Tests/NIOPosixTests/StreamChannelsTest.swift index 668dd1d331b..dfbe8a943f5 100644 --- a/Tests/NIOPosixTests/StreamChannelsTest.swift +++ b/Tests/NIOPosixTests/StreamChannelsTest.swift @@ -32,7 +32,7 @@ class StreamChannelTest: XCTestCase { } func testEchoBasic() throws { - class EchoHandler: ChannelInboundHandler { + final class EchoHandler: ChannelInboundHandler, Sendable { typealias InboundIn = ByteBuffer func channelRead(context: ChannelHandlerContext, data: NIOAny) { @@ -48,7 +48,12 @@ class StreamChannelTest: XCTestCase { var everythingBuffer = chan1.allocator.buffer(capacity: 300000) let allDonePromise = chan1.eventLoop.makePromise(of: ByteBuffer.self) XCTAssertNoThrow(try chan1.pipeline.addHandler(EchoHandler()).wait()) - XCTAssertNoThrow(try chan2.pipeline.addHandler(AccumulateAllReads(allDonePromise: allDonePromise)).wait()) + + let configuredChannel2 = chan2.eventLoop.submit { + let handler = AccumulateAllReads(allDonePromise: allDonePromise) + try chan2.pipeline.syncOperations.addHandler(handler) + } + XCTAssertNoThrow(try configuredChannel2.wait()) for f in [1, 10, 100, 1_000, 10_000, 300_000] { let from = everythingBuffer.writerIndex @@ -72,7 +77,7 @@ class StreamChannelTest: XCTestCase { } func testSyncChannelOptions() throws { - class GetAndSetAutoReadHandler: ChannelInboundHandler { + final class GetAndSetAutoReadHandler: ChannelInboundHandler, Sendable { typealias InboundIn = Any func handlerAdded(context: ChannelHandlerContext) { @@ -96,17 +101,17 @@ class StreamChannelTest: XCTestCase { } func testChannelReturnsNilForDefaultSyncOptionsImplementation() throws { - class TestChannel: Channel { + final class TestChannel: Channel { var allocator: ByteBufferAllocator { fatalError() } var closeFuture: EventLoopFuture { fatalError() } var pipeline: ChannelPipeline { fatalError() } - var localAddress: SocketAddress? = nil - var remoteAddress: SocketAddress? = nil - var parent: Channel? = nil + let localAddress: SocketAddress? = nil + let remoteAddress: SocketAddress? = nil + let parent: Channel? = nil var _channelCore: ChannelCore { fatalError() } var eventLoop: EventLoop { fatalError() } - var isWritable: Bool = false - var isActive: Bool = false + let isWritable: Bool = false + let isActive: Bool = false func setOption(_ option: Option, value: Option.Value) -> EventLoopFuture { fatalError() @@ -164,11 +169,11 @@ class StreamChannelTest: XCTestCase { XCTFail("unexpected error \(error)") } } - context.write(Self.wrapOutboundOut(buffer)).map { + context.write(Self.wrapOutboundOut(buffer)).assumeIsolated().map { XCTAssertEqual(self.state, .thenTrueAgain) }.recover { error in XCTFail("unexpected error \(error)") - }.cascade(to: self.writeFullyDonePromise) + }.nonisolated().cascade(to: self.writeFullyDonePromise) context.flush() } @@ -178,7 +183,7 @@ class StreamChannelTest: XCTestCase { XCTAssertEqual(State(rawValue: self.channelWritabilityChangedCalls), self.state) if let newState = State(rawValue: self.channelWritabilityChangedCalls + 1) { if self.state == .thenFalse { - context.eventLoop.scheduleTask(in: .microseconds(100)) { + context.eventLoop.assumeIsolated().scheduleTask(in: .microseconds(100)) { // Let's delay this a tiny little bit just so we get a higher chance to actually exhaust all // the buffers. The delay is not necessary for this test to work but it makes the tests a // little bit harder. @@ -195,15 +200,21 @@ class StreamChannelTest: XCTestCase { let writabilityFalsePromise = chan1.eventLoop.makePromise(of: Void.self) let writeFullyDonePromise = chan1.eventLoop.makePromise(of: Void.self) XCTAssertNoThrow(try chan2.setOption(.autoRead, value: false).wait()) - XCTAssertNoThrow(try chan2.pipeline.addHandler(AccumulateAllReads(allDonePromise: allDonePromise)).wait()) - XCTAssertNoThrow( - try chan1.pipeline.addHandler( + + let chan2Configured = chan2.eventLoop.submit { + try chan2.pipeline.syncOperations.addHandler(AccumulateAllReads(allDonePromise: allDonePromise)) + } + XCTAssertNoThrow(try chan2Configured.wait()) + + let chan1Configured = chan1.eventLoop.submit { + try chan1.pipeline.syncOperations.addHandler( WritabilityTrackerStateMachine( writabilityNowFalsePromise: writabilityFalsePromise, writeFullyDonePromise: writeFullyDonePromise ) - ).wait() - ) + ) + } + XCTAssertNoThrow(try chan1Configured.wait()) // Writability should turn false because we're writing lots of data and we aren't reading. XCTAssertNoThrow(try writabilityFalsePromise.futureResult.wait()) @@ -329,7 +340,7 @@ class StreamChannelTest: XCTestCase { func testLotsOfWritesWhilstOtherSideNotReading() { // This is a regression test for a problem where we would spin on EVFILT_EXCEPT despite the fact that there // was no EOF or any other exceptional event present. So this is a regression test for rdar://53656794 and https://github.com/apple/swift-nio/pull/526. - class FailOnReadHandler: ChannelInboundHandler { + final class FailOnReadHandler: ChannelInboundHandler, Sendable { typealias InboundIn = ByteBuffer let areReadsOkayNow: ManagedAtomic @@ -369,11 +380,12 @@ class StreamChannelTest: XCTestCase { let receiveBufferSize = Int( (try? receiver.getOption(ChannelOptions.socketOption(.so_rcvbuf)).wait()) ?? 8192 ) - var buffer = sender.allocator.buffer(capacity: receiveBufferSize) - buffer.writeBytes(Array(repeating: UInt8(ascii: "X"), count: receiveBufferSize)) + + let buffer = sender.allocator.buffer(repeating: UInt8(ascii: "X"), count: receiveBufferSize) XCTAssertNoThrow( try sender.eventLoop.submit { + @Sendable func send() { var allBuffer = buffer // When we run through this for the first time, we send exactly the receive buffer size, after that @@ -444,11 +456,13 @@ class StreamChannelTest: XCTestCase { func runTest(receiver: Channel, sender: Channel) throws { let allDonePromise = receiver.eventLoop.makePromise(of: Void.self) XCTAssertNoThrow(try sender.setOption(.writeSpin, value: 0).wait()) - XCTAssertNoThrow( - try receiver.pipeline.addHandler(WaitForTwoBytesHandler(allDonePromise: allDonePromise)).wait() - ) - var buffer = sender.allocator.buffer(capacity: 1) - buffer.writeString("X") + let receiverConfigured = receiver.eventLoop.submit { + try receiver.pipeline.syncOperations.addHandler( + WaitForTwoBytesHandler(allDonePromise: allDonePromise) + ) + } + XCTAssertNoThrow(try receiverConfigured.wait()) + let buffer = sender.allocator.buffer(string: "X") XCTAssertNoThrow( try sender.eventLoop.flatSubmit { () -> EventLoopFuture in let writePromise = sender.eventLoop.makePromise(of: Void.self) @@ -485,8 +499,7 @@ class StreamChannelTest: XCTestCase { // Now, let's trigger another write which should cause flushNow to be re-entered. But first, let's // raise the high water mark so we don't get another call straight away. - var buffer = context.channel.allocator.buffer(capacity: 5) - buffer.writeString("hello") + let buffer = context.channel.allocator.buffer(string: "hello") let loopBoundContext = context.loopBound context.channel.setOption(.writeBufferWaterMark, value: .init(low: 1024, high: 1024)) .flatMap { @@ -535,18 +548,22 @@ class StreamChannelTest: XCTestCase { ) let sevenBytesReceived = receiver.eventLoop.makePromise(of: Void.self) - XCTAssertNoThrow( - try receiver.pipeline.addHandler( - WaitForNumberOfBytes( - numberOfBytes: 7, - allDonePromise: sevenBytesReceived - ) - ).wait() - ) + let receiverConfigured = receiver.eventLoop.submit { + let handler = WaitForNumberOfBytes( + numberOfBytes: 7, + allDonePromise: sevenBytesReceived + ) + try receiver.pipeline.syncOperations.addHandler(handler) + } + XCTAssertNoThrow(try receiverConfigured.wait()) let eventCounterHandler = EventCounterHandler() - XCTAssertNoThrow(try sender.pipeline.addHandler(EventCounterHandler()).wait()) - XCTAssertNoThrow(try sender.pipeline.addHandler(WriteWhenWritabilityGoesToTrue()).wait()) + let senderConfigured = sender.eventLoop.submit { + let sync = sender.pipeline.syncOperations + try sync.addHandler(eventCounterHandler) + try sync.addHandler(WriteWhenWritabilityGoesToTrue()) + } + XCTAssertNoThrow(try senderConfigured.wait()) var buffer = sender.allocator.buffer(capacity: 5) buffer.writeString("XX") // 2 bytes, exceeds the high water mark @@ -642,7 +659,7 @@ class StreamChannelTest: XCTestCase { } let loopBoundContext = context.loopBound - context.eventLoop.execute { + context.eventLoop.assumeIsolated().execute { let context = loopBoundContext.value self.kickOff(context: context) } @@ -667,7 +684,7 @@ class StreamChannelTest: XCTestCase { context.writeAndFlush(Self.wrapOutboundOut(buffer.value)).whenFailure { error in XCTFail("unexpected error \(error)") } - context.eventLoop.scheduleTask(in: .microseconds(100)) { + context.eventLoop.assumeIsolated().scheduleTask(in: .microseconds(100)) { let context = loopBoundContext.value switch self.state { case .writingUntilFull: @@ -689,7 +706,7 @@ class StreamChannelTest: XCTestCase { } } } - context.eventLoop.execute { + context.eventLoop.assumeIsolated().execute { writeOneMore() // this kicks everything off } } @@ -706,9 +723,9 @@ class StreamChannelTest: XCTestCase { } context.fireChannelWritabilityChanged() let loopBoundContext = context.loopBound - self.wroteEnoughToBeStuckPromise.futureResult.whenSuccess { _ in + self.wroteEnoughToBeStuckPromise.futureResult.assumeIsolated().whenSuccess { _ in let context = loopBoundContext.value - context.pipeline.removeHandler(self).whenFailure { error in + context.pipeline.syncOperations.removeHandler(self).whenFailure { error in XCTFail("unexpected error \(error)") } } @@ -816,7 +833,7 @@ class StreamChannelTest: XCTestCase { } } - final class FailOnError: ChannelInboundHandler { + final class FailOnError: ChannelInboundHandler, Sendable { typealias InboundIn = Never func errorCaught(context: ChannelHandlerContext, error: Error) { @@ -840,29 +857,34 @@ class StreamChannelTest: XCTestCase { // We need to not read automatically from the receiving end to be able to force writability notifications // for the sender. XCTAssertNoThrow(try receiver.setOption(.autoRead, value: false).wait()) + let receiverConfigured = receiver.eventLoop.submit { + try receiver.pipeline.syncOperations.addHandler(ReadChunksUntilWeSee1Handler()) + } + XCTAssertNoThrow(try receiverConfigured.wait()) - XCTAssertNoThrow(try receiver.pipeline.addHandler(ReadChunksUntilWeSee1Handler()).wait()) - - XCTAssertNoThrow( - try sender.pipeline.addHandler( + let senderConfigured1 = sender.eventLoop.submit { + try sender.pipeline.syncOperations.addHandler( WriteWhenChannelBecomesWritableAgain( beganBigWritePromise: beganBigWritePromise, finishedBigWritePromise: finishedBigWritePromise ) - ).wait() - ) + ) + } + XCTAssertNoThrow(try senderConfigured1.wait()) + XCTAssertNoThrow(try sender.pipeline.addHandler(FailOnError()).wait()) XCTAssertNoThrow(try receiver.pipeline.addHandler(FailOnError()).wait()) - XCTAssertNoThrow( - try sender.pipeline.addHandler( + let senderConfigured2 = sender.eventLoop.submit { + try sender.pipeline.syncOperations.addHandler( WriteUntilWriteDoesNotCompletelyInstantlyHandler( chunkSize: chunkSize, wroteEnoughToBeStuckPromise: wroteEnoughToBeStuckPromise ), position: .first - ).wait() - ) + ) + } + XCTAssertNoThrow(try senderConfigured2.wait()) var howManyBytes: Int? = nil XCTAssertNoThrow(howManyBytes = try wroteEnoughToBeStuckPromise.futureResult.wait()) @@ -934,7 +956,10 @@ class StreamChannelTest: XCTestCase { } let amount = 2 - XCTAssertNoThrow(try sender.pipeline.addHandler(CloseInWritabilityChanged(amount: amount)).wait()) + let senderConfigured = sender.eventLoop.submit { + try sender.pipeline.syncOperations.addHandler(CloseInWritabilityChanged(amount: amount)) + } + XCTAssertNoThrow(try senderConfigured.wait()) XCTAssertNoThrow( try sender.setOption( .writeBufferWaterMark,