Skip to content
2 changes: 2 additions & 0 deletions components/log-viewer-webui/server/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
"SqlDbPort": 3306,
"SqlDbName": "clp-db",
"SqlDbQueryJobsTableName": "query_jobs",

"MongoDbHost": "localhost",
"MongoDbPort": 27017,
"MongoDbName": "clp-query-results",
"MongoDbStreamFilesCollectionName": "stream-files",
"MongoDbSearchResultsMetadataCollectionName": "results-metadata",

"ClientDir": "../../client/dist",
"LogViewerDir": "../../yscope-log-viewer/dist",
Expand Down
2 changes: 1 addition & 1 deletion components/log-viewer-webui/server/src/fastify-v2/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ export default async function serviceApp (

// Loads all routes.
fastify.register(fastifyAutoload, {
dir: path.join(import.meta.dirname, "routes"),
autoHooks: true,
cascadeHooks: true,
dir: path.join(import.meta.dirname, "routes"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as plugins

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also creates nice routes like api/search/query. Like it will add the folder structure to the route, so i like it

options: {...opts},
});

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import {setTimeout} from "node:timers/promises";

import type {MySQLPromisePool} from "@fastify/mysql";
import {encode} from "@msgpack/msgpack";
import {FastifyInstance} from "fastify";
import fp from "fastify-plugin";
import {ResultSetHeader} from "mysql2";

import settings from "../../../../../../settings.json" with {type: "json"};
import {
QUERY_JOB_STATUS,
QUERY_JOB_STATUS_WAITING_STATES,
QUERY_JOB_TYPE,
QUERY_JOBS_TABLE_COLUMN_NAMES,
QueryJob,
} from "../../../../../typings/query.js";
import {JOB_COMPLETION_STATUS_POLL_INTERVAL_MILLIS} from "./typings.js";


/**
* Class for submitting and monitoring query jobs in the database.
*/
class QueryJobsDbManager {
#sqlDbConnPool: MySQLPromisePool;

private constructor (sqlDbConnPool: MySQLPromisePool) {
this.#sqlDbConnPool = sqlDbConnPool;
}

/**
* Creates a new QueryJobsDbManager.
*
* @param fastify
* @return
*/
static create (fastify: FastifyInstance): QueryJobsDbManager {
const sqlDbConnPool = fastify.mysql;
return new QueryJobsDbManager(sqlDbConnPool);
}

/**
* Submits a search job to the database.
*
* @param searchConfig The arguments for the query.
* @return The job's ID.
* @throws {Error} on error.
*/
async submitSearchJob (searchConfig: object): Promise<number> {
const [queryInsertResults] = await this.#sqlDbConnPool.query<ResultSetHeader>(
`
INSERT INTO ${settings.SqlDbQueryJobsTableName} (
${QUERY_JOBS_TABLE_COLUMN_NAMES.JOB_CONFIG},
${QUERY_JOBS_TABLE_COLUMN_NAMES.TYPE}
)
VALUES (?, ?)
`,
[
Buffer.from(encode(searchConfig)),
QUERY_JOB_TYPE.SEARCH_OR_AGGREGATION,
]
);

return queryInsertResults.insertId;
}

/**
* Submits an aggregation job to the database.
*
* @param searchConfig The arguments for the query.
* @param timeRangeBucketSizeMillis
* @return The aggregation job's ID.
* @throws {Error} on error.
*/
async submitAggregationJob (
searchConfig: object,
timeRangeBucketSizeMillis: number
): Promise<number> {
const searchAggregationConfig = {
...searchConfig,
aggregation_config: {
count_by_time_bucket_size: timeRangeBucketSizeMillis,
},
};

return this.submitSearchJob(searchAggregationConfig);
}

/**
* Submits a query cancellation request to the database.
*
* @param jobId ID of the job to cancel.
* @return
* @throws {Error} on error.
*/
async submitQueryCancellation (jobId: number): Promise<void> {
await this.#sqlDbConnPool.query(
`
UPDATE ${settings.SqlDbQueryJobsTableName}
SET ${QUERY_JOBS_TABLE_COLUMN_NAMES.STATUS} = ${QUERY_JOB_STATUS.CANCELLING}
WHERE ${QUERY_JOBS_TABLE_COLUMN_NAMES.ID} = ?
AND ${QUERY_JOBS_TABLE_COLUMN_NAMES.STATUS}
IN (${QUERY_JOB_STATUS.PENDING}, ${QUERY_JOB_STATUS.RUNNING})
`,
jobId,
);
Comment on lines +96 to +105
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Prefer array wrapper for parameter binding

mysql2 (and the underlying Fastify MySQL plugin) expect the second argument to be an array (or object) of parameters.
Passing a naked scalar works, but is an undocumented convenience; should the driver change its typing in the future this could break compilation or – worse – mis-bind parameters.

-            `,
-            jobId,
+            `,
+            [jobId],

Sticking to the documented array form keeps the call site consistent with the rest of the codebase and avoids surprises.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
await this.#sqlDbConnPool.query(
`
UPDATE ${settings.SqlDbQueryJobsTableName}
SET ${QUERY_JOBS_TABLE_COLUMN_NAMES.STATUS} = ${QUERY_JOB_STATUS.CANCELLING}
WHERE ${QUERY_JOBS_TABLE_COLUMN_NAMES.ID} = ?
AND ${QUERY_JOBS_TABLE_COLUMN_NAMES.STATUS}
IN (${QUERY_JOB_STATUS.PENDING}, ${QUERY_JOB_STATUS.RUNNING})
`,
jobId,
);
await this.#sqlDbConnPool.query(
`
UPDATE ${settings.SqlDbQueryJobsTableName}
SET ${QUERY_JOBS_TABLE_COLUMN_NAMES.STATUS} = ${QUERY_JOB_STATUS.CANCELLING}
WHERE ${QUERY_JOBS_TABLE_COLUMN_NAMES.ID} = ?
AND ${QUERY_JOBS_TABLE_COLUMN_NAMES.STATUS}
IN (${QUERY_JOB_STATUS.PENDING}, ${QUERY_JOB_STATUS.RUNNING})
`,
[jobId],
);
🤖 Prompt for AI Agents
In
components/log-viewer-webui/server/src/fastify-v2/plugins/app/search/QueryJobsDbManager/index.ts
around lines 96 to 105, the query parameter jobId is passed as a scalar instead
of an array. To fix this, wrap jobId in an array when passing it as the second
argument to the query method, ensuring consistent and documented parameter
binding with mysql2 and Fastify MySQL plugin.

}

/**
* Waits for the job to complete.
*
* @param jobId
* @return
* @throws {Error} on MySQL error, if the job wasn't found in the database, if the job was
* cancelled, or if the job completed in an unexpected state.
*/
async awaitJobCompletion (jobId: number): Promise<void> {
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
while (true) {
Comment on lines +117 to +118
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Consider using a condition in the while loop.

The infinite loop with a break statement could be replaced with a conditional while loop for better readability.

-        // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
-        while (true) {
+        let jobCompleted = false;
+        while (!jobCompleted) {

And then replace the break; on line 151 with:

-                break;
+                jobCompleted = true;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
while (true) {
- // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
- while (true) {
+ let jobCompleted = false;
+ while (!jobCompleted) {
@@
- break;
+ jobCompleted = true;
🤖 Prompt for AI Agents
In
components/log-viewer-webui/server/src/fastify-v2/plugins/app/search/QueryJobsDbManager/index.ts
around lines 111 to 112, replace the infinite while(true) loop with a while loop
that uses an explicit condition to control the loop execution. Identify the
condition that currently leads to the break statement on line 151 and use that
condition directly in the while loop. Remove the break statement on line 151
after updating the loop condition to improve readability and maintainability.

let queryJob: QueryJob | undefined;
try {
const [queryRows] = await this.#sqlDbConnPool.query<QueryJob[]>(
`
SELECT ${QUERY_JOBS_TABLE_COLUMN_NAMES.STATUS}
FROM ${settings.SqlDbQueryJobsTableName}
WHERE ${QUERY_JOBS_TABLE_COLUMN_NAMES.ID} = ?
`,
jobId
);

[queryJob] = queryRows;
} catch (e: unknown) {
throw new Error(`Failed to query status for job ${jobId}`, {cause: e});
}
if ("undefined" === typeof queryJob) {
throw new Error(`Job ${jobId} not found in the database.`);
}

const status = queryJob[QUERY_JOBS_TABLE_COLUMN_NAMES.STATUS];

if (false === QUERY_JOB_STATUS_WAITING_STATES.has(status)) {
if (QUERY_JOB_STATUS.CANCELLED === status) {
throw new Error(`Job ${jobId} was cancelled.`);
} else if (QUERY_JOB_STATUS.SUCCEEDED !== status) {
throw new Error(
`Job ${jobId} exited with unexpected status=${status}: ` +
`${Object.keys(QUERY_JOB_STATUS)[status]}.`
);
}
break;
}

await setTimeout(JOB_COMPLETION_STATUS_POLL_INTERVAL_MILLIS);
}
}
}

declare module "fastify" {
export interface FastifyInstance {
QueryJobsDbManager: QueryJobsDbManager;
}
}

export default fp(
(fastify) => {
fastify.decorate("QueryJobsDbManager", QueryJobsDbManager.create(fastify));
},
{
name: "QueryJobsDbManager",
}
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/**
* Interval in milliseconds for polling the completion status of a job.
*/
export const JOB_COMPLETION_STATUS_POLL_INTERVAL_MILLIS = 500;
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import {FastifyInstance} from "fastify";
import fp from "fastify-plugin";

import settings from "../../../../../../settings.json" with {type: "json"};
import type {SearchResultsMetadataDocument} from "./typings.js";


/**
* Creates a MongoDB collection for search results metadata.
*
* @param fastify
* @return MongoDB collection
* @throws {Error} if the MongoDB database is not found.
Comment on lines +12 to +13
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Enhance JSDoc return type documentation.

The return type in the JSDoc comment could be more specific to help developers understand what's being returned.

- * @return MongoDB collection
+ * @return {Collection<SearchResultsMetadataDocument>} MongoDB collection for search results metadata
🤖 Prompt for AI Agents
In
components/log-viewer-webui/server/src/fastify-v2/plugins/app/search/SearchResultsMetadataCollection/index.ts
around lines 12 to 13, the JSDoc return type is currently generic. Update the
@return tag to specify the exact MongoDB collection type being returned, such as
including the document interface or schema type, to provide clearer and more
precise documentation for developers.

*/
const createSearchResultsMetadataCollection = (fastify: FastifyInstance) => {
if ("undefined" === typeof fastify.mongo.db) {
throw new Error("MongoDB database not found");
}

return fastify.mongo.db.collection<SearchResultsMetadataDocument>(
settings.MongoDbSearchResultsMetadataCollectionName
);
};

declare module "fastify" {
export interface FastifyInstance {
SearchResultsMetadataCollection: ReturnType<typeof createSearchResultsMetadataCollection>;
}
}

export default fp(
(fastify) => {
fastify.decorate(
"SearchResultsMetadataCollection",
createSearchResultsMetadataCollection(fastify)
);
},
{
name: "SearchResultsMetadataCollection",
}
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import {Nullable} from "../../../../../typings/common.js";


/**
* Enum of search-related signals.
*
* This includes request and response signals for various search operations and their respective
* states.
*/
enum SEARCH_SIGNAL {
NONE = "none",

REQ_CANCELLING = "req-cancelling",
REQ_CLEARING = "req-clearing",
REQ_QUERYING = "req-querying",

RESP_DONE = "resp-done",
RESP_QUERYING = "resp-querying",
}

/**
* MongoDB document for search results metadata. `numTotalResults` is optional
* since it is only set when the search job is completed.
*/
interface SearchResultsMetadataDocument {
_id: string;
errorMsg: Nullable<string>;
lastSignal: SEARCH_SIGNAL;
numTotalResults?: number;
}

export type {SearchResultsMetadataDocument};
export {SEARCH_SIGNAL};
8 changes: 5 additions & 3 deletions components/log-viewer-webui/server/src/plugins/DbManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@ import {
import {Nullable} from "../typings/common.js";
import {
DbManagerOptions,
StreamFileMongoDocument,
StreamFilesCollection,
} from "../typings/DbManager.js";
import {
QUERY_JOB_STATUS,
QUERY_JOB_STATUS_WAITING_STATES,
QUERY_JOB_TYPE,
QUERY_JOBS_TABLE_COLUMN_NAMES,
QueryJob,
StreamFileMongoDocument,
StreamFilesCollection,
} from "../typings/DbManager.js";
} from "../typings/query.js";
import {sleep} from "../utils/time.js";


Expand Down
6 changes: 4 additions & 2 deletions components/log-viewer-webui/server/src/routes/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import {FastifyPluginAsync} from "fastify";
import {StatusCodes} from "http-status-codes";

import settings from "../../settings.json" with {type: "json"};
import {QUERY_JOB_TYPE} from "../plugins/DbManager.js";
import {EXTRACT_JOB_TYPES} from "../typings/DbManager.js";
import {
EXTRACT_JOB_TYPES,
QUERY_JOB_TYPE,
} from "../typings/query.js";


/**
Expand Down
69 changes: 0 additions & 69 deletions components/log-viewer-webui/server/src/typings/DbManager.ts
Original file line number Diff line number Diff line change
@@ -1,67 +1,6 @@
import {Collection} from "mongodb";
import {RowDataPacket} from "mysql2/promise";


/**
* Matching the `QueryJobType` class in `job_orchestration.query_scheduler.constants`.
*/
enum QUERY_JOB_TYPE {
SEARCH_OR_AGGREGATION = 0,
EXTRACT_IR,
EXTRACT_JSON,
}

/**
* List of valid extract job types.
*/
const EXTRACT_JOB_TYPES = new Set([
QUERY_JOB_TYPE.EXTRACT_IR,
QUERY_JOB_TYPE.EXTRACT_JSON,
]);

/**
* Matching the `QueryJobStatus` class in
* `job_orchestration.query_scheduler.constants`.
*
* @enum {number}
*/
enum QUERY_JOB_STATUS {
PENDING = 0,
RUNNING,
SUCCEEDED,
FAILED,
CANCELLING,
CANCELLED,
}

/**
* List of states that indicate the job is either pending or in progress.
*/
const QUERY_JOB_STATUS_WAITING_STATES = new Set([
QUERY_JOB_STATUS.PENDING,
QUERY_JOB_STATUS.RUNNING,
QUERY_JOB_STATUS.CANCELLING,
]);

/**
* The `query_jobs` table's column names.
*
* @enum {string}
*/
enum QUERY_JOBS_TABLE_COLUMN_NAMES {
ID = "id",
TYPE = "type",
STATUS = "status",
JOB_CONFIG = "job_config",
}

interface QueryJob extends RowDataPacket {
[QUERY_JOBS_TABLE_COLUMN_NAMES.ID]: number;
[QUERY_JOBS_TABLE_COLUMN_NAMES.TYPE]: QUERY_JOB_TYPE;
[QUERY_JOBS_TABLE_COLUMN_NAMES.STATUS]: QUERY_JOB_STATUS;
[QUERY_JOBS_TABLE_COLUMN_NAMES.JOB_CONFIG]: string;
}

interface DbManagerOptions {
mysqlConfig: {
user: string;
Expand Down Expand Up @@ -92,14 +31,6 @@ type StreamFilesCollection = Collection<StreamFileMongoDocument>;

export type {
DbManagerOptions,
QueryJob,
StreamFileMongoDocument,
StreamFilesCollection,
};
export {
EXTRACT_JOB_TYPES,
QUERY_JOB_STATUS,
QUERY_JOB_STATUS_WAITING_STATES,
QUERY_JOB_TYPE,
QUERY_JOBS_TABLE_COLUMN_NAMES,
};
Loading