From 09d6bcd50606705bcd7229b4b0c069237ec2f451 Mon Sep 17 00:00:00 2001 From: Tim <0xtimc@gmail.com> Date: Thu, 16 Jan 2025 20:13:24 +0000 Subject: [PATCH 1/7] Add SynchronousOperations version of Position for non sendable handlers --- Sources/NIOCore/ChannelPipeline.swift | 58 +++++++++++++++++++----- Sources/NIOHTTP1/HTTPPipelineSetup.swift | 40 ++++++++-------- 2 files changed, 68 insertions(+), 30 deletions(-) diff --git a/Sources/NIOCore/ChannelPipeline.swift b/Sources/NIOCore/ChannelPipeline.swift index 7c4b0853a48..3cbb3ff3b98 100644 --- a/Sources/NIOCore/ChannelPipeline.swift +++ b/Sources/NIOCore/ChannelPipeline.swift @@ -175,11 +175,12 @@ public final class ChannelPipeline: ChannelInvoker { ) -> EventLoopFuture { let future: EventLoopFuture + let syncPosition = ChannelPipeline.SynchronousOperations.Position(position) if self.eventLoop.inEventLoop { - future = self.eventLoop.makeCompletedFuture(self.addHandlerSync(handler, name: name, position: position)) + future = self.eventLoop.makeCompletedFuture(self.addHandlerSync(handler, name: name, position: syncPosition)) } else { future = self.eventLoop.submit { - try self.addHandlerSync(handler, name: name, position: position).get() + try self.addHandlerSync(handler, name: name, position: syncPosition).get() } } @@ -198,7 +199,7 @@ public final class ChannelPipeline: ChannelInvoker { fileprivate func addHandlerSync( _ handler: ChannelHandler, name: String? = nil, - position: ChannelPipeline.Position = .last + position: ChannelPipeline.SynchronousOperations.Position = .last ) -> Result { self.eventLoop.assertInEventLoop() @@ -1122,11 +1123,12 @@ extension ChannelPipeline { _ handlers: [ChannelHandler & Sendable], position: ChannelPipeline.Position ) -> Result { - switch position { + let syncPosition = ChannelPipeline.SynchronousOperations.Position(position) + switch syncPosition { case .first, .after: - return self._addHandlersSync(handlers.reversed(), position: position) + return self._addHandlersSync(handlers.reversed(), position: syncPosition) case .last, .before: - return self._addHandlersSync(handlers, position: position) + return self._addHandlersSync(handlers, position: syncPosition) } } @@ -1143,7 +1145,7 @@ extension ChannelPipeline { /// - Returns: A result representing whether the handlers were added or not. fileprivate func addHandlersSyncNotSendable( _ handlers: [ChannelHandler], - position: ChannelPipeline.Position + position: ChannelPipeline.SynchronousOperations.Position ) -> Result { switch position { case .first, .after: @@ -1162,7 +1164,7 @@ extension ChannelPipeline { /// - Returns: A result representing whether the handlers were added or not. private func _addHandlersSync( _ handlers: Handlers, - position: ChannelPipeline.Position + position: ChannelPipeline.SynchronousOperations.Position ) -> Result where Handlers.Element == ChannelHandler & Sendable { self.eventLoop.assertInEventLoop() @@ -1191,7 +1193,7 @@ extension ChannelPipeline { /// - Returns: A result representing whether the handlers were added or not. private func _addHandlersSyncNotSendable( _ handlers: Handlers, - position: ChannelPipeline.Position + position: ChannelPipeline.SynchronousOperations.Position ) -> Result where Handlers.Element == ChannelHandler { self.eventLoop.assertInEventLoop() @@ -1238,7 +1240,7 @@ extension ChannelPipeline { public func addHandler( _ handler: ChannelHandler, name: String? = nil, - position: ChannelPipeline.Position = .last + position: ChannelPipeline.SynchronousOperations.Position = .last ) throws { try self._pipeline.addHandlerSync(handler, name: name, position: position).get() } @@ -1251,7 +1253,7 @@ extension ChannelPipeline { /// - position: The position in the `ChannelPipeline` to add `handlers`. Defaults to `.last`. public func addHandlers( _ handlers: [ChannelHandler], - position: ChannelPipeline.Position = .last + position: ChannelPipeline.SynchronousOperations.Position = .last ) throws { try self._pipeline.addHandlersSyncNotSendable(handlers, position: position).get() } @@ -1264,7 +1266,7 @@ extension ChannelPipeline { /// - position: The position in the `ChannelPipeline` to add `handlers`. Defaults to `.last`. public func addHandlers( _ handlers: ChannelHandler..., - position: ChannelPipeline.Position = .last + position: ChannelPipeline.SynchronousOperations.Position = .last ) throws { try self._pipeline.addHandlersSyncNotSendable(handlers, position: position).get() } @@ -1574,6 +1576,38 @@ extension ChannelPipeline { } } +extension ChannelPipeline.SynchronousOperations { + /// A `Position` within the `ChannelPipeline`'s `SynchronousOperations` used to insert non-sendable handlers + /// into the `ChannelPipeline` at a certain position. + @preconcurrency + public enum Position { + /// The first `ChannelHandler` -- the front of the `ChannelPipeline`. + case first + + /// The last `ChannelHandler` -- the back of the `ChannelPipeline`. + case last + + /// Before the given `ChannelHandler`. + case before(ChannelHandler) + + /// After the given `ChannelHandler`. + case after(ChannelHandler) + + package init(_ position: ChannelPipeline.Position) { + switch position { + case .first: + self = .first + case .last: + self = .last + case .before(let handler): + self = .before(handler) + case .after(let handler): + self = .after(handler) + } + } + } +} + /// Special `ChannelHandler` that forwards all events to the `Channel.Unsafe` implementation. final class HeadChannelHandler: _ChannelOutboundHandler, Sendable { diff --git a/Sources/NIOHTTP1/HTTPPipelineSetup.swift b/Sources/NIOHTTP1/HTTPPipelineSetup.swift index a789b8f6a1c..00e644954a7 100644 --- a/Sources/NIOHTTP1/HTTPPipelineSetup.swift +++ b/Sources/NIOHTTP1/HTTPPipelineSetup.swift @@ -83,10 +83,11 @@ extension ChannelPipeline { ) -> EventLoopFuture { let future: EventLoopFuture + let syncPosition = ChannelPipeline.SynchronousOperations.Position(position) if self.eventLoop.inEventLoop { let result = Result { try self.syncOperations.addHTTPClientHandlers( - position: position, + position: syncPosition, leftOverBytesStrategy: leftOverBytesStrategy, withClientUpgrade: upgrade ) @@ -95,7 +96,7 @@ extension ChannelPipeline { } else { future = self.eventLoop.submit { try self.syncOperations.addHTTPClientHandlers( - position: position, + position: syncPosition, leftOverBytesStrategy: leftOverBytesStrategy, withClientUpgrade: upgrade ) @@ -126,10 +127,11 @@ extension ChannelPipeline { ) -> EventLoopFuture { let future: EventLoopFuture + let syncPosition = ChannelPipeline.SynchronousOperations.Position(position) if self.eventLoop.inEventLoop { let result = Result { try self.syncOperations.addHTTPClientHandlers( - position: position, + position: syncPosition, leftOverBytesStrategy: leftOverBytesStrategy, enableOutboundHeaderValidation: enableOutboundHeaderValidation, withClientUpgrade: upgrade @@ -139,7 +141,7 @@ extension ChannelPipeline { } else { future = self.eventLoop.submit { try self.syncOperations.addHTTPClientHandlers( - position: position, + position: syncPosition, leftOverBytesStrategy: leftOverBytesStrategy, enableOutboundHeaderValidation: enableOutboundHeaderValidation, withClientUpgrade: upgrade @@ -173,10 +175,11 @@ extension ChannelPipeline { ) -> EventLoopFuture { let future: EventLoopFuture + let syncPosition = ChannelPipeline.SynchronousOperations.Position(position) if self.eventLoop.inEventLoop { let result = Result { try self.syncOperations.addHTTPClientHandlers( - position: position, + position: syncPosition, leftOverBytesStrategy: leftOverBytesStrategy, enableOutboundHeaderValidation: enableOutboundHeaderValidation, encoderConfiguration: encoderConfiguration, @@ -187,7 +190,7 @@ extension ChannelPipeline { } else { future = self.eventLoop.submit { try self.syncOperations.addHTTPClientHandlers( - position: position, + position: syncPosition, leftOverBytesStrategy: leftOverBytesStrategy, enableOutboundHeaderValidation: enableOutboundHeaderValidation, encoderConfiguration: encoderConfiguration, @@ -342,10 +345,11 @@ extension ChannelPipeline { ) -> EventLoopFuture { let future: EventLoopFuture + let syncPosition = ChannelPipeline.SynchronousOperations.Position(position) if self.eventLoop.inEventLoop { let result = Result { try self.syncOperations.configureHTTPServerPipeline( - position: position, + position: syncPosition, withPipeliningAssistance: pipelining, withServerUpgrade: upgrade, withErrorHandling: errorHandling, @@ -357,7 +361,7 @@ extension ChannelPipeline { } else { future = self.eventLoop.submit { try self.syncOperations.configureHTTPServerPipeline( - position: position, + position: syncPosition, withPipeliningAssistance: pipelining, withServerUpgrade: upgrade, withErrorHandling: errorHandling, @@ -386,7 +390,7 @@ extension ChannelPipeline.SynchronousOperations { /// - Throws: If the pipeline could not be configured. @preconcurrency public func addHTTPClientHandlers( - position: ChannelPipeline.Position = .last, + position: ChannelPipeline.SynchronousOperations.Position = .last, leftOverBytesStrategy: RemoveAfterUpgradeStrategy = .dropBytes, withClientUpgrade upgrade: NIOHTTPClientUpgradeConfiguration? = nil ) throws { @@ -411,7 +415,7 @@ extension ChannelPipeline.SynchronousOperations { /// for more details. /// - Throws: If the pipeline could not be configured. public func addHTTPClientHandlers( - position: ChannelPipeline.Position = .last, + position: ChannelPipeline.SynchronousOperations.Position = .last, leftOverBytesStrategy: RemoveAfterUpgradeStrategy = .dropBytes, enableOutboundHeaderValidation: Bool = true, withClientUpgrade upgrade: NIOHTTPClientUpgradeConfiguration? = nil @@ -439,7 +443,7 @@ extension ChannelPipeline.SynchronousOperations { /// for more details. /// - Throws: If the pipeline could not be configured. public func addHTTPClientHandlers( - position: ChannelPipeline.Position = .last, + position: ChannelPipeline.SynchronousOperations.Position = .last, leftOverBytesStrategy: RemoveAfterUpgradeStrategy = .dropBytes, enableOutboundHeaderValidation: Bool = true, encoderConfiguration: HTTPRequestEncoder.Configuration = .init(), @@ -455,7 +459,7 @@ extension ChannelPipeline.SynchronousOperations { } private func _addHTTPClientHandlers( - position: ChannelPipeline.Position = .last, + position: ChannelPipeline.SynchronousOperations.Position = .last, leftOverBytesStrategy: RemoveAfterUpgradeStrategy = .dropBytes, enableOutboundHeaderValidation: Bool = true, encoderConfiguration: HTTPRequestEncoder.Configuration = .init(), @@ -481,7 +485,7 @@ extension ChannelPipeline.SynchronousOperations { } private func _addHTTPClientHandlers( - position: ChannelPipeline.Position, + position: ChannelPipeline.SynchronousOperations.Position, leftOverBytesStrategy: RemoveAfterUpgradeStrategy, encoderConfiguration: HTTPRequestEncoder.Configuration ) throws { @@ -496,7 +500,7 @@ extension ChannelPipeline.SynchronousOperations { } private func _addHTTPClientHandlersFallback( - position: ChannelPipeline.Position, + position: ChannelPipeline.SynchronousOperations.Position, leftOverBytesStrategy: RemoveAfterUpgradeStrategy, enableOutboundHeaderValidation: Bool, encoderConfiguration: HTTPRequestEncoder.Configuration, @@ -550,7 +554,7 @@ extension ChannelPipeline.SynchronousOperations { /// - Throws: If the pipeline could not be configured. @preconcurrency public func configureHTTPServerPipeline( - position: ChannelPipeline.Position = .last, + position: ChannelPipeline.SynchronousOperations.Position = .last, withPipeliningAssistance pipelining: Bool = true, withServerUpgrade upgrade: NIOHTTPServerUpgradeConfiguration? = nil, withErrorHandling errorHandling: Bool = true @@ -594,7 +598,7 @@ extension ChannelPipeline.SynchronousOperations { /// spec compliance. Defaults to `true`. /// - Throws: If the pipeline could not be configured. public func configureHTTPServerPipeline( - position: ChannelPipeline.Position = .last, + position: ChannelPipeline.SynchronousOperations.Position = .last, withPipeliningAssistance pipelining: Bool = true, withServerUpgrade upgrade: NIOHTTPServerUpgradeConfiguration? = nil, withErrorHandling errorHandling: Bool = true, @@ -641,7 +645,7 @@ extension ChannelPipeline.SynchronousOperations { /// - encoderConfiguration: The configuration for the ``HTTPRequestEncoder``. /// - Throws: If the pipeline could not be configured. public func configureHTTPServerPipeline( - position: ChannelPipeline.Position = .last, + position: ChannelPipeline.SynchronousOperations.Position = .last, withPipeliningAssistance pipelining: Bool = true, withServerUpgrade upgrade: NIOHTTPServerUpgradeConfiguration? = nil, withErrorHandling errorHandling: Bool = true, @@ -659,7 +663,7 @@ extension ChannelPipeline.SynchronousOperations { } private func _configureHTTPServerPipeline( - position: ChannelPipeline.Position = .last, + position: ChannelPipeline.SynchronousOperations.Position = .last, withPipeliningAssistance pipelining: Bool = true, withServerUpgrade upgrade: NIOHTTPServerUpgradeConfiguration? = nil, withErrorHandling errorHandling: Bool = true, From 7160c77876d9357087327b0ecc412fb7f4d78761 Mon Sep 17 00:00:00 2001 From: Tim <0xtimc@gmail.com> Date: Fri, 17 Jan 2025 19:27:46 +0000 Subject: [PATCH 2/7] Add back ChannelPipeline functions and mark them as deprecated --- Sources/NIOCore/ChannelPipeline.swift | 50 +++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/Sources/NIOCore/ChannelPipeline.swift b/Sources/NIOCore/ChannelPipeline.swift index 3cbb3ff3b98..e15c93b6e63 100644 --- a/Sources/NIOCore/ChannelPipeline.swift +++ b/Sources/NIOCore/ChannelPipeline.swift @@ -1245,6 +1245,24 @@ extension ChannelPipeline { try self._pipeline.addHandlerSync(handler, name: name, position: position).get() } + /// Add a handler to the pipeline. + /// + /// - Important: This *must* be called on the event loop. + /// - Parameters: + /// - handler: The handler to add. + /// - name: The name to use for the `ChannelHandler` when it's added. If no name is specified the one will be generated. + /// - position: The position in the `ChannelPipeline` to add `handler`. Defaults to `.last`. + @available(*, deprecated, message: "Use ChannelPipeline.SynchronousOperations.Position instead") + @_disfavoredOverload + public func addHandler( + _ handler: ChannelHandler, + name: String? = nil, + position: ChannelPipeline.Position = .last + ) throws { + let syncPosition = ChannelPipeline.SynchronousOperations.Position(position) + try self._pipeline.addHandlerSync(handler, name: name, position: syncPosition).get() + } + /// Add an array of handlers to the pipeline. /// /// - Important: This *must* be called on the event loop. @@ -1258,6 +1276,22 @@ extension ChannelPipeline { try self._pipeline.addHandlersSyncNotSendable(handlers, position: position).get() } + /// Add an array of handlers to the pipeline. + /// + /// - Important: This *must* be called on the event loop. + /// - Parameters: + /// - handlers: The handlers to add. + /// - position: The position in the `ChannelPipeline` to add `handlers`. Defaults to `.last`. + @available(*, deprecated, message: "Use ChannelPipeline.SynchronousOperations.Position instead") + @_disfavoredOverload + public func addHandlers( + _ handlers: [ChannelHandler], + position: ChannelPipeline.Position = .last + ) throws { + let syncPosition = ChannelPipeline.SynchronousOperations.Position(position) + try self._pipeline.addHandlersSyncNotSendable(handlers, position: syncPosition).get() + } + /// Add one or more handlers to the pipeline. /// /// - Important: This *must* be called on the event loop. @@ -1271,6 +1305,22 @@ extension ChannelPipeline { try self._pipeline.addHandlersSyncNotSendable(handlers, position: position).get() } + /// Add one or more handlers to the pipeline. + /// + /// - Important: This *must* be called on the event loop. + /// - Parameters: + /// - handlers: The handlers to add. + /// - position: The position in the `ChannelPipeline` to add `handlers`. Defaults to `.last`. + @available(*, deprecated, message: "Use ChannelPipeline.SynchronousOperations.Position instead") + @_disfavoredOverload + public func addHandlers( + _ handlers: ChannelHandler..., + position: ChannelPipeline.Position = .last + ) throws { + let syncPosition = ChannelPipeline.SynchronousOperations.Position(position) + try self._pipeline.addHandlersSyncNotSendable(handlers, position: syncPosition).get() + } + /// Remove a `ChannelHandler` from the `ChannelPipeline`. /// /// - Parameters: From e1c75f740ff2e29fe2569ec130217c8761a4cf41 Mon Sep 17 00:00:00 2001 From: Tim <0xtimc@gmail.com> Date: Fri, 17 Jan 2025 19:32:40 +0000 Subject: [PATCH 3/7] Add deprecated versions of the old HTTPPipelineSetup functions --- Sources/NIOHTTP1/HTTPPipelineSetup.swift | 239 ++++++++++++++++++++++- 1 file changed, 237 insertions(+), 2 deletions(-) diff --git a/Sources/NIOHTTP1/HTTPPipelineSetup.swift b/Sources/NIOHTTP1/HTTPPipelineSetup.swift index 00e644954a7..aa57656b813 100644 --- a/Sources/NIOHTTP1/HTTPPipelineSetup.swift +++ b/Sources/NIOHTTP1/HTTPPipelineSetup.swift @@ -388,7 +388,6 @@ extension ChannelPipeline.SynchronousOperations { /// the upgrade completion handler. See the documentation on `HTTPClientUpgradeHandler` /// for more details. /// - Throws: If the pipeline could not be configured. - @preconcurrency public func addHTTPClientHandlers( position: ChannelPipeline.SynchronousOperations.Position = .last, leftOverBytesStrategy: RemoveAfterUpgradeStrategy = .dropBytes, @@ -401,6 +400,34 @@ extension ChannelPipeline.SynchronousOperations { ) } + /// Configure a `ChannelPipeline` for use as a HTTP client with a client upgrader configuration. + /// + /// - important: This **must** be called on the Channel's event loop. + /// - Parameters: + /// - position: The position in the `ChannelPipeline` where to add the HTTP client handlers. Defaults to `.last`. + /// - leftOverBytesStrategy: The strategy to use when dealing with leftover bytes after removing the `HTTPDecoder` + /// from the pipeline. + /// - upgrade: Add a `HTTPClientUpgradeHandler` to the pipeline, configured for + /// HTTP upgrade. Should be a tuple of an array of `HTTPClientProtocolUpgrader` and + /// the upgrade completion handler. See the documentation on `HTTPClientUpgradeHandler` + /// for more details. + /// - Throws: If the pipeline could not be configured. + @available(*, deprecated, message: "Use ChannelPipeline.SynchronousOperations.Position instead") + @_disfavoredOverload + @preconcurrency + public func addHTTPClientHandlers( + position: ChannelPipeline.Position = .last, + leftOverBytesStrategy: RemoveAfterUpgradeStrategy = .dropBytes, + withClientUpgrade upgrade: NIOHTTPClientUpgradeConfiguration? = nil + ) throws { + let syncPosition = ChannelPipeline.SynchronousOperations.Position(position) + try self._addHTTPClientHandlers( + position: syncPosition, + leftOverBytesStrategy: leftOverBytesStrategy, + withClientUpgrade: upgrade + ) + } + /// Configure a `ChannelPipeline` for use as a HTTP client. /// /// - important: This **must** be called on the Channel's event loop. @@ -428,6 +455,36 @@ extension ChannelPipeline.SynchronousOperations { ) } + /// Configure a `ChannelPipeline` for use as a HTTP client. + /// + /// - important: This **must** be called on the Channel's event loop. + /// - Parameters: + /// - position: The position in the `ChannelPipeline` where to add the HTTP client handlers. Defaults to `.last`. + /// - leftOverBytesStrategy: The strategy to use when dealing with leftover bytes after removing the `HTTPDecoder` + /// from the pipeline. + /// - enableOutboundHeaderValidation: Whether or not request header validation is enforced. + /// - upgrade: Add a ``NIOHTTPClientUpgradeHandler`` to the pipeline, configured for + /// HTTP upgrade. Should be a tuple of an array of ``NIOHTTPClientProtocolUpgrader`` and + /// the upgrade completion handler. See the documentation on ``NIOHTTPClientUpgradeHandler`` + /// for more details. + /// - Throws: If the pipeline could not be configured. + @available(*, deprecated, message: "Use ChannelPipeline.SynchronousOperations.Position instead") + @_disfavoredOverload + public func addHTTPClientHandlers( + position: ChannelPipeline.Position = .last, + leftOverBytesStrategy: RemoveAfterUpgradeStrategy = .dropBytes, + enableOutboundHeaderValidation: Bool = true, + withClientUpgrade upgrade: NIOHTTPClientUpgradeConfiguration? = nil + ) throws { + let syncPosition = ChannelPipeline.SynchronousOperations.Position(position) + try self._addHTTPClientHandlers( + position: syncPosition, + leftOverBytesStrategy: leftOverBytesStrategy, + enableOutboundHeaderValidation: enableOutboundHeaderValidation, + withClientUpgrade: upgrade + ) + } + /// Configure a `ChannelPipeline` for use as a HTTP client. /// /// - important: This **must** be called on the Channel's event loop. @@ -458,6 +515,39 @@ extension ChannelPipeline.SynchronousOperations { ) } + /// Configure a `ChannelPipeline` for use as a HTTP client. + /// + /// - important: This **must** be called on the Channel's event loop. + /// - Parameters: + /// - position: The position in the `ChannelPipeline` where to add the HTTP client handlers. Defaults to `.last`. + /// - leftOverBytesStrategy: The strategy to use when dealing with leftover bytes after removing the `HTTPDecoder` + /// from the pipeline. + /// - enableOutboundHeaderValidation: Whether or not request header validation is enforced. + /// - encoderConfiguration: The configuration for the ``HTTPRequestEncoder``. + /// - upgrade: Add a ``NIOHTTPClientUpgradeHandler`` to the pipeline, configured for + /// HTTP upgrade. Should be a tuple of an array of ``NIOHTTPClientProtocolUpgrader`` and + /// the upgrade completion handler. See the documentation on ``NIOHTTPClientUpgradeHandler`` + /// for more details. + /// - Throws: If the pipeline could not be configured. + @available(*, deprecated, message: "Use ChannelPipeline.SynchronousOperations.Position instead") + @_disfavoredOverload + public func addHTTPClientHandlers( + position: ChannelPipeline.Position = .last, + leftOverBytesStrategy: RemoveAfterUpgradeStrategy = .dropBytes, + enableOutboundHeaderValidation: Bool = true, + encoderConfiguration: HTTPRequestEncoder.Configuration = .init(), + withClientUpgrade upgrade: NIOHTTPClientUpgradeConfiguration? = nil + ) throws { + let syncPosition = ChannelPipeline.SynchronousOperations.Position(position) + try self._addHTTPClientHandlers( + position: syncPosition, + leftOverBytesStrategy: leftOverBytesStrategy, + enableOutboundHeaderValidation: enableOutboundHeaderValidation, + encoderConfiguration: encoderConfiguration, + withClientUpgrade: upgrade + ) + } + private func _addHTTPClientHandlers( position: ChannelPipeline.SynchronousOperations.Position = .last, leftOverBytesStrategy: RemoveAfterUpgradeStrategy = .dropBytes, @@ -526,6 +616,7 @@ extension ChannelPipeline.SynchronousOperations { try self.addHandlers(handlers, position: position) } + /// Configure a `ChannelPipeline` for use as a HTTP server. /// /// This function knows how to set up all first-party HTTP channel handlers appropriately @@ -552,7 +643,6 @@ extension ChannelPipeline.SynchronousOperations { /// - errorHandling: Whether to provide assistance handling protocol errors (e.g. /// failure to parse the HTTP request) by sending 400 errors. Defaults to `true`. /// - Throws: If the pipeline could not be configured. - @preconcurrency public func configureHTTPServerPipeline( position: ChannelPipeline.SynchronousOperations.Position = .last, withPipeliningAssistance pipelining: Bool = true, @@ -567,6 +657,50 @@ extension ChannelPipeline.SynchronousOperations { ) } + /// Configure a `ChannelPipeline` for use as a HTTP server. + /// + /// This function knows how to set up all first-party HTTP channel handlers appropriately + /// for server use. It supports the following features: + /// + /// 1. Providing assistance handling clients that pipeline HTTP requests, using the + /// `HTTPServerPipelineHandler`. + /// 2. Supporting HTTP upgrade, using the `HTTPServerUpgradeHandler`. + /// + /// This method will likely be extended in future with more support for other first-party + /// features. + /// + /// - important: This **must** be called on the Channel's event loop. + /// - Parameters: + /// - position: Where in the pipeline to add the HTTP server handlers, defaults to `.last`. + /// - pipelining: Whether to provide assistance handling HTTP clients that pipeline + /// their requests. Defaults to `true`. If `false`, users will need to handle + /// clients that pipeline themselves. + /// - upgrade: Whether to add a `HTTPServerUpgradeHandler` to the pipeline, configured for + /// HTTP upgrade. Defaults to `nil`, which will not add the handler to the pipeline. If + /// provided should be a tuple of an array of `HTTPServerProtocolUpgrader` and the upgrade + /// completion handler. See the documentation on `HTTPServerUpgradeHandler` for more + /// details. + /// - errorHandling: Whether to provide assistance handling protocol errors (e.g. + /// failure to parse the HTTP request) by sending 400 errors. Defaults to `true`. + /// - Throws: If the pipeline could not be configured. + @preconcurrency + @available(*, deprecated, message: "Use ChannelPipeline.SynchronousOperations.Position instead") + @_disfavoredOverload + public func configureHTTPServerPipeline( + position: ChannelPipeline.Position = .last, + withPipeliningAssistance pipelining: Bool = true, + withServerUpgrade upgrade: NIOHTTPServerUpgradeConfiguration? = nil, + withErrorHandling errorHandling: Bool = true + ) throws { + let syncPosition = ChannelPipeline.SynchronousOperations.Position(position) + try self._configureHTTPServerPipeline( + position: syncPosition, + withPipeliningAssistance: pipelining, + withServerUpgrade: upgrade, + withErrorHandling: errorHandling + ) + } + /// Configure a `ChannelPipeline` for use as a HTTP server. /// /// This function knows how to set up all first-party HTTP channel handlers appropriately @@ -613,6 +747,55 @@ extension ChannelPipeline.SynchronousOperations { ) } + /// Configure a `ChannelPipeline` for use as a HTTP server. + /// + /// This function knows how to set up all first-party HTTP channel handlers appropriately + /// for server use. It supports the following features: + /// + /// 1. Providing assistance handling clients that pipeline HTTP requests, using the + /// `HTTPServerPipelineHandler`. + /// 2. Supporting HTTP upgrade, using the `HTTPServerUpgradeHandler`. + /// 3. Providing assistance handling protocol errors. + /// 4. Validating outbound header fields to protect against response splitting attacks. + /// + /// This method will likely be extended in future with more support for other first-party + /// features. + /// + /// - important: This **must** be called on the Channel's event loop. + /// - Parameters: + /// - position: Where in the pipeline to add the HTTP server handlers, defaults to `.last`. + /// - pipelining: Whether to provide assistance handling HTTP clients that pipeline + /// their requests. Defaults to `true`. If `false`, users will need to handle + /// clients that pipeline themselves. + /// - upgrade: Whether to add a `HTTPServerUpgradeHandler` to the pipeline, configured for + /// HTTP upgrade. Defaults to `nil`, which will not add the handler to the pipeline. If + /// provided should be a tuple of an array of `HTTPServerProtocolUpgrader` and the upgrade + /// completion handler. See the documentation on `HTTPServerUpgradeHandler` for more + /// details. + /// - errorHandling: Whether to provide assistance handling protocol errors (e.g. + /// failure to parse the HTTP request) by sending 400 errors. Defaults to `true`. + /// - headerValidation: Whether to validate outbound request headers to confirm that they meet + /// spec compliance. Defaults to `true`. + /// - Throws: If the pipeline could not be configured. + @available(*, deprecated, message: "Use ChannelPipeline.SynchronousOperations.Position instead") + @_disfavoredOverload + public func configureHTTPServerPipeline( + position: ChannelPipeline.Position = .last, + withPipeliningAssistance pipelining: Bool = true, + withServerUpgrade upgrade: NIOHTTPServerUpgradeConfiguration? = nil, + withErrorHandling errorHandling: Bool = true, + withOutboundHeaderValidation headerValidation: Bool = true + ) throws { + let syncPosition = ChannelPipeline.SynchronousOperations.Position(position) + try self._configureHTTPServerPipeline( + position: syncPosition, + withPipeliningAssistance: pipelining, + withServerUpgrade: upgrade, + withErrorHandling: errorHandling, + withOutboundHeaderValidation: headerValidation + ) + } + /// Configure a `ChannelPipeline` for use as a HTTP server. /// /// This function knows how to set up all first-party HTTP channel handlers appropriately @@ -662,6 +845,58 @@ extension ChannelPipeline.SynchronousOperations { ) } + /// Configure a `ChannelPipeline` for use as a HTTP server. + /// + /// This function knows how to set up all first-party HTTP channel handlers appropriately + /// for server use. It supports the following features: + /// + /// 1. Providing assistance handling clients that pipeline HTTP requests, using the + /// `HTTPServerPipelineHandler`. + /// 2. Supporting HTTP upgrade, using the `HTTPServerUpgradeHandler`. + /// 3. Providing assistance handling protocol errors. + /// 4. Validating outbound header fields to protect against response splitting attacks. + /// + /// This method will likely be extended in future with more support for other first-party + /// features. + /// + /// - important: This **must** be called on the Channel's event loop. + /// - Parameters: + /// - position: Where in the pipeline to add the HTTP server handlers, defaults to `.last`. + /// - pipelining: Whether to provide assistance handling HTTP clients that pipeline + /// their requests. Defaults to `true`. If `false`, users will need to handle + /// clients that pipeline themselves. + /// - upgrade: Whether to add a `HTTPServerUpgradeHandler` to the pipeline, configured for + /// HTTP upgrade. Defaults to `nil`, which will not add the handler to the pipeline. If + /// provided should be a tuple of an array of `HTTPServerProtocolUpgrader` and the upgrade + /// completion handler. See the documentation on `HTTPServerUpgradeHandler` for more + /// details. + /// - errorHandling: Whether to provide assistance handling protocol errors (e.g. + /// failure to parse the HTTP request) by sending 400 errors. Defaults to `true`. + /// - headerValidation: Whether to validate outbound request headers to confirm that they meet + /// spec compliance. Defaults to `true`. + /// - encoderConfiguration: The configuration for the ``HTTPRequestEncoder``. + /// - Throws: If the pipeline could not be configured. + @available(*, deprecated, message: "Use ChannelPipeline.SynchronousOperations.Position instead") + @_disfavoredOverload + public func configureHTTPServerPipeline( + position: ChannelPipeline.Position = .last, + withPipeliningAssistance pipelining: Bool = true, + withServerUpgrade upgrade: NIOHTTPServerUpgradeConfiguration? = nil, + withErrorHandling errorHandling: Bool = true, + withOutboundHeaderValidation headerValidation: Bool = true, + withEncoderConfiguration encoderConfiguration: HTTPResponseEncoder.Configuration + ) throws { + let syncPosition = ChannelPipeline.SynchronousOperations.Position(position) + try self._configureHTTPServerPipeline( + position: syncPosition, + withPipeliningAssistance: pipelining, + withServerUpgrade: upgrade, + withErrorHandling: errorHandling, + withOutboundHeaderValidation: headerValidation, + withEncoderConfiguration: encoderConfiguration + ) + } + private func _configureHTTPServerPipeline( position: ChannelPipeline.SynchronousOperations.Position = .last, withPipeliningAssistance pipelining: Bool = true, From ed47fb1672d9047f755b30958b3fba5227014230 Mon Sep 17 00:00:00 2001 From: Tim <0xtimc@gmail.com> Date: Sun, 2 Feb 2025 13:36:51 +0100 Subject: [PATCH 4/7] Do the conversions on the event loop to ensure we're safe --- Sources/NIOCore/ChannelPipeline.swift | 10 +++++++--- Sources/NIOHTTP1/HTTPPipelineSetup.swift | 12 ++++++++---- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/Sources/NIOCore/ChannelPipeline.swift b/Sources/NIOCore/ChannelPipeline.swift index e15c93b6e63..bbfea54c872 100644 --- a/Sources/NIOCore/ChannelPipeline.swift +++ b/Sources/NIOCore/ChannelPipeline.swift @@ -175,11 +175,12 @@ public final class ChannelPipeline: ChannelInvoker { ) -> EventLoopFuture { let future: EventLoopFuture - let syncPosition = ChannelPipeline.SynchronousOperations.Position(position) if self.eventLoop.inEventLoop { + let syncPosition = ChannelPipeline.SynchronousOperations.Position(position) future = self.eventLoop.makeCompletedFuture(self.addHandlerSync(handler, name: name, position: syncPosition)) } else { future = self.eventLoop.submit { + let syncPosition = ChannelPipeline.SynchronousOperations.Position(position) try self.addHandlerSync(handler, name: name, position: syncPosition).get() } } @@ -1629,7 +1630,6 @@ extension ChannelPipeline { extension ChannelPipeline.SynchronousOperations { /// A `Position` within the `ChannelPipeline`'s `SynchronousOperations` used to insert non-sendable handlers /// into the `ChannelPipeline` at a certain position. - @preconcurrency public enum Position { /// The first `ChannelHandler` -- the front of the `ChannelPipeline`. case first @@ -1643,7 +1643,7 @@ extension ChannelPipeline.SynchronousOperations { /// After the given `ChannelHandler`. case after(ChannelHandler) - package init(_ position: ChannelPipeline.Position) { + public init(_ position: ChannelPipeline.Position) { switch position { case .first: self = .first @@ -1658,6 +1658,10 @@ extension ChannelPipeline.SynchronousOperations { } } +@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) +@available(*, unavailable) +extension ChannelPipeline.SynchronousOperations.Position: Sendable {} + /// Special `ChannelHandler` that forwards all events to the `Channel.Unsafe` implementation. final class HeadChannelHandler: _ChannelOutboundHandler, Sendable { diff --git a/Sources/NIOHTTP1/HTTPPipelineSetup.swift b/Sources/NIOHTTP1/HTTPPipelineSetup.swift index aa57656b813..5e935123ce5 100644 --- a/Sources/NIOHTTP1/HTTPPipelineSetup.swift +++ b/Sources/NIOHTTP1/HTTPPipelineSetup.swift @@ -83,8 +83,8 @@ extension ChannelPipeline { ) -> EventLoopFuture { let future: EventLoopFuture - let syncPosition = ChannelPipeline.SynchronousOperations.Position(position) if self.eventLoop.inEventLoop { + let syncPosition = ChannelPipeline.SynchronousOperations.Position(position) let result = Result { try self.syncOperations.addHTTPClientHandlers( position: syncPosition, @@ -95,6 +95,7 @@ extension ChannelPipeline { future = self.eventLoop.makeCompletedFuture(result) } else { future = self.eventLoop.submit { + let syncPosition = ChannelPipeline.SynchronousOperations.Position(position) try self.syncOperations.addHTTPClientHandlers( position: syncPosition, leftOverBytesStrategy: leftOverBytesStrategy, @@ -127,8 +128,8 @@ extension ChannelPipeline { ) -> EventLoopFuture { let future: EventLoopFuture - let syncPosition = ChannelPipeline.SynchronousOperations.Position(position) if self.eventLoop.inEventLoop { + let syncPosition = ChannelPipeline.SynchronousOperations.Position(position) let result = Result { try self.syncOperations.addHTTPClientHandlers( position: syncPosition, @@ -140,6 +141,7 @@ extension ChannelPipeline { future = self.eventLoop.makeCompletedFuture(result) } else { future = self.eventLoop.submit { + let syncPosition = ChannelPipeline.SynchronousOperations.Position(position) try self.syncOperations.addHTTPClientHandlers( position: syncPosition, leftOverBytesStrategy: leftOverBytesStrategy, @@ -175,8 +177,8 @@ extension ChannelPipeline { ) -> EventLoopFuture { let future: EventLoopFuture - let syncPosition = ChannelPipeline.SynchronousOperations.Position(position) if self.eventLoop.inEventLoop { + let syncPosition = ChannelPipeline.SynchronousOperations.Position(position) let result = Result { try self.syncOperations.addHTTPClientHandlers( position: syncPosition, @@ -189,6 +191,7 @@ extension ChannelPipeline { future = self.eventLoop.makeCompletedFuture(result) } else { future = self.eventLoop.submit { + let syncPosition = ChannelPipeline.SynchronousOperations.Position(position) try self.syncOperations.addHTTPClientHandlers( position: syncPosition, leftOverBytesStrategy: leftOverBytesStrategy, @@ -345,8 +348,8 @@ extension ChannelPipeline { ) -> EventLoopFuture { let future: EventLoopFuture - let syncPosition = ChannelPipeline.SynchronousOperations.Position(position) if self.eventLoop.inEventLoop { + let syncPosition = ChannelPipeline.SynchronousOperations.Position(position) let result = Result { try self.syncOperations.configureHTTPServerPipeline( position: syncPosition, @@ -360,6 +363,7 @@ extension ChannelPipeline { future = self.eventLoop.makeCompletedFuture(result) } else { future = self.eventLoop.submit { + let syncPosition = ChannelPipeline.SynchronousOperations.Position(position) try self.syncOperations.configureHTTPServerPipeline( position: syncPosition, withPipeliningAssistance: pipelining, From bb4a2d6d2e1224456942339c82575e12b3d03c73 Mon Sep 17 00:00:00 2001 From: Tim <0xtimc@gmail.com> Date: Mon, 3 Feb 2025 14:28:19 +0000 Subject: [PATCH 5/7] Add tests for sync position --- Tests/NIOPosixTests/ChannelPipelineTest.swift | 126 ++++++++++++++++++ 1 file changed, 126 insertions(+) diff --git a/Tests/NIOPosixTests/ChannelPipelineTest.swift b/Tests/NIOPosixTests/ChannelPipelineTest.swift index 130e39fd2c7..d20e96ee4c1 100644 --- a/Tests/NIOPosixTests/ChannelPipelineTest.swift +++ b/Tests/NIOPosixTests/ChannelPipelineTest.swift @@ -2693,6 +2693,132 @@ class ChannelPipelineTest: XCTestCase { XCTAssertTrue(try channel.finish().isClean) } + + func testAddAfterForSynchronousPosition() { + let channel = EmbeddedChannel() + defer { + XCTAssertNoThrow(try channel.finish()) + } + + let firstHandler = IndexWritingHandler(1) + XCTAssertNoThrow(try channel.pipeline.addHandler(firstHandler).wait()) + XCTAssertNoThrow(try channel.pipeline.addHandler(IndexWritingHandler(2)).wait()) + XCTAssertNoThrow( + try channel.pipeline.addHandler( + IndexWritingHandler(3), + position: .after(firstHandler) + ).wait() + ) + + channel.assertReadIndexOrder([1, 3, 2]) + channel.assertWriteIndexOrder([2, 3, 1]) + } + + func testAddBeforeForSynchronousPosition() { + let channel = EmbeddedChannel() + defer { + XCTAssertNoThrow(try channel.finish()) + } + + let secondHandler = IndexWritingHandler(2) + XCTAssertNoThrow(try channel.pipeline.addHandler(IndexWritingHandler(1)).wait()) + XCTAssertNoThrow(try channel.pipeline.addHandler(secondHandler).wait()) + XCTAssertNoThrow( + try channel.pipeline.addHandler( + IndexWritingHandler(3), + position: .before(secondHandler) + ).wait() + ) + + channel.assertReadIndexOrder([1, 3, 2]) + channel.assertWriteIndexOrder([2, 3, 1]) + } + + func testAddAfterLastForSynchronousPosition() { + let channel = EmbeddedChannel() + defer { + XCTAssertNoThrow(try channel.finish()) + } + + let secondHandler = IndexWritingHandler(2) + XCTAssertNoThrow(try channel.pipeline.syncOperations.addHandler(IndexWritingHandler(1))) + XCTAssertNoThrow(try channel.pipeline.syncOperations.addHandler(secondHandler)) + XCTAssertNoThrow( + try channel.pipeline.syncOperations.addHandler( + IndexWritingHandler(3), + position: .after(secondHandler) + ) + ) + + channel.assertReadIndexOrder([1, 2, 3]) + channel.assertWriteIndexOrder([3, 2, 1]) + } + + func testAddBeforeFirstForSynchronousPosition() { + let channel = EmbeddedChannel() + defer { + XCTAssertNoThrow(try channel.finish()) + } + + let firstHandler = IndexWritingHandler(1) + XCTAssertNoThrow(try channel.pipeline.syncOperations.addHandler(firstHandler)) + XCTAssertNoThrow(try channel.pipeline.syncOperations.addHandler(IndexWritingHandler(2))) + XCTAssertNoThrow( + try channel.pipeline.syncOperations.addHandler( + IndexWritingHandler(3), + position: .before(firstHandler) + ) + ) + + channel.assertReadIndexOrder([3, 1, 2]) + channel.assertWriteIndexOrder([2, 1, 3]) + } + + func testAddAfterWhileClosedForSynchronousPosition() { + let channel = EmbeddedChannel() + defer { + XCTAssertThrowsError(try channel.finish()) { error in + XCTAssertEqual(.alreadyClosed, error as? ChannelError) + } + } + + let handler = IndexWritingHandler(1) + XCTAssertNoThrow(try channel.pipeline.syncOperations.addHandler(handler)) + XCTAssertNoThrow(try channel.close().wait()) + channel.embeddedEventLoop.run() + + XCTAssertThrowsError( + try channel.pipeline.syncOperations.addHandler( + IndexWritingHandler(2), + position: .after(handler) + ) + ) { error in + XCTAssertEqual(.ioOnClosedChannel, error as? ChannelError) + } + } + + func testAddBeforeWhileClosedForSynchronousPosition() { + let channel = EmbeddedChannel() + defer { + XCTAssertThrowsError(try channel.finish()) { error in + XCTAssertEqual(.alreadyClosed, error as? ChannelError) + } + } + + let handler = IndexWritingHandler(1) + XCTAssertNoThrow(try channel.pipeline.syncOperations.addHandler(handler)) + XCTAssertNoThrow(try channel.close().wait()) + channel.embeddedEventLoop.run() + + XCTAssertThrowsError( + try channel.pipeline.syncOperations.addHandler( + IndexWritingHandler(2), + position: .before(handler) + ) + ) { error in + XCTAssertEqual(.ioOnClosedChannel, error as? ChannelError) + } + } } // this should be within `testAddMultipleHandlers` but https://bugs.swift.org/browse/SR-9956 From 1c647ded3534bf6677c9610b3cc68fd1c07b286d Mon Sep 17 00:00:00 2001 From: Tim <0xtimc@gmail.com> Date: Mon, 3 Feb 2025 15:04:00 +0000 Subject: [PATCH 6/7] =?UTF-8?q?=F0=9F=A4=A6=E2=80=8D=E2=99=82=EF=B8=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Tests/NIOPosixTests/ChannelPipelineTest.swift | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/Tests/NIOPosixTests/ChannelPipelineTest.swift b/Tests/NIOPosixTests/ChannelPipelineTest.swift index d20e96ee4c1..d00f460d05a 100644 --- a/Tests/NIOPosixTests/ChannelPipelineTest.swift +++ b/Tests/NIOPosixTests/ChannelPipelineTest.swift @@ -2701,13 +2701,13 @@ class ChannelPipelineTest: XCTestCase { } let firstHandler = IndexWritingHandler(1) - XCTAssertNoThrow(try channel.pipeline.addHandler(firstHandler).wait()) - XCTAssertNoThrow(try channel.pipeline.addHandler(IndexWritingHandler(2)).wait()) + XCTAssertNoThrow(try channel.pipeline.syncOperations.addHandler(firstHandler)) + XCTAssertNoThrow(try channel.pipeline.syncOperations.addHandler(IndexWritingHandler(2))) XCTAssertNoThrow( - try channel.pipeline.addHandler( + try channel.pipeline.syncOperations.addHandler( IndexWritingHandler(3), position: .after(firstHandler) - ).wait() + ) ) channel.assertReadIndexOrder([1, 3, 2]) @@ -2721,13 +2721,13 @@ class ChannelPipelineTest: XCTestCase { } let secondHandler = IndexWritingHandler(2) - XCTAssertNoThrow(try channel.pipeline.addHandler(IndexWritingHandler(1)).wait()) - XCTAssertNoThrow(try channel.pipeline.addHandler(secondHandler).wait()) + XCTAssertNoThrow(try channel.pipeline.syncOperations.addHandler(IndexWritingHandler(1))) + XCTAssertNoThrow(try channel.pipeline.syncOperations.addHandler(secondHandler)) XCTAssertNoThrow( - try channel.pipeline.addHandler( + try channel.pipeline.syncOperations.addHandler( IndexWritingHandler(3), position: .before(secondHandler) - ).wait() + ) ) channel.assertReadIndexOrder([1, 3, 2]) From 863e209bf55349aa0185cbca545d484302ac9dc1 Mon Sep 17 00:00:00 2001 From: Tim <0xtimc@gmail.com> Date: Mon, 3 Feb 2025 17:38:22 +0000 Subject: [PATCH 7/7] Formatting --- Sources/NIOCore/ChannelPipeline.swift | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Sources/NIOCore/ChannelPipeline.swift b/Sources/NIOCore/ChannelPipeline.swift index bbfea54c872..6b63c5e2bb6 100644 --- a/Sources/NIOCore/ChannelPipeline.swift +++ b/Sources/NIOCore/ChannelPipeline.swift @@ -177,7 +177,9 @@ public final class ChannelPipeline: ChannelInvoker { if self.eventLoop.inEventLoop { let syncPosition = ChannelPipeline.SynchronousOperations.Position(position) - future = self.eventLoop.makeCompletedFuture(self.addHandlerSync(handler, name: name, position: syncPosition)) + future = self.eventLoop.makeCompletedFuture( + self.addHandlerSync(handler, name: name, position: syncPosition) + ) } else { future = self.eventLoop.submit { let syncPosition = ChannelPipeline.SynchronousOperations.Position(position)