diff --git a/components/log-viewer-webui/server/settings.json b/components/log-viewer-webui/server/settings.json index 086f29905a..ac3e244fa3 100644 --- a/components/log-viewer-webui/server/settings.json +++ b/components/log-viewer-webui/server/settings.json @@ -3,10 +3,12 @@ "SqlDbPort": 3306, "SqlDbName": "clp-db", "SqlDbQueryJobsTableName": "query_jobs", + "MongoDbHost": "localhost", "MongoDbPort": 27017, "MongoDbName": "clp-query-results", "MongoDbStreamFilesCollectionName": "stream-files", + "MongoDbSearchResultsMetadataCollectionName": "results-metadata", "ClientDir": "../../client/dist", "LogViewerDir": "../../yscope-log-viewer/dist", diff --git a/components/log-viewer-webui/server/src/fastify-v2/app.ts b/components/log-viewer-webui/server/src/fastify-v2/app.ts index c5b6420bb4..6187d6dbb1 100644 --- a/components/log-viewer-webui/server/src/fastify-v2/app.ts +++ b/components/log-viewer-webui/server/src/fastify-v2/app.ts @@ -52,9 +52,9 @@ export default async function serviceApp ( // Loads all routes. fastify.register(fastifyAutoload, { - dir: path.join(import.meta.dirname, "routes"), autoHooks: true, cascadeHooks: true, + dir: path.join(import.meta.dirname, "routes"), options: {...opts}, }); diff --git a/components/log-viewer-webui/server/src/fastify-v2/plugins/app/search/QueryJobsDbManager/index.ts b/components/log-viewer-webui/server/src/fastify-v2/plugins/app/search/QueryJobsDbManager/index.ts new file mode 100644 index 0000000000..1f29ef92af --- /dev/null +++ b/components/log-viewer-webui/server/src/fastify-v2/plugins/app/search/QueryJobsDbManager/index.ts @@ -0,0 +1,170 @@ +import {setTimeout} from "node:timers/promises"; + +import type {MySQLPromisePool} from "@fastify/mysql"; +import {encode} from "@msgpack/msgpack"; +import {FastifyInstance} from "fastify"; +import fp from "fastify-plugin"; +import {ResultSetHeader} from "mysql2"; + +import settings from "../../../../../../settings.json" with {type: "json"}; +import { + QUERY_JOB_STATUS, + QUERY_JOB_STATUS_WAITING_STATES, + QUERY_JOB_TYPE, + QUERY_JOBS_TABLE_COLUMN_NAMES, + QueryJob, +} from "../../../../../typings/query.js"; +import {JOB_COMPLETION_STATUS_POLL_INTERVAL_MILLIS} from "./typings.js"; + + +/** + * Class for submitting and monitoring query jobs in the database. + */ +class QueryJobsDbManager { + #sqlDbConnPool: MySQLPromisePool; + + private constructor (sqlDbConnPool: MySQLPromisePool) { + this.#sqlDbConnPool = sqlDbConnPool; + } + + /** + * Creates a new QueryJobsDbManager. + * + * @param fastify + * @return + */ + static create (fastify: FastifyInstance): QueryJobsDbManager { + const sqlDbConnPool = fastify.mysql; + return new QueryJobsDbManager(sqlDbConnPool); + } + + /** + * Submits a search job to the database. + * + * @param searchConfig The arguments for the query. + * @return The job's ID. + * @throws {Error} on error. + */ + async submitSearchJob (searchConfig: object): Promise { + const [queryInsertResults] = await this.#sqlDbConnPool.query( + ` + INSERT INTO ${settings.SqlDbQueryJobsTableName} ( + ${QUERY_JOBS_TABLE_COLUMN_NAMES.JOB_CONFIG}, + ${QUERY_JOBS_TABLE_COLUMN_NAMES.TYPE} + ) + VALUES (?, ?) + `, + [ + Buffer.from(encode(searchConfig)), + QUERY_JOB_TYPE.SEARCH_OR_AGGREGATION, + ] + ); + + return queryInsertResults.insertId; + } + + /** + * Submits an aggregation job to the database. + * + * @param searchConfig The arguments for the query. + * @param timeRangeBucketSizeMillis + * @return The aggregation job's ID. + * @throws {Error} on error. + */ + async submitAggregationJob ( + searchConfig: object, + timeRangeBucketSizeMillis: number + ): Promise { + const searchAggregationConfig = { + ...searchConfig, + aggregation_config: { + count_by_time_bucket_size: timeRangeBucketSizeMillis, + }, + }; + + return this.submitSearchJob(searchAggregationConfig); + } + + /** + * Submits a query cancellation request to the database. + * + * @param jobId ID of the job to cancel. + * @return + * @throws {Error} on error. + */ + async submitQueryCancellation (jobId: number): Promise { + await this.#sqlDbConnPool.query( + ` + UPDATE ${settings.SqlDbQueryJobsTableName} + SET ${QUERY_JOBS_TABLE_COLUMN_NAMES.STATUS} = ${QUERY_JOB_STATUS.CANCELLING} + WHERE ${QUERY_JOBS_TABLE_COLUMN_NAMES.ID} = ? + AND ${QUERY_JOBS_TABLE_COLUMN_NAMES.STATUS} + IN (${QUERY_JOB_STATUS.PENDING}, ${QUERY_JOB_STATUS.RUNNING}) + `, + jobId, + ); + } + + /** + * Waits for the job to complete. + * + * @param jobId + * @return + * @throws {Error} on MySQL error, if the job wasn't found in the database, if the job was + * cancelled, or if the job completed in an unexpected state. + */ + async awaitJobCompletion (jobId: number): Promise { + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + while (true) { + let queryJob: QueryJob | undefined; + try { + const [queryRows] = await this.#sqlDbConnPool.query( + ` + SELECT ${QUERY_JOBS_TABLE_COLUMN_NAMES.STATUS} + FROM ${settings.SqlDbQueryJobsTableName} + WHERE ${QUERY_JOBS_TABLE_COLUMN_NAMES.ID} = ? + `, + jobId + ); + + [queryJob] = queryRows; + } catch (e: unknown) { + throw new Error(`Failed to query status for job ${jobId}`, {cause: e}); + } + if ("undefined" === typeof queryJob) { + throw new Error(`Job ${jobId} not found in the database.`); + } + + const status = queryJob[QUERY_JOBS_TABLE_COLUMN_NAMES.STATUS]; + + if (false === QUERY_JOB_STATUS_WAITING_STATES.has(status)) { + if (QUERY_JOB_STATUS.CANCELLED === status) { + throw new Error(`Job ${jobId} was cancelled.`); + } else if (QUERY_JOB_STATUS.SUCCEEDED !== status) { + throw new Error( + `Job ${jobId} exited with unexpected status=${status}: ` + + `${Object.keys(QUERY_JOB_STATUS)[status]}.` + ); + } + break; + } + + await setTimeout(JOB_COMPLETION_STATUS_POLL_INTERVAL_MILLIS); + } + } +} + +declare module "fastify" { + export interface FastifyInstance { + QueryJobsDbManager: QueryJobsDbManager; + } +} + +export default fp( + (fastify) => { + fastify.decorate("QueryJobsDbManager", QueryJobsDbManager.create(fastify)); + }, + { + name: "QueryJobsDbManager", + } +); diff --git a/components/log-viewer-webui/server/src/fastify-v2/plugins/app/search/QueryJobsDbManager/typings.ts b/components/log-viewer-webui/server/src/fastify-v2/plugins/app/search/QueryJobsDbManager/typings.ts new file mode 100644 index 0000000000..9ce73c0f4d --- /dev/null +++ b/components/log-viewer-webui/server/src/fastify-v2/plugins/app/search/QueryJobsDbManager/typings.ts @@ -0,0 +1,4 @@ +/** + * Interval in milliseconds for polling the completion status of a job. + */ +export const JOB_COMPLETION_STATUS_POLL_INTERVAL_MILLIS = 500; diff --git a/components/log-viewer-webui/server/src/fastify-v2/plugins/app/search/SearchResultsMetadataCollection/index.ts b/components/log-viewer-webui/server/src/fastify-v2/plugins/app/search/SearchResultsMetadataCollection/index.ts new file mode 100644 index 0000000000..75e39b2fb6 --- /dev/null +++ b/components/log-viewer-webui/server/src/fastify-v2/plugins/app/search/SearchResultsMetadataCollection/index.ts @@ -0,0 +1,41 @@ +import {FastifyInstance} from "fastify"; +import fp from "fastify-plugin"; + +import settings from "../../../../../../settings.json" with {type: "json"}; +import type {SearchResultsMetadataDocument} from "./typings.js"; + + +/** + * Creates a MongoDB collection for search results metadata. + * + * @param fastify + * @return MongoDB collection + * @throws {Error} if the MongoDB database is not found. + */ +const createSearchResultsMetadataCollection = (fastify: FastifyInstance) => { + if ("undefined" === typeof fastify.mongo.db) { + throw new Error("MongoDB database not found"); + } + + return fastify.mongo.db.collection( + settings.MongoDbSearchResultsMetadataCollectionName + ); +}; + +declare module "fastify" { + export interface FastifyInstance { + SearchResultsMetadataCollection: ReturnType; + } +} + +export default fp( + (fastify) => { + fastify.decorate( + "SearchResultsMetadataCollection", + createSearchResultsMetadataCollection(fastify) + ); + }, + { + name: "SearchResultsMetadataCollection", + } +); diff --git a/components/log-viewer-webui/server/src/fastify-v2/plugins/app/search/SearchResultsMetadataCollection/typings.ts b/components/log-viewer-webui/server/src/fastify-v2/plugins/app/search/SearchResultsMetadataCollection/typings.ts new file mode 100644 index 0000000000..b4998d4045 --- /dev/null +++ b/components/log-viewer-webui/server/src/fastify-v2/plugins/app/search/SearchResultsMetadataCollection/typings.ts @@ -0,0 +1,33 @@ +import {Nullable} from "../../../../../typings/common.js"; + + +/** + * Enum of search-related signals. + * + * This includes request and response signals for various search operations and their respective + * states. + */ +enum SEARCH_SIGNAL { + NONE = "none", + + REQ_CANCELLING = "req-cancelling", + REQ_CLEARING = "req-clearing", + REQ_QUERYING = "req-querying", + + RESP_DONE = "resp-done", + RESP_QUERYING = "resp-querying", +} + +/** + * MongoDB document for search results metadata. `numTotalResults` is optional + * since it is only set when the search job is completed. + */ +interface SearchResultsMetadataDocument { + _id: string; + errorMsg: Nullable; + lastSignal: SEARCH_SIGNAL; + numTotalResults?: number; +} + +export type {SearchResultsMetadataDocument}; +export {SEARCH_SIGNAL}; diff --git a/components/log-viewer-webui/server/src/plugins/DbManager.ts b/components/log-viewer-webui/server/src/plugins/DbManager.ts index 28a39f6651..a1f863c21e 100644 --- a/components/log-viewer-webui/server/src/plugins/DbManager.ts +++ b/components/log-viewer-webui/server/src/plugins/DbManager.ts @@ -12,14 +12,16 @@ import { import {Nullable} from "../typings/common.js"; import { DbManagerOptions, + StreamFileMongoDocument, + StreamFilesCollection, +} from "../typings/DbManager.js"; +import { QUERY_JOB_STATUS, QUERY_JOB_STATUS_WAITING_STATES, QUERY_JOB_TYPE, QUERY_JOBS_TABLE_COLUMN_NAMES, QueryJob, - StreamFileMongoDocument, - StreamFilesCollection, -} from "../typings/DbManager.js"; +} from "../typings/query.js"; import {sleep} from "../utils/time.js"; diff --git a/components/log-viewer-webui/server/src/routes/query.ts b/components/log-viewer-webui/server/src/routes/query.ts index f43a724b07..a8500e53c5 100644 --- a/components/log-viewer-webui/server/src/routes/query.ts +++ b/components/log-viewer-webui/server/src/routes/query.ts @@ -4,8 +4,10 @@ import {FastifyPluginAsync} from "fastify"; import {StatusCodes} from "http-status-codes"; import settings from "../../settings.json" with {type: "json"}; -import {QUERY_JOB_TYPE} from "../plugins/DbManager.js"; -import {EXTRACT_JOB_TYPES} from "../typings/DbManager.js"; +import { + EXTRACT_JOB_TYPES, + QUERY_JOB_TYPE, +} from "../typings/query.js"; /** diff --git a/components/log-viewer-webui/server/src/typings/DbManager.ts b/components/log-viewer-webui/server/src/typings/DbManager.ts index 6464ccd4d3..c294c782bf 100644 --- a/components/log-viewer-webui/server/src/typings/DbManager.ts +++ b/components/log-viewer-webui/server/src/typings/DbManager.ts @@ -1,67 +1,6 @@ import {Collection} from "mongodb"; -import {RowDataPacket} from "mysql2/promise"; -/** - * Matching the `QueryJobType` class in `job_orchestration.query_scheduler.constants`. - */ -enum QUERY_JOB_TYPE { - SEARCH_OR_AGGREGATION = 0, - EXTRACT_IR, - EXTRACT_JSON, -} - -/** - * List of valid extract job types. - */ -const EXTRACT_JOB_TYPES = new Set([ - QUERY_JOB_TYPE.EXTRACT_IR, - QUERY_JOB_TYPE.EXTRACT_JSON, -]); - -/** - * Matching the `QueryJobStatus` class in - * `job_orchestration.query_scheduler.constants`. - * - * @enum {number} - */ -enum QUERY_JOB_STATUS { - PENDING = 0, - RUNNING, - SUCCEEDED, - FAILED, - CANCELLING, - CANCELLED, -} - -/** - * List of states that indicate the job is either pending or in progress. - */ -const QUERY_JOB_STATUS_WAITING_STATES = new Set([ - QUERY_JOB_STATUS.PENDING, - QUERY_JOB_STATUS.RUNNING, - QUERY_JOB_STATUS.CANCELLING, -]); - -/** - * The `query_jobs` table's column names. - * - * @enum {string} - */ -enum QUERY_JOBS_TABLE_COLUMN_NAMES { - ID = "id", - TYPE = "type", - STATUS = "status", - JOB_CONFIG = "job_config", -} - -interface QueryJob extends RowDataPacket { - [QUERY_JOBS_TABLE_COLUMN_NAMES.ID]: number; - [QUERY_JOBS_TABLE_COLUMN_NAMES.TYPE]: QUERY_JOB_TYPE; - [QUERY_JOBS_TABLE_COLUMN_NAMES.STATUS]: QUERY_JOB_STATUS; - [QUERY_JOBS_TABLE_COLUMN_NAMES.JOB_CONFIG]: string; -} - interface DbManagerOptions { mysqlConfig: { user: string; @@ -92,14 +31,6 @@ type StreamFilesCollection = Collection; export type { DbManagerOptions, - QueryJob, StreamFileMongoDocument, StreamFilesCollection, }; -export { - EXTRACT_JOB_TYPES, - QUERY_JOB_STATUS, - QUERY_JOB_STATUS_WAITING_STATES, - QUERY_JOB_TYPE, - QUERY_JOBS_TABLE_COLUMN_NAMES, -}; diff --git a/components/log-viewer-webui/server/src/typings/query.ts b/components/log-viewer-webui/server/src/typings/query.ts new file mode 100644 index 0000000000..1b000699fb --- /dev/null +++ b/components/log-viewer-webui/server/src/typings/query.ts @@ -0,0 +1,71 @@ +import {RowDataPacket} from "mysql2/promise"; + + +/** + * Matching the `QueryJobType` class in `job_orchestration.query_scheduler.constants`. + */ +enum QUERY_JOB_TYPE { + SEARCH_OR_AGGREGATION = 0, + EXTRACT_IR, + EXTRACT_JSON, +} + +/** + * List of valid extract job types. + */ +const EXTRACT_JOB_TYPES = new Set([ + QUERY_JOB_TYPE.EXTRACT_IR, + QUERY_JOB_TYPE.EXTRACT_JSON, +]); + +/** + * Matching the `QueryJobStatus` class in + * `job_orchestration.query_scheduler.constants`. + * + * @enum {number} + */ +enum QUERY_JOB_STATUS { + PENDING = 0, + RUNNING, + SUCCEEDED, + FAILED, + CANCELLING, + CANCELLED, +} + +/** + * List of states that indicate the job is either pending or in progress. + */ +const QUERY_JOB_STATUS_WAITING_STATES = new Set([ + QUERY_JOB_STATUS.PENDING, + QUERY_JOB_STATUS.RUNNING, + QUERY_JOB_STATUS.CANCELLING, +]); + +/** + * The `query_jobs` table's column names. + * + * @enum {string} + */ +enum QUERY_JOBS_TABLE_COLUMN_NAMES { + ID = "id", + TYPE = "type", + STATUS = "status", + JOB_CONFIG = "job_config", +} + +interface QueryJob extends RowDataPacket { + [QUERY_JOBS_TABLE_COLUMN_NAMES.ID]: number; + [QUERY_JOBS_TABLE_COLUMN_NAMES.TYPE]: QUERY_JOB_TYPE; + [QUERY_JOBS_TABLE_COLUMN_NAMES.STATUS]: QUERY_JOB_STATUS; + [QUERY_JOBS_TABLE_COLUMN_NAMES.JOB_CONFIG]: string; +} + +export type {QueryJob}; +export { + EXTRACT_JOB_TYPES, + QUERY_JOB_STATUS, + QUERY_JOB_STATUS_WAITING_STATES, + QUERY_JOB_TYPE, + QUERY_JOBS_TABLE_COLUMN_NAMES, +}; diff --git a/components/log-viewer-webui/server/tsconfig.json b/components/log-viewer-webui/server/tsconfig.json index f21550a911..b843124917 100644 --- a/components/log-viewer-webui/server/tsconfig.json +++ b/components/log-viewer-webui/server/tsconfig.json @@ -18,7 +18,7 @@ "baseUrl": ".", "paths": { // Map imports from "@common/*" to the common server-client folder - "@common/*": ["../common/*"] + "@common/*": ["../common/*"], }, "alwaysStrict": true,