Skip to content

Commit 7cc0771

Browse files
authored
Fix queue management in jest-worker (#7934)
1 parent 2f3d557 commit 7cc0771

File tree

4 files changed

+115
-40
lines changed

4 files changed

+115
-40
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
- `[jest-changed-files]` Improve default file selection for Mercurial repos ([#7880](https://github.com/facebook/jest/pull/7880))
1515
- `[jest-validate]` Fix validating async functions ([#7894](https://github.com/facebook/jest/issues/7894))
1616
- `[jest-circus]` Fix bug with test.only ([#7888](https://github.com/facebook/jest/pull/7888))
17-
- `[jest-transform]` Normalize config and remove unecessary checks, convert `TestUtils.js` to TypeScript ([#7801](https://github.com/facebook/jest/pull/7801)
17+
- `[jest-transform]` Normalize config and remove unecessary checks, convert `TestUtils.js` to TypeScript ([#7801](https://github.com/facebook/jest/pull/7801))
18+
- `[jest-worker]` Fix `jest-worker` when using pre-allocated jobs ([#7934](https://github.com/facebook/jest/pull/7934))
1819

1920
### Chore & Maintenance
2021

packages/jest-worker/src/Farm.ts

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
ChildMessage,
1010
FarmOptions,
1111
QueueChildMessage,
12+
QueueItem,
1213
WorkerInterface,
1314
OnStart,
1415
OnEnd,
@@ -19,24 +20,25 @@ export default class Farm {
1920
private _computeWorkerKey: FarmOptions['computeWorkerKey'];
2021
private _cacheKeys: {[key: string]: WorkerInterface};
2122
private _callback: Function;
22-
private _last: Array<QueueChildMessage>;
23+
private _last: Array<QueueItem>;
2324
private _locks: Array<boolean>;
2425
private _numOfWorkers: number;
2526
private _offset: number;
26-
private _queue: Array<QueueChildMessage | null>;
27+
private _queue: Array<QueueItem | null>;
2728

2829
constructor(
2930
numOfWorkers: number,
3031
callback: Function,
3132
computeWorkerKey?: FarmOptions['computeWorkerKey'],
3233
) {
33-
this._callback = callback;
34-
this._numOfWorkers = numOfWorkers;
3534
this._cacheKeys = Object.create(null);
36-
this._queue = [];
35+
this._callback = callback;
3736
this._last = [];
3837
this._locks = [];
38+
this._numOfWorkers = numOfWorkers;
3939
this._offset = 0;
40+
this._queue = [];
41+
4042
if (computeWorkerKey) {
4143
this._computeWorkerKey = computeWorkerKey;
4244
}
@@ -70,6 +72,7 @@ export default class Farm {
7072
};
7173

7274
const task = {onEnd, onStart, request};
75+
7376
if (worker) {
7477
this._enqueue(task, worker.getWorkerId());
7578
} else {
@@ -78,80 +81,82 @@ export default class Farm {
7881
});
7982
}
8083

81-
private _getNextJob(workerId: number): QueueChildMessage | null {
84+
private _getNextTask(workerId: number): QueueChildMessage | null {
8285
let queueHead = this._queue[workerId];
8386

84-
while (queueHead && queueHead.request[1]) {
87+
while (queueHead && queueHead.task.request[1]) {
8588
queueHead = queueHead.next || null;
8689
}
8790

8891
this._queue[workerId] = queueHead;
8992

90-
return queueHead;
93+
return queueHead && queueHead.task;
9194
}
9295

9396
private _process(workerId: number): Farm {
94-
if (this.isLocked(workerId)) {
97+
if (this._isLocked(workerId)) {
9598
return this;
9699
}
97100

98-
const job = this._getNextJob(workerId);
101+
const task = this._getNextTask(workerId);
99102

100-
if (!job) {
103+
if (!task) {
101104
return this;
102105
}
103106

104107
const onEnd = (error: Error | null, result: unknown) => {
105-
job.onEnd(error, result);
106-
this.unlock(workerId);
108+
task.onEnd(error, result);
109+
110+
this._unlock(workerId);
107111
this._process(workerId);
108112
};
109113

110-
this.lock(workerId);
114+
task.request[1] = true;
111115

112-
this._callback(workerId, job.request, job.onStart, onEnd);
113-
114-
job.request[1] = true;
116+
this._lock(workerId);
117+
this._callback(workerId, task.request, task.onStart, onEnd);
115118

116119
return this;
117120
}
118121

119122
private _enqueue(task: QueueChildMessage, workerId: number): Farm {
123+
const item = {next: null, task};
124+
120125
if (task.request[1]) {
121126
return this;
122127
}
123128

124129
if (this._queue[workerId]) {
125-
this._last[workerId].next = task;
130+
this._last[workerId].next = item;
126131
} else {
127-
this._queue[workerId] = task;
132+
this._queue[workerId] = item;
128133
}
129134

130-
this._last[workerId] = task;
135+
this._last[workerId] = item;
131136
this._process(workerId);
132137

133138
return this;
134139
}
135140

136141
private _push(task: QueueChildMessage): Farm {
137142
for (let i = 0; i < this._numOfWorkers; i++) {
138-
const workerIdx = (this._offset + i) % this._numOfWorkers;
139-
this._enqueue(task, workerIdx);
143+
this._enqueue(task, (this._offset + i) % this._numOfWorkers);
140144
}
145+
141146
this._offset++;
142147

143148
return this;
144149
}
145150

146-
lock(workerId: number): void {
151+
private _lock(workerId: number): void {
147152
this._locks[workerId] = true;
148153
}
149154

150-
unlock(workerId: number): void {
155+
private _unlock(workerId: number): void {
151156
this._locks[workerId] = false;
152157
}
153158

154-
isLocked(workerId: number): boolean {
159+
private _isLocked(workerId: number): boolean {
155160
return this._locks[workerId];
156161
}
157162
}

packages/jest-worker/src/__tests__/Farm.test.js

Lines changed: 77 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -200,18 +200,19 @@ describe('Farm', () => {
200200
workerReply(0, null, 17);
201201
await p0;
202202

203-
// Note that the stickiness is not created by the method name or the arguments
204-
// it is solely controlled by the provided "computeWorkerKey" method, which in
205-
// the test example always returns the same key, so all calls should be
206-
// redirected to worker 1 (which is the one that resolved the first call).
203+
// Note that the stickiness is not created by the method name or the
204+
// arguments it is solely controlled by the provided "computeWorkerKey"
205+
// method, which in the test example always returns the same key, so all
206+
// calls should be redirected to worker 1 (which is the one that resolved
207+
// the first call).
207208
const p1 = farm.doWork('foo', 'bar');
208209
workerReply(1, null, 17);
209210
await p1;
210211

211-
// The first time, a call with a "1234567890abcdef" hash had never been done
212-
// earlier ("foo" call), so it got queued to all workers. Later, since the one
213-
// that resolved the call was the one in position 1, all subsequent calls are
214-
// only redirected to that worker.
212+
// The first time, a call with a "1234567890abcdef" hash had never been
213+
// done earlier ("foo" call), so it got queued to all workers. Later, since
214+
// the one that resolved the call was the one in position 1, all subsequent
215+
// calls are only redirected to that worker.
215216
expect(callback).toHaveBeenCalledTimes(2); // Only "foo".
216217
expect(callback).toHaveBeenNthCalledWith(
217218
1,
@@ -248,11 +249,11 @@ describe('Farm', () => {
248249
workerReply(1, null, 17);
249250
await p1;
250251

251-
// Both requests are send to the same worker
252-
// The first time, a call with a "1234567890abcdef" hash had never been done
253-
// earlier ("foo" call), so it got queued to all workers. Later, since the one
254-
// that resolved the call was the one in position 1, all subsequent calls are
255-
// only redirected to that worker.
252+
// Both requests are send to the same worker. The first time, a call with
253+
// a "1234567890abcdef" hash had never been done earlier ("foo" call), so
254+
// it got queued to all workers. Later, since the one that resolved the
255+
// call was the one in position 1, all subsequent calls are only redirected
256+
// to that worker.
256257
expect(callback).toHaveBeenCalledTimes(2);
257258
expect(callback).toHaveBeenNthCalledWith(
258259
1,
@@ -269,4 +270,67 @@ describe('Farm', () => {
269270
expect.any(Function),
270271
);
271272
});
273+
274+
it('checks that locking works, and jobs are never lost', async () => {
275+
const hash = jest
276+
.fn()
277+
// This will go to both queues, but picked by the first worker.
278+
.mockReturnValueOnce(0)
279+
// This will go to both queues too, but picked by the second worker.
280+
.mockReturnValueOnce(1)
281+
// This will go to worker 0, now only assigned to it.
282+
.mockReturnValueOnce(0)
283+
// This will go to worker 1, now only assigned to it.
284+
.mockReturnValueOnce(1)
285+
// This will go to both queues too, but will wait, since workers are busy.
286+
.mockReturnValueOnce(2)
287+
// This will only go to the first queue.
288+
.mockReturnValueOnce(0)
289+
// This will be gone if the queue implementation is wrong.
290+
.mockReturnValueOnce(0)
291+
// Push onto the second queue; potentially wiping the earlier job.
292+
.mockReturnValueOnce(1);
293+
294+
const farm = new Farm(2, callback, hash);
295+
296+
// First and second jobs get resolved, so that their hash is sticked to
297+
// the right worker: worker assignment is performed when workers reply, not
298+
// when the call is made.
299+
const p0 = farm.doWork('work-0');
300+
const p1 = farm.doWork('work-1');
301+
workerReply(0, null, 'response-0');
302+
await p0;
303+
workerReply(1, null, 'response-1');
304+
await p1;
305+
306+
// Now we perform the rest of the calls (7 resolves before 5 and 6, since 4
307+
// is in both queues, and as soon as you resolve 4, 7 will be picked).
308+
const p2 = farm.doWork('work-2');
309+
const p3 = farm.doWork('work-3');
310+
const p4 = farm.doWork('work-4');
311+
const p5 = farm.doWork('work-5');
312+
const p6 = farm.doWork('work-6');
313+
const p7 = farm.doWork('work-7');
314+
workerReply(2, null, 'response-2');
315+
await p2;
316+
workerReply(3, null, 'response-3');
317+
await p3;
318+
workerReply(4, null, 'response-4');
319+
await p4;
320+
workerReply(5, null, 'response-7');
321+
await p7;
322+
workerReply(6, null, 'response-5');
323+
await p5;
324+
workerReply(7, null, 'response-6');
325+
await p6;
326+
327+
await expect(p0).resolves.toBe('response-0');
328+
await expect(p1).resolves.toBe('response-1');
329+
await expect(p2).resolves.toBe('response-2');
330+
await expect(p3).resolves.toBe('response-3');
331+
await expect(p4).resolves.toBe('response-4');
332+
await expect(p5).resolves.toBe('response-5');
333+
await expect(p6).resolves.toBe('response-6');
334+
await expect(p7).resolves.toBe('response-7');
335+
});
272336
});

packages/jest-worker/src/types.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,12 +147,17 @@ export type ParentMessageError = [
147147
export type ParentMessage = ParentMessageOk | ParentMessageError;
148148

149149
// Queue types.
150+
150151
export type OnStart = (worker: WorkerInterface) => void;
151152
export type OnEnd = (err: Error | null, result: unknown) => void;
152153

153154
export type QueueChildMessage = {
154155
request: ChildMessage;
155156
onStart: OnStart;
156157
onEnd: OnEnd;
157-
next?: QueueChildMessage;
158+
};
159+
160+
export type QueueItem = {
161+
task: QueueChildMessage;
162+
next: QueueItem | null;
158163
};

0 commit comments

Comments
 (0)