Skip to content

Commit a17e654

Browse files
authored
feat(new-webui): Port search services from old-webui to Fastify plugins. (#912)
1 parent 6643d42 commit a17e654

File tree

11 files changed

+332
-76
lines changed

11 files changed

+332
-76
lines changed

components/log-viewer-webui/server/settings.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@
33
"SqlDbPort": 3306,
44
"SqlDbName": "clp-db",
55
"SqlDbQueryJobsTableName": "query_jobs",
6+
67
"MongoDbHost": "localhost",
78
"MongoDbPort": 27017,
89
"MongoDbName": "clp-query-results",
910
"MongoDbStreamFilesCollectionName": "stream-files",
11+
"MongoDbSearchResultsMetadataCollectionName": "results-metadata",
1012

1113
"ClientDir": "../../client/dist",
1214
"LogViewerDir": "../../yscope-log-viewer/dist",

components/log-viewer-webui/server/src/fastify-v2/app.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,9 @@ export default async function serviceApp (
5252

5353
// Loads all routes.
5454
fastify.register(fastifyAutoload, {
55-
dir: path.join(import.meta.dirname, "routes"),
5655
autoHooks: true,
5756
cascadeHooks: true,
57+
dir: path.join(import.meta.dirname, "routes"),
5858
options: {...opts},
5959
});
6060

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
import {setTimeout} from "node:timers/promises";
2+
3+
import type {MySQLPromisePool} from "@fastify/mysql";
4+
import {encode} from "@msgpack/msgpack";
5+
import {FastifyInstance} from "fastify";
6+
import fp from "fastify-plugin";
7+
import {ResultSetHeader} from "mysql2";
8+
9+
import settings from "../../../../../../settings.json" with {type: "json"};
10+
import {
11+
QUERY_JOB_STATUS,
12+
QUERY_JOB_STATUS_WAITING_STATES,
13+
QUERY_JOB_TYPE,
14+
QUERY_JOBS_TABLE_COLUMN_NAMES,
15+
QueryJob,
16+
} from "../../../../../typings/query.js";
17+
import {JOB_COMPLETION_STATUS_POLL_INTERVAL_MILLIS} from "./typings.js";
18+
19+
20+
/**
21+
* Class for submitting and monitoring query jobs in the database.
22+
*/
23+
class QueryJobsDbManager {
24+
#sqlDbConnPool: MySQLPromisePool;
25+
26+
private constructor (sqlDbConnPool: MySQLPromisePool) {
27+
this.#sqlDbConnPool = sqlDbConnPool;
28+
}
29+
30+
/**
31+
* Creates a new QueryJobsDbManager.
32+
*
33+
* @param fastify
34+
* @return
35+
*/
36+
static create (fastify: FastifyInstance): QueryJobsDbManager {
37+
const sqlDbConnPool = fastify.mysql;
38+
return new QueryJobsDbManager(sqlDbConnPool);
39+
}
40+
41+
/**
42+
* Submits a search job to the database.
43+
*
44+
* @param searchConfig The arguments for the query.
45+
* @return The job's ID.
46+
* @throws {Error} on error.
47+
*/
48+
async submitSearchJob (searchConfig: object): Promise<number> {
49+
const [queryInsertResults] = await this.#sqlDbConnPool.query<ResultSetHeader>(
50+
`
51+
INSERT INTO ${settings.SqlDbQueryJobsTableName} (
52+
${QUERY_JOBS_TABLE_COLUMN_NAMES.JOB_CONFIG},
53+
${QUERY_JOBS_TABLE_COLUMN_NAMES.TYPE}
54+
)
55+
VALUES (?, ?)
56+
`,
57+
[
58+
Buffer.from(encode(searchConfig)),
59+
QUERY_JOB_TYPE.SEARCH_OR_AGGREGATION,
60+
]
61+
);
62+
63+
return queryInsertResults.insertId;
64+
}
65+
66+
/**
67+
* Submits an aggregation job to the database.
68+
*
69+
* @param searchConfig The arguments for the query.
70+
* @param timeRangeBucketSizeMillis
71+
* @return The aggregation job's ID.
72+
* @throws {Error} on error.
73+
*/
74+
async submitAggregationJob (
75+
searchConfig: object,
76+
timeRangeBucketSizeMillis: number
77+
): Promise<number> {
78+
const searchAggregationConfig = {
79+
...searchConfig,
80+
aggregation_config: {
81+
count_by_time_bucket_size: timeRangeBucketSizeMillis,
82+
},
83+
};
84+
85+
return this.submitSearchJob(searchAggregationConfig);
86+
}
87+
88+
/**
89+
* Submits a query cancellation request to the database.
90+
*
91+
* @param jobId ID of the job to cancel.
92+
* @return
93+
* @throws {Error} on error.
94+
*/
95+
async submitQueryCancellation (jobId: number): Promise<void> {
96+
await this.#sqlDbConnPool.query(
97+
`
98+
UPDATE ${settings.SqlDbQueryJobsTableName}
99+
SET ${QUERY_JOBS_TABLE_COLUMN_NAMES.STATUS} = ${QUERY_JOB_STATUS.CANCELLING}
100+
WHERE ${QUERY_JOBS_TABLE_COLUMN_NAMES.ID} = ?
101+
AND ${QUERY_JOBS_TABLE_COLUMN_NAMES.STATUS}
102+
IN (${QUERY_JOB_STATUS.PENDING}, ${QUERY_JOB_STATUS.RUNNING})
103+
`,
104+
jobId,
105+
);
106+
}
107+
108+
/**
109+
* Waits for the job to complete.
110+
*
111+
* @param jobId
112+
* @return
113+
* @throws {Error} on MySQL error, if the job wasn't found in the database, if the job was
114+
* cancelled, or if the job completed in an unexpected state.
115+
*/
116+
async awaitJobCompletion (jobId: number): Promise<void> {
117+
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
118+
while (true) {
119+
let queryJob: QueryJob | undefined;
120+
try {
121+
const [queryRows] = await this.#sqlDbConnPool.query<QueryJob[]>(
122+
`
123+
SELECT ${QUERY_JOBS_TABLE_COLUMN_NAMES.STATUS}
124+
FROM ${settings.SqlDbQueryJobsTableName}
125+
WHERE ${QUERY_JOBS_TABLE_COLUMN_NAMES.ID} = ?
126+
`,
127+
jobId
128+
);
129+
130+
[queryJob] = queryRows;
131+
} catch (e: unknown) {
132+
throw new Error(`Failed to query status for job ${jobId}`, {cause: e});
133+
}
134+
if ("undefined" === typeof queryJob) {
135+
throw new Error(`Job ${jobId} not found in the database.`);
136+
}
137+
138+
const status = queryJob[QUERY_JOBS_TABLE_COLUMN_NAMES.STATUS];
139+
140+
if (false === QUERY_JOB_STATUS_WAITING_STATES.has(status)) {
141+
if (QUERY_JOB_STATUS.CANCELLED === status) {
142+
throw new Error(`Job ${jobId} was cancelled.`);
143+
} else if (QUERY_JOB_STATUS.SUCCEEDED !== status) {
144+
throw new Error(
145+
`Job ${jobId} exited with unexpected status=${status}: ` +
146+
`${Object.keys(QUERY_JOB_STATUS)[status]}.`
147+
);
148+
}
149+
break;
150+
}
151+
152+
await setTimeout(JOB_COMPLETION_STATUS_POLL_INTERVAL_MILLIS);
153+
}
154+
}
155+
}
156+
157+
declare module "fastify" {
158+
export interface FastifyInstance {
159+
QueryJobsDbManager: QueryJobsDbManager;
160+
}
161+
}
162+
163+
export default fp(
164+
(fastify) => {
165+
fastify.decorate("QueryJobsDbManager", QueryJobsDbManager.create(fastify));
166+
},
167+
{
168+
name: "QueryJobsDbManager",
169+
}
170+
);
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
/**
2+
* Interval in milliseconds for polling the completion status of a job.
3+
*/
4+
export const JOB_COMPLETION_STATUS_POLL_INTERVAL_MILLIS = 500;
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import {FastifyInstance} from "fastify";
2+
import fp from "fastify-plugin";
3+
4+
import settings from "../../../../../../settings.json" with {type: "json"};
5+
import type {SearchResultsMetadataDocument} from "./typings.js";
6+
7+
8+
/**
9+
* Creates a MongoDB collection for search results metadata.
10+
*
11+
* @param fastify
12+
* @return MongoDB collection
13+
* @throws {Error} if the MongoDB database is not found.
14+
*/
15+
const createSearchResultsMetadataCollection = (fastify: FastifyInstance) => {
16+
if ("undefined" === typeof fastify.mongo.db) {
17+
throw new Error("MongoDB database not found");
18+
}
19+
20+
return fastify.mongo.db.collection<SearchResultsMetadataDocument>(
21+
settings.MongoDbSearchResultsMetadataCollectionName
22+
);
23+
};
24+
25+
declare module "fastify" {
26+
export interface FastifyInstance {
27+
SearchResultsMetadataCollection: ReturnType<typeof createSearchResultsMetadataCollection>;
28+
}
29+
}
30+
31+
export default fp(
32+
(fastify) => {
33+
fastify.decorate(
34+
"SearchResultsMetadataCollection",
35+
createSearchResultsMetadataCollection(fastify)
36+
);
37+
},
38+
{
39+
name: "SearchResultsMetadataCollection",
40+
}
41+
);
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import {Nullable} from "../../../../../typings/common.js";
2+
3+
4+
/**
5+
* Enum of search-related signals.
6+
*
7+
* This includes request and response signals for various search operations and their respective
8+
* states.
9+
*/
10+
enum SEARCH_SIGNAL {
11+
NONE = "none",
12+
13+
REQ_CANCELLING = "req-cancelling",
14+
REQ_CLEARING = "req-clearing",
15+
REQ_QUERYING = "req-querying",
16+
17+
RESP_DONE = "resp-done",
18+
RESP_QUERYING = "resp-querying",
19+
}
20+
21+
/**
22+
* MongoDB document for search results metadata. `numTotalResults` is optional
23+
* since it is only set when the search job is completed.
24+
*/
25+
interface SearchResultsMetadataDocument {
26+
_id: string;
27+
errorMsg: Nullable<string>;
28+
lastSignal: SEARCH_SIGNAL;
29+
numTotalResults?: number;
30+
}
31+
32+
export type {SearchResultsMetadataDocument};
33+
export {SEARCH_SIGNAL};

components/log-viewer-webui/server/src/plugins/DbManager.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,16 @@ import {
1212
import {Nullable} from "../typings/common.js";
1313
import {
1414
DbManagerOptions,
15+
StreamFileMongoDocument,
16+
StreamFilesCollection,
17+
} from "../typings/DbManager.js";
18+
import {
1519
QUERY_JOB_STATUS,
1620
QUERY_JOB_STATUS_WAITING_STATES,
1721
QUERY_JOB_TYPE,
1822
QUERY_JOBS_TABLE_COLUMN_NAMES,
1923
QueryJob,
20-
StreamFileMongoDocument,
21-
StreamFilesCollection,
22-
} from "../typings/DbManager.js";
24+
} from "../typings/query.js";
2325
import {sleep} from "../utils/time.js";
2426

2527

components/log-viewer-webui/server/src/routes/query.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@ import {FastifyPluginAsync} from "fastify";
44
import {StatusCodes} from "http-status-codes";
55

66
import settings from "../../settings.json" with {type: "json"};
7-
import {QUERY_JOB_TYPE} from "../plugins/DbManager.js";
8-
import {EXTRACT_JOB_TYPES} from "../typings/DbManager.js";
7+
import {
8+
EXTRACT_JOB_TYPES,
9+
QUERY_JOB_TYPE,
10+
} from "../typings/query.js";
911

1012

1113
/**
Lines changed: 0 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1,67 +1,6 @@
11
import {Collection} from "mongodb";
2-
import {RowDataPacket} from "mysql2/promise";
32

43

5-
/**
6-
* Matching the `QueryJobType` class in `job_orchestration.query_scheduler.constants`.
7-
*/
8-
enum QUERY_JOB_TYPE {
9-
SEARCH_OR_AGGREGATION = 0,
10-
EXTRACT_IR,
11-
EXTRACT_JSON,
12-
}
13-
14-
/**
15-
* List of valid extract job types.
16-
*/
17-
const EXTRACT_JOB_TYPES = new Set([
18-
QUERY_JOB_TYPE.EXTRACT_IR,
19-
QUERY_JOB_TYPE.EXTRACT_JSON,
20-
]);
21-
22-
/**
23-
* Matching the `QueryJobStatus` class in
24-
* `job_orchestration.query_scheduler.constants`.
25-
*
26-
* @enum {number}
27-
*/
28-
enum QUERY_JOB_STATUS {
29-
PENDING = 0,
30-
RUNNING,
31-
SUCCEEDED,
32-
FAILED,
33-
CANCELLING,
34-
CANCELLED,
35-
}
36-
37-
/**
38-
* List of states that indicate the job is either pending or in progress.
39-
*/
40-
const QUERY_JOB_STATUS_WAITING_STATES = new Set([
41-
QUERY_JOB_STATUS.PENDING,
42-
QUERY_JOB_STATUS.RUNNING,
43-
QUERY_JOB_STATUS.CANCELLING,
44-
]);
45-
46-
/**
47-
* The `query_jobs` table's column names.
48-
*
49-
* @enum {string}
50-
*/
51-
enum QUERY_JOBS_TABLE_COLUMN_NAMES {
52-
ID = "id",
53-
TYPE = "type",
54-
STATUS = "status",
55-
JOB_CONFIG = "job_config",
56-
}
57-
58-
interface QueryJob extends RowDataPacket {
59-
[QUERY_JOBS_TABLE_COLUMN_NAMES.ID]: number;
60-
[QUERY_JOBS_TABLE_COLUMN_NAMES.TYPE]: QUERY_JOB_TYPE;
61-
[QUERY_JOBS_TABLE_COLUMN_NAMES.STATUS]: QUERY_JOB_STATUS;
62-
[QUERY_JOBS_TABLE_COLUMN_NAMES.JOB_CONFIG]: string;
63-
}
64-
654
interface DbManagerOptions {
665
mysqlConfig: {
676
user: string;
@@ -92,14 +31,6 @@ type StreamFilesCollection = Collection<StreamFileMongoDocument>;
9231

9332
export type {
9433
DbManagerOptions,
95-
QueryJob,
9634
StreamFileMongoDocument,
9735
StreamFilesCollection,
9836
};
99-
export {
100-
EXTRACT_JOB_TYPES,
101-
QUERY_JOB_STATUS,
102-
QUERY_JOB_STATUS_WAITING_STATES,
103-
QUERY_JOB_TYPE,
104-
QUERY_JOBS_TABLE_COLUMN_NAMES,
105-
};

0 commit comments

Comments
 (0)