diff --git a/Package.swift b/Package.swift index b4ede3caeaf..de91473761d 100644 --- a/Package.swift +++ b/Package.swift @@ -519,7 +519,8 @@ let package = Package( "NIOCore", "NIOEmbedded", "NIOWebSocket", - ] + ], + swiftSettings: strictConcurrencySettings ), .testTarget( name: "NIOTestUtilsTests", diff --git a/Sources/NIOWebSocket/NIOWebSocketClientUpgrader.swift b/Sources/NIOWebSocket/NIOWebSocketClientUpgrader.swift index 09f3c1a1232..d3a5c4b2869 100644 --- a/Sources/NIOWebSocket/NIOWebSocketClientUpgrader.swift +++ b/Sources/NIOWebSocket/NIOWebSocketClientUpgrader.swift @@ -24,7 +24,7 @@ public typealias NIOWebClientSocketUpgrader = NIOWebSocketClientUpgrader /// This upgrader assumes that the `HTTPClientUpgradeHandler` will create and send the upgrade request. /// This upgrader also assumes that the `HTTPClientUpgradeHandler` will appropriately mutate the /// pipeline to remove the HTTP `ChannelHandler`s. -public final class NIOWebSocketClientUpgrader: NIOHTTPClientProtocolUpgrader { +public final class NIOWebSocketClientUpgrader: NIOHTTPClientProtocolUpgrader, Sendable { /// RFC 6455 specs this as the required entry in the Upgrade header. public let supportedProtocol: String = "websocket" /// None of the websocket headers are actually defined as 'required'. diff --git a/Tests/NIOWebSocketTests/WebSocketClientEndToEndTests.swift b/Tests/NIOWebSocketTests/WebSocketClientEndToEndTests.swift index 1122ad61d60..f484a77f10d 100644 --- a/Tests/NIOWebSocketTests/WebSocketClientEndToEndTests.swift +++ b/Tests/NIOWebSocketTests/WebSocketClientEndToEndTests.swift @@ -12,6 +12,7 @@ // //===----------------------------------------------------------------------===// +import NIOConcurrencyHelpers import NIOEmbedded import NIOHTTP1 import XCTest @@ -54,9 +55,9 @@ extension ChannelPipeline { } private func setUpClientChannel( - clientHTTPHandler: RemovableChannelHandler, - clientUpgraders: [NIOHTTPClientProtocolUpgrader], - _ upgradeCompletionHandler: @escaping (ChannelHandlerContext) -> Void + clientHTTPHandler: RemovableChannelHandler & Sendable, + clientUpgraders: [NIOHTTPClientProtocolUpgrader & Sendable], + _ upgradeCompletionHandler: @escaping @Sendable (ChannelHandlerContext) -> Void ) throws -> EmbeddedChannel { let channel = EmbeddedChannel() @@ -64,7 +65,7 @@ private func setUpClientChannel( let config: NIOHTTPClientUpgradeSendableConfiguration = ( upgraders: clientUpgraders, completionHandler: { context in - channel.pipeline.removeHandler(clientHTTPHandler, promise: nil) + channel.pipeline.syncOperations.removeHandler(clientHTTPHandler, promise: nil) upgradeCompletionHandler(context) } ) @@ -80,7 +81,7 @@ private func setUpClientChannel( } // A HTTP handler that will send an initial request which can be augmented by the upgrade handler. -private final class BasicHTTPHandler: ChannelInboundHandler, RemovableChannelHandler { +private final class BasicHTTPHandler: ChannelInboundHandler, RemovableChannelHandler, Sendable { fileprivate typealias InboundIn = HTTPClientResponsePart fileprivate typealias OutboundOut = HTTPClientRequestPart @@ -92,7 +93,7 @@ private final class BasicHTTPHandler: ChannelInboundHandler, RemovableChannelHan // A HTTP handler that will send a request and then fail if it receives a response or an error. // It can be used when there is a successful upgrade as the handler should be removed by the upgrader. -private final class ExplodingHTTPHandler: ChannelInboundHandler, RemovableChannelHandler { +private final class ExplodingHTTPHandler: ChannelInboundHandler, RemovableChannelHandler, Sendable { fileprivate typealias InboundIn = HTTPClientResponsePart fileprivate typealias OutboundOut = HTTPClientRequestPart @@ -163,7 +164,7 @@ private func basicRequest(path: String = "/") -> String { class WebSocketClientEndToEndTests: XCTestCase { func testSimpleUpgradeSucceeds() throws { - var upgradeHandlerCallbackFired = false + let upgradeHandlerCallbackFired = NIOLockedValueBox(false) let requestKey = "OfS0wDaT5NoxF2gqm7Zj2YtetzM=" let responseKey = "yKEqitDFPE81FyIhKTm+ojBqigk=" @@ -183,7 +184,7 @@ class WebSocketClientEndToEndTests: XCTestCase { ) { _ in // This is called before the upgrader gets called. - upgradeHandlerCallbackFired = true + upgradeHandlerCallbackFired.withLockedValue { $0 = true } } // Read the server request. @@ -233,7 +234,7 @@ class WebSocketClientEndToEndTests: XCTestCase { .assertContains(handlerType: WebSocketRecorderHandler.self) ) - XCTAssert(upgradeHandlerCallbackFired) + XCTAssert(upgradeHandlerCallbackFired.withLockedValue { $0 }) // Close the pipeline. XCTAssertNoThrow(try clientChannel.close().wait()) diff --git a/Tests/NIOWebSocketTests/WebSocketFrameDecoderTest.swift b/Tests/NIOWebSocketTests/WebSocketFrameDecoderTest.swift index 1ae4c0bf9bf..594df5b99cf 100644 --- a/Tests/NIOWebSocketTests/WebSocketFrameDecoderTest.swift +++ b/Tests/NIOWebSocketTests/WebSocketFrameDecoderTest.swift @@ -24,7 +24,7 @@ private class CloseSwallower: ChannelOutboundHandler, RemovableChannelHandler { private var closePromise: EventLoopPromise? = nil private var context: ChannelHandlerContext? = nil - public func allowClose() { + func allowClose() { self.context!.close(promise: self.closePromise) self.context = nil } @@ -58,12 +58,12 @@ private final class SynchronousCloser: ChannelInboundHandler { } } -public final class WebSocketFrameDecoderTest: XCTestCase { - public var decoderChannel: EmbeddedChannel! - public var encoderChannel: EmbeddedChannel! - public var buffer: ByteBuffer! +final class WebSocketFrameDecoderTest: XCTestCase { + var decoderChannel: EmbeddedChannel! + var encoderChannel: EmbeddedChannel! + var buffer: ByteBuffer! - public override func setUp() { + override func setUp() { self.decoderChannel = EmbeddedChannel() self.encoderChannel = EmbeddedChannel() self.buffer = decoderChannel.allocator.buffer(capacity: 128) @@ -73,7 +73,7 @@ public final class WebSocketFrameDecoderTest: XCTestCase { XCTAssertNoThrow(try self.encoderChannel.pipeline.syncOperations.addHandler(WebSocketFrameEncoder())) } - public override func tearDown() { + override func tearDown() { XCTAssertNoThrow(try self.encoderChannel.finish()) _ = try? self.decoderChannel.finish() self.encoderChannel = nil @@ -114,32 +114,31 @@ public final class WebSocketFrameDecoderTest: XCTestCase { // We need to insert a decoder that doesn't do error handling. We still insert // an encoder because we want to fail gracefully if a frame is written. let f = self.decoderChannel.pipeline.context(handlerType: ByteToMessageHandler.self) - .flatMapThrowing { - if let handler = $0.handler as? RemovableChannelHandler { - return handler + .assumeIsolated() + .flatMap { context in + if let handler = context.handler as? RemovableChannelHandler { + return self.decoderChannel.pipeline.syncOperations.removeHandler(handler) } else { - throw ChannelError.unremovableHandler + return context.eventLoop.makeFailedFuture(ChannelError.unremovableHandler) } - }.flatMap { - self.decoderChannel.pipeline.removeHandler($0) } // we need to run the event loop here because removal is not synchronous (self.decoderChannel.eventLoop as! EmbeddedEventLoop).run() XCTAssertNoThrow( - try f.flatMap { - self.decoderChannel.pipeline.addHandler(handler) - }.wait() + try f.flatMapThrowing { + try self.decoderChannel.pipeline.syncOperations.addHandler(handler) + }.nonisolated().wait() ) } - public func testFramesWithoutBodies() throws { + func testFramesWithoutBodies() throws { let frame = WebSocketFrame(fin: true, opcode: .ping, data: self.buffer) assertFrameRoundTrips(frame: frame) } - public func testFramesWithExtensionDataDontRoundTrip() throws { + func testFramesWithExtensionDataDontRoundTrip() throws { // We don't know what the extensions are, so all data goes in...well...data. self.buffer.writeBytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) let frame = WebSocketFrame( @@ -151,7 +150,7 @@ public final class WebSocketFrameDecoderTest: XCTestCase { assertFrameDoesNotRoundTrip(frame: frame) } - public func testFramesWithExtensionDataCanBeRecovered() throws { + func testFramesWithExtensionDataCanBeRecovered() throws { self.buffer.writeBytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) let frame = WebSocketFrame( fin: false, @@ -165,7 +164,7 @@ public final class WebSocketFrameDecoderTest: XCTestCase { XCTAssertEqual(newFrame, frame) } - public func testFramesWithReservedBitsSetRoundTrip() throws { + func testFramesWithReservedBitsSetRoundTrip() throws { self.buffer.writeBytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) let frame = WebSocketFrame( fin: false, @@ -178,7 +177,7 @@ public final class WebSocketFrameDecoderTest: XCTestCase { assertFrameRoundTrips(frame: frame) } - public func testFramesWith16BitLengthsRoundTrip() throws { + func testFramesWith16BitLengthsRoundTrip() throws { self.buffer.writeBytes(Array(repeating: UInt8(4), count: 300)) let frame = WebSocketFrame( fin: true, @@ -188,7 +187,7 @@ public final class WebSocketFrameDecoderTest: XCTestCase { assertFrameRoundTrips(frame: frame) } - public func testFramesWith64BitLengthsRoundTrip() throws { + func testFramesWith64BitLengthsRoundTrip() throws { // We need a new decoder channel here, because the max length would otherwise trigger an error. _ = try! self.decoderChannel.finish() self.decoderChannel = EmbeddedChannel() @@ -207,7 +206,7 @@ public final class WebSocketFrameDecoderTest: XCTestCase { assertFrameRoundTrips(frame: frame) } - public func testMaskedFramesRoundTripWithMaskingIntact() throws { + func testMaskedFramesRoundTripWithMaskingIntact() throws { self.buffer.writeBytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) let frame = WebSocketFrame( fin: false, @@ -232,7 +231,7 @@ public final class WebSocketFrameDecoderTest: XCTestCase { XCTAssertEqual(producedFrame.unmaskedData, self.buffer) } - public func testMaskedFramesRoundTripWithMaskingIntactEvenWithExtensions() throws { + func testMaskedFramesRoundTripWithMaskingIntactEvenWithExtensions() throws { self.buffer.writeBytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) let frame = WebSocketFrame( fin: false, @@ -266,7 +265,7 @@ public final class WebSocketFrameDecoderTest: XCTestCase { ) } - public func testDecoderRejectsOverlongFrames() throws { + func testDecoderRejectsOverlongFrames() throws { XCTAssertNoThrow( try self.decoderChannel.pipeline.syncOperations.addHandler(WebSocketFrameEncoder(), position: .first) ) @@ -284,7 +283,7 @@ public final class WebSocketFrameDecoderTest: XCTestCase { XCTAssertNoThrow(XCTAssertEqual([0x88, 0x02, 0x03, 0xF1], try self.decoderChannel.readAllOutboundBytes())) } - public func testDecoderRejectsFragmentedControlFrames() throws { + func testDecoderRejectsFragmentedControlFrames() throws { XCTAssertNoThrow( try self.decoderChannel.pipeline.syncOperations.addHandler(WebSocketFrameEncoder(), position: .first) ) @@ -301,7 +300,7 @@ public final class WebSocketFrameDecoderTest: XCTestCase { XCTAssertNoThrow(XCTAssertEqual([0x88, 0x02, 0x03, 0xEA], try self.decoderChannel.readAllOutboundBytes())) } - public func testDecoderRejectsMultibyteControlFrameLengths() throws { + func testDecoderRejectsMultibyteControlFrameLengths() throws { XCTAssertNoThrow( try self.decoderChannel.pipeline.syncOperations.addHandler(WebSocketFrameEncoder(), position: .first) ) @@ -357,12 +356,12 @@ public final class WebSocketFrameDecoderTest: XCTestCase { swallower.allowClose() // Take the handler out for cleanliness. - XCTAssertNoThrow(try self.decoderChannel.pipeline.removeHandler(swallower).wait()) + XCTAssertNoThrow(try self.decoderChannel.pipeline.syncOperations.removeHandler(swallower).wait()) } - public func testClosingSynchronouslyOnChannelRead() throws { + func testClosingSynchronouslyOnChannelRead() throws { // We're going to send a connectionClose frame and confirm we only see it once. - XCTAssertNoThrow(try self.decoderChannel.pipeline.addHandler(SynchronousCloser()).wait()) + XCTAssertNoThrow(try self.decoderChannel.pipeline.syncOperations.addHandler(SynchronousCloser())) var errorCodeBuffer = self.encoderChannel.allocator.buffer(capacity: 4) errorCodeBuffer.write(webSocketErrorCode: .normalClosure) @@ -382,7 +381,7 @@ public final class WebSocketFrameDecoderTest: XCTestCase { XCTAssertNoThrow(XCTAssertNil(try self.decoderChannel.readInbound(as: WebSocketFrame.self))) } - public func testDecoderRejectsOverlongFramesWithNoAutomaticErrorHandling() { + func testDecoderRejectsOverlongFramesWithNoAutomaticErrorHandling() { // We need to insert a decoder that doesn't do error handling. We still insert // an encoder because we want to fail gracefully if a frame is written. self.swapDecoder(for: ByteToMessageHandler(WebSocketFrameDecoder())) @@ -402,7 +401,7 @@ public final class WebSocketFrameDecoderTest: XCTestCase { XCTAssertNoThrow(XCTAssertEqual([], try self.decoderChannel.readAllOutboundBytes())) } - public func testDecoderRejectsFragmentedControlFramesWithNoAutomaticErrorHandling() throws { + func testDecoderRejectsFragmentedControlFramesWithNoAutomaticErrorHandling() throws { // We need to insert a decoder that doesn't do error handling. We still insert // an encoder because we want to fail gracefully if a frame is written. self.swapDecoder(for: ByteToMessageHandler(WebSocketFrameDecoder())) @@ -421,7 +420,7 @@ public final class WebSocketFrameDecoderTest: XCTestCase { XCTAssertNoThrow(XCTAssertEqual([], try self.decoderChannel.readAllOutboundBytes())) } - public func testDecoderRejectsMultibyteControlFrameLengthsWithNoAutomaticErrorHandling() throws { + func testDecoderRejectsMultibyteControlFrameLengthsWithNoAutomaticErrorHandling() throws { // We need to insert a decoder that doesn't do error handling. We still insert // an encoder because we want to fail gracefully if a frame is written. self.swapDecoder(for: ByteToMessageHandler(WebSocketFrameDecoder())) @@ -476,7 +475,7 @@ public final class WebSocketFrameDecoderTest: XCTestCase { XCTAssertNoThrow(XCTAssertNil(try self.decoderChannel.readOutbound())) } - public func testDecoderRejectsOverlongFramesWithSeparateErrorHandling() throws { + func testDecoderRejectsOverlongFramesWithSeparateErrorHandling() throws { // We need to insert a decoder that doesn't do error handling, and then a separate error // handler. self.swapDecoder(for: ByteToMessageHandler(WebSocketFrameDecoder())) @@ -497,7 +496,7 @@ public final class WebSocketFrameDecoderTest: XCTestCase { XCTAssertNoThrow(XCTAssertEqual([0x88, 0x02, 0x03, 0xF1], try self.decoderChannel.readAllOutboundBytes())) } - public func testDecoderRejectsFragmentedControlFramesWithSeparateErrorHandling() throws { + func testDecoderRejectsFragmentedControlFramesWithSeparateErrorHandling() throws { // We need to insert a decoder that doesn't do error handling, and then a separate error // handler. self.swapDecoder(for: ByteToMessageHandler(WebSocketFrameDecoder())) @@ -517,7 +516,7 @@ public final class WebSocketFrameDecoderTest: XCTestCase { XCTAssertNoThrow(XCTAssertEqual([0x88, 0x02, 0x03, 0xEA], try self.decoderChannel.readAllOutboundBytes())) } - public func testDecoderRejectsMultibyteControlFrameLengthsWithSeparateErrorHandling() throws { + func testDecoderRejectsMultibyteControlFrameLengthsWithSeparateErrorHandling() throws { // We need to insert a decoder that doesn't do error handling, and then a separate error // handler. self.swapDecoder(for: ByteToMessageHandler(WebSocketFrameDecoder())) @@ -579,7 +578,7 @@ public final class WebSocketFrameDecoderTest: XCTestCase { swallower.allowClose() // Take the handler out for cleanliness. - XCTAssertNoThrow(try self.decoderChannel.pipeline.removeHandler(swallower).wait()) + XCTAssertNoThrow(try self.decoderChannel.pipeline.syncOperations.removeHandler(swallower).wait()) } func testErrorHandlerDoesNotSwallowRandomErrors() throws { diff --git a/Tests/NIOWebSocketTests/WebSocketServerEndToEndTests.swift b/Tests/NIOWebSocketTests/WebSocketServerEndToEndTests.swift index 39161af5c7f..c2561b7aae6 100644 --- a/Tests/NIOWebSocketTests/WebSocketServerEndToEndTests.swift +++ b/Tests/NIOWebSocketTests/WebSocketServerEndToEndTests.swift @@ -12,6 +12,7 @@ // //===----------------------------------------------------------------------===// +import NIOConcurrencyHelpers import NIOEmbedded import NIOHTTP1 import XCTest @@ -137,11 +138,12 @@ struct WebSocketServerUpgraderConfiguration { class WebSocketServerEndToEndTests: XCTestCase { func createTestFixtures( - upgraders: [WebSocketServerUpgraderConfiguration] + upgraders: [WebSocketServerUpgraderConfiguration], + loop: EmbeddedEventLoop? = nil ) -> (loop: EmbeddedEventLoop, serverChannel: EmbeddedChannel, clientChannel: EmbeddedChannel) { - let loop = EmbeddedEventLoop() + let loop = loop ?? EmbeddedEventLoop() let serverChannel = EmbeddedChannel(loop: loop) - let upgraders = upgraders.map { + let upgraders: [HTTPServerProtocolUpgrader & Sendable] = upgraders.map { NIOWebSocketServerUpgrader( maxFrameSize: $0.maxFrameSize, automaticErrorHandling: $0.automaticErrorHandling, @@ -152,7 +154,7 @@ class WebSocketServerEndToEndTests: XCTestCase { XCTAssertNoThrow( try serverChannel.pipeline.configureHTTPServerPipeline( withServerUpgrade: ( - upgraders: upgraders as [HTTPServerProtocolUpgrader], + upgraders: upgraders, completionHandler: { (context: ChannelHandlerContext) in } ) ).wait() @@ -265,16 +267,17 @@ class WebSocketServerEndToEndTests: XCTestCase { func testCanDelayAcceptingUpgrade() throws { // This accept promise is going to be written to from a callback. This is only safe because we use // embedded channels. - var acceptPromise: EventLoopPromise? = nil - var upgradeComplete = false + let acceptPromise = NIOLockedValueBox?>(nil) + let upgradeComplete = NIOLockedValueBox(false) let basicUpgrader = WebSocketServerUpgraderConfiguration( shouldUpgrade: { (channel, head) in - acceptPromise = channel.eventLoop.makePromise() - return acceptPromise!.futureResult + let promise = channel.eventLoop.makePromise(of: Optional.self) + acceptPromise.withLockedValue { $0 = promise } + return promise.futureResult }, upgradePipelineHandler: { (channel, req) in - upgradeComplete = true + upgradeComplete.withLockedValue { $0 = true } return channel.eventLoop.makeSucceededFuture(()) } ) @@ -285,7 +288,7 @@ class WebSocketServerEndToEndTests: XCTestCase { XCTAssertNoThrow(try loop.syncShutdownGracefully()) } - XCTAssertNil(acceptPromise) + XCTAssertNil(acceptPromise.withLockedValue { $0 }) let upgradeRequest = self.upgradeRequest(extraHeaders: [ "Sec-WebSocket-Version": "13", "Sec-WebSocket-Key": "AQIDBAUGBwgJCgsMDQ4PEC==", @@ -297,12 +300,12 @@ class WebSocketServerEndToEndTests: XCTestCase { // No upgrade should have occurred yet. XCTAssertNoThrow(XCTAssertNil(try client.readInbound(as: ByteBuffer.self))) - XCTAssertFalse(upgradeComplete) + XCTAssertFalse(upgradeComplete.withLockedValue { $0 }) // Satisfy the promise. This will cause the upgrade to complete. - acceptPromise?.succeed(HTTPHeaders()) + acceptPromise.withLockedValue { $0?.succeed(HTTPHeaders()) } loop.run() - XCTAssertTrue(upgradeComplete) + XCTAssertTrue(upgradeComplete.withLockedValue { $0 }) XCTAssertNoThrow(try interactInMemory(client, server, eventLoop: loop)) XCTAssertNoThrow( @@ -467,15 +470,19 @@ class WebSocketServerEndToEndTests: XCTestCase { } func testSendAFewFrames() throws { - let recorder = WebSocketRecorderHandler() + let embeddedEventLoop = EmbeddedEventLoop() + let recorder = NIOLoopBound(WebSocketRecorderHandler(), eventLoop: embeddedEventLoop) + let basicUpgrader = WebSocketServerUpgraderConfiguration( shouldUpgrade: { (channel, head) in channel.eventLoop.makeSucceededFuture(HTTPHeaders()) }, upgradePipelineHandler: { (channel, req) in - channel.pipeline.addHandler(recorder) - + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler(recorder.value) + } } ) - let (loop, server, client) = self.createTestFixtures(upgraders: [basicUpgrader]) + + let (loop, server, client) = self.createTestFixtures(upgraders: [basicUpgrader], loop: embeddedEventLoop) defer { XCTAssertNoThrow(try client.finish()) XCTAssertNoThrow(try server.finish()) @@ -512,7 +519,7 @@ class WebSocketServerEndToEndTests: XCTestCase { XCTAssertNoThrow(try client.writeAndFlush(pingFrame).wait()) XCTAssertNoThrow(try interactInMemory(client, server, eventLoop: loop)) - XCTAssertEqual(recorder.frames, [dataFrame, pingFrame]) + XCTAssertEqual(recorder.value.frames, [dataFrame, pingFrame]) } func testMaxFrameSize() throws { @@ -553,15 +560,18 @@ class WebSocketServerEndToEndTests: XCTestCase { } func testAutomaticErrorHandling() throws { - let recorder = WebSocketRecorderHandler() + let embeddedEventLoop = EmbeddedEventLoop() + let recorder = NIOLoopBound(WebSocketRecorderHandler(), eventLoop: embeddedEventLoop) + let basicUpgrader = WebSocketServerUpgraderConfiguration( shouldUpgrade: { (channel, head) in channel.eventLoop.makeSucceededFuture(HTTPHeaders()) }, upgradePipelineHandler: { (channel, req) in - channel.pipeline.addHandler(recorder) - + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler(recorder.value) + } } ) - let (loop, server, client) = self.createTestFixtures(upgraders: [basicUpgrader]) + let (loop, server, client) = self.createTestFixtures(upgraders: [basicUpgrader], loop: embeddedEventLoop) defer { XCTAssertNoThrow(try client.finish()) XCTAssertNoThrow(try server.finishAcceptingAlreadyClosed()) @@ -593,24 +603,27 @@ class WebSocketServerEndToEndTests: XCTestCase { XCTAssertEqual(NIOWebSocketError.multiByteControlFrameLength, error as? NIOWebSocketError) } - XCTAssertEqual(recorder.errors.count, 1) - XCTAssertEqual(recorder.errors.first as? NIOWebSocketError, .some(.multiByteControlFrameLength)) + XCTAssertEqual(recorder.value.errors.count, 1) + XCTAssertEqual(recorder.value.errors.first as? NIOWebSocketError, .some(.multiByteControlFrameLength)) // The client should have received a close frame, if we'd continued interacting. XCTAssertNoThrow(XCTAssertEqual(try server.readAllOutboundBytes(), [0x88, 0x02, 0x03, 0xEA])) } func testNoAutomaticErrorHandling() throws { - let recorder = WebSocketRecorderHandler() + let embeddedEventLoop = EmbeddedEventLoop() + let recorder = NIOLoopBound(WebSocketRecorderHandler(), eventLoop: embeddedEventLoop) + let basicUpgrader = WebSocketServerUpgraderConfiguration( automaticErrorHandling: false, shouldUpgrade: { (channel, head) in channel.eventLoop.makeSucceededFuture(HTTPHeaders()) }, upgradePipelineHandler: { (channel, req) in - channel.pipeline.addHandler(recorder) - + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler(recorder.value) + } } ) - let (loop, server, client) = self.createTestFixtures(upgraders: [basicUpgrader]) + let (loop, server, client) = self.createTestFixtures(upgraders: [basicUpgrader], loop: embeddedEventLoop) defer { XCTAssertNoThrow(try client.finish()) XCTAssertNoThrow(try server.finishAcceptingAlreadyClosed()) @@ -642,8 +655,8 @@ class WebSocketServerEndToEndTests: XCTestCase { XCTAssertEqual(NIOWebSocketError.multiByteControlFrameLength, error as? NIOWebSocketError) } - XCTAssertEqual(recorder.errors.count, 1) - XCTAssertEqual(recorder.errors.first as? NIOWebSocketError, .some(.multiByteControlFrameLength)) + XCTAssertEqual(recorder.value.errors.count, 1) + XCTAssertEqual(recorder.value.errors.first as? NIOWebSocketError, .some(.multiByteControlFrameLength)) // The client should not have received a close frame, if we'd continued interacting. XCTAssertNoThrow(XCTAssertEqual([], try server.readAllOutboundBytes())) @@ -654,9 +667,10 @@ class WebSocketServerEndToEndTests: XCTestCase { @available(macOS 13, iOS 16, tvOS 16, watchOS 9, *) final class TypedWebSocketServerEndToEndTests: WebSocketServerEndToEndTests { override func createTestFixtures( - upgraders: [WebSocketServerUpgraderConfiguration] + upgraders: [WebSocketServerUpgraderConfiguration], + loop: EmbeddedEventLoop? = nil ) -> (loop: EmbeddedEventLoop, serverChannel: EmbeddedChannel, clientChannel: EmbeddedChannel) { - let loop = EmbeddedEventLoop() + let loop = loop ?? EmbeddedEventLoop() let serverChannel = EmbeddedChannel(loop: loop) let upgraders = upgraders.map { NIOTypedWebSocketServerUpgrader(