Skip to content

Commit c5a921f

Browse files
committed
fix: Dont await on table resovler
1 parent b73a94a commit c5a921f

File tree

1 file changed

+30
-18
lines changed

1 file changed

+30
-18
lines changed

src/scheduler/scheduler.ts

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -92,20 +92,9 @@ const resolveTable = async (
9292
) => {
9393
logger.info(`resolving table ${table.name}`);
9494
const stream = new TableResolverStream();
95-
try {
96-
await table.resolver(client, parent, stream);
97-
} catch (error) {
98-
const tableError = new SyncTableResolveError(`error resolving table ${table.name}`, {
99-
cause: error,
100-
props: { table, client },
101-
});
102-
logger.error(`error resolving table ${table.name}`, tableError);
103-
return;
104-
} finally {
105-
stream.end();
106-
}
95+
const resolverPromise = table.resolver(client, parent, stream);
10796

108-
for await (const data of stream) {
97+
const processData = async (data: unknown) => {
10998
logger.debug(`resolving resource for table ${table.name}`);
11099
const resolveResourceTimeout = 10 * 60 * 1000;
111100
const resource = new Resource(table, parent, data);
@@ -118,7 +107,7 @@ const resolveTable = async (
118107
props: { resource, table, client },
119108
});
120109
logger.error(preResolverError);
121-
continue;
110+
return;
122111
}
123112

124113
try {
@@ -128,7 +117,7 @@ const resolveTable = async (
128117
await pTimeout(allColumnsPromise, { milliseconds: resolveResourceTimeout });
129118
} catch (error) {
130119
logger.error(`error resolving columns for table ${table.name}`, error);
131-
continue;
120+
return;
132121
}
133122

134123
try {
@@ -139,7 +128,7 @@ const resolveTable = async (
139128
props: { resource, table, client },
140129
});
141130
logger.error(postResolveError);
142-
continue;
131+
return;
143132
}
144133

145134
setCQId(resource, deterministicCQId);
@@ -148,7 +137,7 @@ const resolveTable = async (
148137
validateResource(resource);
149138
} catch (error) {
150139
logger.error(error);
151-
continue;
140+
return;
152141
}
153142

154143
try {
@@ -161,14 +150,37 @@ const resolveTable = async (
161150
},
162151
});
163152
logger.error(encodeError);
164-
continue;
153+
return;
165154
}
166155

167156
logger.debug(`done resolving resource for table ${table.name}`);
168157

169158
await pMap(table.relations, (child) =>
170159
resolveTable(logger, client, child, resource, syncStream, deterministicCQId),
171160
);
161+
};
162+
163+
const processPromises: Promise<void>[] = [];
164+
stream.on('data', async (data) => {
165+
const promise = processData(data);
166+
processPromises.push(promise);
167+
});
168+
169+
stream.on('end', async () => {
170+
await Promise.all(processPromises);
171+
});
172+
173+
try {
174+
await resolverPromise;
175+
} catch (error) {
176+
const tableError = new SyncTableResolveError(`error resolving table ${table.name}`, {
177+
cause: error,
178+
props: { table, client },
179+
});
180+
logger.error(`error resolving table ${table.name}`, tableError);
181+
return;
182+
} finally {
183+
stream.end();
172184
}
173185

174186
logger.info(`done resolving table ${table.name}`);

0 commit comments

Comments
 (0)