Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Sources/NIOCore/AsyncAwaitSupport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ extension ChannelPipeline {
}

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
@available(
*,
deprecated,
message: "Use .syncOperations.removeHandler(context:) instead, this method is not Sendable-safe."
)
public func removeHandler(context: ChannelHandlerContext) async throws {
try await self.removeHandler(context: context).get()
}
Expand Down
154 changes: 125 additions & 29 deletions Sources/NIOCore/ChannelPipeline.swift
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,9 @@ public final class ChannelPipeline: ChannelInvoker {
/// - handler: the `ChannelHandler` to add
/// - position: The position in the `ChannelPipeline` to add `handler`. Defaults to `.last`.
/// - returns: the `EventLoopFuture` which will be notified once the `ChannelHandler` was added.
@preconcurrency
public func addHandler(
_ handler: ChannelHandler,
_ handler: ChannelHandler & Sendable,
name: String? = nil,
position: ChannelPipeline.Position = .last
) -> EventLoopFuture<Void> {
Expand Down Expand Up @@ -349,7 +350,8 @@ public final class ChannelPipeline: ChannelInvoker {
/// - parameters:
/// - handler: the `ChannelHandler` to remove.
/// - returns: the `EventLoopFuture` which will be notified once the `ChannelHandler` was removed.
public func removeHandler(_ handler: RemovableChannelHandler) -> EventLoopFuture<Void> {
@preconcurrency
public func removeHandler(_ handler: RemovableChannelHandler & Sendable) -> EventLoopFuture<Void> {
let promise = self.eventLoop.makePromise(of: Void.self)
self.removeHandler(handler, promise: promise)
return promise.futureResult
Expand All @@ -371,6 +373,11 @@ public final class ChannelPipeline: ChannelInvoker {
/// - parameters:
/// - context: the `ChannelHandlerContext` that belongs to `ChannelHandler` that should be removed.
/// - returns: the `EventLoopFuture` which will be notified once the `ChannelHandler` was removed.
@available(
*,
deprecated,
message: "Use .syncOperations.removeHandler(context:) instead, this method is not Sendable-safe."
)
public func removeHandler(context: ChannelHandlerContext) -> EventLoopFuture<Void> {
let promise = self.eventLoop.makePromise(of: Void.self)
self.removeHandler(context: context, promise: promise)
Expand All @@ -382,14 +389,11 @@ public final class ChannelPipeline: ChannelInvoker {
/// - parameters:
/// - handler: the `ChannelHandler` to remove.
/// - promise: An `EventLoopPromise` that will complete when the `ChannelHandler` is removed.
public func removeHandler(_ handler: RemovableChannelHandler, promise: EventLoopPromise<Void>?) {
@preconcurrency
public func removeHandler(_ handler: RemovableChannelHandler & Sendable, promise: EventLoopPromise<Void>?) {
@Sendable
func removeHandler0() {
switch self.contextSync(handler: handler) {
case .success(let context):
self.removeHandler(context: context, promise: promise)
case .failure(let error):
promise?.fail(error)
}
self.syncOperations.removeHandler(handler, promise: promise)
}

if self.eventLoop.inEventLoop {
Expand All @@ -407,13 +411,9 @@ public final class ChannelPipeline: ChannelInvoker {
/// - name: the name that was used to add the `ChannelHandler` to the `ChannelPipeline` before.
/// - promise: An `EventLoopPromise` that will complete when the `ChannelHandler` is removed.
public func removeHandler(name: String, promise: EventLoopPromise<Void>?) {
@Sendable
func removeHandler0() {
switch self.contextSync(name: name) {
case .success(let context):
self.removeHandler(context: context, promise: promise)
case .failure(let error):
promise?.fail(error)
}
self.syncOperations.removeHandler(name: name, promise: promise)
}

if self.eventLoop.inEventLoop {
Expand All @@ -430,13 +430,22 @@ public final class ChannelPipeline: ChannelInvoker {
/// - parameters:
/// - context: the `ChannelHandlerContext` that belongs to `ChannelHandler` that should be removed.
/// - promise: An `EventLoopPromise` that will complete when the `ChannelHandler` is removed.
@available(
*,
deprecated,
message: "Use .syncOperations.removeHandler(context:) instead, this method is not Sendable-safe."
)
public func removeHandler(context: ChannelHandlerContext, promise: EventLoopPromise<Void>?) {
guard context.handler is RemovableChannelHandler else {
let sendableView = context.sendableView

guard sendableView.channelHandlerIsRemovable else {
promise?.fail(ChannelError._unremovableHandler)
return
}

@Sendable
func removeHandler0() {
context.startUserTriggeredRemoval(promise: promise)
sendableView.wrappedValue.startUserTriggeredRemoval(promise: promise)
}

if self.eventLoop.inEventLoop {
Expand All @@ -453,7 +462,13 @@ public final class ChannelPipeline: ChannelInvoker {
/// - parameters:
/// - handler: the `ChannelHandler` for which the `ChannelHandlerContext` should be returned
/// - returns: the `EventLoopFuture` which will be notified once the the operation completes.
public func context(handler: ChannelHandler) -> EventLoopFuture<ChannelHandlerContext> {
@available(
*,
deprecated,
message: "This method is not strict concurrency safe. Prefer .syncOperations.context(handler:)"
)
@preconcurrency
public func context(handler: ChannelHandler & Sendable) -> EventLoopFuture<ChannelHandlerContext> {
let promise = self.eventLoop.makePromise(of: ChannelHandlerContext.self)

if self.eventLoop.inEventLoop {
Expand Down Expand Up @@ -1005,8 +1020,9 @@ extension ChannelPipeline {
/// - position: The position in the `ChannelPipeline` to add `handlers`. Defaults to `.last`.
///
/// - returns: A future that will be completed when all of the supplied `ChannelHandler`s were added.
@preconcurrency
public func addHandlers(
_ handlers: [ChannelHandler],
_ handlers: [ChannelHandler & Sendable],
position: ChannelPipeline.Position = .last
) -> EventLoopFuture<Void> {
let future: EventLoopFuture<Void>
Expand All @@ -1030,8 +1046,9 @@ extension ChannelPipeline {
/// - position: The position in the `ChannelPipeline` to add `handlers`. Defaults to `.last`.
///
/// - returns: A future that will be completed when all of the supplied `ChannelHandler`s were added.
@preconcurrency
public func addHandlers(
_ handlers: ChannelHandler...,
_ handlers: (ChannelHandler & Sendable)...,
position: ChannelPipeline.Position = .last
) -> EventLoopFuture<Void> {
self.addHandlers(handlers, position: position)
Expand Down Expand Up @@ -1149,29 +1166,71 @@ extension ChannelPipeline {
/// - parameters:
/// - handler: the `ChannelHandler` to remove.
/// - returns: the `EventLoopFuture` which will be notified once the `ChannelHandler` was removed.
@preconcurrency
public func removeHandler(_ handler: RemovableChannelHandler) -> EventLoopFuture<Void> {
let promise = self.eventLoop.makePromise(of: Void.self)
self.removeHandler(handler, promise: promise)
return promise.futureResult
}

/// Remove a ``ChannelHandler`` from the ``ChannelPipeline``.
///
/// - parameters:
/// - handler: the ``ChannelHandler`` to remove.
/// - promise: an ``EventLoopPromise`` to notify when the ``ChannelHandler`` was removed.
public func removeHandler(_ handler: RemovableChannelHandler, promise: EventLoopPromise<Void>?) {
switch self._pipeline.contextSync(handler: handler) {
case .success(let context):
self._pipeline.removeHandler(context: context, promise: promise)
self.removeHandler(context: context, promise: promise)
case .failure(let error):
promise.fail(error)
promise?.fail(error)
}
}

/// Remove a `ChannelHandler` from the `ChannelPipeline`.
///
/// - parameters:
/// - name: the name that was used to add the `ChannelHandler` to the `ChannelPipeline` before.
/// - returns: the `EventLoopFuture` which will be notified once the `ChannelHandler` was removed.
public func removeHandler(name: String) -> EventLoopFuture<Void> {
let promise = self.eventLoop.makePromise(of: Void.self)
self.removeHandler(name: name, promise: promise)
return promise.futureResult
}

/// Remove a ``ChannelHandler`` from the ``ChannelPipeline``.
///
/// - parameters:
/// - name: the name that was used to add the `ChannelHandler` to the `ChannelPipeline` before.
/// - promise: an ``EventLoopPromise`` to notify when the ``ChannelHandler`` was removed.
public func removeHandler(name: String, promise: EventLoopPromise<Void>?) {
switch self._pipeline.contextSync(name: name) {
case .success(let context):
self.removeHandler(context: context, promise: promise)
case .failure(let error):
promise?.fail(error)
}
}

/// Remove a `ChannelHandler` from the `ChannelPipeline`.
///
/// - parameters:
/// - context: the `ChannelHandlerContext` that belongs to `ChannelHandler` that should be removed.
/// - returns: the `EventLoopFuture` which will be notified once the `ChannelHandler` was removed.
public func removeHandler(context: ChannelHandlerContext) -> EventLoopFuture<Void> {
let promise = self.eventLoop.makePromise(of: Void.self)
self._pipeline.removeHandler(context: context, promise: promise)
self.removeHandler(context: context, promise: promise)
return promise.futureResult
}

/// Remove a `ChannelHandler` from the `ChannelPipeline`.
///
/// - parameters:
/// - context: the `ChannelHandlerContext` that belongs to `ChannelHandler` that should be removed.
/// - promise: an ``EventLoopPromise`` to notify when the ``ChannelHandler`` was removed.
public func removeHandler(context: ChannelHandlerContext, promise: EventLoopPromise<Void>?) {
context.startUserTriggeredRemoval(promise: promise)
}

/// Returns the `ChannelHandlerContext` for the given handler instance if it is in
/// the `ChannelPipeline`, if it exists.
///
Expand Down Expand Up @@ -1367,26 +1426,27 @@ extension ChannelPipeline.SynchronousOperations: Sendable {}

extension ChannelPipeline {
/// A `Position` within the `ChannelPipeline` used to insert handlers into the `ChannelPipeline`.
public enum Position {
@preconcurrency
public enum Position: Sendable {
/// The first `ChannelHandler` -- the front of the `ChannelPipeline`.
case first

/// The last `ChannelHandler` -- the back of the `ChannelPipeline`.
case last

/// Before the given `ChannelHandler`.
case before(ChannelHandler)
case before(ChannelHandler & Sendable)

/// After the given `ChannelHandler`.
case after(ChannelHandler)
case after(ChannelHandler & Sendable)
}
}

@available(*, unavailable)
extension ChannelPipeline.Position: Sendable {}

/// Special `ChannelHandler` that forwards all events to the `Channel.Unsafe` implementation.
final class HeadChannelHandler: _ChannelOutboundHandler {
final class HeadChannelHandler: _ChannelOutboundHandler, Sendable {

static let name = "head"
static let sharedInstance = HeadChannelHandler()
Expand Down Expand Up @@ -1442,7 +1502,7 @@ extension CloseMode {
}

/// Special `ChannelInboundHandler` which will consume all inbound events.
final class TailChannelHandler: _ChannelInboundHandler {
final class TailChannelHandler: _ChannelInboundHandler, Sendable {

static let name = "tail"
static let sharedInstance = TailChannelHandler()
Expand Down Expand Up @@ -1977,6 +2037,42 @@ extension ChannelHandlerContext {
}
}

extension ChannelHandlerContext {
var sendableView: SendableView {
SendableView(wrapping: self)
}

/// A wrapper over ``ChannelHandlerContext`` that allows access to the thread-safe API
/// surface on the type.
///
/// Very little of ``ChannelHandlerContext`` is thread-safe, but in a rare few places
/// there are things we can access. This type makes those available.
struct SendableView: @unchecked Sendable {
private let context: ChannelHandlerContext

fileprivate init(wrapping context: ChannelHandlerContext) {
self.context = context
}

/// Whether the ``ChannelHandler`` associated with this context conforms to
/// ``RemovableChannelHandler``.
var channelHandlerIsRemovable: Bool {
// `context.handler` is not mutable, and set at construction, so this access is
// acceptable. The protocol conformance check is also safe.
self.context.handler is RemovableChannelHandler
}

/// Grabs the underlying ``ChannelHandlerContext``. May only be called on the
/// event loop.
var wrappedValue: ChannelHandlerContext {
// The event loop lookup here is also thread-safe, so we can grab the value out
// and use it.
self.context.eventLoop.preconditionInEventLoop()
return self.context
}
}
}

extension ChannelPipeline: CustomDebugStringConvertible {
public var debugDescription: String {
// This method forms output in the following format:
Expand Down