|
1 | 1 | --- |
2 | | -id: Class |
| 2 | +id: Instance |
3 | 3 | sidebar_position: 2 |
4 | 4 | --- |
5 | 5 |
|
@@ -98,3 +98,129 @@ This class extends [`EventEmitter`](https://nodejs.org/api/events.html) from Nod |
98 | 98 | Use caution when setting resource limits. Setting limits that are too low may |
99 | 99 | result in the `Piscina` worker threads being unusable. |
100 | 100 | ::: |
| 101 | + |
| 102 | +## `PiscinaLoadBalancer` |
| 103 | + |
| 104 | +The `PiscinaLoadBalancer` interface is used to implement custom load balancing algorithm that determines which worker thread should be assigned a task. |
| 105 | + |
| 106 | +> For more information, see [Custom Load Balancers](../advanced-topics/loadbalancer.mdx). |
| 107 | +
|
| 108 | +### Interface: `PiscinaLoadBalancer` |
| 109 | + |
| 110 | +```ts |
| 111 | +type PiscinaLoadBalancer = ( |
| 112 | + task: PiscinaTask, // Task to be distributed |
| 113 | + workers: PiscinaWorker[] // Array of Worker instances |
| 114 | +) => PiscinaWorker | null; // Worker instance to be assigned the task |
| 115 | +``` |
| 116 | + |
| 117 | +If the `PiscinaLoadBalancer` returns `null`, `Piscina` will attempt to spawn a new worker, otherwise the task will be queued until a worker is available. |
| 118 | + |
| 119 | +### Interface: `PiscinaTask` |
| 120 | + |
| 121 | +```ts |
| 122 | +interface PiscinaTask { |
| 123 | + taskId: number; // Unique identifier for the task |
| 124 | + filename: string; // Filename of the worker module |
| 125 | + name: string; // Name of the worker function |
| 126 | + created: number; // Timestamp when the task was created |
| 127 | + isAbortable: boolean; // Indicates if the task can be aborted through AbortSignal |
| 128 | +} |
| 129 | +``` |
| 130 | + |
| 131 | +### Interface: `PiscinaWorker` |
| 132 | + |
| 133 | +```ts |
| 134 | +interface PiscinaWorker { |
| 135 | + id: number; // Unique identifier for the worker |
| 136 | + currentUsage: number; // Number of tasks currently running on the worker |
| 137 | + isRunningAbortableTask: boolean; // Indicates if the worker is running an abortable task |
| 138 | + histogram: HistogramSummary | null; // Worker histogram |
| 139 | + terminating: boolean; // Indicates if the worker is terminating |
| 140 | + destroyed: boolean; // Indicates if the worker has been destroyed |
| 141 | +} |
| 142 | +``` |
| 143 | + |
| 144 | +### Example: Custom Load Balancer |
| 145 | + |
| 146 | +#### JavaScript |
| 147 | +<a id="custom-load-balancer-example-js"> </a> |
| 148 | + |
| 149 | +```js |
| 150 | +const { Piscina } = require('piscina'); |
| 151 | + |
| 152 | +function LeastBusyBalancer(opts) { |
| 153 | + const { maximumUsage } = opts; |
| 154 | + |
| 155 | + return (task, workers) => { |
| 156 | + let candidate = null; |
| 157 | + let checkpoint = maximumUsage; |
| 158 | + for (const worker of workers) { |
| 159 | + if (worker.currentUsage === 0) { |
| 160 | + candidate = worker; |
| 161 | + break; |
| 162 | + } |
| 163 | + |
| 164 | + if (worker.isRunningAbortableTask) continue; |
| 165 | + |
| 166 | + if (!task.isAbortable && worker.currentUsage < checkpoint) { |
| 167 | + candidate = worker; |
| 168 | + checkpoint = worker.currentUsage; |
| 169 | + } |
| 170 | + } |
| 171 | + |
| 172 | + return candidate; |
| 173 | + }; |
| 174 | +} |
| 175 | + |
| 176 | +const piscina = new Piscina({ |
| 177 | + loadBalancer: LeastBusyBalancer({ maximumUsage: 2 }), |
| 178 | +}); |
| 179 | + |
| 180 | +piscina |
| 181 | + .runTask({ filename: 'worker.js', name: 'default' }) |
| 182 | + .then((result) => console.log(result)) |
| 183 | + .catch((err) => console.error(err)); |
| 184 | +``` |
| 185 | + |
| 186 | +#### TypeScript |
| 187 | +<a id="custom-load-balancer-example-ts"> </a> |
| 188 | + |
| 189 | +```ts |
| 190 | +import { Piscina } from 'piscina'; |
| 191 | + |
| 192 | +function LeastBusyBalancer( |
| 193 | + opts: LeastBusyBalancerOptions |
| 194 | +): PiscinaLoadBalancer { |
| 195 | + const { maximumUsage } = opts; |
| 196 | + |
| 197 | + return (task, workers) => { |
| 198 | + let candidate: PiscinaWorker | null = null; |
| 199 | + let checkpoint = maximumUsage; |
| 200 | + for (const worker of workers) { |
| 201 | + if (worker.currentUsage === 0) { |
| 202 | + candidate = worker; |
| 203 | + break; |
| 204 | + } |
| 205 | + |
| 206 | + if (worker.isRunningAbortableTask) continue; |
| 207 | + |
| 208 | + if (!task.isAbortable && worker.currentUsage < checkpoint) { |
| 209 | + candidate = worker; |
| 210 | + checkpoint = worker.currentUsage; |
| 211 | + } |
| 212 | + } |
| 213 | + |
| 214 | + return candidate; |
| 215 | + }; |
| 216 | +} |
| 217 | + |
| 218 | +const piscina = new Piscina({ |
| 219 | + loadBalancer: LeastBusyBalancer({ maximumUsage: 2 }), |
| 220 | +}); |
| 221 | + |
| 222 | +piscina |
| 223 | + .runTask({ filename: 'worker.js', name: 'default' }) |
| 224 | + .then((result) => console.log(result)) |
| 225 | + .catch((err) => console.error(err)); |
| 226 | +``` |
0 commit comments