diff --git a/Tests/NIOPosixTests/EventLoopTest.swift b/Tests/NIOPosixTests/EventLoopTest.swift index 947d916be00..e099be8f1b3 100644 --- a/Tests/NIOPosixTests/EventLoopTest.swift +++ b/Tests/NIOPosixTests/EventLoopTest.swift @@ -21,15 +21,15 @@ import XCTest @testable import NIOCore @testable import NIOPosix -public final class EventLoopTest: XCTestCase { +final class EventLoopTest: XCTestCase { - public func testSchedule() throws { + func testSchedule() throws { let eventLoop = EmbeddedEventLoop() let scheduled = eventLoop.scheduleTask(in: .seconds(1)) { true } var result: Bool? - scheduled.futureResult.whenSuccess { result = $0 } + scheduled.futureResult.assumeIsolated().whenSuccess { result = $0 } eventLoop.run() // run without time advancing should do nothing XCTAssertFalse(scheduled.futureResult.isFulfilled) @@ -42,7 +42,7 @@ public final class EventLoopTest: XCTestCase { XCTAssertTrue(result == true) } - public func testFlatSchedule() throws { + func testFlatSchedule() throws { let eventLoop = EmbeddedEventLoop() let scheduled = eventLoop.flatScheduleTask(in: .seconds(1)) { @@ -50,7 +50,7 @@ public final class EventLoopTest: XCTestCase { } var result: Bool? - scheduled.futureResult.whenSuccess { result = $0 } + scheduled.futureResult.assumeIsolated().whenSuccess { result = $0 } eventLoop.run() // run without time advancing should do nothing XCTAssertFalse(scheduled.futureResult.isFulfilled) @@ -63,7 +63,7 @@ public final class EventLoopTest: XCTestCase { XCTAssertTrue(result == true) } - public func testScheduleWithDelay() throws { + func testScheduleWithDelay() throws { let smallAmount: TimeAmount = .milliseconds(100) let longAmount: TimeAmount = .seconds(1) let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) @@ -104,15 +104,15 @@ public final class EventLoopTest: XCTestCase { XCTAssertTrue(NIODeadline.now() - nanos >= longAmount) } - public func testScheduleCancelled() throws { + func testScheduleCancelled() throws { let eventLoop = EmbeddedEventLoop() let scheduled = eventLoop.scheduleTask(in: .seconds(1)) { true } var result: Bool? var error: Error? - scheduled.futureResult.whenSuccess { result = $0 } - scheduled.futureResult.whenFailure { error = $0 } + scheduled.futureResult.assumeIsolated().whenSuccess { result = $0 } + scheduled.futureResult.assumeIsolated().whenFailure { error = $0 } eventLoop.advanceTime(by: .milliseconds(500)) // advance halfway to firing time scheduled.cancel() @@ -123,7 +123,7 @@ public final class EventLoopTest: XCTestCase { XCTAssertEqual(error as? EventLoopError, .cancelled) } - public func testFlatScheduleCancelled() throws { + func testFlatScheduleCancelled() throws { let eventLoop = EmbeddedEventLoop() let scheduled = eventLoop.flatScheduleTask(in: .seconds(1)) { @@ -132,8 +132,8 @@ public final class EventLoopTest: XCTestCase { var result: Bool? var error: Error? - scheduled.futureResult.whenSuccess { result = $0 } - scheduled.futureResult.whenFailure { error = $0 } + scheduled.futureResult.assumeIsolated().whenSuccess { result = $0 } + scheduled.futureResult.assumeIsolated().whenFailure { error = $0 } eventLoop.advanceTime(by: .milliseconds(500)) // advance halfway to firing time scheduled.cancel() @@ -144,7 +144,7 @@ public final class EventLoopTest: XCTestCase { XCTAssertEqual(error as? EventLoopError, .cancelled) } - public func testScheduleRepeatedTask() throws { + func testScheduleRepeatedTask() throws { let nanos: NIODeadline = .now() let initialDelay: TimeAmount = .milliseconds(5) let delay: TimeAmount = .milliseconds(10) @@ -176,14 +176,14 @@ public final class EventLoopTest: XCTestCase { XCTAssertTrue(NIODeadline.now() - nanos >= initialDelay + Int64(count) * delay) } - public func testScheduledTaskThatIsImmediatelyCancelledNeverFires() throws { + func testScheduledTaskThatIsImmediatelyCancelledNeverFires() throws { let eventLoop = EmbeddedEventLoop() let scheduled = eventLoop.scheduleTask(in: .seconds(1)) { true } var result: Bool? var error: Error? - scheduled.futureResult.whenSuccess { result = $0 } - scheduled.futureResult.whenFailure { error = $0 } + scheduled.futureResult.assumeIsolated().whenSuccess { result = $0 } + scheduled.futureResult.assumeIsolated().whenFailure { error = $0 } scheduled.cancel() eventLoop.advanceTime(by: .seconds(1)) @@ -193,7 +193,7 @@ public final class EventLoopTest: XCTestCase { XCTAssertEqual(error as? EventLoopError, .cancelled) } - public func testScheduledTasksAreOrdered() throws { + func testScheduledTasksAreOrdered() throws { let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) @@ -202,18 +202,18 @@ public final class EventLoopTest: XCTestCase { let eventLoop = eventLoopGroup.next() let now = NIODeadline.now() - var result = [Int]() + let result = NIOLockedValueBox([Int]()) var lastScheduled: Scheduled? for i in 0...100 { lastScheduled = eventLoop.scheduleTask(deadline: now) { - result.append(i) + result.withLockedValue { $0.append(i) } } } try lastScheduled?.futureResult.wait() - XCTAssertEqual(result, Array(0...100)) + XCTAssertEqual(result.withLockedValue { $0 }, Array(0...100)) } - public func testFlatScheduledTaskThatIsImmediatelyCancelledNeverFires() throws { + func testFlatScheduledTaskThatIsImmediatelyCancelledNeverFires() throws { let eventLoop = EmbeddedEventLoop() let scheduled = eventLoop.flatScheduleTask(in: .seconds(1)) { eventLoop.makeSucceededFuture(true) @@ -221,8 +221,8 @@ public final class EventLoopTest: XCTestCase { var result: Bool? var error: Error? - scheduled.futureResult.whenSuccess { result = $0 } - scheduled.futureResult.whenFailure { error = $0 } + scheduled.futureResult.assumeIsolated().whenSuccess { result = $0 } + scheduled.futureResult.assumeIsolated().whenFailure { error = $0 } scheduled.cancel() eventLoop.advanceTime(by: .seconds(1)) @@ -232,7 +232,7 @@ public final class EventLoopTest: XCTestCase { XCTAssertEqual(error as? EventLoopError, .cancelled) } - public func testRepeatedTaskThatIsImmediatelyCancelledNeverFires() throws { + func testRepeatedTaskThatIsImmediatelyCancelledNeverFires() throws { let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) @@ -248,7 +248,7 @@ public final class EventLoopTest: XCTestCase { Thread.sleep(until: .init(timeIntervalSinceNow: 0.1)) } - public func testScheduleRepeatedTaskCancelFromDifferentThread() throws { + func testScheduleRepeatedTaskCancelFromDifferentThread() throws { let nanos: NIODeadline = .now() let initialDelay: TimeAmount = .milliseconds(5) // this will actually force the race from issue #554 to happen frequently @@ -263,24 +263,29 @@ public final class EventLoopTest: XCTestCase { let loop = eventLoopGroup.next() hasFiredGroup.enter() isCancelledGroup.enter() - var isAllowedToFire = true // read/write only on `loop` - var hasFired = false // read/write only on `loop` + + let (isAllowedToFire, hasFired) = try! loop.submit { + let isAllowedToFire = NIOLoopBoundBox(true, eventLoop: loop) + let hasFired = NIOLoopBoundBox(false, eventLoop: loop) + return (isAllowedToFire, hasFired) + }.wait() + let repeatedTask = loop.scheduleRepeatedTask(initialDelay: initialDelay, delay: delay) { (_: RepeatedTask) -> Void in XCTAssertTrue(loop.inEventLoop) - if !hasFired { + if !hasFired.value { // we can only do this once as we can only leave the DispatchGroup once but we might lose a race and // the timer might fire more than once (until `shouldNoLongerFire` becomes true). - hasFired = true + hasFired.value = true hasFiredGroup.leave() } - XCTAssertTrue(isAllowedToFire) + XCTAssertTrue(isAllowedToFire.value) } hasFiredGroup.notify(queue: DispatchQueue.global()) { repeatedTask.cancel() loop.execute { // only now do we know that the `cancel` must have gone through - isAllowedToFire = false + isAllowedToFire.value = false isCancelledGroup.leave() } } @@ -290,7 +295,7 @@ public final class EventLoopTest: XCTestCase { isCancelledGroup.wait() } - public func testScheduleRepeatedTaskToNotRetainRepeatedTask() throws { + func testScheduleRepeatedTaskToNotRetainRepeatedTask() throws { let initialDelay: TimeAmount = .milliseconds(5) let delay: TimeAmount = .milliseconds(10) let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) @@ -308,7 +313,7 @@ public final class EventLoopTest: XCTestCase { assert(weakRepeated == nil, within: .seconds(1)) } - public func testScheduleRepeatedTaskToNotRetainEventLoop() throws { + func testScheduleRepeatedTaskToNotRetainEventLoop() throws { weak var weakEventLoop: EventLoop? = nil try { let initialDelay: TimeAmount = .milliseconds(5) @@ -327,12 +332,12 @@ public final class EventLoopTest: XCTestCase { func testScheduledRepeatedAsyncTask() { let eventLoop = EmbeddedEventLoop() - var counter = 0 + let counter = NIOLoopBoundBox(0, eventLoop: eventLoop) let repeatedTask = eventLoop.scheduleRepeatedAsyncTask( initialDelay: .milliseconds(10), delay: .milliseconds(10) ) { (_: RepeatedTask) in - counter += 1 + counter.value += 1 let p = eventLoop.makePromise(of: Void.self) eventLoop.scheduleTask(in: .milliseconds(10)) { p.succeed(()) @@ -344,59 +349,59 @@ public final class EventLoopTest: XCTestCase { eventLoop.run() } // t == 0: nothing - XCTAssertEqual(0, counter) + XCTAssertEqual(0, counter.value) // t == 5: nothing eventLoop.advanceTime(by: .milliseconds(5)) - XCTAssertEqual(0, counter) + XCTAssertEqual(0, counter.value) // t == 10: once eventLoop.advanceTime(by: .milliseconds(5)) - XCTAssertEqual(1, counter) + XCTAssertEqual(1, counter.value) // t == 15: still once eventLoop.advanceTime(by: .milliseconds(5)) - XCTAssertEqual(1, counter) + XCTAssertEqual(1, counter.value) // t == 20: still once (because the task takes 10ms to execute) eventLoop.advanceTime(by: .milliseconds(5)) - XCTAssertEqual(1, counter) + XCTAssertEqual(1, counter.value) // t == 25: still once (because the task takes 10ms to execute) eventLoop.advanceTime(by: .milliseconds(5)) - XCTAssertEqual(1, counter) + XCTAssertEqual(1, counter.value) // t == 30: twice eventLoop.advanceTime(by: .milliseconds(5)) - XCTAssertEqual(2, counter) + XCTAssertEqual(2, counter.value) // t == 40: twice eventLoop.advanceTime(by: .milliseconds(10)) - XCTAssertEqual(2, counter) + XCTAssertEqual(2, counter.value) // t == 50: three times eventLoop.advanceTime(by: .milliseconds(10)) - XCTAssertEqual(3, counter) + XCTAssertEqual(3, counter.value) // t == 60: three times eventLoop.advanceTime(by: .milliseconds(10)) - XCTAssertEqual(3, counter) + XCTAssertEqual(3, counter.value) // t == 89: four times eventLoop.advanceTime(by: .milliseconds(29)) - XCTAssertEqual(4, counter) + XCTAssertEqual(4, counter.value) // t == 90: five times eventLoop.advanceTime(by: .milliseconds(1)) - XCTAssertEqual(5, counter) + XCTAssertEqual(5, counter.value) repeatedTask.cancel() eventLoop.run() - XCTAssertEqual(5, counter) + XCTAssertEqual(5, counter.value) eventLoop.advanceTime(by: .hours(10)) - XCTAssertEqual(5, counter) + XCTAssertEqual(5, counter.value) } func testScheduledRepeatedAsyncTaskIsJittered() throws { @@ -437,7 +442,7 @@ public final class EventLoopTest: XCTestCase { XCTAssertTrue((minNumberOfExecutedTasks...maxNumberOfExecutedTasks).contains(counter.load(ordering: .relaxed))) } - public func testEventLoopGroupMakeIterator() throws { + func testEventLoopGroupMakeIterator() throws { let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount) defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) @@ -456,7 +461,7 @@ public final class EventLoopTest: XCTestCase { XCTAssertEqual(innerCounter, System.coreCount) } - public func testEventLoopMakeIterator() throws { + func testEventLoopMakeIterator() throws { let eventLoop = EmbeddedEventLoop() let iterator = eventLoop.makeIterator() defer { @@ -472,7 +477,7 @@ public final class EventLoopTest: XCTestCase { XCTAssertEqual(counter, 1) } - public func testMultipleShutdown() throws { + func testMultipleShutdown() throws { // This test catches a regression that causes it to intermittently fail: it reveals bugs in synchronous shutdown. // Do not ignore intermittent failures in this test! let threads = 8 @@ -507,7 +512,7 @@ public final class EventLoopTest: XCTestCase { try group.syncShutdownGracefully() } - public func testShuttingDownFailsRegistration() throws { + func testShuttingDownFailsRegistration() throws { // This test catches a regression where the selectable event loop would allow a socket registration while // it was nominally "shutting down". To do this, we take advantage of the fact that the event loop attempts // to cleanly shut down all the channels before it actually closes. We add a custom channel that we can use @@ -551,8 +556,7 @@ public final class EventLoopTest: XCTestCase { } } - let promiseQueue = DispatchQueue(label: "promiseQueue") - var promises: [EventLoopPromise] = [] + let promises = NIOLockedValueBox<[EventLoopPromise]>([]) let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) defer { @@ -566,11 +570,13 @@ public final class EventLoopTest: XCTestCase { let serverChannel = try assertNoThrowWithValue( ServerBootstrap(group: group) .childChannelInitializer { channel in - channel.pipeline.addHandler( - WedgeOpenHandler(channelActivePromise: serverChannelUp) { promise in - promiseQueue.sync { promises.append(promise) } - } - ) + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler( + WedgeOpenHandler(channelActivePromise: serverChannelUp) { promise in + promises.withLockedValue { $0.append(promise) } + } + ) + } } .bind(host: "127.0.0.1", port: 0).wait() ) @@ -580,12 +586,14 @@ public final class EventLoopTest: XCTestCase { let connectPromise = loop.makePromise(of: Void.self) // We're going to create and register a channel, but not actually attempt to do anything with it. - let wedgeHandler = WedgeOpenHandler { promise in - promiseQueue.sync { promises.append(promise) } - } let channel = try SocketChannel(eventLoop: loop, protocolFamily: .inet) try channel.eventLoop.submit { - channel.pipeline.addHandler(wedgeHandler).flatMap { + channel.eventLoop.makeCompletedFuture { + let wedgeHandler = WedgeOpenHandler { promise in + promises.withLockedValue { $0.append(promise) } + } + try channel.pipeline.syncOperations.addHandler(wedgeHandler) + }.flatMap { channel.register() }.flatMap { // connecting here to stop epoll from throwing EPOLLHUP at us @@ -621,15 +629,14 @@ public final class EventLoopTest: XCTestCase { XCTAssertEqual(.timedOut, g.wait(timeout: .now())) // Now let it close. - promiseQueue.sync { - for promise in promises { - promise.succeed(()) - } + let promisesToSucceed = promises.withLockedValue { $0 } + for promise in promisesToSucceed { + promise.succeed(()) } XCTAssertNoThrow(g.wait()) } - public func testEventLoopThreads() throws { + func testEventLoopThreads() throws { var counter = 0 let body: ThreadInitializer = { t in counter += 1 @@ -642,7 +649,7 @@ public final class EventLoopTest: XCTestCase { XCTAssertNoThrow(try group.syncShutdownGracefully()) } - public func testEventLoopPinned() throws { + func testEventLoopPinned() throws { #if os(Linux) || os(Android) let target = NIOThread.current.affinity.cpuIds.first! let body: ThreadInitializer = { t in @@ -658,7 +665,7 @@ public final class EventLoopTest: XCTestCase { #endif } - public func testEventLoopPinnedCPUIdsConstructor() throws { + func testEventLoopPinnedCPUIdsConstructor() throws { #if os(Linux) || os(Android) let target = NIOThread.current.affinity.cpuIds.first! let group = MultiThreadedEventLoopGroup(pinnedCPUIds: [target]) @@ -672,7 +679,7 @@ public final class EventLoopTest: XCTestCase { #endif } - public func testCurrentEventLoop() throws { + func testCurrentEventLoop() throws { class EventLoopHolder { weak var loop: EventLoop? init(_ loop: EventLoop) { @@ -716,21 +723,21 @@ public final class EventLoopTest: XCTestCase { } } - public func testShutdownWhileScheduledTasksNotReady() throws { + func testShutdownWhileScheduledTasksNotReady() throws { let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) let eventLoop = group.next() _ = eventLoop.scheduleTask(in: .hours(1)) {} try group.syncShutdownGracefully() } - public func testCloseFutureNotifiedBeforeUnblock() throws { - class AssertHandler: ChannelInboundHandler { + func testCloseFutureNotifiedBeforeUnblock() throws { + final class AssertHandler: ChannelInboundHandler, Sendable { typealias InboundIn = Any let groupIsShutdown = ManagedAtomic(false) let removed = ManagedAtomic(false) - public func handlerRemoved(context: ChannelHandlerContext) { + func handlerRemoved(context: ChannelHandlerContext) { XCTAssertFalse(groupIsShutdown.load(ordering: .relaxed)) XCTAssertTrue(removed.compareExchange(expected: false, desired: true, ordering: .relaxed).exchanged) } @@ -774,26 +781,30 @@ public final class EventLoopTest: XCTestCase { XCTAssertTrue(closeFutureFulfilledEventually.load(ordering: .relaxed)) } - public func testScheduleMultipleTasks() throws { + func testScheduleMultipleTasks() throws { let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } - var array = [(Int, NIODeadline)]() - let scheduled1 = eventLoopGroup.next().scheduleTask(in: .milliseconds(500)) { - array.append((1, .now())) + + let eventLoop = eventLoopGroup.next() + let array = try! eventLoop.submit { + NIOLoopBoundBox([(Int, NIODeadline)](), eventLoop: eventLoop) + }.wait() + let scheduled1 = eventLoop.scheduleTask(in: .milliseconds(500)) { + array.value.append((1, .now())) } - let scheduled2 = eventLoopGroup.next().scheduleTask(in: .milliseconds(100)) { - array.append((2, .now())) + let scheduled2 = eventLoop.scheduleTask(in: .milliseconds(100)) { + array.value.append((2, .now())) } - let scheduled3 = eventLoopGroup.next().scheduleTask(in: .milliseconds(1000)) { - array.append((3, .now())) + let scheduled3 = eventLoop.scheduleTask(in: .milliseconds(1000)) { + array.value.append((3, .now())) } - var result = try eventLoopGroup.next().scheduleTask(in: .milliseconds(1000)) { - array + var result = try eventLoop.scheduleTask(in: .milliseconds(1000)) { + array.value }.futureResult.wait() XCTAssertTrue(scheduled1.futureResult.isFulfilled) @@ -811,10 +822,9 @@ public final class EventLoopTest: XCTestCase { XCTAssertTrue(second.1 < third.1) XCTAssertTrue(result.isEmpty) - } - public func testRepeatedTaskThatIsImmediatelyCancelledNotifies() throws { + func testRepeatedTaskThatIsImmediatelyCancelledNotifies() throws { let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) @@ -842,7 +852,7 @@ public final class EventLoopTest: XCTestCase { XCTAssertEqual(res, .completed) } - public func testRepeatedTaskThatIsCancelledAfterRunningAtLeastTwiceNotifies() throws { + func testRepeatedTaskThatIsCancelledAfterRunningAtLeastTwiceNotifies() throws { let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) @@ -869,7 +879,7 @@ public final class EventLoopTest: XCTestCase { XCTAssertEqual(XCTWaiter.wait(for: [expect1, expect2], timeout: 1.0), .completed) } - public func testRepeatedTaskThatCancelsItselfNotifiesOnlyWhenFinished() throws { + func testRepeatedTaskThatCancelsItselfNotifiesOnlyWhenFinished() throws { let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) @@ -974,7 +984,7 @@ public final class EventLoopTest: XCTestCase { XCTAssertNoThrow(try group.syncShutdownGracefully()) } - class Thing { + final class Thing: Sendable { private let deallocated: ConditionLock init(_ deallocated: ConditionLock) { @@ -1258,7 +1268,7 @@ public final class EventLoopTest: XCTestCase { } func channelRead(context: ChannelHandlerContext, data: NIOAny) { - context.eventLoop.execute { + context.eventLoop.assumeIsolated().execute { self.groupToNotify.leave() } } @@ -1280,7 +1290,9 @@ public final class EventLoopTest: XCTestCase { .serverChannelOption(.autoRead, value: false) .serverChannelOption(.maxMessagesPerRead, value: 1) .childChannelInitializer { channel in - channel.pipeline.addHandler(ExecuteSomethingOnEventLoop(groupToNotify: g)) + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler(ExecuteSomethingOnEventLoop(groupToNotify: g)) + } } .bind(to: .init(ipAddress: "127.0.0.1", port: 0)) .wait() @@ -1397,8 +1409,7 @@ public final class EventLoopTest: XCTestCase { func testTakeOverThreadAndAlsoTakeItBack() { let currentNIOThread = NIOThread.current let currentNSThread = Thread.current - let lock = NIOLock() - var hasBeenShutdown = false + let hasBeenShutdown = NIOLockedValueBox(false) let allDoneGroup = DispatchGroup() allDoneGroup.enter() MultiThreadedEventLoopGroup.withCurrentThreadAsEventLoop { loop in @@ -1407,14 +1418,14 @@ public final class EventLoopTest: XCTestCase { XCTAssert(loop === MultiThreadedEventLoopGroup.currentEventLoop) loop.shutdownGracefully(queue: DispatchQueue.global()) { error in XCTAssertNil(error) - lock.withLock { - hasBeenShutdown = error == nil + hasBeenShutdown.withLockedValue { + $0 = error == nil } allDoneGroup.leave() } } allDoneGroup.wait() - XCTAssertTrue(lock.withLock { hasBeenShutdown }) + XCTAssertTrue(hasBeenShutdown.withLockedValue { $0 }) } func testThreadTakeoverUnsetsCurrentEventLoop() { @@ -1434,8 +1445,12 @@ public final class EventLoopTest: XCTestCase { final class SaveReceivedByte: ChannelInboundHandler { typealias InboundIn = ByteBuffer + init(received: NIOLockedValueBox) { + self.received = received + } + // For once, we don't need thread-safety as we're taking the calling thread :) - var received: UInt8? = nil + let received: NIOLockedValueBox var readCalls: Int = 0 var allDonePromise: EventLoopPromise? = nil @@ -1446,8 +1461,8 @@ public final class EventLoopTest: XCTestCase { var data = Self.unwrapInboundIn(data) XCTAssertEqual(1, data.readableBytes) - XCTAssertNil(self.received) - self.received = data.readInteger() + XCTAssertNil(self.received.withLockedValue { $0 }) + self.received.withLockedValue { $0 = data.readInteger() } self.allDonePromise?.succeed(()) @@ -1455,12 +1470,17 @@ public final class EventLoopTest: XCTestCase { } } - let receiveHandler = SaveReceivedByte() // There'll be just one connection, we can share. + let received = NIOLockedValueBox(nil) MultiThreadedEventLoopGroup.withCurrentThreadAsEventLoop { loop in + // There'll be just one connection, we can share. + let receiveHandler = NIOLoopBound(SaveReceivedByte(received: received), eventLoop: loop) + ServerBootstrap(group: loop) .serverChannelOption(ChannelOptions.socket(.init(SOL_SOCKET), .init(SO_REUSEADDR)), value: 1) .childChannelInitializer { accepted in - accepted.pipeline.addHandler(receiveHandler) + accepted.eventLoop.makeCompletedFuture { + try accepted.pipeline.syncOperations.addHandler(receiveHandler.value) + } } .bind(host: "127.0.0.1", port: 0) .flatMap { serverChannel in @@ -1469,9 +1489,9 @@ public final class EventLoopTest: XCTestCase { buffer.writeString("J") return clientChannel.writeAndFlush(buffer) }.flatMap { - XCTAssertNil(receiveHandler.allDonePromise) - receiveHandler.allDonePromise = loop.makePromise() - return receiveHandler.allDonePromise!.futureResult + XCTAssertNil(receiveHandler.value.allDonePromise) + receiveHandler.value.allDonePromise = loop.makePromise() + return receiveHandler.value.allDonePromise!.futureResult }.flatMap { serverChannel.close() } @@ -1489,7 +1509,7 @@ public final class EventLoopTest: XCTestCase { } // All done, the EventLoop is terminated so we should be able to check the results. - XCTAssertEqual(UInt8(ascii: "J"), receiveHandler.received) + XCTAssertEqual(UInt8(ascii: "J"), received.withLockedValue { $0 }) } func testWeFailOutstandingScheduledTasksOnELShutdown() { @@ -1558,7 +1578,7 @@ public final class EventLoopTest: XCTestCase { // (Crashing resulted from an EINVAL/IOException thrown by the kevent // syscall when the timeout value exceeded the maximum supported by // the Darwin kernel #1056). - public func testScheduleMaximum() { + func testScheduleMaximum() { let eventLoop = EmbeddedEventLoop() let maxAmount: TimeAmount = .nanoseconds(.max) let scheduled = eventLoop.scheduleTask(in: maxAmount) { true }