WIP: t31135461 / Refine jest-worker API#6676
Conversation
Clean up types, create missing interfaes and make Flow and Jest happy
|
Please, take into account that this is a WIP PR. Currently, there are a few parts missing:
|
| this._locks = []; | ||
| this._offset = 0; | ||
|
|
||
| // If we exceeded the amount of retries, we will emulate an error reply |
There was a problem hiding this comment.
This part will be rewritten and removed from here. This a part of missing functionality I've mentioned in the first comment to the PR
| const numOfWorkers = this._workerPool.getWorkers().length; | ||
| for (let i = 0; i < numOfWorkers; i++) { | ||
| const workerIdx = (this._offset + i) % numOfWorkers; | ||
| this.enqueue(task, workerIdx); |
There was a problem hiding this comment.
Separate the initial part of the function into another one, to avoid recursivity.
There was a problem hiding this comment.
I can use enqueue(QueueChildMessage, number) for scheduling to a specific worker and push(QueueChildMessage) to schedule a job in all workers. wdyt?
There was a problem hiding this comment.
What I mean is to move the for loop into the parent implementation, where you will call enqueue always with a workerId. This way workerId won't be optional anymore. This will also allow you to remove getWorkers, because you won't need it to get the length anymore.
There was a problem hiding this comment.
From the design perspective I'd say Farm shouldn't be concerned how WorkerPool spread tasks among workers. It is fine that Farm can pin job to the worker if it has a reference to it, however it isn't cool if Farm contains queue distribution logic in it 🤔
There was a problem hiding this comment.
👍🏻 Let's separate it into a different method as discussed offline.
| _options: FarmOptions; | ||
| _stderr: Readable; | ||
| _stdout: Readable; | ||
| _workers: Array<WorkerInterface>; |
There was a problem hiding this comment.
The interface shouldn't have any of the private variables.
| send(request: ChildMessage, onProcessStart: OnStart, onProcessEnd: OnEnd) { | ||
| onProcessStart(this); | ||
| this._onProcessEnd = onProcessEnd; | ||
| // $FlowFixMe |
There was a problem hiding this comment.
¿
this._child.send(request) doesn't expect to receive a request type
| ]); | ||
| } | ||
|
|
||
| function execMethod(method: string, args: $ReadOnlyArray<any>): void { |
There was a problem hiding this comment.
Can you create a common file within the workers/ folder that abstract this functionality? It looks like child and threadChild are 90% the same.
| * non-zero exit code. | ||
| */ | ||
| process.on('message', (request: any /* Should be ChildMessage */) => { | ||
| process.on('message', (request: any) => { |
There was a problem hiding this comment.
Let's rename this file to processChild as opposed to threadChild.
| const workerOptions: WorkerOptions = { | ||
| forkOptions: options.forkOptions || {}, | ||
| maxRetries: options.maxRetries || 3, | ||
| useNodeWorkersIfPossible: options.useNodeWorkersIfPossible, |
There was a problem hiding this comment.
Let's remove this and make it true always.
There was a problem hiding this comment.
It makes sense as an option, doesn't it? Seems like an opt-in thing (beyond using a version of node where it's unflagged)
There was a problem hiding this comment.
There is no real reason why you'd like to use child_process instead of worker_threads when the second one is available. According to our benchmarks, implementation using workers works ~ twice as fast
There was a problem hiding this comment.
API stability, there might be behavioural differences. Upgrading my app from node 10 to 11 (or whenever they remove the flag) shouldn't change the underlying implementation IMO. The docs for the module says it uses multiple processes, and just silently not doing that, depending on node version, seems risky to me
FWIW I could get behind setting it to true by default so it's possible to opt-out.
There was a problem hiding this comment.
Upgrading my app from node 10 to 11 (or whenever they remove the flag) shouldn't change the underlying implementation IMO
I think that underlying implementation is not something that should concern an average library user. Public interface stays completely the same (which is crucial for all library users), while "engine" is getting faster. Same happens with the language and other libraries like React (Fiber rewrite) etc.
The docs for the module says it uses multiple processes, and just silently not doing that, depending on node version, seems risky to me
I'm completely on the same page, but we're going to cover all that by specs so there should be no issues with unexpected regression bugs.
I think @mjesun can give you more insights.
There was a problem hiding this comment.
Let's re-write the docs as "we'll choose the most-performant implementation that is possible". People can enforce one or another by explicitly passing the corresponding worker pool.
| return; | ||
| } | ||
| const task = {onEnd, onStart, request}; | ||
| const workerId = worker ? worker.getWorkerId() : undefined; |
There was a problem hiding this comment.
Let's unify the interface. Either we work with worker instances or with worker ids, but I see a mix in the API.
There was a problem hiding this comment.
Done. We use workerIds everywhere up to the pool
|
|
||
| if (job.owner) { | ||
| this._queue[workerId] = job.next ? job.next : null; | ||
| return this.run(workerId); |
There was a problem hiding this comment.
Remove recursivity here and use a while loop to find the first job available. Don't lock by worker, lock by job.
|
Ooooooh, nice! |
SimenB
left a comment
There was a problem hiding this comment.
This is also missing docs, in addition to the WIP points highlighted in a comment 🙂
Excited about this!
| @@ -0,0 +1,33 @@ | |||
| /** | |||
| * Copyright (c) 2017-present, Facebook, Inc. All rights reserved. | |||
There was a problem hiding this comment.
18? I have to admit I don't know what these should say
|
@SimenB you haven't seen the performance part yet 😉 P.S. Added "Usage docs" as a missing part. I think it should come along with this PR to avoid inconsistency |
| if (workerId != null) { | ||
| if (this._queue[workerId]) { | ||
| this._last[workerId].next = task; | ||
| this._last[workerId] = task; |
|
|
||
| this._workerPool.send(worker, job.request, job.onStart, onEnd); | ||
|
|
||
| job.owner = worker; |
There was a problem hiding this comment.
Convert owner to a boolean.
| const job = this._queue[workerId]; | ||
| const worker = this._workerPool.getWorkers()[workerId]; | ||
|
|
||
| if (!job) { |
There was a problem hiding this comment.
Convert job into an Array, as in https://github.com/facebook/jest/blob/master/packages/jest-worker/src/worker.js#L151.
|
|
||
| this.lock(workerId); | ||
|
|
||
| this._workerPool.send(worker, job.request, job.onStart, onEnd); |
There was a problem hiding this comment.
That's an example of API unification: let's use workerId.
| // $FlowFixMe: This has to be a dynamic require. | ||
| const child = require(workerPath); | ||
| this._cacheKeys = Object.create(null); | ||
| this._options = Object.assign({}, defaultFarmOptions, options); |
There was a problem hiding this comment.
Make sure to make all configuration non-optional from here down the line (e.g. your input says numWorkers: ?number, but the _options object passed here should not be optional (i.e. numWorkers: number).
| import { | ||
| CHILD_MESSAGE_INITIALIZE, | ||
| PARENT_MESSAGE_ERROR, | ||
| CHILD_MESSAGE_INITIALIZE, |
|
|
||
| const onEnd = (error: ?Error, result: mixed, worker: WorkerInterface) => { | ||
| this.unlock(workerId); | ||
| job.onEnd(error, result, worker); |
There was a problem hiding this comment.
Can we swap this with the previous line? Conceptually sounds better to keep the worker locked while finishing, just in case onEnd decides to go wild.
| } | ||
|
|
||
| getNextJob(workerId: number): ?QueueChildMessage { | ||
| if (!this._queue[workerId]) { |
There was a problem hiding this comment.
Cache the entire queue in const queue = this._queue[workerId].
| break; | ||
| } | ||
| this._queue[workerId] = this._queue[workerId].next; | ||
| } |
There was a problem hiding this comment.
| const numOfWorkers = this._workerPool.getWorkers().length; | ||
| for (let i = 0; i < numOfWorkers; i++) { | ||
| const workerIdx = (this._offset + i) % numOfWorkers; | ||
| this.enqueue(task, workerIdx); |
There was a problem hiding this comment.
What I mean is to move the for loop into the parent implementation, where you will call enqueue always with a workerId. This way workerId won't be optional anymore. This will also allow you to remove getWorkers, because you won't need it to get the length anymore.
|
|
||
| this._workers[i] = worker; | ||
|
|
||
| // $FlowFixMe |
There was a problem hiding this comment.
Please add explanation to all FlowFixMes.
You can get rid of this ones by redefining at the top the result, as a variable (class methods are covariant in Flow). For instance, putting at the top: getStderr: () => BaseWorkerPool; would do the trick.
Anyway, I'm not sure why do we need to bind them all.
There was a problem hiding this comment.
We actually don't need it anymore. Will remove them accordingly
1ad9aa5 to
29e04d2
Compare
|
Any news on this one? I'm excited! |
|
@mjesun @rubennorte anyone at FB working on this? If not, anyone able to say what works and what doesn't with this PR so we can land it? |
|
@SimenB AFAIK no one is working on this right now. I might take a look in a few weeks but I don't think it's going to be ready for the next major. Feel free to take over from here if you want to work on this (or anyone else). |
|
Might look into it if you could say some about what's missing. Just getting CI green? More tests? |
|
I merged in master and wrote tests for this on #7408 Separated the PRs to get a clean review 👍 |
|
Landed |
|
This pull request has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs. |
Summary
Node 10 shipped with a "threading API" that uses SharedBuffers to communicate between the main process and its child threads. Being jest-worker a parallelization library, we can take advantage of this API when available. At the same time, we'll decouple our scheduling logic from our communication logic for better re-usability.
Here are some key things:
Performance
Test plan
TBA