-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathclient-registry.js
More file actions
153 lines (153 loc) · 6.29 KB
/
client-registry.js
File metadata and controls
153 lines (153 loc) · 6.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import { ComfyApi } from "../client.js";
export class ClientRegistry {
pool;
events;
clients = new Map();
// Maps a workflow structure hash to a set of client URLs that have affinity for that workflow
workflowAffinityMap = new Map();
constructor(pool, events) {
this.pool = pool;
this.events = events;
}
addClient(clientUrl, options) {
const comfyApi = new ComfyApi(clientUrl);
const enhancedClient = {
url: clientUrl,
state: "idle",
nodeName: new URL(clientUrl).hostname,
priority: options?.priority,
api: comfyApi
};
if (options?.workflowAffinity) {
enhancedClient.workflowAffinity = new Set();
for (const workflow of options.workflowAffinity) {
let hash = workflow.structureHash;
if (!hash) {
workflow.updateHash();
hash = workflow.structureHash;
}
if (!hash) {
throw new Error("Workflow must have a valid structure hash for affinity.");
}
if (!this.workflowAffinityMap.has(hash)) {
this.workflowAffinityMap.set(hash, new Set());
}
this.workflowAffinityMap.get(hash).add(clientUrl);
enhancedClient.workflowAffinity.add(hash);
}
}
this.clients.set(clientUrl, enhancedClient);
}
removeClient(clientUrl) {
this.clients.delete(clientUrl);
}
async getQueueStatus(clientUrl) {
const comfyApi = this.clients.get(clientUrl)?.api;
if (!comfyApi) {
throw new Error(`Client ${clientUrl} not found`);
}
return comfyApi.getQueue();
}
getOptimalClient(workflow) {
let workflowHash = workflow.structureHash;
if (!workflowHash) {
workflow.updateHash();
workflowHash = workflow.structureHash;
}
if (!workflowHash) {
throw new Error("Workflow must have a valid structure hash.");
}
// Filter clients based on workflow affinity
const suitableClients = [];
for (const client of this.clients.values()) {
if (client.state !== "idle") {
continue;
}
if (client.workflowAffinity && client.workflowAffinity.has(workflowHash)) {
suitableClients.push(client);
}
}
if (suitableClients.length === 0) {
this.events.emitEvent({ type: "debug", payload: `No suitable clients found for workflow ${workflowHash}.` });
return null;
}
this.events.emitEvent({ type: "debug", payload: `Suitable clients for workflow ${workflowHash}: ${suitableClients.map(value => value.nodeName).join(",")}` });
// sort suitable clients by priority
suitableClients.sort((a, b) => {
const priorityA = a.priority ?? 0;
const priorityB = b.priority ?? 0;
return priorityB - priorityA; // higher priority first
});
return suitableClients.length > 0 ? suitableClients[0] : null;
}
hasClientsForWorkflow(workflowHash) {
const clientSet = this.workflowAffinityMap.get(workflowHash);
return clientSet !== undefined && clientSet.size > 0;
}
// Get an optimal idle client for a given workflow (used for general queue)
async getOptimalIdleClient(workflow) {
this.events.emitEvent({ type: "debug", payload: `Searching for idle clients for workflow ${workflow.structureHash}...` });
// We can infer model capabilities from workflow and try to get the best idle client, based on other workflow affinities, for now lets pick any idle client
const idleClients = [];
for (const client of this.clients.values()) {
if (client.state === "idle") {
// For the general queue, we need to check the actual queue state
await this.checkClientQueueState(client);
if (client.state === "idle") {
this.events.emitEvent({ type: "debug", payload: `Client ${client.nodeName} is idle.` });
idleClients.push(client);
}
}
}
this.events.emitEvent({ type: "debug", payload: `Idle clients available: ${idleClients.map(value => value.nodeName).join(",")}` });
// sort idle clients by priority
idleClients.sort((a, b) => {
const priorityA = a.priority ?? 0;
const priorityB = b.priority ?? 0;
return priorityB - priorityA; // higher priority first
});
return idleClients.length > 0 ? idleClients[0] : null;
}
async checkClientQueueState(client) {
try {
const queue = await this.getQueueStatus(client.url);
if (queue.queue_running.length > 0 || queue.queue_pending.length > 0) {
client.state = "busy";
}
else {
client.state = "idle";
}
}
catch (error) {
this.events.emitEvent({ type: "error", payload: { message: `Error checking queue state for client ${client.nodeName}`, error } });
client.state = "offline";
}
}
markClientIncompatibleWithWorkflow(url, structureHash) {
const client = this.clients.get(url);
if (client && structureHash && client.workflowAffinity) {
client.workflowAffinity.delete(structureHash);
const affinitySet = this.workflowAffinityMap.get(structureHash);
if (affinitySet) {
affinitySet.delete(url);
if (affinitySet.size === 0) {
this.workflowAffinityMap.delete(structureHash);
}
}
}
}
getAllEligibleClientsForWorkflow(workflow) {
const eligibleClients = [];
const workflowHash = workflow.structureHash;
if (!workflowHash) {
throw new Error("Workflow must have a valid structure hash.");
}
for (const client of this.clients.values()) {
if (client.workflowAffinity && client.workflowAffinity.has(workflowHash)) {
eligibleClients.push(client);
}
}
return eligibleClients;
}
}
//# sourceMappingURL=client-registry.js.map