Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 79 additions & 54 deletions Tests/NIOPosixTests/StreamChannelsTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class StreamChannelTest: XCTestCase {
}

func testEchoBasic() throws {
class EchoHandler: ChannelInboundHandler {
final class EchoHandler: ChannelInboundHandler, Sendable {
typealias InboundIn = ByteBuffer

func channelRead(context: ChannelHandlerContext, data: NIOAny) {
Expand All @@ -48,7 +48,12 @@ class StreamChannelTest: XCTestCase {
var everythingBuffer = chan1.allocator.buffer(capacity: 300000)
let allDonePromise = chan1.eventLoop.makePromise(of: ByteBuffer.self)
XCTAssertNoThrow(try chan1.pipeline.addHandler(EchoHandler()).wait())
XCTAssertNoThrow(try chan2.pipeline.addHandler(AccumulateAllReads(allDonePromise: allDonePromise)).wait())

let configuredChannel2 = chan2.eventLoop.submit {
let handler = AccumulateAllReads(allDonePromise: allDonePromise)
try chan2.pipeline.syncOperations.addHandler(handler)
}
XCTAssertNoThrow(try configuredChannel2.wait())

for f in [1, 10, 100, 1_000, 10_000, 300_000] {
let from = everythingBuffer.writerIndex
Expand All @@ -72,7 +77,7 @@ class StreamChannelTest: XCTestCase {
}

func testSyncChannelOptions() throws {
class GetAndSetAutoReadHandler: ChannelInboundHandler {
final class GetAndSetAutoReadHandler: ChannelInboundHandler, Sendable {
typealias InboundIn = Any

func handlerAdded(context: ChannelHandlerContext) {
Expand All @@ -96,17 +101,17 @@ class StreamChannelTest: XCTestCase {
}

func testChannelReturnsNilForDefaultSyncOptionsImplementation() throws {
class TestChannel: Channel {
final class TestChannel: Channel {
var allocator: ByteBufferAllocator { fatalError() }
var closeFuture: EventLoopFuture<Void> { fatalError() }
var pipeline: ChannelPipeline { fatalError() }
var localAddress: SocketAddress? = nil
var remoteAddress: SocketAddress? = nil
var parent: Channel? = nil
let localAddress: SocketAddress? = nil
let remoteAddress: SocketAddress? = nil
let parent: Channel? = nil
var _channelCore: ChannelCore { fatalError() }
var eventLoop: EventLoop { fatalError() }
var isWritable: Bool = false
var isActive: Bool = false
let isWritable: Bool = false
let isActive: Bool = false

func setOption<Option: ChannelOption>(_ option: Option, value: Option.Value) -> EventLoopFuture<Void> {
fatalError()
Expand Down Expand Up @@ -164,11 +169,11 @@ class StreamChannelTest: XCTestCase {
XCTFail("unexpected error \(error)")
}
}
context.write(Self.wrapOutboundOut(buffer)).map {
context.write(Self.wrapOutboundOut(buffer)).assumeIsolated().map {
XCTAssertEqual(self.state, .thenTrueAgain)
}.recover { error in
XCTFail("unexpected error \(error)")
}.cascade(to: self.writeFullyDonePromise)
}.nonisolated().cascade(to: self.writeFullyDonePromise)
context.flush()
}

Expand All @@ -178,7 +183,7 @@ class StreamChannelTest: XCTestCase {
XCTAssertEqual(State(rawValue: self.channelWritabilityChangedCalls), self.state)
if let newState = State(rawValue: self.channelWritabilityChangedCalls + 1) {
if self.state == .thenFalse {
context.eventLoop.scheduleTask(in: .microseconds(100)) {
context.eventLoop.assumeIsolated().scheduleTask(in: .microseconds(100)) {
// Let's delay this a tiny little bit just so we get a higher chance to actually exhaust all
// the buffers. The delay is not necessary for this test to work but it makes the tests a
// little bit harder.
Expand All @@ -195,15 +200,21 @@ class StreamChannelTest: XCTestCase {
let writabilityFalsePromise = chan1.eventLoop.makePromise(of: Void.self)
let writeFullyDonePromise = chan1.eventLoop.makePromise(of: Void.self)
XCTAssertNoThrow(try chan2.setOption(.autoRead, value: false).wait())
XCTAssertNoThrow(try chan2.pipeline.addHandler(AccumulateAllReads(allDonePromise: allDonePromise)).wait())
XCTAssertNoThrow(
try chan1.pipeline.addHandler(

let chan2Configured = chan2.eventLoop.submit {
try chan2.pipeline.syncOperations.addHandler(AccumulateAllReads(allDonePromise: allDonePromise))
}
XCTAssertNoThrow(try chan2Configured.wait())

let chan1Configured = chan1.eventLoop.submit {
try chan1.pipeline.syncOperations.addHandler(
WritabilityTrackerStateMachine(
writabilityNowFalsePromise: writabilityFalsePromise,
writeFullyDonePromise: writeFullyDonePromise
)
).wait()
)
)
}
XCTAssertNoThrow(try chan1Configured.wait())

// Writability should turn false because we're writing lots of data and we aren't reading.
XCTAssertNoThrow(try writabilityFalsePromise.futureResult.wait())
Expand Down Expand Up @@ -329,7 +340,7 @@ class StreamChannelTest: XCTestCase {
func testLotsOfWritesWhilstOtherSideNotReading() {
// This is a regression test for a problem where we would spin on EVFILT_EXCEPT despite the fact that there
// was no EOF or any other exceptional event present. So this is a regression test for rdar://53656794 and https://github.com/apple/swift-nio/pull/526.
class FailOnReadHandler: ChannelInboundHandler {
final class FailOnReadHandler: ChannelInboundHandler, Sendable {
typealias InboundIn = ByteBuffer

let areReadsOkayNow: ManagedAtomic<Bool>
Expand Down Expand Up @@ -369,11 +380,12 @@ class StreamChannelTest: XCTestCase {
let receiveBufferSize = Int(
(try? receiver.getOption(ChannelOptions.socketOption(.so_rcvbuf)).wait()) ?? 8192
)
var buffer = sender.allocator.buffer(capacity: receiveBufferSize)
buffer.writeBytes(Array(repeating: UInt8(ascii: "X"), count: receiveBufferSize))

let buffer = sender.allocator.buffer(repeating: UInt8(ascii: "X"), count: receiveBufferSize)

XCTAssertNoThrow(
try sender.eventLoop.submit {
@Sendable
func send() {
var allBuffer = buffer
// When we run through this for the first time, we send exactly the receive buffer size, after that
Expand Down Expand Up @@ -444,11 +456,13 @@ class StreamChannelTest: XCTestCase {
func runTest(receiver: Channel, sender: Channel) throws {
let allDonePromise = receiver.eventLoop.makePromise(of: Void.self)
XCTAssertNoThrow(try sender.setOption(.writeSpin, value: 0).wait())
XCTAssertNoThrow(
try receiver.pipeline.addHandler(WaitForTwoBytesHandler(allDonePromise: allDonePromise)).wait()
)
var buffer = sender.allocator.buffer(capacity: 1)
buffer.writeString("X")
let receiverConfigured = receiver.eventLoop.submit {
try receiver.pipeline.syncOperations.addHandler(
WaitForTwoBytesHandler(allDonePromise: allDonePromise)
)
}
XCTAssertNoThrow(try receiverConfigured.wait())
let buffer = sender.allocator.buffer(string: "X")
XCTAssertNoThrow(
try sender.eventLoop.flatSubmit { () -> EventLoopFuture<Void> in
let writePromise = sender.eventLoop.makePromise(of: Void.self)
Expand Down Expand Up @@ -485,8 +499,7 @@ class StreamChannelTest: XCTestCase {

// Now, let's trigger another write which should cause flushNow to be re-entered. But first, let's
// raise the high water mark so we don't get another call straight away.
var buffer = context.channel.allocator.buffer(capacity: 5)
buffer.writeString("hello")
let buffer = context.channel.allocator.buffer(string: "hello")
let loopBoundContext = context.loopBound
context.channel.setOption(.writeBufferWaterMark, value: .init(low: 1024, high: 1024))
.flatMap {
Expand Down Expand Up @@ -535,18 +548,22 @@ class StreamChannelTest: XCTestCase {
)

let sevenBytesReceived = receiver.eventLoop.makePromise(of: Void.self)
XCTAssertNoThrow(
try receiver.pipeline.addHandler(
WaitForNumberOfBytes(
numberOfBytes: 7,
allDonePromise: sevenBytesReceived
)
).wait()
)
let receiverConfigured = receiver.eventLoop.submit {
let handler = WaitForNumberOfBytes(
numberOfBytes: 7,
allDonePromise: sevenBytesReceived
)
try receiver.pipeline.syncOperations.addHandler(handler)
}
XCTAssertNoThrow(try receiverConfigured.wait())

let eventCounterHandler = EventCounterHandler()
XCTAssertNoThrow(try sender.pipeline.addHandler(EventCounterHandler()).wait())
XCTAssertNoThrow(try sender.pipeline.addHandler(WriteWhenWritabilityGoesToTrue()).wait())
let senderConfigured = sender.eventLoop.submit {
let sync = sender.pipeline.syncOperations
try sync.addHandler(eventCounterHandler)
try sync.addHandler(WriteWhenWritabilityGoesToTrue())
}
XCTAssertNoThrow(try senderConfigured.wait())

var buffer = sender.allocator.buffer(capacity: 5)
buffer.writeString("XX") // 2 bytes, exceeds the high water mark
Expand Down Expand Up @@ -642,7 +659,7 @@ class StreamChannelTest: XCTestCase {
}

let loopBoundContext = context.loopBound
context.eventLoop.execute {
context.eventLoop.assumeIsolated().execute {
let context = loopBoundContext.value
self.kickOff(context: context)
}
Expand All @@ -667,7 +684,7 @@ class StreamChannelTest: XCTestCase {
context.writeAndFlush(Self.wrapOutboundOut(buffer.value)).whenFailure { error in
XCTFail("unexpected error \(error)")
}
context.eventLoop.scheduleTask(in: .microseconds(100)) {
context.eventLoop.assumeIsolated().scheduleTask(in: .microseconds(100)) {
let context = loopBoundContext.value
switch self.state {
case .writingUntilFull:
Expand All @@ -689,7 +706,7 @@ class StreamChannelTest: XCTestCase {
}
}
}
context.eventLoop.execute {
context.eventLoop.assumeIsolated().execute {
writeOneMore() // this kicks everything off
}
}
Expand All @@ -706,9 +723,9 @@ class StreamChannelTest: XCTestCase {
}
context.fireChannelWritabilityChanged()
let loopBoundContext = context.loopBound
self.wroteEnoughToBeStuckPromise.futureResult.whenSuccess { _ in
self.wroteEnoughToBeStuckPromise.futureResult.assumeIsolated().whenSuccess { _ in
let context = loopBoundContext.value
context.pipeline.removeHandler(self).whenFailure { error in
context.pipeline.syncOperations.removeHandler(self).whenFailure { error in
XCTFail("unexpected error \(error)")
}
}
Expand Down Expand Up @@ -816,7 +833,7 @@ class StreamChannelTest: XCTestCase {
}
}

final class FailOnError: ChannelInboundHandler {
final class FailOnError: ChannelInboundHandler, Sendable {
typealias InboundIn = Never

func errorCaught(context: ChannelHandlerContext, error: Error) {
Expand All @@ -840,29 +857,34 @@ class StreamChannelTest: XCTestCase {
// We need to not read automatically from the receiving end to be able to force writability notifications
// for the sender.
XCTAssertNoThrow(try receiver.setOption(.autoRead, value: false).wait())
let receiverConfigured = receiver.eventLoop.submit {
try receiver.pipeline.syncOperations.addHandler(ReadChunksUntilWeSee1Handler())
}
XCTAssertNoThrow(try receiverConfigured.wait())

XCTAssertNoThrow(try receiver.pipeline.addHandler(ReadChunksUntilWeSee1Handler()).wait())

XCTAssertNoThrow(
try sender.pipeline.addHandler(
let senderConfigured1 = sender.eventLoop.submit {
try sender.pipeline.syncOperations.addHandler(
WriteWhenChannelBecomesWritableAgain(
beganBigWritePromise: beganBigWritePromise,
finishedBigWritePromise: finishedBigWritePromise
)
).wait()
)
)
}
XCTAssertNoThrow(try senderConfigured1.wait())

XCTAssertNoThrow(try sender.pipeline.addHandler(FailOnError()).wait())
XCTAssertNoThrow(try receiver.pipeline.addHandler(FailOnError()).wait())

XCTAssertNoThrow(
try sender.pipeline.addHandler(
let senderConfigured2 = sender.eventLoop.submit {
try sender.pipeline.syncOperations.addHandler(
WriteUntilWriteDoesNotCompletelyInstantlyHandler(
chunkSize: chunkSize,
wroteEnoughToBeStuckPromise: wroteEnoughToBeStuckPromise
),
position: .first
).wait()
)
)
}
XCTAssertNoThrow(try senderConfigured2.wait())
var howManyBytes: Int? = nil

XCTAssertNoThrow(howManyBytes = try wroteEnoughToBeStuckPromise.futureResult.wait())
Expand Down Expand Up @@ -934,7 +956,10 @@ class StreamChannelTest: XCTestCase {
}

let amount = 2
XCTAssertNoThrow(try sender.pipeline.addHandler(CloseInWritabilityChanged(amount: amount)).wait())
let senderConfigured = sender.eventLoop.submit {
try sender.pipeline.syncOperations.addHandler(CloseInWritabilityChanged(amount: amount))
}
XCTAssertNoThrow(try senderConfigured.wait())
XCTAssertNoThrow(
try sender.setOption(
.writeBufferWaterMark,
Expand Down
Loading