From 2fb8c8d18dadfc2359e2f5c69194d06d6c599eba Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Thu, 27 Jun 2024 16:24:14 +0100 Subject: [PATCH 01/11] Close channel during upgrade, if client closes input --- .../NIOHTTP1/NIOTypedHTTPServerUpgradeHandler.swift | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/Sources/NIOHTTP1/NIOTypedHTTPServerUpgradeHandler.swift b/Sources/NIOHTTP1/NIOTypedHTTPServerUpgradeHandler.swift index e22a57a806a..bc24341c088 100644 --- a/Sources/NIOHTTP1/NIOTypedHTTPServerUpgradeHandler.swift +++ b/Sources/NIOHTTP1/NIOTypedHTTPServerUpgradeHandler.swift @@ -154,6 +154,17 @@ public final class NIOTypedHTTPServerUpgradeHandler: Ch } } + public func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) { + switch event { + case let evt as ChannelEvent where evt == ChannelEvent.inputClosed: + // The remote peer half-closed the channel during the upgrade so we should close + context.close(promise: nil) + + default: + context.fireUserInboundEventTriggered(event) + } + } + private func channelRead(context: ChannelHandlerContext, requestPart: HTTPServerRequestPart) { switch self.stateMachine.channelReadRequestPart(requestPart) { case .failUpgradePromise(let error): From bf01ecfeb96fa99b9b70f4b00c186219a7b491b9 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Fri, 28 Jun 2024 09:38:10 +0100 Subject: [PATCH 02/11] Check state machine before closing during upgrade --- .../NIOTypedHTTPServerUpgradeHandler.swift | 10 ++++++++-- ...IOTypedHTTPServerUpgraderStateMachine.swift | 18 ++++++++++++++++++ 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/Sources/NIOHTTP1/NIOTypedHTTPServerUpgradeHandler.swift b/Sources/NIOHTTP1/NIOTypedHTTPServerUpgradeHandler.swift index bc24341c088..7f9bf380660 100644 --- a/Sources/NIOHTTP1/NIOTypedHTTPServerUpgradeHandler.swift +++ b/Sources/NIOHTTP1/NIOTypedHTTPServerUpgradeHandler.swift @@ -157,8 +157,14 @@ public final class NIOTypedHTTPServerUpgradeHandler: Ch public func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) { switch event { case let evt as ChannelEvent where evt == ChannelEvent.inputClosed: - // The remote peer half-closed the channel during the upgrade so we should close - context.close(promise: nil) + // The remote peer half-closed the channel during the upgrade. Should we close the other side + switch self.stateMachine.closeInbound() { + case .close: + self.upgradeResultPromise.fail(ChannelError.ioOnClosedChannel) + context.close(promise: nil) + case .continue: + break + } default: context.fireUserInboundEventTriggered(event) diff --git a/Sources/NIOHTTP1/NIOTypedHTTPServerUpgraderStateMachine.swift b/Sources/NIOHTTP1/NIOTypedHTTPServerUpgraderStateMachine.swift index 80fd018944f..6d821781135 100644 --- a/Sources/NIOHTTP1/NIOTypedHTTPServerUpgraderStateMachine.swift +++ b/Sources/NIOHTTP1/NIOTypedHTTPServerUpgraderStateMachine.swift @@ -393,5 +393,23 @@ struct NIOTypedHTTPServerUpgraderStateMachine { } } + @usableFromInline + enum CloseInboundAction { + case close + case `continue` + } + + @inlinable + mutating func closeInbound() -> CloseInboundAction { + switch self.state { + case .initial, .awaitingUpgrader: + self.state = .finished + return .close + + default: + return .continue + } + } + } #endif From 14da3d1abd1425dba4702fa23a9e0ef821bb7e8b Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Fri, 28 Jun 2024 11:56:42 +0100 Subject: [PATCH 03/11] Add test for half closure I had to add another handler to `setUpTestWithAutoremoval` to catch upgrade errors. --- .../NIOTypedHTTPServerUpgradeHandler.swift | 2 +- .../HTTPServerUpgradeTests.swift | 35 ++++++++++++++++++- 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/Sources/NIOHTTP1/NIOTypedHTTPServerUpgradeHandler.swift b/Sources/NIOHTTP1/NIOTypedHTTPServerUpgradeHandler.swift index 7f9bf380660..1c3df66dcf8 100644 --- a/Sources/NIOHTTP1/NIOTypedHTTPServerUpgradeHandler.swift +++ b/Sources/NIOHTTP1/NIOTypedHTTPServerUpgradeHandler.swift @@ -160,7 +160,7 @@ public final class NIOTypedHTTPServerUpgradeHandler: Ch // The remote peer half-closed the channel during the upgrade. Should we close the other side switch self.stateMachine.closeInbound() { case .close: - self.upgradeResultPromise.fail(ChannelError.ioOnClosedChannel) + self.upgradeResultPromise.fail(ChannelError.inputClosed) context.close(promise: nil) case .continue: break diff --git a/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift b/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift index 166b7b0934f..50480492062 100644 --- a/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift +++ b/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift @@ -489,6 +489,7 @@ class HTTPServerUpgradeTestCase: XCTestCase { upgraders: [any TypedAndUntypedHTTPServerProtocolUpgrader], extraHandlers: [ChannelHandler], notUpgradingHandler: (@Sendable (Channel) -> EventLoopFuture)? = nil, + upgradeErrorHandler: (@Sendable (Error) -> Void)? = nil, _ upgradeCompletionHandler: @escaping UpgradeCompletionHandler ) throws -> (Channel, Channel, Channel) { let (serverChannel, connectedServerChannelFuture) = try serverHTTPChannelWithAutoremoval( @@ -1770,11 +1771,13 @@ final class TypedHTTPServerUpgradeTestCase: HTTPServerUpgradeTestCase { upgraders: [any TypedAndUntypedHTTPServerProtocolUpgrader], extraHandlers: [ChannelHandler], notUpgradingHandler: (@Sendable (Channel) -> EventLoopFuture)? = nil, + upgradeErrorHandler: (@Sendable (Error) -> Void)? = nil, _ upgradeCompletionHandler: @escaping UpgradeCompletionHandler ) throws -> (Channel, Channel, Channel) { let connectionChannelPromise = Self.eventLoop.makePromise(of: Channel.self) let serverChannelFuture = ServerBootstrap(group: Self.eventLoop) - .serverChannelOption(.socketOption(.so_reuseaddr), value: 1) + .serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) + .childChannelOption(ChannelOptions.allowRemoteHalfClosure, value: true) .childChannelInitializer { channel in channel.eventLoop.makeCompletedFuture { connectionChannelPromise.succeed(channel) @@ -1800,6 +1803,10 @@ final class TypedHTTPServerUpgradeTestCase: HTTPServerUpgradeTestCase { return channel.eventLoop.makeSucceededVoidFuture() } } + .flatMapErrorThrowing { error in + upgradeErrorHandler?(error) + throw error + } } .flatMap { _ in let futureResults = extraHandlers.map { channel.pipeline.addHandler($0) } @@ -2313,5 +2320,31 @@ final class TypedHTTPServerUpgradeTestCase: HTTPServerUpgradeTestCase { // We also want to confirm that the upgrade handler is no longer in the pipeline. try connectedServer.pipeline.waitForUpgraderToBeRemoved() } + + func testHalfClosure() throws { + let errorCaught = UnsafeMutableTransferBox(false) + + let upgrader = SuccessfulUpgrader(forProtocol: "myproto", requiringHeaders: ["kafkaesque"]) { req in + XCTFail("Upgrade cannot be successful if we don't send any data to server") + } + let (_, client, connectedServer) = try setUpTestWithAutoremoval( + upgraders: [upgrader], + extraHandlers: [], + upgradeErrorHandler: { error in + switch error { + case ChannelError.inputClosed: + errorCaught.wrappedValue = true + default: + break + } + } + ) { (context) in + } + + try client.close().wait() + try connectedServer.closeFuture.wait() + XCTAssertEqual(errorCaught.wrappedValue, true) + } + } #endif From 47db17259e0764e559785bbad92da61b99e8c549 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Mon, 1 Jul 2024 16:03:32 +0100 Subject: [PATCH 04/11] HTTP upgrades: store inputClosed for processing when unbuffering --- .../NIOTypedHTTPServerUpgradeHandler.swift | 2 + ...OTypedHTTPServerUpgraderStateMachine.swift | 62 +++++++++++++++---- 2 files changed, 51 insertions(+), 13 deletions(-) diff --git a/Sources/NIOHTTP1/NIOTypedHTTPServerUpgradeHandler.swift b/Sources/NIOHTTP1/NIOTypedHTTPServerUpgradeHandler.swift index 1c3df66dcf8..e013b013ebd 100644 --- a/Sources/NIOHTTP1/NIOTypedHTTPServerUpgradeHandler.swift +++ b/Sources/NIOHTTP1/NIOTypedHTTPServerUpgradeHandler.swift @@ -416,6 +416,8 @@ public final class NIOTypedHTTPServerUpgradeHandler: Ch private func unbuffer(context: ChannelHandlerContext) { while true { switch self.stateMachine.unbuffer() { + case .close: + context.close(promise: nil) case .fireChannelRead(let data): context.fireChannelRead(data) diff --git a/Sources/NIOHTTP1/NIOTypedHTTPServerUpgraderStateMachine.swift b/Sources/NIOHTTP1/NIOTypedHTTPServerUpgraderStateMachine.swift index 6d821781135..6fecbad1095 100644 --- a/Sources/NIOHTTP1/NIOTypedHTTPServerUpgraderStateMachine.swift +++ b/Sources/NIOHTTP1/NIOTypedHTTPServerUpgraderStateMachine.swift @@ -22,10 +22,15 @@ struct NIOTypedHTTPServerUpgraderStateMachine { /// The state before we received a TLSUserEvent. We are just forwarding any read at this point. case initial + enum BufferedState { + case data(NIOAny) + case inputClosed + } + @usableFromInline struct AwaitingUpgrader { var seenFirstRequest: Bool - var buffer: Deque + var buffer: Deque } /// The request head has been received. We're currently running the future chain awaiting an upgrader. @@ -37,7 +42,7 @@ struct NIOTypedHTTPServerUpgraderStateMachine { var requestHead: HTTPRequestHead var responseHeaders: HTTPHeaders var proto: String - var buffer: Deque + var buffer: Deque } /// We have an upgrader, which means we can begin upgrade we are just waiting for the request end. @@ -45,14 +50,14 @@ struct NIOTypedHTTPServerUpgraderStateMachine { @usableFromInline struct Upgrading { - var buffer: Deque + var buffer: Deque } /// We are either running the upgrading handler. case upgrading(Upgrading) @usableFromInline struct Unbuffering { - var buffer: Deque + var buffer: Deque } case unbuffering(Unbuffering) @@ -99,7 +104,7 @@ struct NIOTypedHTTPServerUpgraderStateMachine { if awaitingUpgrader.seenFirstRequest { // We should buffer the data since we have seen the full request. self.state = .modifying - awaitingUpgrader.buffer.append(data) + awaitingUpgrader.buffer.append(.data(data)) self.state = .awaitingUpgrader(awaitingUpgrader) return nil } else { @@ -114,7 +119,7 @@ struct NIOTypedHTTPServerUpgraderStateMachine { case .unbuffering(var unbuffering): self.state = .modifying - unbuffering.buffer.append(data) + unbuffering.buffer.append(.data(data)) self.state = .unbuffering(unbuffering) return nil @@ -125,7 +130,7 @@ struct NIOTypedHTTPServerUpgraderStateMachine { // We got a read while running ugprading. // We have to buffer the read to unbuffer it afterwards self.state = .modifying - upgrading.buffer.append(data) + upgrading.buffer.append(.data(data)) self.state = .upgrading(upgrading) return nil @@ -167,8 +172,8 @@ struct NIOTypedHTTPServerUpgraderStateMachine { guard requestedProtocols.count > 0 else { // We have to buffer now since we got the request head but are not upgrading. // The user is configuring the HTTP pipeline now. - var buffer = Deque() - buffer.append(NIOAny(requestPart)) + var buffer = Deque() + buffer.append(.data(NIOAny(requestPart))) self.state = .upgrading(.init(buffer: buffer)) return .runNotUpgradingInitializer } @@ -364,6 +369,7 @@ struct NIOTypedHTTPServerUpgraderStateMachine { @usableFromInline enum UnbufferAction { + case close case fireChannelRead(NIOAny) case fireChannelReadCompleteAndRemoveHandler } @@ -379,8 +385,12 @@ struct NIOTypedHTTPServerUpgraderStateMachine { if let element = unbuffering.buffer.popFirst() { self.state = .unbuffering(unbuffering) - - return .fireChannelRead(element) + switch element { + case .data(let data): + return .fireChannelRead(data) + case .inputClosed: + return .close + } } else { self.state = .finished @@ -402,11 +412,37 @@ struct NIOTypedHTTPServerUpgraderStateMachine { @inlinable mutating func closeInbound() -> CloseInboundAction { switch self.state { - case .initial, .awaitingUpgrader: + case .initial: self.state = .finished return .close - default: + case .awaitingUpgrader(var awaitingUpgrader): + if awaitingUpgrader.seenFirstRequest { + // We should buffer the input close since we have seen the full request. + awaitingUpgrader.buffer.append(.inputClosed) + self.state = .awaitingUpgrader(awaitingUpgrader) + return .continue + } else { + // We shouldn't buffer. This means we are still expecting HTTP parts. + return .close + } + + case .upgrading(var upgrading): + upgrading.buffer.append(.inputClosed) + self.state = .upgrading(upgrading) + return .continue + + case .upgraderReady(var upgraderReady): + upgraderReady.buffer.append(.inputClosed) + self.state = .upgraderReady(upgraderReady) + return .continue + + case .unbuffering(var unbuffering): + unbuffering.buffer.append(.inputClosed) + self.state = .unbuffering(unbuffering) + return .continue + + case .modifying, .finished: return .continue } } From 8ec2fb3d0d29e1714b6785c1aa849890b06e9cf1 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Tue, 23 Jul 2024 08:58:28 +0100 Subject: [PATCH 05/11] Update Sources/NIOHTTP1/NIOTypedHTTPServerUpgraderStateMachine.swift Co-authored-by: Franz Busch --- ...OTypedHTTPServerUpgraderStateMachine.swift | 4 +- .../HTTPServerUpgradeTests.swift | 53 ++++++++++++++----- 2 files changed, 41 insertions(+), 16 deletions(-) diff --git a/Sources/NIOHTTP1/NIOTypedHTTPServerUpgraderStateMachine.swift b/Sources/NIOHTTP1/NIOTypedHTTPServerUpgraderStateMachine.swift index 6fecbad1095..5214073a044 100644 --- a/Sources/NIOHTTP1/NIOTypedHTTPServerUpgraderStateMachine.swift +++ b/Sources/NIOHTTP1/NIOTypedHTTPServerUpgraderStateMachine.swift @@ -423,7 +423,7 @@ struct NIOTypedHTTPServerUpgraderStateMachine { self.state = .awaitingUpgrader(awaitingUpgrader) return .continue } else { - // We shouldn't buffer. This means we are still expecting HTTP parts. + // We shouldn't buffer. This means we were still expecting HTTP parts. return .close } @@ -446,6 +446,6 @@ struct NIOTypedHTTPServerUpgraderStateMachine { return .continue } } - + } #endif diff --git a/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift b/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift index 50480492062..4da161d8a15 100644 --- a/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift +++ b/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift @@ -1827,14 +1827,14 @@ final class TypedHTTPServerUpgradeTestCase: HTTPServerUpgradeTestCase { let (_, client, connectedServer) = try setUpTestWithAutoremoval( upgraders: [upgrader], - extraHandlers: [] - ) { channel in - notUpgraderCbFired.wrappedValue = true - // We're closing the connection now. - channel.close(promise: nil) - return channel.eventLoop.makeSucceededFuture(true) - } _: { _ in - } + extraHandlers: [], + notUpgradingHandler: { channel in + notUpgraderCbFired.wrappedValue = true + // We're closing the connection now. + channel.close(promise: nil) + return channel.eventLoop.makeSucceededFuture(true) + } + ) { _ in } let completePromise = Self.eventLoop.makePromise(of: Void.self) let clientHandler = ArrayAccumulationHandler { buffers in @@ -2209,11 +2209,12 @@ final class TypedHTTPServerUpgradeTestCase: HTTPServerUpgradeTestCase { let upgrader = SuccessfulUpgrader( forProtocol: "myproto", - requiringHeaders: ["kafkaesque"] - ) { - // this is the wrong EL - otherELG.next().makeSucceededFuture($1) - } onUpgradeComplete: { req in + requiringHeaders: ["kafkaesque"], + buildUpgradeResponseFuture: { + // this is the wrong EL + otherELG.next().makeSucceededFuture($1) + } + ) { req in upgradeRequest.wrappedValue = req XCTAssertFalse(upgradeHandlerCbFired.wrappedValue) upgraderCbFired.wrappedValue = true @@ -2341,10 +2342,34 @@ final class TypedHTTPServerUpgradeTestCase: HTTPServerUpgradeTestCase { ) { (context) in } - try client.close().wait() + try client.close(mode: .output).wait() try connectedServer.closeFuture.wait() XCTAssertEqual(errorCaught.wrappedValue, true) } + /// Test that send a request and closing immediately performs a successful upgrade + func testSendRequestCloseImmediately() throws { + let upgradePerformed = UnsafeMutableTransferBox(false) + + let upgrader = SuccessfulUpgrader(forProtocol: "myproto", requiringHeaders: ["kafkaesque"]) { _ in + upgradePerformed.wrappedValue = true + } + let (_, client, connectedServer) = try setUpTestWithAutoremoval( + upgraders: [upgrader], + extraHandlers: [], + upgradeErrorHandler: { error in + XCTFail("Error: \(error)") + } + ) { (context) in + } + + let request = + "OPTIONS * HTTP/1.1\r\nHost: localhost\r\nUpgrade: myproto\r\nKafkaesque: yup\r\nConnection: upgrade\r\nConnection: kafkaesque\r\n\r\n" + XCTAssertNoThrow(try client.writeAndFlush(NIOAny(client.allocator.buffer(string: request))).wait()) + try client.close(mode: .output).wait() + try connectedServer.pipeline.waitForUpgraderToBeRemoved() + XCTAssertEqual(upgradePerformed.wrappedValue, true) + } + } #endif From ab437fdb78adf10a1950ac1864aedc3e0bb9ddb5 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Tue, 23 Jul 2024 13:47:51 +0100 Subject: [PATCH 06/11] fixes after format --- .../HTTPServerUpgradeTests.swift | 27 +++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift b/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift index 4da161d8a15..927658b5cea 100644 --- a/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift +++ b/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift @@ -1827,14 +1827,14 @@ final class TypedHTTPServerUpgradeTestCase: HTTPServerUpgradeTestCase { let (_, client, connectedServer) = try setUpTestWithAutoremoval( upgraders: [upgrader], - extraHandlers: [], - notUpgradingHandler: { channel in - notUpgraderCbFired.wrappedValue = true - // We're closing the connection now. - channel.close(promise: nil) - return channel.eventLoop.makeSucceededFuture(true) - } - ) { _ in } + extraHandlers: [] + ) { channel in + notUpgraderCbFired.wrappedValue = true + // We're closing the connection now. + channel.close(promise: nil) + return channel.eventLoop.makeSucceededFuture(true) + } _: { _ in + } let completePromise = Self.eventLoop.makePromise(of: Void.self) let clientHandler = ArrayAccumulationHandler { buffers in @@ -2209,12 +2209,11 @@ final class TypedHTTPServerUpgradeTestCase: HTTPServerUpgradeTestCase { let upgrader = SuccessfulUpgrader( forProtocol: "myproto", - requiringHeaders: ["kafkaesque"], - buildUpgradeResponseFuture: { - // this is the wrong EL - otherELG.next().makeSucceededFuture($1) - } - ) { req in + requiringHeaders: ["kafkaesque"] + ) { + // this is the wrong EL + otherELG.next().makeSucceededFuture($1) + } onUpgradeComplete: { req in upgradeRequest.wrappedValue = req XCTAssertFalse(upgradeHandlerCbFired.wrappedValue) upgraderCbFired.wrappedValue = true From 6ecde73be94f4c53483bb4881b9cc4beb05bd154 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Tue, 23 Jul 2024 13:48:37 +0100 Subject: [PATCH 07/11] formatting --- Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift b/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift index 927658b5cea..5b5d14f432c 100644 --- a/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift +++ b/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift @@ -2213,7 +2213,7 @@ final class TypedHTTPServerUpgradeTestCase: HTTPServerUpgradeTestCase { ) { // this is the wrong EL otherELG.next().makeSucceededFuture($1) - } onUpgradeComplete: { req in + } onUpgradeComplete: { req in upgradeRequest.wrappedValue = req XCTAssertFalse(upgradeHandlerCbFired.wrappedValue) upgraderCbFired.wrappedValue = true From 3d4200a61b9a83ef9eb4692b2036ed422c916bc4 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Wed, 13 Nov 2024 14:42:41 +0000 Subject: [PATCH 08/11] PR comments: State machine changes --- .../NIOTypedHTTPServerUpgradeHandler.swift | 8 +++++++- .../NIOTypedHTTPServerUpgraderStateMachine.swift | 15 ++++++++++----- Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift | 3 +-- 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/Sources/NIOHTTP1/NIOTypedHTTPServerUpgradeHandler.swift b/Sources/NIOHTTP1/NIOTypedHTTPServerUpgradeHandler.swift index e013b013ebd..4ceb46c7052 100644 --- a/Sources/NIOHTTP1/NIOTypedHTTPServerUpgradeHandler.swift +++ b/Sources/NIOHTTP1/NIOTypedHTTPServerUpgradeHandler.swift @@ -158,12 +158,14 @@ public final class NIOTypedHTTPServerUpgradeHandler: Ch switch event { case let evt as ChannelEvent where evt == ChannelEvent.inputClosed: // The remote peer half-closed the channel during the upgrade. Should we close the other side - switch self.stateMachine.closeInbound() { + switch self.stateMachine.inputClosed() { case .close: self.upgradeResultPromise.fail(ChannelError.inputClosed) context.close(promise: nil) case .continue: break + case .fireInputClosedEvent: + context.fireUserInboundEventTriggered(event) } default: @@ -418,6 +420,7 @@ public final class NIOTypedHTTPServerUpgradeHandler: Ch switch self.stateMachine.unbuffer() { case .close: context.close(promise: nil) + case .fireChannelRead(let data): context.fireChannelRead(data) @@ -425,6 +428,9 @@ public final class NIOTypedHTTPServerUpgradeHandler: Ch context.fireChannelReadComplete() context.pipeline.removeHandler(self, promise: nil) return + + case .fireInputClosedEvent: + context.fireUserInboundEventTriggered(ChannelEvent.inputClosed) } } } diff --git a/Sources/NIOHTTP1/NIOTypedHTTPServerUpgraderStateMachine.swift b/Sources/NIOHTTP1/NIOTypedHTTPServerUpgraderStateMachine.swift index 5214073a044..dd7f22a0d16 100644 --- a/Sources/NIOHTTP1/NIOTypedHTTPServerUpgraderStateMachine.swift +++ b/Sources/NIOHTTP1/NIOTypedHTTPServerUpgraderStateMachine.swift @@ -372,6 +372,7 @@ struct NIOTypedHTTPServerUpgraderStateMachine { case close case fireChannelRead(NIOAny) case fireChannelReadCompleteAndRemoveHandler + case fireInputClosedEvent } @inlinable @@ -389,7 +390,7 @@ struct NIOTypedHTTPServerUpgraderStateMachine { case .data(let data): return .fireChannelRead(data) case .inputClosed: - return .close + return .fireInputClosedEvent } } else { self.state = .finished @@ -404,13 +405,14 @@ struct NIOTypedHTTPServerUpgraderStateMachine { } @usableFromInline - enum CloseInboundAction { + enum InputClosedAction { case close case `continue` + case fireInputClosedEvent } @inlinable - mutating func closeInbound() -> CloseInboundAction { + mutating func inputClosed() -> InputClosedAction { switch self.state { case .initial: self.state = .finished @@ -442,8 +444,11 @@ struct NIOTypedHTTPServerUpgraderStateMachine { self.state = .unbuffering(unbuffering) return .continue - case .modifying, .finished: - return .continue + case .finished: + return .fireInputClosedEvent + + case .modifying: + fatalError("Internal inconsistency in HTTPServerUpgradeStateMachine") } } diff --git a/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift b/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift index 5b5d14f432c..444fd5aee6a 100644 --- a/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift +++ b/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift @@ -2364,11 +2364,10 @@ final class TypedHTTPServerUpgradeTestCase: HTTPServerUpgradeTestCase { let request = "OPTIONS * HTTP/1.1\r\nHost: localhost\r\nUpgrade: myproto\r\nKafkaesque: yup\r\nConnection: upgrade\r\nConnection: kafkaesque\r\n\r\n" - XCTAssertNoThrow(try client.writeAndFlush(NIOAny(client.allocator.buffer(string: request))).wait()) + XCTAssertNoThrow(try client.writeAndFlush(client.allocator.buffer(string: request)).wait()) try client.close(mode: .output).wait() try connectedServer.pipeline.waitForUpgraderToBeRemoved() XCTAssertEqual(upgradePerformed.wrappedValue, true) } - } #endif From 9f0d0d4dc9704c990248507d9c86709b53649484 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Wed, 13 Nov 2024 14:53:57 +0000 Subject: [PATCH 09/11] Close on upgraderReady if we receive a inputClosed event --- ...OTypedHTTPServerUpgraderStateMachine.swift | 10 ++++--- .../HTTPServerUpgradeTests.swift | 29 +++++++++++++++++++ 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/Sources/NIOHTTP1/NIOTypedHTTPServerUpgraderStateMachine.swift b/Sources/NIOHTTP1/NIOTypedHTTPServerUpgraderStateMachine.swift index dd7f22a0d16..ff228b9834c 100644 --- a/Sources/NIOHTTP1/NIOTypedHTTPServerUpgraderStateMachine.swift +++ b/Sources/NIOHTTP1/NIOTypedHTTPServerUpgraderStateMachine.swift @@ -434,10 +434,12 @@ struct NIOTypedHTTPServerUpgraderStateMachine { self.state = .upgrading(upgrading) return .continue - case .upgraderReady(var upgraderReady): - upgraderReady.buffer.append(.inputClosed) - self.state = .upgraderReady(upgraderReady) - return .continue + case .upgraderReady: + // if the state is `upgraderReady` we have received a `.head` but not an `.end`. + // If input is closed then there is no way to move this forward so we should + // close. + self.state = .finished + return .close case .unbuffering(var unbuffering): unbuffering.buffer.append(.inputClosed) diff --git a/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift b/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift index 444fd5aee6a..b17b2300e3e 100644 --- a/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift +++ b/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift @@ -2369,5 +2369,34 @@ final class TypedHTTPServerUpgradeTestCase: HTTPServerUpgradeTestCase { try connectedServer.pipeline.waitForUpgraderToBeRemoved() XCTAssertEqual(upgradePerformed.wrappedValue, true) } + + /// Test that sending an unfinished upgrade request and closing immediately throws + /// an input closed error + func testSendUnfinishedRequestCloseImmediately() throws { + let errorCaught = UnsafeMutableTransferBox(false) + + let upgrader = SuccessfulUpgrader(forProtocol: "myproto", requiringHeaders: ["kafkaesque"]) { _ in + } + let (_, client, connectedServer) = try setUpTestWithAutoremoval( + upgraders: [upgrader], + extraHandlers: [], + upgradeErrorHandler: { error in + switch error { + case ChannelError.inputClosed: + errorCaught.wrappedValue = true + default: + XCTFail("Error: \(error)") + } + } + ) { (context) in + } + + let request = + "OPTIONS * HTTP/1.1\r\nHost: localhost\r\ncontent-length: 10\r\nUpgrade: myproto\r\nKafkaesque: yup\r\nConnection: upgrade\r\nConnection: kafkaesque\r\n\r\n" + XCTAssertNoThrow(try client.writeAndFlush(client.allocator.buffer(string: request)).wait()) + try client.close(mode: .output).wait() + try connectedServer.pipeline.waitForUpgraderToBeRemoved() + XCTAssertEqual(errorCaught.wrappedValue, true) + } } #endif From 886140079575f2ad018ca43dffd82f1ee8d9178d Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Wed, 13 Nov 2024 15:25:40 +0000 Subject: [PATCH 10/11] Swapped close, promise.fail lines in userInboundEventTriggered --- Sources/NIOHTTP1/NIOTypedHTTPServerUpgradeHandler.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/NIOHTTP1/NIOTypedHTTPServerUpgradeHandler.swift b/Sources/NIOHTTP1/NIOTypedHTTPServerUpgradeHandler.swift index 4ceb46c7052..b430cf1be44 100644 --- a/Sources/NIOHTTP1/NIOTypedHTTPServerUpgradeHandler.swift +++ b/Sources/NIOHTTP1/NIOTypedHTTPServerUpgradeHandler.swift @@ -160,8 +160,8 @@ public final class NIOTypedHTTPServerUpgradeHandler: Ch // The remote peer half-closed the channel during the upgrade. Should we close the other side switch self.stateMachine.inputClosed() { case .close: - self.upgradeResultPromise.fail(ChannelError.inputClosed) context.close(promise: nil) + self.upgradeResultPromise.fail(ChannelError.inputClosed) case .continue: break case .fireInputClosedEvent: From e4840e0833087019c285c6a07c8fb7f72645fce7 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Wed, 13 Nov 2024 16:05:59 +0000 Subject: [PATCH 11/11] linting issues --- .../HTTPServerUpgradeTests.swift | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift b/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift index b17b2300e3e..3a69f806e35 100644 --- a/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift +++ b/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift @@ -2337,9 +2337,9 @@ final class TypedHTTPServerUpgradeTestCase: HTTPServerUpgradeTestCase { default: break } - } - ) { (context) in - } + }, + { _ in } + ) try client.close(mode: .output).wait() try connectedServer.closeFuture.wait() @@ -2358,9 +2358,9 @@ final class TypedHTTPServerUpgradeTestCase: HTTPServerUpgradeTestCase { extraHandlers: [], upgradeErrorHandler: { error in XCTFail("Error: \(error)") - } - ) { (context) in - } + }, + { _ in } + ) let request = "OPTIONS * HTTP/1.1\r\nHost: localhost\r\nUpgrade: myproto\r\nKafkaesque: yup\r\nConnection: upgrade\r\nConnection: kafkaesque\r\n\r\n" @@ -2370,7 +2370,7 @@ final class TypedHTTPServerUpgradeTestCase: HTTPServerUpgradeTestCase { XCTAssertEqual(upgradePerformed.wrappedValue, true) } - /// Test that sending an unfinished upgrade request and closing immediately throws + /// Test that sending an unfinished upgrade request and closing immediately throws /// an input closed error func testSendUnfinishedRequestCloseImmediately() throws { let errorCaught = UnsafeMutableTransferBox(false) @@ -2387,9 +2387,9 @@ final class TypedHTTPServerUpgradeTestCase: HTTPServerUpgradeTestCase { default: XCTFail("Error: \(error)") } - } - ) { (context) in - } + }, + { _ in } + ) let request = "OPTIONS * HTTP/1.1\r\nHost: localhost\r\ncontent-length: 10\r\nUpgrade: myproto\r\nKafkaesque: yup\r\nConnection: upgrade\r\nConnection: kafkaesque\r\n\r\n"