Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 68 additions & 41 deletions Tests/NIOPosixTests/AcceptBackoffHandlerTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ import XCTest

@testable import NIOPosix

public final class AcceptBackoffHandlerTest: XCTestCase {
final class AcceptBackoffHandlerTest: XCTestCase {
@Sendable
static func defaultBackoffProvider(error: IOError) -> TimeAmount? {
AcceptBackoffHandler.defaultBackoffProvider(error: error)
}

private let acceptHandlerName = "AcceptBackoffHandler"

Expand All @@ -46,13 +50,17 @@ public final class AcceptBackoffHandlerTest: XCTestCase {

private func assertBackoffRead(error: Int32) throws {
let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
let loop = group.next() as! SelectableEventLoop
defer {
XCTAssertNoThrow(try group.syncShutdownGracefully())
}

let readCountHandler = ReadCountHandler()
// Only used from EL
let readCountHandler = try! loop.submit {
NIOLoopBound(ReadCountHandler(), eventLoop: loop)
}.wait()
let serverChannel = try setupChannel(
group: group,
eventLoop: loop,
readCountHandler: readCountHandler,
backoffProvider: { _ in .milliseconds(100) },
errors: [error]
Expand All @@ -62,15 +70,15 @@ public final class AcceptBackoffHandlerTest: XCTestCase {
try serverChannel.eventLoop.submit {
serverChannel.readable()
serverChannel.read()
return readCountHandler.readCount
return readCountHandler.value.readCount
}.wait()
)

// Inspect the read count after our scheduled backoff elapsed.
XCTAssertEqual(
1,
try serverChannel.eventLoop.scheduleTask(in: .milliseconds(100)) {
readCountHandler.readCount
readCountHandler.value.readCount
}.futureResult.wait()
)

Expand All @@ -79,7 +87,7 @@ public final class AcceptBackoffHandlerTest: XCTestCase {
2,
try serverChannel.eventLoop.submit {
serverChannel.read()
return readCountHandler.readCount
return readCountHandler.value.readCount
}.wait()
)

Expand All @@ -96,13 +104,16 @@ public final class AcceptBackoffHandlerTest: XCTestCase {

private func assertRemoval(read: Bool) throws {
let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
let loop = group.next() as! SelectableEventLoop
defer {
XCTAssertNoThrow(try group.syncShutdownGracefully())
}

let readCountHandler = ReadCountHandler()
let readCountHandler = try! loop.submit {
NIOLoopBound(ReadCountHandler(), eventLoop: loop)
}.wait()
let serverChannel = try setupChannel(
group: group,
eventLoop: loop,
readCountHandler: readCountHandler,
backoffProvider: { err in
.hours(1)
Expand All @@ -116,7 +127,7 @@ public final class AcceptBackoffHandlerTest: XCTestCase {
if read {
serverChannel.read()
}
return readCountHandler.readCount
return readCountHandler.value.readCount
}.wait()
)

Expand All @@ -127,15 +138,15 @@ public final class AcceptBackoffHandlerTest: XCTestCase {
XCTAssertEqual(
1,
try serverChannel.eventLoop.submit {
readCountHandler.readCount
readCountHandler.value.readCount
}.wait()
)
} else {
// Removal should have triggered no read.
XCTAssertEqual(
0,
try serverChannel.eventLoop.submit {
readCountHandler.readCount
readCountHandler.value.readCount
}.wait()
)
}
Expand All @@ -144,13 +155,16 @@ public final class AcceptBackoffHandlerTest: XCTestCase {

public func testNotScheduleReadIfAlreadyScheduled() throws {
let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
let loop = group.next() as! SelectableEventLoop
defer {
XCTAssertNoThrow(try group.syncShutdownGracefully())
}

let readCountHandler = ReadCountHandler()
let readCountHandler = try! loop.submit {
NIOLoopBound(ReadCountHandler(), eventLoop: loop)
}.wait()
let serverChannel = try setupChannel(
group: group,
eventLoop: loop,
readCountHandler: readCountHandler,
backoffProvider: { err in
.milliseconds(10)
Expand All @@ -163,7 +177,7 @@ public final class AcceptBackoffHandlerTest: XCTestCase {
serverChannel.readable()
serverChannel.read()
serverChannel.read()
return readCountHandler.readCount
return readCountHandler.value.readCount
}.wait()
)

Expand All @@ -172,7 +186,7 @@ public final class AcceptBackoffHandlerTest: XCTestCase {
XCTAssertEqual(
1,
try serverChannel.eventLoop.scheduleTask(in: .milliseconds(500)) {
readCountHandler.readCount
readCountHandler.value.readCount
}.futureResult.wait()
)

Expand All @@ -181,7 +195,7 @@ public final class AcceptBackoffHandlerTest: XCTestCase {
2,
try serverChannel.eventLoop.submit {
serverChannel.read()
return readCountHandler.readCount
return readCountHandler.value.readCount
}.wait()
)

Expand All @@ -190,6 +204,7 @@ public final class AcceptBackoffHandlerTest: XCTestCase {

public func testChannelInactiveCancelScheduled() throws {
let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
let loop = group.next() as! SelectableEventLoop
defer {
XCTAssertNoThrow(try group.syncShutdownGracefully())
}
Expand All @@ -212,18 +227,25 @@ public final class AcceptBackoffHandlerTest: XCTestCase {
}
}

let readCountHandler = ReadCountHandler()
let readCountHandler = try! loop.submit {
NIOLoopBound(ReadCountHandler(), eventLoop: loop)
}.wait()

let serverChannel = try setupChannel(
group: group,
eventLoop: loop,
readCountHandler: readCountHandler,
backoffProvider: { err in
.milliseconds(10)
},
errors: [ENFILE]
)

let inactiveVerificationHandler = InactiveVerificationHandler(promise: serverChannel.eventLoop.makePromise())
XCTAssertNoThrow(try serverChannel.pipeline.addHandler(inactiveVerificationHandler).wait())
let inactivePromise = serverChannel.eventLoop.makePromise(of: Void.self)
let configured = loop.submit {
let inactiveVerificationHandler = InactiveVerificationHandler(promise: inactivePromise)
try serverChannel.pipeline.syncOperations.addHandler(inactiveVerificationHandler)
}
XCTAssertNoThrow(try configured.wait())

XCTAssertEqual(
0,
Expand All @@ -232,32 +254,35 @@ public final class AcceptBackoffHandlerTest: XCTestCase {
serverChannel.read()
// Close the channel, this should also take care of cancel the scheduled read.
serverChannel.close(promise: nil)
return readCountHandler.readCount
return readCountHandler.value.readCount
}.wait()
)

// Inspect the read count after our scheduled backoff elapsed multiple times. This should have triggered no read as the channel was closed.
XCTAssertEqual(
0,
try serverChannel.eventLoop.scheduleTask(in: .milliseconds(500)) {
readCountHandler.readCount
readCountHandler.value.readCount
}.futureResult.wait()
)

XCTAssertNoThrow(try inactiveVerificationHandler.waitForInactive())
XCTAssertNoThrow(try inactivePromise.futureResult.wait())
}

public func testSecondErrorUpdateScheduledRead() throws {
let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
let loop = group.next() as! SelectableEventLoop
defer {
XCTAssertNoThrow(try group.syncShutdownGracefully())
}

let readCountHandler = ReadCountHandler()
let readCountHandler = try! loop.submit {
NIOLoopBound(ReadCountHandler(), eventLoop: loop)
}.wait()

let backoffProviderCalled = ManagedAtomic(0)
let serverChannel = try setupChannel(
group: group,
eventLoop: loop,
readCountHandler: readCountHandler,
backoffProvider: { err in
if backoffProviderCalled.loadThenWrappingIncrement(ordering: .relaxed) == 0 {
Expand All @@ -273,7 +298,7 @@ public final class AcceptBackoffHandlerTest: XCTestCase {
try serverChannel.eventLoop.submit {
serverChannel.readable()
serverChannel.read()
let readCount = readCountHandler.readCount
let readCount = readCountHandler.value.readCount
// Directly trigger a read again without going through the pipeline. This will allow us to use serverChannel.readable()
serverChannel._channelCore.read0()
serverChannel.readable()
Expand All @@ -285,15 +310,15 @@ public final class AcceptBackoffHandlerTest: XCTestCase {
XCTAssertEqual(
0,
try serverChannel.eventLoop.scheduleTask(in: .seconds(1)) {
readCountHandler.readCount
readCountHandler.value.readCount
}.futureResult.wait()
)

// This should have fired now as the updated scheduled read task should have been complete by now
XCTAssertEqual(
1,
try serverChannel.eventLoop.scheduleTask(in: .seconds(1)) {
readCountHandler.readCount
readCountHandler.value.readCount
}.futureResult.wait()
)

Expand Down Expand Up @@ -327,30 +352,32 @@ public final class AcceptBackoffHandlerTest: XCTestCase {
}

private func setupChannel(
group: EventLoopGroup,
readCountHandler: ReadCountHandler,
backoffProvider: @escaping (IOError) -> TimeAmount? = AcceptBackoffHandler.defaultBackoffProvider,
eventLoop: SelectableEventLoop,
readCountHandler: NIOLoopBound<ReadCountHandler>,
backoffProvider: @escaping @Sendable (IOError) -> TimeAmount? = AcceptBackoffHandlerTest.defaultBackoffProvider,
errors: [Int32]
) throws -> ServerSocketChannel {
let eventLoop = group.next() as! SelectableEventLoop
let socket = try NonAcceptingServerSocket(errors: errors)
let serverChannel = try assertNoThrowWithValue(
ServerSocketChannel(
serverSocket: socket,
eventLoop: eventLoop,
group: group
group: eventLoop
)
)

XCTAssertNoThrow(try serverChannel.setOption(.autoRead, value: false).wait())
XCTAssertNoThrow(
try serverChannel.pipeline.addHandler(readCountHandler).flatMapThrowing { _ in
try serverChannel.pipeline.syncOperations.addHandler(
AcceptBackoffHandler(backoffProvider: backoffProvider),
name: self.acceptHandlerName
)
}.wait()
)

let configured = eventLoop.submit { [acceptHandlerName = self.acceptHandlerName] in
let sync = serverChannel.pipeline.syncOperations
try sync.addHandler(readCountHandler.value)
try sync.addHandler(
AcceptBackoffHandler(backoffProvider: backoffProvider),
name: acceptHandlerName
)
}

XCTAssertNoThrow(try configured.wait())

XCTAssertNoThrow(
try eventLoop.flatSubmit {
Expand Down
Loading