Skip to content

Commit d73364c

Browse files
committed
fix: use cheaper async mutex in place of 1-queue
1 parent 1d6135c commit d73364c

6 files changed

Lines changed: 98 additions & 23 deletions

File tree

src/adapter/ember/ezsp/ezsp.ts

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
/* v8 ignore start */
22

33
import EventEmitter from "node:events";
4-
5-
import {Queue} from "../../../utils";
4+
import {AsyncMutex} from "../../../utils/async-mutex";
65
import {logger} from "../../../utils/logger";
76
import * as ZSpec from "../../../zspec";
87
import type {Eui64, ExtendedPanId, NodeId, PanId} from "../../../zspec/tstypes";
@@ -239,7 +238,7 @@ export class Ezsp extends EventEmitter<EmberEzspEventMap> {
239238
private frameSequence: number;
240239
/** Sequence used for EZSP send() tagging. static uint8_t */
241240
private sendSequence: number;
242-
private readonly queue: Queue;
241+
private readonly queue: AsyncMutex;
243242
/** Awaiting response resolve/timer struct. undefined if not waiting for response. */
244243
private responseWaiter?: EzspWaiter;
245244

@@ -254,7 +253,7 @@ export class Ezsp extends EventEmitter<EmberEzspEventMap> {
254253
this.callbackFrameContents = Buffer.alloc(EZSP_MAX_FRAME_LENGTH);
255254
this.callbackBuffalo = new EzspBuffalo(this.callbackFrameContents, 0);
256255

257-
this.queue = new Queue(1);
256+
this.queue = new AsyncMutex();
258257
this.ash = new UartAsh(options);
259258

260259
this.version = 0;
@@ -535,7 +534,7 @@ export class Ezsp extends EventEmitter<EmberEzspEventMap> {
535534
return EzspStatus.ERROR_COMMAND_TOO_LONG;
536535
}
537536

538-
return await this.queue.execute<EzspStatus>(async () => {
537+
return await this.queue.run<EzspStatus>(async () => {
539538
let status: EzspStatus = EzspStatus.ASH_ERROR_TIMEOUTS; // will be overwritten below as necessary
540539
const frameId = sendBuffalo.getFrameId();
541540
const frameString = `[FRAME: ID=${frameId}:"${EzspFrameID[frameId]}" Seq=${sequence} Len=${length}]`;

src/adapter/ezsp/driver/uart.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
import {EventEmitter} from "node:events";
44
import net from "node:net";
5-
6-
import {Queue, Waitress, wait} from "../../../utils";
5+
import {Waitress, wait} from "../../../utils";
6+
import {AsyncMutex} from "../../../utils/async-mutex";
77
import {logger} from "../../../utils/logger";
88
import {SerialPort} from "../../serialPort";
99
import type {SerialPortOptions} from "../../tstype";
@@ -45,12 +45,12 @@ export class SerialDriver extends EventEmitter {
4545
private ackSeq = 0; // next number after the last accepted frame
4646
private rejectCondition = false;
4747
private waitress: Waitress<EZSPPacket, EZSPPacketMatcher>;
48-
private queue: Queue;
48+
private queue: AsyncMutex;
4949

5050
constructor() {
5151
super();
5252
this.initialized = false;
53-
this.queue = new Queue(1);
53+
this.queue = new AsyncMutex();
5454
this.waitress = new Waitress<EZSPPacket, EZSPPacketMatcher>(this.waitressValidator, this.waitressTimeoutFormatter);
5555
this.writer = new Writer();
5656
this.parser = new Parser();
@@ -332,7 +332,7 @@ export class SerialDriver extends EventEmitter {
332332
this.sendSeq = 0;
333333
this.recvSeq = 0;
334334

335-
return await this.queue.execute<void>(async (): Promise<void> => {
335+
return await this.queue.run<void>(async (): Promise<void> => {
336336
try {
337337
logger.debug("--> Write reset", NS);
338338
const waiter = this.waitFor(-1, 10000);
@@ -410,7 +410,7 @@ export class SerialDriver extends EventEmitter {
410410
const nextSeq = this.sendSeq;
411411
const ackSeq = this.recvSeq;
412412

413-
return await this.queue.execute<void>(async (): Promise<void> => {
413+
return await this.queue.run<void>(async (): Promise<void> => {
414414
const randData = NpiFrame.makeRandomizedBuffer(data);
415415

416416
try {

src/adapter/zboss/uart.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
import EventEmitter from "node:events";
44
import {Socket} from "node:net";
5-
6-
import {Queue, Waitress, wait} from "../../utils";
5+
import {AsyncMutex} from "src/utils/async-mutex";
6+
import {Waitress, wait} from "../../utils";
77
import {logger} from "../../utils/logger";
88
import {SerialPort} from "../serialPort";
99
import type {SerialPortOptions} from "../tstype";
@@ -27,7 +27,7 @@ export class ZBOSSUart extends EventEmitter {
2727
private recvSeq = 0; // next frame number to receive
2828
private ackSeq = 0; // next number after the last accepted frame
2929
private waitress: Waitress<number, number>;
30-
private queue: Queue;
30+
private queue: AsyncMutex;
3131
public inReset = false;
3232

3333
constructor(options: SerialPortOptions) {
@@ -39,7 +39,7 @@ export class ZBOSSUart extends EventEmitter {
3939
this.writer = new ZBOSSWriter();
4040
this.reader = new ZBOSSReader();
4141
this.reader.on("data", this.onPackage.bind(this));
42-
this.queue = new Queue(1);
42+
this.queue = new AsyncMutex();
4343
this.waitress = new Waitress<number, number>(this.waitressValidator, this.waitressTimeoutFormatter);
4444
}
4545

@@ -307,7 +307,7 @@ export class ZBOSSUart extends EventEmitter {
307307
const nextSeq = this.sendSeq;
308308
const ackSeq = this.recvSeq;
309309

310-
return await this.queue.execute<void>(async (): Promise<void> => {
310+
return await this.queue.run<void>(async (): Promise<void> => {
311311
try {
312312
logger.debug(`--> DATA (${seq},${ackSeq},0): ${data.toString("hex")}`, NS);
313313
if (!isACK) {

src/adapter/zigate/driver/zigate.ts

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,8 @@
33
import assert from "node:assert";
44
import {EventEmitter} from "node:events";
55
import net from "node:net";
6-
76
import {DelimiterParser} from "@serialport/parser-delimiter";
8-
9-
import {Queue} from "../../../utils";
7+
import {AsyncMutex} from "../../../utils/async-mutex";
108
import {logger} from "../../../utils/logger";
119
import {Waitress} from "../../../utils/waitress";
1210
import * as ZSpec from "../../../zspec";
@@ -79,7 +77,7 @@ export default class ZiGate extends EventEmitter<ZiGateEventMap> {
7977
private parser?: EventEmitter;
8078
private serialPort?: SerialPort;
8179
private socketPort?: net.Socket;
82-
private queue: Queue;
80+
private queue: AsyncMutex;
8381

8482
public portWrite?: SerialPort | net.Socket;
8583
private waitress: Waitress<ZiGateObject, WaitressMatcher>;
@@ -92,7 +90,7 @@ export default class ZiGate extends EventEmitter<ZiGateEventMap> {
9290
// XXX: not used?
9391
// this.rtscts = typeof serialPortOptions.rtscts === 'boolean' ? serialPortOptions.rtscts : false;
9492
this.initialized = false;
95-
this.queue = new Queue(1);
93+
this.queue = new AsyncMutex();
9694

9795
this.waitress = new Waitress<ZiGateObject, WaitressMatcher>(this.waitressValidator, this.waitressTimeoutFormatter);
9896
this.zdoWaitress = new Waitress<ZdoWaitressPayload, ZdoWaitressMatcher>(this.zdoWaitressValidator, this.waitressTimeoutFormatter);
@@ -107,7 +105,7 @@ export default class ZiGate extends EventEmitter<ZiGateEventMap> {
107105
): Promise<ZiGateObject> {
108106
const waiters: Promise<ZiGateObject>[] = [];
109107
const waitersId: number[] = [];
110-
return await this.queue.execute(async () => {
108+
return await this.queue.run(async () => {
111109
try {
112110
logger.debug(
113111
() => `Send command \x1b[32m>>>> ${ZiGateCommandCode[code]} 0x${zeroPad(code)} <<<<\x1b[0m \nPayload: ${JSON.stringify(payload)}`,
@@ -163,7 +161,7 @@ export default class ZiGate extends EventEmitter<ZiGateEventMap> {
163161
}
164162

165163
public async requestZdo(clusterId: Zdo.ClusterId, payload: Buffer): Promise<boolean> {
166-
return await this.queue.execute(async () => {
164+
return await this.queue.run(async () => {
167165
const commandCode = ZDO_REQ_CLUSTER_ID_TO_ZIGATE_COMMAND_ID[clusterId];
168166
assert(commandCode !== undefined, `ZDO cluster ID '${clusterId}' not supported.`);
169167
const ruleStatus: ZiGateResponseMatcher = [

src/utils/async-mutex.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
export class AsyncMutex {
2+
#locked = false;
3+
readonly #queue: Array<() => void> = [];
4+
5+
get count() {
6+
return this.#queue.length;
7+
}
8+
9+
async run<T>(fn: () => Promise<T>): Promise<T> {
10+
if (this.#locked) {
11+
await new Promise<void>((resolve) => this.#queue.push(resolve));
12+
}
13+
14+
this.#locked = true;
15+
16+
try {
17+
return await fn();
18+
} finally {
19+
this.#locked = false;
20+
const next = this.#queue.shift();
21+
22+
if (next) {
23+
next();
24+
}
25+
}
26+
}
27+
28+
clear() {
29+
this.#queue.length = 0;
30+
}
31+
}

test/utils.test.ts

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import {describe, expect, it, vi} from "vitest";
22
import {checkInstallCode} from "../src/controller/helpers/installCodes";
33
import {Queue, Utils, Waitress, wait} from "../src/utils";
4+
import {AsyncMutex} from "../src/utils/async-mutex";
45
import {logger, setLogger} from "../src/utils/logger";
56

67
const mockLogger = {
@@ -201,6 +202,52 @@ describe("Utils", () => {
201202
expect(queue.count()).toBe(5);
202203
});
203204

205+
it("Test async mutex", async () => {
206+
vi.useFakeTimers();
207+
208+
const queue = new AsyncMutex();
209+
210+
void queue.run(async () => {
211+
await new Promise((resolve) => setTimeout(resolve, 1000));
212+
});
213+
214+
void queue.run(async () => {
215+
await new Promise((resolve) => setTimeout(resolve, 1000));
216+
});
217+
218+
await vi.advanceTimersByTimeAsync(500);
219+
expect(queue.count).toStrictEqual(1); // first has ran but still pending return, second is queued
220+
221+
await vi.advanceTimersByTimeAsync(1000);
222+
expect(queue.count).toStrictEqual(0);
223+
224+
void queue.run(async () => {
225+
await new Promise((resolve) => setTimeout(resolve, 1000));
226+
});
227+
228+
expect(queue.count).toStrictEqual(1); // second has ran but still pending return, third is queued
229+
await vi.advanceTimersByTimeAsync(1600);
230+
expect(queue.count).toStrictEqual(0);
231+
232+
//-- clear
233+
234+
void queue.run(async () => {
235+
await new Promise((resolve) => setTimeout(resolve, 1000));
236+
});
237+
void queue.run(async () => {
238+
await new Promise((resolve) => setTimeout(resolve, 1000));
239+
});
240+
241+
expect(queue.count).toStrictEqual(1);
242+
243+
queue.clear();
244+
245+
expect(queue.count).toStrictEqual(0);
246+
await vi.runOnlyPendingTimersAsync(); // cleanup
247+
248+
vi.useRealTimers();
249+
});
250+
204251
it("Logs", () => {
205252
const debugSpy = vi.spyOn(console, "debug");
206253
const infoSpy = vi.spyOn(console, "info");

0 commit comments

Comments
 (0)