From 32f937add9b551c691fe7ba27f377372c28c34dc Mon Sep 17 00:00:00 2001 From: Vera Xia Date: Fri, 8 Aug 2025 09:49:51 -0700 Subject: [PATCH 1/5] add canary test application --- Package.swift | 9 + Source/Canary/Mqtt5Canary/Mqtt5Canary.swift | 535 ++++++++++++++++++++ 2 files changed, 544 insertions(+) create mode 100644 Source/Canary/Mqtt5Canary/Mqtt5Canary.swift diff --git a/Package.swift b/Package.swift index d0decfcc..9a564122 100644 --- a/Package.swift +++ b/Package.swift @@ -14,6 +14,7 @@ var package = Package( products: [ .library(name: "AwsCommonRuntimeKit", targets: ["AwsCommonRuntimeKit"]), .executable(name: "Elasticurl", targets: ["Elasticurl"]), + .executable(name: "Mqtt5Canary", targets: ["Mqtt5Canary"]), ], dependencies: [ // Arugment Parser Dependency for ElasticCurl @@ -371,5 +372,13 @@ packageTargets.append(contentsOf: [ ], path: "Source/Elasticurl" ), + .executableTarget( + name: "Mqtt5Canary", + dependencies: [ + "AwsCommonRuntimeKit", + .product(name: "ArgumentParser", package: "swift-argument-parser"), + ], + path: "Source/Canary/Mqtt5Canary" + ), ]) package.targets = packageTargets diff --git a/Source/Canary/Mqtt5Canary/Mqtt5Canary.swift b/Source/Canary/Mqtt5Canary/Mqtt5Canary.swift new file mode 100644 index 00000000..b81b58b6 --- /dev/null +++ b/Source/Canary/Mqtt5Canary/Mqtt5Canary.swift @@ -0,0 +1,535 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0. + +import ArgumentParser +import AwsCommonRuntimeKit +import Foundation +import _Concurrency + + +let NANO_SECOND: UInt64 = 1_000_000_000 +let SLEEP_OVERHEAD: UInt64 = 2_000_000 // 2ms overhead +let UNSUBSRIBE_TESTING_TOPIC = "testTopic1" + +typealias OnMqtt5CanaryTestFunction = (Mqtt5CanaryTestContext) -> Void + +enum CanaryTestError: Error { + case InvalidArgument +} + +class Mqtt5CanaryTestContext { + var mqtt5Clients: [Mqtt5CanaryClient] = [] + var statistic: Mqtt5CanaryStatistic + + init() { + statistic = Mqtt5CanaryStatistic() + } + + /// Setup Test Context + func appendClient(client: Mqtt5CanaryClient){ + mqtt5Clients.append(client) + } + + func setClientConnection(index: Int, connected: Bool){ + mqtt5Clients[index].setConnected(connected: connected) + } + + /// Client Operation Help Function + func mqtt5CanaryRunOperation(_ operation: Mqtt5CanaryOperation, _ index: Int) async throws{ + switch operation { + case Mqtt5CanaryOperation.START: + try await mqtt5CanaryOperationStart(clientIndex: index) + case Mqtt5CanaryOperation.STOP: + try await mqtt5CanaryOperationStop(clientIndex: index) + case Mqtt5CanaryOperation.SUBSSCRIBE: + try await mqtt5CanaryOperationSubscribe(clientIndex: index) + case Mqtt5CanaryOperation.UNSUBSCRIBE: + if (mqtt5Clients[index].subscriptionCount > 0) { + try await mqtt5CanaryOperationUnsubscribe(clientIndex: index, testTopic: mqtt5Clients[index].getUnsubscribedTopic()) + } else { + // If there is topic subscribed, fallthrough to UNSUBSSCRIBE_BAD operation + fallthrough + } + case Mqtt5CanaryOperation.UNSUBSSCRIBE_BAD: + try await mqtt5CanaryOperationUnsubscribe(clientIndex: index) + case Mqtt5CanaryOperation.PUBLISH_QOS0: + try await mqtt5CanaryOperationPublish(clientIndex: index, qos: QoS.atMostOnce) + case Mqtt5CanaryOperation.PUBLISH_QOS1: + try await mqtt5CanaryOperationPublish(clientIndex: index, qos: QoS.atLeastOnce) + case Mqtt5CanaryOperation.PUBLISH_TO_SUBSCRIBED_TOPIC_QOS0: + try await mqtt5CanaryOperationPublish(clientIndex: index, topic: mqtt5Clients[index].getSubscribedTopic(), qos: QoS.atMostOnce) + case Mqtt5CanaryOperation.PUBLISH_TO_SUBSCRIBED_TOPIC_QOS1: + try await mqtt5CanaryOperationPublish(clientIndex: index, topic: mqtt5Clients[index].getSubscribedTopic(), qos: QoS.atLeastOnce) + case Mqtt5CanaryOperation.PUBLISH_TO_SHARED_TOPIC_QOS0: + try await mqtt5CanaryOperationPublish(clientIndex: index, topic: mqtt5Clients[index].shared_topic, qos: QoS.atMostOnce) + case Mqtt5CanaryOperation.PUBLISH_TO_SHARED_TOPIC_QOS1: + try await mqtt5CanaryOperationPublish(clientIndex: index, topic: mqtt5Clients[index].shared_topic, qos: QoS.atLeastOnce) + + case Mqtt5CanaryOperation.NULL: + fallthrough + case Mqtt5CanaryOperation.DESTROY: + fallthrough + case Mqtt5CanaryOperation.OPERATION_COUNT: + throw CanaryTestError.InvalidArgument + } + + } + + /// Client Operations + func mqtt5CanaryOperationStart(clientIndex: Int) async throws{ + if !mqtt5Clients[clientIndex].is_connected { + await statistic.incrementTotalOperation() + try mqtt5Clients[clientIndex].client.start() + } + } + + func mqtt5CanaryOperationStop(clientIndex: Int) async throws { + if mqtt5Clients[clientIndex].is_connected { + await statistic.incrementTotalOperation() + try mqtt5Clients[clientIndex].client.stop() + // clean up the subscription count + mqtt5Clients[clientIndex].subscriptionCount = 0 + } + } + + func mqtt5CanaryOperationSubscribe(clientIndex: Int) async throws{ + if !mqtt5Clients[clientIndex].is_connected { + return try await mqtt5CanaryOperationStart(clientIndex: clientIndex) + } + + do { + let canaryClient = mqtt5Clients[clientIndex] + let sub = Subscription(topicFilter: canaryClient.getNextSubTopic(), qos: QoS.atLeastOnce) + var subscriptions = [sub] + + await self.statistic.incrementSubscribeAttempts() + await self.statistic.incrementTotalOperation() + + // If this is the first subscription of the client, subscribe to the shared topic + if(canaryClient.subscriptionCount == 1){ + subscriptions.append(Subscription(topicFilter: canaryClient.shared_topic, qos: QoS.atLeastOnce)) + } + + let suback = try await canaryClient.client.subscribe(subscribePacket: SubscribePacket(subscriptions: subscriptions)) + + if suback.reasonCodes[0] == SubackReasonCode.grantedQos1 { + await self.statistic.incrementSubscribeSucceed() + }else{ + await self.statistic.incrementSubscribeFailed() + } + }catch{ + await self.statistic.incrementSubscribeFailed() + } + } + + func mqtt5CanaryOperationUnsubscribe(clientIndex: Int, testTopic : String = UNSUBSRIBE_TESTING_TOPIC) async throws{ + if !mqtt5Clients[clientIndex].is_connected { + return try await mqtt5CanaryOperationStart(clientIndex: clientIndex) + } + + do { + let canaryClient = mqtt5Clients[clientIndex] + let unsub = UnsubscribePacket(topicFilter: testTopic) + + await self.statistic.incrementUnsubscribeAttempts() + await self.statistic.incrementTotalOperation() + + let unsubAck = try await canaryClient.client.unsubscribe(unsubscribePacket: unsub) + + if unsubAck.reasonCodes[0] == UnsubackReasonCode.success{ + await self.statistic.incrementUnsubscribeSucceed() + }else{ + await self.statistic.incrementUnsubscribeFailed() + print("Unsubscribe to topic \(testTopic), failed with reason \(unsubAck.reasonCodes[0])") + } + }catch{ + await self.statistic.incrementUnsubscribeFailed() + print("Unsubscribe to topic \(testTopic), operation failed") + } + } + + func mqtt5CanaryOperationPublish(clientIndex: Int, topic: String = UNSUBSRIBE_TESTING_TOPIC, qos: QoS) async throws{ + if !mqtt5Clients[clientIndex].is_connected { + return try await mqtt5CanaryOperationStart(clientIndex: clientIndex) + } + + do { + let canaryClient = mqtt5Clients[clientIndex] + let pub = PublishPacket(qos: qos, topic: topic) + + await self.statistic.incrementPublishAttempts() + await self.statistic.incrementTotalOperation() + let puback = try await canaryClient.client.publish(publishPacket: pub) + + if qos == QoS.atMostOnce || + puback.puback?.reasonCode == PubackReasonCode.success || + puback.puback?.reasonCode == PubackReasonCode.noMatchingSubscribers { + await self.statistic.incrementPublishSucceed() + }else{ + await self.statistic.incrementPublishFailed() + } + }catch{ + await self.statistic.incrementPublishFailed() + } + } + +} + +class Mqtt5CanaryClient { + fileprivate let client : Mqtt5Client + fileprivate let clientId : String + fileprivate let shared_topic: String + fileprivate var subscriptionCount: Int = 0 + fileprivate var is_connected: Bool = false + + + init(client: Mqtt5Client, clientId: String, sharedTopic: String) { + self.clientId = clientId + self.client = client + self.shared_topic = sharedTopic + } + + func getNextSubTopic() -> String { + self.subscriptionCount += 1 + return self.clientId + "_" + String(self.subscriptionCount); + } + + func getSubscribedTopic() -> String { + return self.clientId + "_" + String(self.subscriptionCount); + } + + func getUnsubscribedTopic() -> String { + let topic = self.clientId + "_" + String(self.subscriptionCount); + if(self.subscriptionCount > 0) { + self.subscriptionCount -= 1 + } + return topic + } + + func setConnected(connected: Bool){ + self.is_connected = connected + } + +} + +actor Mqtt5CanaryStatistic { + var totalOperation: Int = 0 + + var subscribeAttempt: Int = 0 + var subscribeSucceed: Int = 0 + var subscribeFailed: Int = 0 + + var publishAttempt: Int = 0 + var publishSucceed: Int = 0 + var publishFailed: Int = 0 + + var unsubAttempt: Int = 0 + var unsubSucceed: Int = 0 + var unsubFailed: Int = 0 + + init(){ + } + + func incrementTotalOperation(){ + self.totalOperation += 1; + } + + func incrementPublishAttempts(){ + self.publishAttempt += 1; + } + + func incrementPublishSucceed(){ + self.publishSucceed += 1; + } + + func incrementPublishFailed(){ + self.publishFailed += 1; + } + + func incrementSubscribeAttempts(){ + self.subscribeAttempt += 1; + } + + func incrementSubscribeSucceed(){ + self.subscribeSucceed += 1; + } + + func incrementSubscribeFailed(){ + self.subscribeFailed += 1; + } + + + func incrementUnsubscribeAttempts(){ + self.unsubAttempt += 1; + } + + func incrementUnsubscribeSucceed(){ + self.unsubSucceed += 1; + } + + func incrementUnsubscribeFailed(){ + self.unsubFailed += 1; + } + + func printStatistics(duration: TimeInterval) { + let subscribeSuccessRate = subscribeAttempt > 0 ? Double(subscribeSucceed) / Double(subscribeAttempt) * 100 : 0 + let publishSuccessRate = publishAttempt > 0 ? Double(publishSucceed) / Double(publishAttempt) * 100 : 0 + let unsubscribeSuccessRate = unsubAttempt > 0 ? Double(unsubSucceed) / Double(unsubAttempt) * 100 : 0 + let totalTPS = duration > 0 ? Double(totalOperation) / duration : 0 + + print("=== MQTT5 Canary Test Statistics ===") + print("Test Duration: \(String(format: "%.2f", duration)) seconds") + print("Total Operations: \(totalOperation)") + print("Total TPS: \(String(format: "%.2f", totalTPS))") + print("Subscribe - Attempts: \(subscribeAttempt), Success: \(subscribeSucceed), Failed: \(subscribeFailed), Success Rate: \(String(format: "%.2f", subscribeSuccessRate))%") + print("Publish - Attempts: \(publishAttempt), Success: \(publishSucceed), Failed: \(publishFailed), Success Rate: \(String(format: "%.2f", publishSuccessRate))%") + print("Unsubscribe - Attempts: \(unsubAttempt), Success: \(unsubSucceed), Failed: \(unsubFailed), Success Rate: \(String(format: "%.2f", unsubscribeSuccessRate))%") + } + +} + +enum Mqtt5CanaryOperation: Int{ + case NULL = 0 + case START + case STOP + case DESTROY + case SUBSSCRIBE + case UNSUBSCRIBE + case UNSUBSSCRIBE_BAD + case PUBLISH_QOS0 + case PUBLISH_QOS1 + case PUBLISH_TO_SUBSCRIBED_TOPIC_QOS0 + case PUBLISH_TO_SUBSCRIBED_TOPIC_QOS1 + case PUBLISH_TO_SHARED_TOPIC_QOS0 + case PUBLISH_TO_SHARED_TOPIC_QOS1 + case OPERATION_COUNT +} + +struct Mqtt5CanaryTestOptions { + // Client options + let shared_topic: String + var elg: EventLoopGroup + var boostrap: ClientBootstrap + var tlsctx: TLSContext? = nil + // Test options + var tpsSleepTime: UInt64 + var onWebsocketTransform:OnWebSocketHandshakeIntercept? = nil + var operationDistribution: [Mqtt5CanaryOperation] = [] + + init(testApp: Mqtt5Canary) throws{ + shared_topic = "shared_topic_" + UUID().uuidString + self.tpsSleepTime = testApp.tps > 0 ? (UInt64(NANO_SECOND / testApp.tps)) : 0 + print("tps sleep time: \(self.tpsSleepTime)") + // Initialize elg first + self.elg = try EventLoopGroup(threadCount: testApp.threads) + let resolver = try HostResolver( + eventLoopGroup: self.elg, + maxHosts: 8, + maxTTL: 30) + self.boostrap = try ClientBootstrap( + eventLoopGroup: self.elg, + hostResolver: resolver) + + try self.setupClientOption(testApp: testApp) + + // Setup operation distribution + Mqtt5CanaryOperationDistributionSetup(&self.operationDistribution) + } + + + mutating func setupClientOption(testApp: Mqtt5Canary) throws{ + if (testApp.useTls && self.tlsctx == nil){ + guard let _cert = testApp.cert, let _key = testApp.key else { + throw CanaryTestError.InvalidArgument + } + let tlsOptions = try TLSContextOptions.makeMTLS( + certificatePath: _cert, + privateKeyPath: _key + ) + + if let _caPath = testApp.caPath { + try tlsOptions.overrideDefaultTrustStoreWithFile(caFile: _caPath) + } + self.tlsctx = try TLSContext(options: tlsOptions, mode: .client) + } + + if (testApp.useWebsocket){ + self.onWebsocketTransform = { request, complete in + // Simply complete the request with success + complete(request, 0) + } + } + } +} + +func Mqtt5CanaryOperationDistributionSetup(_ distributionDataSet : inout [Mqtt5CanaryOperation]){ + let operationDistribution = [ + (Mqtt5CanaryOperation.STOP, 1), + (Mqtt5CanaryOperation.SUBSSCRIBE, 200), + (Mqtt5CanaryOperation.UNSUBSCRIBE, 200), + (Mqtt5CanaryOperation.UNSUBSSCRIBE_BAD, 100), + (Mqtt5CanaryOperation.PUBLISH_QOS0, 300), + (Mqtt5CanaryOperation.PUBLISH_QOS1, 150), + (Mqtt5CanaryOperation.PUBLISH_TO_SUBSCRIBED_TOPIC_QOS0, 100), + (Mqtt5CanaryOperation.PUBLISH_TO_SUBSCRIBED_TOPIC_QOS1, 50), + (Mqtt5CanaryOperation.PUBLISH_TO_SHARED_TOPIC_QOS0, 50), + (Mqtt5CanaryOperation.PUBLISH_TO_SHARED_TOPIC_QOS1, 50) + ] + + + for distribution in operationDistribution { + for _ in 0.. Void { + let operationIndex = Int.random(in : 0.. SLEEP_OVERHEAD { + try await Task.sleep(nanoseconds: testOptions.tpsSleepTime - SLEEP_OVERHEAD) + } + let targetTime = Date().addingTimeInterval(TimeInterval(testOptions.tpsSleepTime / NANO_SECOND)) + while Date() < targetTime { + // Busy-wait for precision sleep + } + } + + // Close and clean up the clients + for index: Int in 0.. Date: Fri, 8 Aug 2025 09:53:40 -0700 Subject: [PATCH 2/5] remove prints & lint --- Source/Canary/Mqtt5Canary/Mqtt5Canary.swift | 972 ++++++++++---------- 1 file changed, 497 insertions(+), 475 deletions(-) diff --git a/Source/Canary/Mqtt5Canary/Mqtt5Canary.swift b/Source/Canary/Mqtt5Canary/Mqtt5Canary.swift index b81b58b6..30280269 100644 --- a/Source/Canary/Mqtt5Canary/Mqtt5Canary.swift +++ b/Source/Canary/Mqtt5Canary/Mqtt5Canary.swift @@ -6,530 +6,552 @@ import AwsCommonRuntimeKit import Foundation import _Concurrency - let NANO_SECOND: UInt64 = 1_000_000_000 -let SLEEP_OVERHEAD: UInt64 = 2_000_000 // 2ms overhead +let SLEEP_OVERHEAD: UInt64 = 2_000_000 // 2ms overhead let UNSUBSRIBE_TESTING_TOPIC = "testTopic1" typealias OnMqtt5CanaryTestFunction = (Mqtt5CanaryTestContext) -> Void enum CanaryTestError: Error { - case InvalidArgument + case InvalidArgument } class Mqtt5CanaryTestContext { - var mqtt5Clients: [Mqtt5CanaryClient] = [] - var statistic: Mqtt5CanaryStatistic - - init() { - statistic = Mqtt5CanaryStatistic() - } - - /// Setup Test Context - func appendClient(client: Mqtt5CanaryClient){ - mqtt5Clients.append(client) - } - - func setClientConnection(index: Int, connected: Bool){ - mqtt5Clients[index].setConnected(connected: connected) - } + var mqtt5Clients: [Mqtt5CanaryClient] = [] + var statistic: Mqtt5CanaryStatistic - /// Client Operation Help Function - func mqtt5CanaryRunOperation(_ operation: Mqtt5CanaryOperation, _ index: Int) async throws{ - switch operation { - case Mqtt5CanaryOperation.START: - try await mqtt5CanaryOperationStart(clientIndex: index) - case Mqtt5CanaryOperation.STOP: - try await mqtt5CanaryOperationStop(clientIndex: index) - case Mqtt5CanaryOperation.SUBSSCRIBE: - try await mqtt5CanaryOperationSubscribe(clientIndex: index) - case Mqtt5CanaryOperation.UNSUBSCRIBE: - if (mqtt5Clients[index].subscriptionCount > 0) { - try await mqtt5CanaryOperationUnsubscribe(clientIndex: index, testTopic: mqtt5Clients[index].getUnsubscribedTopic()) - } else { - // If there is topic subscribed, fallthrough to UNSUBSSCRIBE_BAD operation - fallthrough - } - case Mqtt5CanaryOperation.UNSUBSSCRIBE_BAD: - try await mqtt5CanaryOperationUnsubscribe(clientIndex: index) - case Mqtt5CanaryOperation.PUBLISH_QOS0: - try await mqtt5CanaryOperationPublish(clientIndex: index, qos: QoS.atMostOnce) - case Mqtt5CanaryOperation.PUBLISH_QOS1: - try await mqtt5CanaryOperationPublish(clientIndex: index, qos: QoS.atLeastOnce) - case Mqtt5CanaryOperation.PUBLISH_TO_SUBSCRIBED_TOPIC_QOS0: - try await mqtt5CanaryOperationPublish(clientIndex: index, topic: mqtt5Clients[index].getSubscribedTopic(), qos: QoS.atMostOnce) - case Mqtt5CanaryOperation.PUBLISH_TO_SUBSCRIBED_TOPIC_QOS1: - try await mqtt5CanaryOperationPublish(clientIndex: index, topic: mqtt5Clients[index].getSubscribedTopic(), qos: QoS.atLeastOnce) - case Mqtt5CanaryOperation.PUBLISH_TO_SHARED_TOPIC_QOS0: - try await mqtt5CanaryOperationPublish(clientIndex: index, topic: mqtt5Clients[index].shared_topic, qos: QoS.atMostOnce) - case Mqtt5CanaryOperation.PUBLISH_TO_SHARED_TOPIC_QOS1: - try await mqtt5CanaryOperationPublish(clientIndex: index, topic: mqtt5Clients[index].shared_topic, qos: QoS.atLeastOnce) - - case Mqtt5CanaryOperation.NULL: - fallthrough - case Mqtt5CanaryOperation.DESTROY: - fallthrough - case Mqtt5CanaryOperation.OPERATION_COUNT: - throw CanaryTestError.InvalidArgument - } - - } - - /// Client Operations - func mqtt5CanaryOperationStart(clientIndex: Int) async throws{ - if !mqtt5Clients[clientIndex].is_connected { - await statistic.incrementTotalOperation() - try mqtt5Clients[clientIndex].client.start() - } + init() { + statistic = Mqtt5CanaryStatistic() + } + + /// Setup Test Context + func appendClient(client: Mqtt5CanaryClient) { + mqtt5Clients.append(client) + } + + func setClientConnection(index: Int, connected: Bool) { + mqtt5Clients[index].setConnected(connected: connected) + } + + /// Client Operation Help Function + func mqtt5CanaryRunOperation(_ operation: Mqtt5CanaryOperation, _ index: Int) async throws { + switch operation { + case Mqtt5CanaryOperation.START: + try await mqtt5CanaryOperationStart(clientIndex: index) + case Mqtt5CanaryOperation.STOP: + try await mqtt5CanaryOperationStop(clientIndex: index) + case Mqtt5CanaryOperation.SUBSSCRIBE: + try await mqtt5CanaryOperationSubscribe(clientIndex: index) + case Mqtt5CanaryOperation.UNSUBSCRIBE: + if (mqtt5Clients[index].subscriptionCount > 0) { + try await mqtt5CanaryOperationUnsubscribe( + clientIndex: index, testTopic: mqtt5Clients[index].getUnsubscribedTopic()) + } else { + // If there is topic subscribed, fallthrough to UNSUBSSCRIBE_BAD operation + fallthrough + } + case Mqtt5CanaryOperation.UNSUBSSCRIBE_BAD: + try await mqtt5CanaryOperationUnsubscribe(clientIndex: index) + case Mqtt5CanaryOperation.PUBLISH_QOS0: + try await mqtt5CanaryOperationPublish(clientIndex: index, qos: QoS.atMostOnce) + case Mqtt5CanaryOperation.PUBLISH_QOS1: + try await mqtt5CanaryOperationPublish(clientIndex: index, qos: QoS.atLeastOnce) + case Mqtt5CanaryOperation.PUBLISH_TO_SUBSCRIBED_TOPIC_QOS0: + try await mqtt5CanaryOperationPublish( + clientIndex: index, topic: mqtt5Clients[index].getSubscribedTopic(), qos: QoS.atMostOnce) + case Mqtt5CanaryOperation.PUBLISH_TO_SUBSCRIBED_TOPIC_QOS1: + try await mqtt5CanaryOperationPublish( + clientIndex: index, topic: mqtt5Clients[index].getSubscribedTopic(), qos: QoS.atLeastOnce) + case Mqtt5CanaryOperation.PUBLISH_TO_SHARED_TOPIC_QOS0: + try await mqtt5CanaryOperationPublish( + clientIndex: index, topic: mqtt5Clients[index].shared_topic, qos: QoS.atMostOnce) + case Mqtt5CanaryOperation.PUBLISH_TO_SHARED_TOPIC_QOS1: + try await mqtt5CanaryOperationPublish( + clientIndex: index, topic: mqtt5Clients[index].shared_topic, qos: QoS.atLeastOnce) + + case Mqtt5CanaryOperation.NULL: + fallthrough + case Mqtt5CanaryOperation.DESTROY: + fallthrough + case Mqtt5CanaryOperation.OPERATION_COUNT: + throw CanaryTestError.InvalidArgument } - - func mqtt5CanaryOperationStop(clientIndex: Int) async throws { - if mqtt5Clients[clientIndex].is_connected { - await statistic.incrementTotalOperation() - try mqtt5Clients[clientIndex].client.stop() - // clean up the subscription count - mqtt5Clients[clientIndex].subscriptionCount = 0 - } + + } + + /// Client Operations + func mqtt5CanaryOperationStart(clientIndex: Int) async throws { + if !mqtt5Clients[clientIndex].is_connected { + await statistic.incrementTotalOperation() + try mqtt5Clients[clientIndex].client.start() } + } + + func mqtt5CanaryOperationStop(clientIndex: Int) async throws { + if mqtt5Clients[clientIndex].is_connected { + await statistic.incrementTotalOperation() + try mqtt5Clients[clientIndex].client.stop() + // clean up the subscription count + mqtt5Clients[clientIndex].subscriptionCount = 0 + } + } - func mqtt5CanaryOperationSubscribe(clientIndex: Int) async throws{ - if !mqtt5Clients[clientIndex].is_connected { - return try await mqtt5CanaryOperationStart(clientIndex: clientIndex) - } - - do { - let canaryClient = mqtt5Clients[clientIndex] - let sub = Subscription(topicFilter: canaryClient.getNextSubTopic(), qos: QoS.atLeastOnce) - var subscriptions = [sub] - - await self.statistic.incrementSubscribeAttempts() - await self.statistic.incrementTotalOperation() - - // If this is the first subscription of the client, subscribe to the shared topic - if(canaryClient.subscriptionCount == 1){ - subscriptions.append(Subscription(topicFilter: canaryClient.shared_topic, qos: QoS.atLeastOnce)) - } - - let suback = try await canaryClient.client.subscribe(subscribePacket: SubscribePacket(subscriptions: subscriptions)) - - if suback.reasonCodes[0] == SubackReasonCode.grantedQos1 { - await self.statistic.incrementSubscribeSucceed() - }else{ - await self.statistic.incrementSubscribeFailed() - } - }catch{ - await self.statistic.incrementSubscribeFailed() - } + func mqtt5CanaryOperationSubscribe(clientIndex: Int) async throws { + if !mqtt5Clients[clientIndex].is_connected { + return try await mqtt5CanaryOperationStart(clientIndex: clientIndex) } - - func mqtt5CanaryOperationUnsubscribe(clientIndex: Int, testTopic : String = UNSUBSRIBE_TESTING_TOPIC) async throws{ - if !mqtt5Clients[clientIndex].is_connected { - return try await mqtt5CanaryOperationStart(clientIndex: clientIndex) - } - - do { - let canaryClient = mqtt5Clients[clientIndex] - let unsub = UnsubscribePacket(topicFilter: testTopic) - - await self.statistic.incrementUnsubscribeAttempts() - await self.statistic.incrementTotalOperation() - - let unsubAck = try await canaryClient.client.unsubscribe(unsubscribePacket: unsub) - - if unsubAck.reasonCodes[0] == UnsubackReasonCode.success{ - await self.statistic.incrementUnsubscribeSucceed() - }else{ - await self.statistic.incrementUnsubscribeFailed() - print("Unsubscribe to topic \(testTopic), failed with reason \(unsubAck.reasonCodes[0])") - } - }catch{ - await self.statistic.incrementUnsubscribeFailed() - print("Unsubscribe to topic \(testTopic), operation failed") - } + + do { + let canaryClient = mqtt5Clients[clientIndex] + let sub = Subscription(topicFilter: canaryClient.getNextSubTopic(), qos: QoS.atLeastOnce) + var subscriptions = [sub] + + await self.statistic.incrementSubscribeAttempts() + await self.statistic.incrementTotalOperation() + + // If this is the first subscription of the client, subscribe to the shared topic + if (canaryClient.subscriptionCount == 1) { + subscriptions.append( + Subscription(topicFilter: canaryClient.shared_topic, qos: QoS.atLeastOnce)) + } + + let suback = try await canaryClient.client.subscribe( + subscribePacket: SubscribePacket(subscriptions: subscriptions)) + + if suback.reasonCodes[0] == SubackReasonCode.grantedQos1 { + await self.statistic.incrementSubscribeSucceed() + } else { + await self.statistic.incrementSubscribeFailed() + } + } catch { + await self.statistic.incrementSubscribeFailed() + } + } + + func mqtt5CanaryOperationUnsubscribe( + clientIndex: Int, testTopic: String = UNSUBSRIBE_TESTING_TOPIC + ) async throws { + if !mqtt5Clients[clientIndex].is_connected { + return try await mqtt5CanaryOperationStart(clientIndex: clientIndex) } - - func mqtt5CanaryOperationPublish(clientIndex: Int, topic: String = UNSUBSRIBE_TESTING_TOPIC, qos: QoS) async throws{ - if !mqtt5Clients[clientIndex].is_connected { - return try await mqtt5CanaryOperationStart(clientIndex: clientIndex) - } - - do { - let canaryClient = mqtt5Clients[clientIndex] - let pub = PublishPacket(qos: qos, topic: topic) - - await self.statistic.incrementPublishAttempts() - await self.statistic.incrementTotalOperation() - let puback = try await canaryClient.client.publish(publishPacket: pub) - - if qos == QoS.atMostOnce || - puback.puback?.reasonCode == PubackReasonCode.success || - puback.puback?.reasonCode == PubackReasonCode.noMatchingSubscribers { - await self.statistic.incrementPublishSucceed() - }else{ - await self.statistic.incrementPublishFailed() - } - }catch{ - await self.statistic.incrementPublishFailed() - } + + do { + let canaryClient = mqtt5Clients[clientIndex] + let unsub = UnsubscribePacket(topicFilter: testTopic) + + await self.statistic.incrementUnsubscribeAttempts() + await self.statistic.incrementTotalOperation() + + let unsubAck = try await canaryClient.client.unsubscribe(unsubscribePacket: unsub) + + if unsubAck.reasonCodes[0] == UnsubackReasonCode.success { + await self.statistic.incrementUnsubscribeSucceed() + } else { + await self.statistic.incrementUnsubscribeFailed() + } + } catch { + await self.statistic.incrementUnsubscribeFailed() + } + } + + func mqtt5CanaryOperationPublish( + clientIndex: Int, topic: String = UNSUBSRIBE_TESTING_TOPIC, qos: QoS + ) async throws { + if !mqtt5Clients[clientIndex].is_connected { + return try await mqtt5CanaryOperationStart(clientIndex: clientIndex) } - + + do { + let canaryClient = mqtt5Clients[clientIndex] + let pub = PublishPacket(qos: qos, topic: topic) + + await self.statistic.incrementPublishAttempts() + await self.statistic.incrementTotalOperation() + let puback = try await canaryClient.client.publish(publishPacket: pub) + + if qos == QoS.atMostOnce || puback.puback?.reasonCode == PubackReasonCode.success + || puback.puback?.reasonCode == PubackReasonCode.noMatchingSubscribers + { + await self.statistic.incrementPublishSucceed() + } else { + await self.statistic.incrementPublishFailed() + } + } catch { + await self.statistic.incrementPublishFailed() + } + } + } class Mqtt5CanaryClient { - fileprivate let client : Mqtt5Client - fileprivate let clientId : String - fileprivate let shared_topic: String - fileprivate var subscriptionCount: Int = 0 - fileprivate var is_connected: Bool = false - - - init(client: Mqtt5Client, clientId: String, sharedTopic: String) { - self.clientId = clientId - self.client = client - self.shared_topic = sharedTopic - } - - func getNextSubTopic() -> String { - self.subscriptionCount += 1 - return self.clientId + "_" + String(self.subscriptionCount); - } - - func getSubscribedTopic() -> String { - return self.clientId + "_" + String(self.subscriptionCount); - } - - func getUnsubscribedTopic() -> String { - let topic = self.clientId + "_" + String(self.subscriptionCount); - if(self.subscriptionCount > 0) { - self.subscriptionCount -= 1 - } - return topic - } - - func setConnected(connected: Bool){ - self.is_connected = connected + fileprivate let client: Mqtt5Client + fileprivate let clientId: String + fileprivate let shared_topic: String + fileprivate var subscriptionCount: Int = 0 + fileprivate var is_connected: Bool = false + + init(client: Mqtt5Client, clientId: String, sharedTopic: String) { + self.clientId = clientId + self.client = client + self.shared_topic = sharedTopic + } + + func getNextSubTopic() -> String { + self.subscriptionCount += 1 + return self.clientId + "_" + String(self.subscriptionCount); + } + + func getSubscribedTopic() -> String { + return self.clientId + "_" + String(self.subscriptionCount); + } + + func getUnsubscribedTopic() -> String { + let topic = self.clientId + "_" + String(self.subscriptionCount); + if (self.subscriptionCount > 0) { + self.subscriptionCount -= 1 } - + return topic + } + + func setConnected(connected: Bool) { + self.is_connected = connected + } + } actor Mqtt5CanaryStatistic { - var totalOperation: Int = 0 - - var subscribeAttempt: Int = 0 - var subscribeSucceed: Int = 0 - var subscribeFailed: Int = 0 - - var publishAttempt: Int = 0 - var publishSucceed: Int = 0 - var publishFailed: Int = 0 - - var unsubAttempt: Int = 0 - var unsubSucceed: Int = 0 - var unsubFailed: Int = 0 - - init(){ - } - - func incrementTotalOperation(){ - self.totalOperation += 1; - } - - func incrementPublishAttempts(){ - self.publishAttempt += 1; - } - - func incrementPublishSucceed(){ - self.publishSucceed += 1; - } + var totalOperation: Int = 0 - func incrementPublishFailed(){ - self.publishFailed += 1; - } - - func incrementSubscribeAttempts(){ - self.subscribeAttempt += 1; - } - - func incrementSubscribeSucceed(){ - self.subscribeSucceed += 1; - } + var subscribeAttempt: Int = 0 + var subscribeSucceed: Int = 0 + var subscribeFailed: Int = 0 - func incrementSubscribeFailed(){ - self.subscribeFailed += 1; - } - - - func incrementUnsubscribeAttempts(){ - self.unsubAttempt += 1; - } - - func incrementUnsubscribeSucceed(){ - self.unsubSucceed += 1; - } + var publishAttempt: Int = 0 + var publishSucceed: Int = 0 + var publishFailed: Int = 0 + + var unsubAttempt: Int = 0 + var unsubSucceed: Int = 0 + var unsubFailed: Int = 0 + + init() { + } + + func incrementTotalOperation() { + self.totalOperation += 1; + } + + func incrementPublishAttempts() { + self.publishAttempt += 1; + } + + func incrementPublishSucceed() { + self.publishSucceed += 1; + } + + func incrementPublishFailed() { + self.publishFailed += 1; + } + + func incrementSubscribeAttempts() { + self.subscribeAttempt += 1; + } + + func incrementSubscribeSucceed() { + self.subscribeSucceed += 1; + } + + func incrementSubscribeFailed() { + self.subscribeFailed += 1; + } + + func incrementUnsubscribeAttempts() { + self.unsubAttempt += 1; + } + + func incrementUnsubscribeSucceed() { + self.unsubSucceed += 1; + } + + func incrementUnsubscribeFailed() { + self.unsubFailed += 1; + } + + func printStatistics(duration: TimeInterval) { + let subscribeSuccessRate = + subscribeAttempt > 0 ? Double(subscribeSucceed) / Double(subscribeAttempt) * 100 : 0 + let publishSuccessRate = + publishAttempt > 0 ? Double(publishSucceed) / Double(publishAttempt) * 100 : 0 + let unsubscribeSuccessRate = + unsubAttempt > 0 ? Double(unsubSucceed) / Double(unsubAttempt) * 100 : 0 + let totalTPS = duration > 0 ? Double(totalOperation) / duration : 0 + + print("=== MQTT5 Canary Test Statistics ===") + print("Test Duration: \(String(format: "%.2f", duration)) seconds") + print("Total Operations: \(totalOperation)") + print("Total TPS: \(String(format: "%.2f", totalTPS))") + print( + "Subscribe - Attempts: \(subscribeAttempt), Success: \(subscribeSucceed), Failed: \(subscribeFailed), Success Rate: \(String(format: "%.2f", subscribeSuccessRate))%" + ) + print( + "Publish - Attempts: \(publishAttempt), Success: \(publishSucceed), Failed: \(publishFailed), Success Rate: \(String(format: "%.2f", publishSuccessRate))%" + ) + print( + "Unsubscribe - Attempts: \(unsubAttempt), Success: \(unsubSucceed), Failed: \(unsubFailed), Success Rate: \(String(format: "%.2f", unsubscribeSuccessRate))%" + ) + } - func incrementUnsubscribeFailed(){ - self.unsubFailed += 1; - } - - func printStatistics(duration: TimeInterval) { - let subscribeSuccessRate = subscribeAttempt > 0 ? Double(subscribeSucceed) / Double(subscribeAttempt) * 100 : 0 - let publishSuccessRate = publishAttempt > 0 ? Double(publishSucceed) / Double(publishAttempt) * 100 : 0 - let unsubscribeSuccessRate = unsubAttempt > 0 ? Double(unsubSucceed) / Double(unsubAttempt) * 100 : 0 - let totalTPS = duration > 0 ? Double(totalOperation) / duration : 0 - - print("=== MQTT5 Canary Test Statistics ===") - print("Test Duration: \(String(format: "%.2f", duration)) seconds") - print("Total Operations: \(totalOperation)") - print("Total TPS: \(String(format: "%.2f", totalTPS))") - print("Subscribe - Attempts: \(subscribeAttempt), Success: \(subscribeSucceed), Failed: \(subscribeFailed), Success Rate: \(String(format: "%.2f", subscribeSuccessRate))%") - print("Publish - Attempts: \(publishAttempt), Success: \(publishSucceed), Failed: \(publishFailed), Success Rate: \(String(format: "%.2f", publishSuccessRate))%") - print("Unsubscribe - Attempts: \(unsubAttempt), Success: \(unsubSucceed), Failed: \(unsubFailed), Success Rate: \(String(format: "%.2f", unsubscribeSuccessRate))%") - } - } -enum Mqtt5CanaryOperation: Int{ - case NULL = 0 - case START - case STOP - case DESTROY - case SUBSSCRIBE - case UNSUBSCRIBE - case UNSUBSSCRIBE_BAD - case PUBLISH_QOS0 - case PUBLISH_QOS1 - case PUBLISH_TO_SUBSCRIBED_TOPIC_QOS0 - case PUBLISH_TO_SUBSCRIBED_TOPIC_QOS1 - case PUBLISH_TO_SHARED_TOPIC_QOS0 - case PUBLISH_TO_SHARED_TOPIC_QOS1 - case OPERATION_COUNT +enum Mqtt5CanaryOperation: Int { + case NULL = 0 + case START + case STOP + case DESTROY + case SUBSSCRIBE + case UNSUBSCRIBE + case UNSUBSSCRIBE_BAD + case PUBLISH_QOS0 + case PUBLISH_QOS1 + case PUBLISH_TO_SUBSCRIBED_TOPIC_QOS0 + case PUBLISH_TO_SUBSCRIBED_TOPIC_QOS1 + case PUBLISH_TO_SHARED_TOPIC_QOS0 + case PUBLISH_TO_SHARED_TOPIC_QOS1 + case OPERATION_COUNT } struct Mqtt5CanaryTestOptions { - // Client options - let shared_topic: String - var elg: EventLoopGroup - var boostrap: ClientBootstrap - var tlsctx: TLSContext? = nil - // Test options - var tpsSleepTime: UInt64 - var onWebsocketTransform:OnWebSocketHandshakeIntercept? = nil - var operationDistribution: [Mqtt5CanaryOperation] = [] - - init(testApp: Mqtt5Canary) throws{ - shared_topic = "shared_topic_" + UUID().uuidString - self.tpsSleepTime = testApp.tps > 0 ? (UInt64(NANO_SECOND / testApp.tps)) : 0 - print("tps sleep time: \(self.tpsSleepTime)") - // Initialize elg first - self.elg = try EventLoopGroup(threadCount: testApp.threads) - let resolver = try HostResolver( - eventLoopGroup: self.elg, - maxHosts: 8, - maxTTL: 30) - self.boostrap = try ClientBootstrap( - eventLoopGroup: self.elg, - hostResolver: resolver) - - try self.setupClientOption(testApp: testApp) - - // Setup operation distribution - Mqtt5CanaryOperationDistributionSetup(&self.operationDistribution) - } + // Client options + let shared_topic: String + var elg: EventLoopGroup + var boostrap: ClientBootstrap + var tlsctx: TLSContext? = nil + // Test options + var tpsSleepTime: UInt64 + var onWebsocketTransform: OnWebSocketHandshakeIntercept? = nil + var operationDistribution: [Mqtt5CanaryOperation] = [] + + init(testApp: Mqtt5Canary) throws { + shared_topic = "shared_topic_" + UUID().uuidString + self.tpsSleepTime = testApp.tps > 0 ? (UInt64(NANO_SECOND / testApp.tps)) : 0 + // Initialize elg first + self.elg = try EventLoopGroup(threadCount: testApp.threads) + let resolver = try HostResolver( + eventLoopGroup: self.elg, + maxHosts: 8, + maxTTL: 30) + self.boostrap = try ClientBootstrap( + eventLoopGroup: self.elg, + hostResolver: resolver) + + try self.setupClientOption(testApp: testApp) + // Setup operation distribution + Mqtt5CanaryOperationDistributionSetup(&self.operationDistribution) + } - mutating func setupClientOption(testApp: Mqtt5Canary) throws{ - if (testApp.useTls && self.tlsctx == nil){ - guard let _cert = testApp.cert, let _key = testApp.key else { - throw CanaryTestError.InvalidArgument - } - let tlsOptions = try TLSContextOptions.makeMTLS( - certificatePath: _cert, - privateKeyPath: _key - ) - - if let _caPath = testApp.caPath { - try tlsOptions.overrideDefaultTrustStoreWithFile(caFile: _caPath) - } - self.tlsctx = try TLSContext(options: tlsOptions, mode: .client) - } - - if (testApp.useWebsocket){ - self.onWebsocketTransform = { request, complete in - // Simply complete the request with success - complete(request, 0) - } - } + mutating func setupClientOption(testApp: Mqtt5Canary) throws { + if (testApp.useTls && self.tlsctx == nil) { + guard let _cert = testApp.cert, let _key = testApp.key else { + throw CanaryTestError.InvalidArgument + } + let tlsOptions = try TLSContextOptions.makeMTLS( + certificatePath: _cert, + privateKeyPath: _key + ) + + if let _caPath = testApp.caPath { + try tlsOptions.overrideDefaultTrustStoreWithFile(caFile: _caPath) + } + self.tlsctx = try TLSContext(options: tlsOptions, mode: .client) + } + + if (testApp.useWebsocket) { + self.onWebsocketTransform = { request, complete in + // Simply complete the request with success + complete(request, 0) + } } + } } -func Mqtt5CanaryOperationDistributionSetup(_ distributionDataSet : inout [Mqtt5CanaryOperation]){ - let operationDistribution = [ - (Mqtt5CanaryOperation.STOP, 1), - (Mqtt5CanaryOperation.SUBSSCRIBE, 200), - (Mqtt5CanaryOperation.UNSUBSCRIBE, 200), - (Mqtt5CanaryOperation.UNSUBSSCRIBE_BAD, 100), - (Mqtt5CanaryOperation.PUBLISH_QOS0, 300), - (Mqtt5CanaryOperation.PUBLISH_QOS1, 150), - (Mqtt5CanaryOperation.PUBLISH_TO_SUBSCRIBED_TOPIC_QOS0, 100), - (Mqtt5CanaryOperation.PUBLISH_TO_SUBSCRIBED_TOPIC_QOS1, 50), - (Mqtt5CanaryOperation.PUBLISH_TO_SHARED_TOPIC_QOS0, 50), - (Mqtt5CanaryOperation.PUBLISH_TO_SHARED_TOPIC_QOS1, 50) - ] - - - for distribution in operationDistribution { - for _ in 0.. Void { - let operationIndex = Int.random(in : 0.. Void { + let operationIndex = Int.random(in: 0.. SLEEP_OVERHEAD { + try await Task.sleep(nanoseconds: testOptions.tpsSleepTime - SLEEP_OVERHEAD) + } + let targetTime = Date().addingTimeInterval( + TimeInterval(testOptions.tpsSleepTime / NANO_SECOND)) + while Date() < targetTime { + // Busy-wait for precision sleep + } } - - mutating func run() async throws { - CommonRuntimeKit.initialize() - // Enable logging - let logLevel = LogLevel.fromString(string: self.verbose) - if let logFile = self.logFile { - print("enable logging with trace file") - try? Logger.initialize(target: .filePath(logFile), level: logLevel) - } else { - print("enable logging with stdout") - try? Logger.initialize(target: .standardOutput, level: logLevel) - } - - // Setup test options - let testOptions = try Mqtt5CanaryTestOptions(testApp: self) - // Setup test clients - let context: Mqtt5CanaryTestContext = Mqtt5CanaryTestContext() - for index: Int in 0.. SLEEP_OVERHEAD { - try await Task.sleep(nanoseconds: testOptions.tpsSleepTime - SLEEP_OVERHEAD) - } - let targetTime = Date().addingTimeInterval(TimeInterval(testOptions.tpsSleepTime / NANO_SECOND)) - while Date() < targetTime { - // Busy-wait for precision sleep - } - } - - // Close and clean up the clients - for index: Int in 0.. Date: Fri, 8 Aug 2025 11:50:42 -0700 Subject: [PATCH 3/5] fix typo and swift6 --- Source/Canary/Mqtt5Canary/Mqtt5Canary.swift | 22 +++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/Source/Canary/Mqtt5Canary/Mqtt5Canary.swift b/Source/Canary/Mqtt5Canary/Mqtt5Canary.swift index 30280269..eeea94c7 100644 --- a/Source/Canary/Mqtt5Canary/Mqtt5Canary.swift +++ b/Source/Canary/Mqtt5Canary/Mqtt5Canary.swift @@ -16,7 +16,7 @@ enum CanaryTestError: Error { case InvalidArgument } -class Mqtt5CanaryTestContext { +class Mqtt5CanaryTestContext: @unchecked Sendable { var mqtt5Clients: [Mqtt5CanaryClient] = [] var statistic: Mqtt5CanaryStatistic @@ -40,7 +40,7 @@ class Mqtt5CanaryTestContext { try await mqtt5CanaryOperationStart(clientIndex: index) case Mqtt5CanaryOperation.STOP: try await mqtt5CanaryOperationStop(clientIndex: index) - case Mqtt5CanaryOperation.SUBSSCRIBE: + case Mqtt5CanaryOperation.SUBSCRIBE: try await mqtt5CanaryOperationSubscribe(clientIndex: index) case Mqtt5CanaryOperation.UNSUBSCRIBE: if (mqtt5Clients[index].subscriptionCount > 0) { @@ -50,7 +50,7 @@ class Mqtt5CanaryTestContext { // If there is topic subscribed, fallthrough to UNSUBSSCRIBE_BAD operation fallthrough } - case Mqtt5CanaryOperation.UNSUBSSCRIBE_BAD: + case Mqtt5CanaryOperation.UNSUBSCRIBE_BAD: try await mqtt5CanaryOperationUnsubscribe(clientIndex: index) case Mqtt5CanaryOperation.PUBLISH_QOS0: try await mqtt5CanaryOperationPublish(clientIndex: index, qos: QoS.atMostOnce) @@ -308,9 +308,9 @@ enum Mqtt5CanaryOperation: Int { case START case STOP case DESTROY - case SUBSSCRIBE + case SUBSCRIBE case UNSUBSCRIBE - case UNSUBSSCRIBE_BAD + case UNSUBSCRIBE_BAD case PUBLISH_QOS0 case PUBLISH_QOS1 case PUBLISH_TO_SUBSCRIBED_TOPIC_QOS0 @@ -320,14 +320,16 @@ enum Mqtt5CanaryOperation: Int { case OPERATION_COUNT } -struct Mqtt5CanaryTestOptions { +// This struct holds the canary options +// Adjustification of "@uncheck Sendable": The memebers in this struct will not be modified once setup. +struct Mqtt5CanaryTestOptions: @unchecked Sendable { // Client options let shared_topic: String var elg: EventLoopGroup var boostrap: ClientBootstrap var tlsctx: TLSContext? = nil // Test options - var tpsSleepTime: UInt64 + let tpsSleepTime: UInt64 var onWebsocketTransform: OnWebSocketHandshakeIntercept? = nil var operationDistribution: [Mqtt5CanaryOperation] = [] @@ -378,9 +380,9 @@ struct Mqtt5CanaryTestOptions { func Mqtt5CanaryOperationDistributionSetup(_ distributionDataSet: inout [Mqtt5CanaryOperation]) { let operationDistribution = [ (Mqtt5CanaryOperation.STOP, 1), - (Mqtt5CanaryOperation.SUBSSCRIBE, 200), + (Mqtt5CanaryOperation.SUBSCRIBE, 200), (Mqtt5CanaryOperation.UNSUBSCRIBE, 200), - (Mqtt5CanaryOperation.UNSUBSSCRIBE_BAD, 100), + (Mqtt5CanaryOperation.UNSUBSCRIBE_BAD, 100), (Mqtt5CanaryOperation.PUBLISH_QOS0, 300), (Mqtt5CanaryOperation.PUBLISH_QOS1, 150), (Mqtt5CanaryOperation.PUBLISH_TO_SUBSCRIBED_TOPIC_QOS0, 100), @@ -397,7 +399,7 @@ func Mqtt5CanaryOperationDistributionSetup(_ distributionDataSet: inout [Mqtt5Ca } -func Mqtt5CanaryTestRunIteration( +@Sendable func Mqtt5CanaryTestRunIteration( _ context: Mqtt5CanaryTestContext, _ options: Mqtt5CanaryTestOptions ) async throws -> Void { let operationIndex = Int.random(in: 0.. Date: Tue, 12 Aug 2025 09:56:51 -0700 Subject: [PATCH 4/5] update crt --- Source/Canary/Mqtt5Canary/Mqtt5Canary.swift | 71 ++++++++++----------- 1 file changed, 35 insertions(+), 36 deletions(-) diff --git a/Source/Canary/Mqtt5Canary/Mqtt5Canary.swift b/Source/Canary/Mqtt5Canary/Mqtt5Canary.swift index eeea94c7..9c3617ed 100644 --- a/Source/Canary/Mqtt5Canary/Mqtt5Canary.swift +++ b/Source/Canary/Mqtt5Canary/Mqtt5Canary.swift @@ -6,9 +6,9 @@ import AwsCommonRuntimeKit import Foundation import _Concurrency -let NANO_SECOND: UInt64 = 1_000_000_000 -let SLEEP_OVERHEAD: UInt64 = 2_000_000 // 2ms overhead -let UNSUBSRIBE_TESTING_TOPIC = "testTopic1" +let ONE_NANO_SECOND: UInt64 = 1_000_000_000 +let SLEEP_OVERHEAD_NS: UInt64 = 2_000_000 // 2ms overhead +let UNSUBSCRIBED_TEST_TOPIC = "SwiftCanary/Unsubscribed/" + UUID().uuidString typealias OnMqtt5CanaryTestFunction = (Mqtt5CanaryTestContext) -> Void @@ -17,7 +17,7 @@ enum CanaryTestError: Error { } class Mqtt5CanaryTestContext: @unchecked Sendable { - var mqtt5Clients: [Mqtt5CanaryClient] = [] + var mqtt5CanaryClients: [Mqtt5CanaryClient] = [] var statistic: Mqtt5CanaryStatistic init() { @@ -25,12 +25,12 @@ class Mqtt5CanaryTestContext: @unchecked Sendable { } /// Setup Test Context - func appendClient(client: Mqtt5CanaryClient) { - mqtt5Clients.append(client) + func appendCanaryClient(client: Mqtt5CanaryClient) { + mqtt5CanaryClients.append(client) } func setClientConnection(index: Int, connected: Bool) { - mqtt5Clients[index].setConnected(connected: connected) + mqtt5CanaryClients[index].setConnected(connected: connected) } /// Client Operation Help Function @@ -43,9 +43,9 @@ class Mqtt5CanaryTestContext: @unchecked Sendable { case Mqtt5CanaryOperation.SUBSCRIBE: try await mqtt5CanaryOperationSubscribe(clientIndex: index) case Mqtt5CanaryOperation.UNSUBSCRIBE: - if (mqtt5Clients[index].subscriptionCount > 0) { + if (mqtt5CanaryClients[index].subscriptionCount > 0) { try await mqtt5CanaryOperationUnsubscribe( - clientIndex: index, testTopic: mqtt5Clients[index].getUnsubscribedTopic()) + clientIndex: index, testTopic: mqtt5CanaryClients[index].getUnsubscribedTopic()) } else { // If there is topic subscribed, fallthrough to UNSUBSSCRIBE_BAD operation fallthrough @@ -58,21 +58,21 @@ class Mqtt5CanaryTestContext: @unchecked Sendable { try await mqtt5CanaryOperationPublish(clientIndex: index, qos: QoS.atLeastOnce) case Mqtt5CanaryOperation.PUBLISH_TO_SUBSCRIBED_TOPIC_QOS0: try await mqtt5CanaryOperationPublish( - clientIndex: index, topic: mqtt5Clients[index].getSubscribedTopic(), qos: QoS.atMostOnce) + clientIndex: index, topic: mqtt5CanaryClients[index].getSubscribedTopic(), + qos: QoS.atMostOnce) case Mqtt5CanaryOperation.PUBLISH_TO_SUBSCRIBED_TOPIC_QOS1: try await mqtt5CanaryOperationPublish( - clientIndex: index, topic: mqtt5Clients[index].getSubscribedTopic(), qos: QoS.atLeastOnce) + clientIndex: index, topic: mqtt5CanaryClients[index].getSubscribedTopic(), + qos: QoS.atLeastOnce) case Mqtt5CanaryOperation.PUBLISH_TO_SHARED_TOPIC_QOS0: try await mqtt5CanaryOperationPublish( - clientIndex: index, topic: mqtt5Clients[index].shared_topic, qos: QoS.atMostOnce) + clientIndex: index, topic: mqtt5CanaryClients[index].shared_topic, qos: QoS.atMostOnce) case Mqtt5CanaryOperation.PUBLISH_TO_SHARED_TOPIC_QOS1: try await mqtt5CanaryOperationPublish( - clientIndex: index, topic: mqtt5Clients[index].shared_topic, qos: QoS.atLeastOnce) + clientIndex: index, topic: mqtt5CanaryClients[index].shared_topic, qos: QoS.atLeastOnce) case Mqtt5CanaryOperation.NULL: fallthrough - case Mqtt5CanaryOperation.DESTROY: - fallthrough case Mqtt5CanaryOperation.OPERATION_COUNT: throw CanaryTestError.InvalidArgument } @@ -81,28 +81,28 @@ class Mqtt5CanaryTestContext: @unchecked Sendable { /// Client Operations func mqtt5CanaryOperationStart(clientIndex: Int) async throws { - if !mqtt5Clients[clientIndex].is_connected { + if !mqtt5CanaryClients[clientIndex].is_connected { await statistic.incrementTotalOperation() - try mqtt5Clients[clientIndex].client.start() + try mqtt5CanaryClients[clientIndex].client.start() } } func mqtt5CanaryOperationStop(clientIndex: Int) async throws { - if mqtt5Clients[clientIndex].is_connected { + if mqtt5CanaryClients[clientIndex].is_connected { await statistic.incrementTotalOperation() - try mqtt5Clients[clientIndex].client.stop() + try mqtt5CanaryClients[clientIndex].client.stop() // clean up the subscription count - mqtt5Clients[clientIndex].subscriptionCount = 0 + mqtt5CanaryClients[clientIndex].subscriptionCount = 0 } } func mqtt5CanaryOperationSubscribe(clientIndex: Int) async throws { - if !mqtt5Clients[clientIndex].is_connected { + if !mqtt5CanaryClients[clientIndex].is_connected { return try await mqtt5CanaryOperationStart(clientIndex: clientIndex) } do { - let canaryClient = mqtt5Clients[clientIndex] + let canaryClient = mqtt5CanaryClients[clientIndex] let sub = Subscription(topicFilter: canaryClient.getNextSubTopic(), qos: QoS.atLeastOnce) var subscriptions = [sub] @@ -129,14 +129,14 @@ class Mqtt5CanaryTestContext: @unchecked Sendable { } func mqtt5CanaryOperationUnsubscribe( - clientIndex: Int, testTopic: String = UNSUBSRIBE_TESTING_TOPIC + clientIndex: Int, testTopic: String = UNSUBSCRIBED_TEST_TOPIC ) async throws { - if !mqtt5Clients[clientIndex].is_connected { + if !mqtt5CanaryClients[clientIndex].is_connected { return try await mqtt5CanaryOperationStart(clientIndex: clientIndex) } do { - let canaryClient = mqtt5Clients[clientIndex] + let canaryClient = mqtt5CanaryClients[clientIndex] let unsub = UnsubscribePacket(topicFilter: testTopic) await self.statistic.incrementUnsubscribeAttempts() @@ -155,14 +155,14 @@ class Mqtt5CanaryTestContext: @unchecked Sendable { } func mqtt5CanaryOperationPublish( - clientIndex: Int, topic: String = UNSUBSRIBE_TESTING_TOPIC, qos: QoS + clientIndex: Int, topic: String = UNSUBSCRIBED_TEST_TOPIC, qos: QoS ) async throws { - if !mqtt5Clients[clientIndex].is_connected { + if !mqtt5CanaryClients[clientIndex].is_connected { return try await mqtt5CanaryOperationStart(clientIndex: clientIndex) } do { - let canaryClient = mqtt5Clients[clientIndex] + let canaryClient = mqtt5CanaryClients[clientIndex] let pub = PublishPacket(qos: qos, topic: topic) await self.statistic.incrementPublishAttempts() @@ -307,7 +307,6 @@ enum Mqtt5CanaryOperation: Int { case NULL = 0 case START case STOP - case DESTROY case SUBSCRIBE case UNSUBSCRIBE case UNSUBSCRIBE_BAD @@ -335,7 +334,7 @@ struct Mqtt5CanaryTestOptions: @unchecked Sendable { init(testApp: Mqtt5Canary) throws { shared_topic = "shared_topic_" + UUID().uuidString - self.tpsSleepTime = testApp.tps > 0 ? (UInt64(NANO_SECOND / testApp.tps)) : 0 + self.tpsSleepTime = testApp.tps > 0 ? (UInt64(ONE_NANO_SECOND / testApp.tps)) : 0 // Initialize elg first self.elg = try EventLoopGroup(threadCount: testApp.threads) let resolver = try HostResolver( @@ -382,7 +381,7 @@ func Mqtt5CanaryOperationDistributionSetup(_ distributionDataSet: inout [Mqtt5Ca (Mqtt5CanaryOperation.STOP, 1), (Mqtt5CanaryOperation.SUBSCRIBE, 200), (Mqtt5CanaryOperation.UNSUBSCRIBE, 200), - (Mqtt5CanaryOperation.UNSUBSCRIBE_BAD, 100), + (Mqtt5CanaryOperation.UNSUBSCRIBE_BAD, 50), (Mqtt5CanaryOperation.PUBLISH_QOS0, 300), (Mqtt5CanaryOperation.PUBLISH_QOS1, 150), (Mqtt5CanaryOperation.PUBLISH_TO_SUBSCRIBED_TOPIC_QOS0, 100), @@ -403,7 +402,7 @@ func Mqtt5CanaryOperationDistributionSetup(_ distributionDataSet: inout [Mqtt5Ca _ context: Mqtt5CanaryTestContext, _ options: Mqtt5CanaryTestOptions ) async throws -> Void { let operationIndex = Int.random(in: 0.. SLEEP_OVERHEAD { - try await Task.sleep(nanoseconds: testOptions.tpsSleepTime - SLEEP_OVERHEAD) + if testOptions.tpsSleepTime > SLEEP_OVERHEAD_NS { + try await Task.sleep(nanoseconds: testOptions.tpsSleepTime - SLEEP_OVERHEAD_NS) } let targetTime = Date().addingTimeInterval( - TimeInterval(testOptions.tpsSleepTime / NANO_SECOND)) + TimeInterval(testOptions.tpsSleepTime / ONE_NANO_SECOND)) while Date() < targetTime { // Busy-wait for precision sleep } From e740943b42e7505233f3e427b4393ba90c89c349 Mon Sep 17 00:00:00 2001 From: Vera Xia Date: Tue, 19 Aug 2025 09:45:23 -0700 Subject: [PATCH 5/5] update canary with create/destroy client --- Source/Canary/Mqtt5Canary/Mqtt5Canary.swift | 242 +++++++++++++------- 1 file changed, 159 insertions(+), 83 deletions(-) diff --git a/Source/Canary/Mqtt5Canary/Mqtt5Canary.swift b/Source/Canary/Mqtt5Canary/Mqtt5Canary.swift index 9c3617ed..54ed74c0 100644 --- a/Source/Canary/Mqtt5Canary/Mqtt5Canary.swift +++ b/Source/Canary/Mqtt5Canary/Mqtt5Canary.swift @@ -17,7 +17,7 @@ enum CanaryTestError: Error { } class Mqtt5CanaryTestContext: @unchecked Sendable { - var mqtt5CanaryClients: [Mqtt5CanaryClient] = [] + var mqtt5CanaryClients: [String: Mqtt5CanaryClient] = [:] var statistic: Mqtt5CanaryStatistic init() { @@ -25,52 +25,94 @@ class Mqtt5CanaryTestContext: @unchecked Sendable { } /// Setup Test Context - func appendCanaryClient(client: Mqtt5CanaryClient) { - mqtt5CanaryClients.append(client) + func appendCanaryClient(clientId: String, client: Mqtt5CanaryClient) { + mqtt5CanaryClients[clientId] = client } - func setClientConnection(index: Int, connected: Bool) { - mqtt5CanaryClients[index].setConnected(connected: connected) + func setClientConnection(clientId: String, connected: Bool) throws { + guard let client = mqtt5CanaryClients[clientId] else { + throw CanaryTestError.InvalidArgument + } + client.setConnected(connected: connected) + } + + func getCanaryClient(_ index: Int) throws -> Mqtt5CanaryClient { + let index = mqtt5CanaryClients.index(mqtt5CanaryClients.startIndex, offsetBy: index) + let key = mqtt5CanaryClients.keys[index] + guard let result = mqtt5CanaryClients[key] else { + throw CanaryTestError.InvalidArgument + } + return result + } + + func createClient( + testOptions: Mqtt5CanaryTestOptions + ) async throws { + let clientId = "Mqtt5CanaryTest-" + UUID().uuidString + let connectionOption = MqttConnectOptions(clientId: clientId) + let clientOption = MqttClientOptions( + hostName: testOptions.endpoint, port: testOptions.port, bootstrap: testOptions.boostrap, + tlsCtx: testOptions.tlsctx, + onWebsocketTransform: testOptions.onWebsocketTransform, + connectOptions: connectionOption, + onLifecycleEventStoppedFn: { _ in + if let client = self.mqtt5CanaryClients[clientId] { + client.setConnected(connected: false) + } + }, + onLifecycleEventConnectionSuccessFn: { _ in + if let client = self.mqtt5CanaryClients[clientId] { + client.setConnected(connected: true) + } + }, + onLifecycleEventConnectionFailureFn: { _ in + if let client = self.mqtt5CanaryClients[clientId] { + client.setConnected(connected: false) + } + }, + onLifecycleEventDisconnectionFn: { _ in + if let client = self.mqtt5CanaryClients[clientId] { + client.setConnected(connected: false) + } + } + ) + let client = try Mqtt5Client(clientOptions: clientOption) + let canaryClient = Mqtt5CanaryClient( + client: client, clientId: clientId, sharedTopic: testOptions.shared_topic) + appendCanaryClient(clientId: clientId, client: canaryClient) } /// Client Operation Help Function - func mqtt5CanaryRunOperation(_ operation: Mqtt5CanaryOperation, _ index: Int) async throws { + func mqtt5CanaryRunOperation( + _ operation: Mqtt5CanaryOperation, _ index: Int, _ options: Mqtt5CanaryTestOptions + ) async throws { switch operation { - case Mqtt5CanaryOperation.START: - try await mqtt5CanaryOperationStart(clientIndex: index) case Mqtt5CanaryOperation.STOP: try await mqtt5CanaryOperationStop(clientIndex: index) case Mqtt5CanaryOperation.SUBSCRIBE: try await mqtt5CanaryOperationSubscribe(clientIndex: index) case Mqtt5CanaryOperation.UNSUBSCRIBE: - if (mqtt5CanaryClients[index].subscriptionCount > 0) { - try await mqtt5CanaryOperationUnsubscribe( - clientIndex: index, testTopic: mqtt5CanaryClients[index].getUnsubscribedTopic()) - } else { - // If there is topic subscribed, fallthrough to UNSUBSSCRIBE_BAD operation - fallthrough - } - case Mqtt5CanaryOperation.UNSUBSCRIBE_BAD: try await mqtt5CanaryOperationUnsubscribe(clientIndex: index) + case Mqtt5CanaryOperation.UNSUBSCRIBE_BAD: + try await mqtt5CanaryOperationUnsubscribeBad(clientIndex: index) case Mqtt5CanaryOperation.PUBLISH_QOS0: - try await mqtt5CanaryOperationPublish(clientIndex: index, qos: QoS.atMostOnce) + try await mqtt5CanaryOperationPublishUnsubscribed(clientIndex: index, qos: QoS.atMostOnce) case Mqtt5CanaryOperation.PUBLISH_QOS1: - try await mqtt5CanaryOperationPublish(clientIndex: index, qos: QoS.atLeastOnce) + try await mqtt5CanaryOperationPublishUnsubscribed(clientIndex: index, qos: QoS.atLeastOnce) case Mqtt5CanaryOperation.PUBLISH_TO_SUBSCRIBED_TOPIC_QOS0: try await mqtt5CanaryOperationPublish( - clientIndex: index, topic: mqtt5CanaryClients[index].getSubscribedTopic(), - qos: QoS.atMostOnce) + clientIndex: index, qos: QoS.atMostOnce) case Mqtt5CanaryOperation.PUBLISH_TO_SUBSCRIBED_TOPIC_QOS1: try await mqtt5CanaryOperationPublish( - clientIndex: index, topic: mqtt5CanaryClients[index].getSubscribedTopic(), - qos: QoS.atLeastOnce) + clientIndex: index, qos: QoS.atLeastOnce) case Mqtt5CanaryOperation.PUBLISH_TO_SHARED_TOPIC_QOS0: - try await mqtt5CanaryOperationPublish( - clientIndex: index, topic: mqtt5CanaryClients[index].shared_topic, qos: QoS.atMostOnce) + try await mqtt5CanaryOperationPublishShared( + clientIndex: index, qos: QoS.atMostOnce) case Mqtt5CanaryOperation.PUBLISH_TO_SHARED_TOPIC_QOS1: - try await mqtt5CanaryOperationPublish( - clientIndex: index, topic: mqtt5CanaryClients[index].shared_topic, qos: QoS.atLeastOnce) - + try await mqtt5CanaryOperationPublishShared( + clientIndex: index, qos: QoS.atLeastOnce) + case Mqtt5CanaryOperation.DESTROY_AND_CREATE: + try await mqtt5CanaryOperationDestroyAndCreate(clientIndex: index, option: options) case Mqtt5CanaryOperation.NULL: fallthrough case Mqtt5CanaryOperation.OPERATION_COUNT: @@ -81,28 +123,37 @@ class Mqtt5CanaryTestContext: @unchecked Sendable { /// Client Operations func mqtt5CanaryOperationStart(clientIndex: Int) async throws { - if !mqtt5CanaryClients[clientIndex].is_connected { + let canaryClient = try getCanaryClient(clientIndex) + if !canaryClient.is_connected { + await statistic.incrementTotalOperation() + try canaryClient.client!.start() + } + } + + func mqtt5CanaryOperationStart(canaryClient: Mqtt5CanaryClient) async throws { + if !canaryClient.is_connected { await statistic.incrementTotalOperation() - try mqtt5CanaryClients[clientIndex].client.start() + try canaryClient.client!.start() } } func mqtt5CanaryOperationStop(clientIndex: Int) async throws { - if mqtt5CanaryClients[clientIndex].is_connected { + let canaryClient = try getCanaryClient(clientIndex) + if !canaryClient.is_connected { await statistic.incrementTotalOperation() - try mqtt5CanaryClients[clientIndex].client.stop() + try canaryClient.client!.stop() // clean up the subscription count - mqtt5CanaryClients[clientIndex].subscriptionCount = 0 + canaryClient.subscriptionCount = 0 } } func mqtt5CanaryOperationSubscribe(clientIndex: Int) async throws { - if !mqtt5CanaryClients[clientIndex].is_connected { + let canaryClient = try getCanaryClient(clientIndex) + if !canaryClient.is_connected { return try await mqtt5CanaryOperationStart(clientIndex: clientIndex) } do { - let canaryClient = mqtt5CanaryClients[clientIndex] let sub = Subscription(topicFilter: canaryClient.getNextSubTopic(), qos: QoS.atLeastOnce) var subscriptions = [sub] @@ -115,7 +166,7 @@ class Mqtt5CanaryTestContext: @unchecked Sendable { Subscription(topicFilter: canaryClient.shared_topic, qos: QoS.atLeastOnce)) } - let suback = try await canaryClient.client.subscribe( + let suback = try await canaryClient.client!.subscribe( subscribePacket: SubscribePacket(subscriptions: subscriptions)) if suback.reasonCodes[0] == SubackReasonCode.grantedQos1 { @@ -128,21 +179,31 @@ class Mqtt5CanaryTestContext: @unchecked Sendable { } } + func mqtt5CanaryOperationUnsubscribe(clientIndex: Int) async throws { + let canaryClient = try getCanaryClient(clientIndex) + try await mqtt5CanaryOperationUnsubscribe( + canaryClient: canaryClient, testTopic: canaryClient.getUnsubscribedTopic()) + } + + func mqtt5CanaryOperationUnsubscribeBad(clientIndex: Int) async throws { + let canaryClient = try getCanaryClient(clientIndex) + try await mqtt5CanaryOperationUnsubscribe(canaryClient: canaryClient) + } + func mqtt5CanaryOperationUnsubscribe( - clientIndex: Int, testTopic: String = UNSUBSCRIBED_TEST_TOPIC + canaryClient: Mqtt5CanaryClient, testTopic: String = UNSUBSCRIBED_TEST_TOPIC ) async throws { - if !mqtt5CanaryClients[clientIndex].is_connected { - return try await mqtt5CanaryOperationStart(clientIndex: clientIndex) + if !canaryClient.is_connected { + return try await mqtt5CanaryOperationStart(canaryClient: canaryClient) } do { - let canaryClient = mqtt5CanaryClients[clientIndex] let unsub = UnsubscribePacket(topicFilter: testTopic) await self.statistic.incrementUnsubscribeAttempts() await self.statistic.incrementTotalOperation() - let unsubAck = try await canaryClient.client.unsubscribe(unsubscribePacket: unsub) + let unsubAck = try await canaryClient.client!.unsubscribe(unsubscribePacket: unsub) if unsubAck.reasonCodes[0] == UnsubackReasonCode.success { await self.statistic.incrementUnsubscribeSucceed() @@ -154,20 +215,35 @@ class Mqtt5CanaryTestContext: @unchecked Sendable { } } + func mqtt5CanaryOperationPublish(clientIndex: Int, qos: QoS) async throws { + let canaryClient = try getCanaryClient(clientIndex) + try await mqtt5CanaryOperationPublish( + canaryClient: canaryClient, topic: canaryClient.getSubscribedTopic(), qos: qos) + } + + func mqtt5CanaryOperationPublishUnsubscribed(clientIndex: Int, qos: QoS) async throws { + let canaryClient = try getCanaryClient(clientIndex) + try await mqtt5CanaryOperationPublish(canaryClient: canaryClient, qos: qos) + } + + func mqtt5CanaryOperationPublishShared(clientIndex: Int, qos: QoS) async throws { + let canaryClient = try getCanaryClient(clientIndex) + try await mqtt5CanaryOperationPublish( + canaryClient: canaryClient, topic: canaryClient.shared_topic, qos: qos) + } + func mqtt5CanaryOperationPublish( - clientIndex: Int, topic: String = UNSUBSCRIBED_TEST_TOPIC, qos: QoS + canaryClient: Mqtt5CanaryClient, topic: String = UNSUBSCRIBED_TEST_TOPIC, qos: QoS ) async throws { - if !mqtt5CanaryClients[clientIndex].is_connected { - return try await mqtt5CanaryOperationStart(clientIndex: clientIndex) + if !canaryClient.is_connected { + return try await mqtt5CanaryOperationStart(canaryClient: canaryClient) } - do { - let canaryClient = mqtt5CanaryClients[clientIndex] let pub = PublishPacket(qos: qos, topic: topic) await self.statistic.incrementPublishAttempts() await self.statistic.incrementTotalOperation() - let puback = try await canaryClient.client.publish(publishPacket: pub) + let puback = try await canaryClient.client!.publish(publishPacket: pub) if qos == QoS.atMostOnce || puback.puback?.reasonCode == PubackReasonCode.success || puback.puback?.reasonCode == PubackReasonCode.noMatchingSubscribers @@ -181,10 +257,31 @@ class Mqtt5CanaryTestContext: @unchecked Sendable { } } + func mqtt5CanaryOperationCreation(option: Mqtt5CanaryTestOptions) async throws { + if mqtt5CanaryClients.count < option.clientCount { + try await createClient(testOptions: option) + } + } + + func mqtt5CanaryOperationDestroyAndCreate(clientIndex: Int, option: Mqtt5CanaryTestOptions) + async throws + { + // If there is one client left + if mqtt5CanaryClients.count > 1 { + let canaryClient = try getCanaryClient(clientIndex) + try canaryClient.client!.stop() + await statistic.incrementTotalOperation() + canaryClient.client = nil + mqtt5CanaryClients.remove( + at: mqtt5CanaryClients.index(mqtt5CanaryClients.startIndex, offsetBy: clientIndex)) + try await mqtt5CanaryOperationCreation(option: option) + } + } + } class Mqtt5CanaryClient { - fileprivate let client: Mqtt5Client + fileprivate var client: Mqtt5Client? fileprivate let clientId: String fileprivate let shared_topic: String fileprivate var subscriptionCount: Int = 0 @@ -305,7 +402,7 @@ actor Mqtt5CanaryStatistic { enum Mqtt5CanaryOperation: Int { case NULL = 0 - case START + case DESTROY_AND_CREATE case STOP case SUBSCRIBE case UNSUBSCRIBE @@ -320,10 +417,13 @@ enum Mqtt5CanaryOperation: Int { } // This struct holds the canary options -// Adjustification of "@uncheck Sendable": The memebers in this struct will not be modified once setup. +// Justification of "@unchecked Sendable": The members in this struct will not be modified once setup. struct Mqtt5CanaryTestOptions: @unchecked Sendable { // Client options let shared_topic: String + let endpoint: String + let port: UInt32 + let clientCount: Int var elg: EventLoopGroup var boostrap: ClientBootstrap var tlsctx: TLSContext? = nil @@ -334,7 +434,10 @@ struct Mqtt5CanaryTestOptions: @unchecked Sendable { init(testApp: Mqtt5Canary) throws { shared_topic = "shared_topic_" + UUID().uuidString + self.endpoint = testApp.endpoint + self.port = testApp.port self.tpsSleepTime = testApp.tps > 0 ? (UInt64(ONE_NANO_SECOND / testApp.tps)) : 0 + self.clientCount = testApp.clients // Initialize elg first self.elg = try EventLoopGroup(threadCount: testApp.threads) let resolver = try HostResolver( @@ -378,6 +481,7 @@ struct Mqtt5CanaryTestOptions: @unchecked Sendable { func Mqtt5CanaryOperationDistributionSetup(_ distributionDataSet: inout [Mqtt5CanaryOperation]) { let operationDistribution = [ + (Mqtt5CanaryOperation.DESTROY_AND_CREATE, 10), (Mqtt5CanaryOperation.STOP, 1), (Mqtt5CanaryOperation.SUBSCRIBE, 200), (Mqtt5CanaryOperation.UNSUBSCRIBE, 200), @@ -404,7 +508,7 @@ func Mqtt5CanaryOperationDistributionSetup(_ distributionDataSet: inout [Mqtt5Ca let operationIndex = Int.random(in: 0.. SLEEP_OVERHEAD_NS { try await Task.sleep(nanoseconds: testOptions.tpsSleepTime - SLEEP_OVERHEAD_NS) } - let targetTime = Date().addingTimeInterval( + let targetTime = iterationStartTime.addingTimeInterval( TimeInterval(testOptions.tpsSleepTime / ONE_NANO_SECOND)) while Date() < targetTime { // Busy-wait for precision sleep @@ -546,7 +622,7 @@ struct Mqtt5Canary: AsyncParsableCommand { // Close and clean up the clients for index: Int in 0..