Skip to content
10 changes: 9 additions & 1 deletion Sources/NIOPosix/BaseSocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -595,8 +595,16 @@ class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, Chan
switch writeResult.writeResult {
case .couldNotWriteEverything:
newWriteRegistrationState = .register
case .writtenCompletely:
case .writtenCompletely(let closeState):
newWriteRegistrationState = .unregister
switch closeState {
case .open:
()
case .readyForClose:
self.close0(error: ChannelError.outputClosed, mode: .output, promise: nil)
case .closed:
() // we can be flushed before becoming active
}
}

if !self.isOpen || !self.hasFlushedPendingWrites() {
Expand Down
39 changes: 32 additions & 7 deletions Sources/NIOPosix/BaseStreamSocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,35 @@ class BaseStreamSocketChannel<Socket: SocketProtocol>: BaseSocketChannel<Socket>
self.close0(error: error, mode: .all, promise: promise)
return
}
try self.shutdownSocket(mode: mode)
// Fail all pending writes and so ensure all pending promises are notified
self.pendingWrites.failAll(error: error, close: false)
self.unregisterForWritable()
promise?.succeed(())

self.pipeline.fireUserInboundEventTriggered(ChannelEvent.outputClosed)
let result = self.pendingWrites.closeOutbound(promise)
switch result {
case .pending:
() // promise is stored in `pendingWrites` state for completing later

case .readyForClose(let closePromise):
// Shutdown the socket only when the pending writes are dealt with
do {
try self.shutdownSocket(mode: mode)
closePromise?.succeed(())
} catch let err {
closePromise?.fail(err)
}
self.unregisterForWritable()
self.pipeline.fireUserInboundEventTriggered(ChannelEvent.outputClosed)

case .closed(let closePromise):
closePromise?.succeed(())

case .errored(let err, let closePromise):
assertionFailure("Close errored: \(err)")
closePromise?.fail(err)

// Escalate to full closure
// promise is nil here because we have used the supplied promise to convey failure of the half-close
self.close0(error: err, mode: .all, promise: nil)
}

case .input:
if self.inputShutdown {
promise?.fail(ChannelError._inputClosed)
Expand All @@ -224,6 +246,7 @@ class BaseStreamSocketChannel<Socket: SocketProtocol>: BaseSocketChannel<Socket>
promise?.succeed(())

self.pipeline.fireUserInboundEventTriggered(ChannelEvent.inputClosed)

case .all:
if let timeout = self.connectTimeoutScheduled {
self.connectTimeoutScheduled = nil
Expand All @@ -247,7 +270,9 @@ class BaseStreamSocketChannel<Socket: SocketProtocol>: BaseSocketChannel<Socket>
}

final override func cancelWritesOnClose(error: Error) {
self.pendingWrites.failAll(error: error, close: true)
if let eventLoopPromise = self.pendingWrites.failAll(error: error) {
eventLoopPromise.fail(error)
}
}

@discardableResult
Expand Down
7 changes: 7 additions & 0 deletions Sources/NIOPosix/PendingDatagramWritesManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,13 @@ final class PendingDatagramWritesManager: PendingWritesManager {
internal var publishedWritability = true
internal var writeSpinCount: UInt = 16
private(set) var isOpen = true
var outboundCloseState: CloseState {
if self.isOpen {
.open
} else {
.closed
}
}

/// Initialize with a pre-allocated array of message headers and storage references. We pass in these pre-allocated
/// objects to save allocations. They can be safely be re-used for all `Channel`s on a given `EventLoop` as an
Expand Down
151 changes: 141 additions & 10 deletions Sources/NIOPosix/PendingWritesManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ private struct PendingStreamWrite {
var promise: Optional<EventLoopPromise<Void>>
}

/// Write result is `.couldNotWriteEverything` but we have no more writes to perform.
public struct NIOReportedIncompleteWritesWhenNoMoreToPerform: Error {}

/// Close result is `.open`.
public struct NIOReportedOpenAfterClose: Error {}

/// There are buffered writes after it should have been cleared, `.readyForClose` or `.closed` state
public struct NIOReportedPendingWritesInInvalidState: Error {}

/// Does the setup required to issue a writev.
///
/// - Parameters:
Expand Down Expand Up @@ -97,12 +106,33 @@ internal enum OneWriteOperationResult {
/// The result of trying to write all the outstanding flushed data. That naturally includes all `ByteBuffer`s and
/// `FileRegions` and the individual writes have potentially been retried (see `WriteSpinOption`).
internal struct OverallWriteResult {
enum WriteOutcome {
enum WriteOutcome: Equatable {
/// Wrote all the data that was flushed. When receiving this result, we can unsubscribe from 'writable' notification.
case writtenCompletely
case writtenCompletely(WrittenCompletelyResult)

/// Could not write everything. Before attempting further writes the eventing system should send a 'writable' notification.
case couldNotWriteEverything

/// The resulting status of a `PendingWritesManager` after a completely-written write
///
/// This type is subtly different to `CloseState` so that it only surfaces the close promise when the caller
/// is expected to fulfill it
internal enum WrittenCompletelyResult: Equatable {
case open
case readyForClose(EventLoopPromise<Void>?)
case closed(EventLoopPromise<Void>?)

init(_ closeState: CloseState) {
switch closeState {
case .open:
self = .open
case .pending(let closePromise), .readyForClose(let closePromise):
self = .readyForClose(closePromise)
case .closed:
self = .closed(nil)
}
}
}
}

internal var writeResult: WriteOutcome
Expand Down Expand Up @@ -152,7 +182,7 @@ private struct PendingStreamWritesState {
self.subtractOutstanding(bytes: bytes)
}

/// Initialise a new, empty `PendingWritesState`.
/// Initialize a new, empty `PendingWritesState`.
public init() {}

/// Check if there are no outstanding writes.
Expand Down Expand Up @@ -310,6 +340,8 @@ final class PendingStreamWritesManager: PendingWritesManager {

private(set) var isOpen = true

private(set) var outboundCloseState: CloseState = .open

/// Mark the flush checkpoint.
func markFlushCheckpoint() {
self.state.markFlushCheckpoint()
Expand Down Expand Up @@ -337,7 +369,7 @@ final class PendingStreamWritesManager: PendingWritesManager {
/// - result: If the `Channel` is still writable after adding the write of `data`.
func add(data: IOData, promise: EventLoopPromise<Void>?) -> Bool {
assert(self.isOpen)
self.state.append(.init(data: data, promise: promise))
self.state.append(PendingStreamWrite(data: data, promise: promise))

if self.state.bytes > waterMark.high
&& channelWritabilityFlag.compareExchange(expected: true, desired: false, ordering: .relaxed).exchanged
Expand Down Expand Up @@ -463,16 +495,101 @@ final class PendingStreamWritesManager: PendingWritesManager {
return self.didWrite(itemCount: result.itemCount, result: result.writeResult)
}

/// Fail all the outstanding writes. This is useful if for example the `Channel` is closed.
func failAll(error: Error, close: Bool) {
if close {
assert(self.isOpen)
self.isOpen = false
/// Fail all the outstanding writes.
func failAll(error: Error) -> EventLoopPromise<Void>? {
assert(self.isOpen)

let promise: EventLoopPromise<Void>?
self.isOpen = false
switch self.outboundCloseState {
case .open, .closed:
self.outboundCloseState = .closed
promise = nil
case .pending(let closePromise), .readyForClose(let closePromise):
self.outboundCloseState = .closed
promise = closePromise
}

self.state.removeAll()?.fail(error)

assert(self.state.isEmpty)
return promise
}

// The result of calling `closeOutbound`
enum CloseOutboundResult {
case pending
case readyForClose(EventLoopPromise<Void>?)
case closed(EventLoopPromise<Void>?)
case errored(Error, EventLoopPromise<Void>?)

init(_ closeState: CloseState, _ isEmpty: Bool, _ promise: EventLoopPromise<Void>?) {
switch closeState {
case .open:
assertionFailure(
"We are in .open state after being asked to close. This should never happen."
)
self = .errored(NIOReportedOpenAfterClose(), promise)
case .pending:
// `promise` has already been taken care of in the pending state for later completion
self = .pending
case .readyForClose(let closePromise):
if isEmpty {
self = .readyForClose(closePromise)
} else {
assertionFailure(
"We are in .readyForClose state but we still have pending writes. This should never happen."
)
// `promise` has already been cascaded off `closePromise`
self = .errored(NIOReportedPendingWritesInInvalidState(), closePromise)
}
case .closed:
if isEmpty {
self = .closed(promise)
} else {
assertionFailure(
"We are in .closed state but we still have pending writes. This should never happen."
)
self = .errored(NIOReportedPendingWritesInInvalidState(), promise)
}
}
}
}

/// Signal the intention to close. Takes a promise which will be returned for completing when pending writes are dealt with
///
/// - Parameters:
/// - promise: Optionally an `EventLoopPromise` which is stored and is returned to be completed by the caller once
/// all outstanding writes have been dealt with or an error condition is encountered.
func closeOutbound(_ promise: EventLoopPromise<Void>?) -> CloseOutboundResult {
assert(self.isOpen)

// Update our internal state
switch self.outboundCloseState {
case .open:
if self.isEmpty {
self.outboundCloseState = .readyForClose(promise)
} else {
self.outboundCloseState = .pending(promise)
}
case .readyForClose(var closePromise):
closePromise.setOrCascade(to: promise)
self.outboundCloseState = .readyForClose(closePromise)
case .pending(var closePromise):
closePromise.setOrCascade(to: promise)
if self.isEmpty {
self.outboundCloseState = .readyForClose(closePromise)
} else {
self.outboundCloseState = .pending(closePromise)
}
case .closed:
()
}

// Decide on the result
let result = CloseOutboundResult(self.outboundCloseState, self.isEmpty, promise)

return result
}

/// Initialize with a pre-allocated array of IO vectors and storage references. We pass in these pre-allocated
Expand All @@ -496,6 +613,8 @@ internal enum WriteMechanism {

internal protocol PendingWritesManager: AnyObject {
var isOpen: Bool { get }
var isEmpty: Bool { get }
var outboundCloseState: CloseState { get }
var isFlushPending: Bool { get }
var writeSpinCount: UInt { get }
var currentBestWriteMechanism: WriteMechanism { get }
Expand All @@ -507,6 +626,18 @@ internal protocol PendingWritesManager: AnyObject {
var publishedWritability: Bool { get set }
}

/// Describes the state that a `PendingWritesManager` closure state machine will step through when instructed to close
internal enum CloseState {
/// The manager will accept new writes
case open
/// The manager has been asked to close but cannot because its write buffer is not empty
case pending(EventLoopPromise<Void>?)
/// The manager has been asked to close and is ready to be closed because its write buffer is empty
case readyForClose(EventLoopPromise<Void>?)
/// The manager is closed
case closed
}

extension PendingWritesManager {
// This is called from `Channel` API so must be thread-safe.
var isWritable: Bool {
Expand All @@ -522,7 +653,7 @@ extension PendingWritesManager {
var oneResult: OneWriteOperationResult
repeat {
guard self.isOpen && self.isFlushPending else {
result.writeResult = .writtenCompletely
result.writeResult = .writtenCompletely(.init(self.outboundCloseState))
break writeSpinLoop
}

Expand Down
Loading
Loading