Skip to content
Merged
Changes from 9 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions Sources/NIOCore/AsyncChannel/AsyncChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ public struct NIOAsyncChannel<Inbound: Sendable, Outbound: Sendable>: Sendable {
/// - Important: After this method returned the underlying ``Channel`` will be closed.
///
/// - Parameter body: A closure that gets scoped access to the inbound and outbound.
@_disfavoredOverload
public func executeThenClose<Result>(
_ body: (_ inbound: NIOAsyncChannelInboundStream<Inbound>, _ outbound: NIOAsyncChannelOutboundWriter<Outbound>)
async throws -> Result
Expand All @@ -301,6 +302,53 @@ public struct NIOAsyncChannel<Inbound: Sendable, Outbound: Sendable>: Sendable {
}
}

self._outbound.finish()
// We ignore errors from close, since all we care about is that the channel has been closed
// at this point.
self.channel.close(promise: nil)
// `closeFuture` should never be failed, so we could ignore the error. However, do an
// assertionFailure to guide bad Channel implementations that are incorrectly failing this
// future to stop failing it.
do {
try await self.channel.closeFuture.get()
} catch {
assertionFailure(
"""
The channel's closeFuture should never be failed, but it was failed with error: \(error).
This is an error in the channel's implementation.
Refer to `Channel/closeFuture`'s documentation for more information.
"""
)
}
return result
}

#if compiler(>=6.0)
/// Provides scoped access to the inbound and outbound side of the underlying ``Channel``.
///
/// - Important: After this method returned the underlying ``Channel`` will be closed.
///
/// - Parameters:
/// - actor: actor where this function should be isolated to
/// - body: A closure that gets scoped access to the inbound and outbound.
public func executeThenClose<Result>(
isolatedTo actor: isolated (any Actor)? = #isolation,
_ body: (_ inbound: NIOAsyncChannelInboundStream<Inbound>, _ outbound: NIOAsyncChannelOutboundWriter<Outbound>)
async throws -> Result
) async throws -> Result where Result: Sendable {
let result: Result
do {
result = try await body(self._inbound, self._outbound)
} catch let bodyError {
do {
self._outbound.finish()
try await self.channel.close().get()
throw bodyError
} catch {
throw bodyError
}
}

self._outbound.finish()
// We ignore errors from close, since all we care about is that the channel has been closed
// at this point.
Expand All @@ -322,6 +370,7 @@ public struct NIOAsyncChannel<Inbound: Sendable, Outbound: Sendable>: Sendable {

return result
}
#endif
}

// swift-format-ignore: AmbiguousTrailingClosureOverload
Expand All @@ -332,13 +381,61 @@ extension NIOAsyncChannel {
/// - Important: After this method returned the underlying ``Channel`` will be closed.
///
/// - Parameter body: A closure that gets scoped access to the inbound.
@_disfavoredOverload
public func executeThenClose<Result>(
_ body: (_ inbound: NIOAsyncChannelInboundStream<Inbound>) async throws -> Result
) async throws -> Result where Outbound == Never {
let result: Result
do {
result = try await body(self._inbound)
} catch let bodyError {
do {
self._outbound.finish()
try await self.channel.close().get()
throw bodyError
} catch {
throw bodyError
}
}

self._outbound.finish()
// We ignore errors from close, since all we care about is that the channel has been closed
// at this point.
self.channel.close(promise: nil)
// `closeFuture` should never be failed, so we could ignore the error. However, do an
// assertionFailure to guide bad Channel implementations that are incorrectly failing this
// future to stop failing it.
do {
try await self.channel.closeFuture.get()
} catch {
assertionFailure(
"""
The channel's closeFuture should never be failed, but it was failed with error: \(error).
This is an error in the channel's implementation.
Refer to `Channel/closeFuture`'s documentation for more information.
"""
)
}
return result
}

#if compiler(>=6.0)
/// Provides scoped access to the inbound side of the underlying ``Channel``.
///
/// - Important: After this method returned the underlying ``Channel`` will be closed.
///
/// - Parameters:
/// - actor: actor where this function should be isolated to
/// - body: A closure that gets scoped access to the inbound.
public func executeThenClose<Result>(
isolatedTo actor: isolated (any Actor)? = #isolation,
_ body: (_ inbound: NIOAsyncChannelInboundStream<Inbound>) async throws -> Result
) async throws -> Result where Outbound == Never, Result: Sendable {
try await self.executeThenClose { inbound, _ in
try await body(inbound)
}
}
#endif
}

extension Channel {
Expand Down
Loading