Skip to content

Commit 6d8774f

Browse files
committed
fix: Dont await on table resovler
1 parent b73a94a commit 6d8774f

File tree

3 files changed

+55
-18
lines changed

3 files changed

+55
-18
lines changed

package-lock.json

Lines changed: 23 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@
101101
"modern-errors": "^7.0.0",
102102
"modern-errors-bugs": "^5.0.0",
103103
"p-map": "^7.0.0",
104+
"p-queue": "^8.1.0",
104105
"p-timeout": "^6.1.2",
105106
"path-exists": "^5.0.0",
106107
"path-type": "^6.0.0",

src/scheduler/scheduler.ts

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { Duplex } from 'node:stream';
22

33
import pMap from 'p-map';
4+
import pQueue from 'p-queue';
45
import pTimeout from 'p-timeout';
56
import type { Logger } from 'winston';
67

@@ -92,20 +93,9 @@ const resolveTable = async (
9293
) => {
9394
logger.info(`resolving table ${table.name}`);
9495
const stream = new TableResolverStream();
95-
try {
96-
await table.resolver(client, parent, stream);
97-
} catch (error) {
98-
const tableError = new SyncTableResolveError(`error resolving table ${table.name}`, {
99-
cause: error,
100-
props: { table, client },
101-
});
102-
logger.error(`error resolving table ${table.name}`, tableError);
103-
return;
104-
} finally {
105-
stream.end();
106-
}
96+
const resolverPromise = table.resolver(client, parent, stream);
10797

108-
for await (const data of stream) {
98+
const processData = async (data: unknown) => {
10999
logger.debug(`resolving resource for table ${table.name}`);
110100
const resolveResourceTimeout = 10 * 60 * 1000;
111101
const resource = new Resource(table, parent, data);
@@ -118,7 +108,7 @@ const resolveTable = async (
118108
props: { resource, table, client },
119109
});
120110
logger.error(preResolverError);
121-
continue;
111+
return;
122112
}
123113

124114
try {
@@ -128,7 +118,7 @@ const resolveTable = async (
128118
await pTimeout(allColumnsPromise, { milliseconds: resolveResourceTimeout });
129119
} catch (error) {
130120
logger.error(`error resolving columns for table ${table.name}`, error);
131-
continue;
121+
return;
132122
}
133123

134124
try {
@@ -139,7 +129,7 @@ const resolveTable = async (
139129
props: { resource, table, client },
140130
});
141131
logger.error(postResolveError);
142-
continue;
132+
return;
143133
}
144134

145135
setCQId(resource, deterministicCQId);
@@ -148,7 +138,7 @@ const resolveTable = async (
148138
validateResource(resource);
149139
} catch (error) {
150140
logger.error(error);
151-
continue;
141+
return;
152142
}
153143

154144
try {
@@ -161,14 +151,37 @@ const resolveTable = async (
161151
},
162152
});
163153
logger.error(encodeError);
164-
continue;
154+
return;
165155
}
166156

167157
logger.debug(`done resolving resource for table ${table.name}`);
168158

169159
await pMap(table.relations, (child) =>
170160
resolveTable(logger, client, child, resource, syncStream, deterministicCQId),
171161
);
162+
};
163+
164+
const queue = new pQueue({ concurrency: 5 });
165+
166+
stream.on('data', async (data) => {
167+
await queue.add(() => processData(data));
168+
});
169+
170+
stream.on('end', async () => {
171+
await queue.onIdle();
172+
});
173+
174+
try {
175+
await resolverPromise;
176+
} catch (error) {
177+
const tableError = new SyncTableResolveError(`error resolving table ${table.name}`, {
178+
cause: error,
179+
props: { table, client },
180+
});
181+
logger.error(`error resolving table ${table.name}`, tableError);
182+
return;
183+
} finally {
184+
stream.end();
172185
}
173186

174187
logger.info(`done resolving table ${table.name}`);

0 commit comments

Comments
 (0)