Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 37 additions & 7 deletions src/hub/WorkerManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,40 @@

/** Manages communication with worker scripts using a promise interface. */
export default abstract class WorkerManager {
private static HEARTBEAT_TIMEOUT_MS = 30_000;

private static globalRequestId: number = 0;
private static workers: {
[id: number]: {
worker: Worker;
resolve: (value: any) => void;
reject: (value: any) => void;
progress?: (progress: number) => void;
heartbeatTimer: ReturnType<typeof setTimeout> | null;
};
} = {};

private static cleanupWorker(requestId: number) {
let entry = this.workers[requestId];
if (entry === undefined) return;
if (entry.heartbeatTimer !== null) clearTimeout(entry.heartbeatTimer);
entry.worker.terminate();
delete this.workers[requestId];
}

private static resetHeartbeat(requestId: number) {
let entry = this.workers[requestId];
if (entry === undefined) return;
if (entry.heartbeatTimer !== null) clearTimeout(entry.heartbeatTimer);
entry.heartbeatTimer = setTimeout(() => {
if (WorkerManager.workers[requestId] !== undefined) {
let reject = WorkerManager.workers[requestId].reject;
WorkerManager.cleanupWorker(requestId);
reject(null);
}
}, this.HEARTBEAT_TIMEOUT_MS);
}

static request(script: string, payload: any, progressCallback?: (progress: number) => void): Promise<any> {
const requestId = this.globalRequestId++;
return new Promise<any>((resolve, reject) => {
Expand All @@ -25,9 +49,18 @@ export default abstract class WorkerManager {
worker: worker,
resolve: resolve,
reject: reject,
progress: progressCallback
progress: progressCallback,
heartbeatTimer: null
};
worker.onmessage = this.handleResponse;
worker.onerror = () => {
if (WorkerManager.workers[requestId] !== undefined) {
let reject = WorkerManager.workers[requestId].reject;
WorkerManager.cleanupWorker(requestId);
reject(null);
}
};
WorkerManager.resetHeartbeat(requestId);
worker.postMessage({
id: requestId,
payload: payload
Expand All @@ -37,23 +70,20 @@ export default abstract class WorkerManager {

private static handleResponse(event: any) {
let message = event.data;
let deleteWorker = () => {
WorkerManager.workers[message.id].worker.terminate();
delete WorkerManager.workers[message.id];
};
if (message.id in WorkerManager.workers) {
if ("payload" in message) {
let resolve = WorkerManager.workers[message.id].resolve;
deleteWorker();
WorkerManager.cleanupWorker(message.id);
resolve(message.payload);
} else if ("progress" in message) {
WorkerManager.resetHeartbeat(message.id);
let progress = WorkerManager.workers[message.id].progress;
if (progress !== undefined) {
progress(message.progress as number);
}
} else {
let reject = WorkerManager.workers[message.id].reject;
deleteWorker();
WorkerManager.cleanupWorker(message.id);
reject(null);
}
}
Expand Down
101 changes: 66 additions & 35 deletions src/hub/exportWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,20 @@ import Log from "../shared/log/Log";
import LogFieldTree from "../shared/log/LogFieldTree";
import { AKIT_TIMESTAMP_KEYS, filterFieldByPrefixes, getLogValueText } from "../shared/log/LogUtil";
import LoggableType from "../shared/log/LoggableType";
import { cleanFloat } from "../shared/util";
import { cleanFloat, concatBuffers, upperBound } from "../shared/util";
import { WPILOGEncoder, WPILOGEncoderRecord } from "./dataSources/wpilog/WPILOGEncoder";

self.onmessage = async (event) => {
// WORKER SETUP
self.onmessage = null;
let { id, payload } = event.data;
function resolve(result: any) {
self.postMessage({ id: id, payload: result });
let message = { id: id, payload: result };
if (result instanceof Uint8Array) {
(self as unknown as DedicatedWorkerGlobalScope).postMessage(message, [result.buffer]);
} else {
self.postMessage(message);
}
}
function progress(percent: number) {
self.postMessage({ id: id, progress: percent });
Expand Down Expand Up @@ -97,53 +102,81 @@ self.onmessage = async (event) => {
resolve(await generateMCAP(log, fields, progress, timestamps));
break;
}
} catch {
} catch (e) {
// Something went wrong
console.error("[exportWorker] Export failed:", e);
reject();
}
};

/** Quotes a string for CSV output per RFC 4180. */
function csvQuote(s: string): string {
return '"' + s.replaceAll('"', '""') + '"';
}

function generateCsvTable(
log: Log,
fields: string[],
progress: (progress: number) => void,
timestamps?: number[]
): string {
): Uint8Array {
// Generate timestamps for changes
if (timestamps === undefined) {
timestamps = log.getTimestamps(fields);
}

// Record timestamps
let data: string[][] = [["Timestamp"]];
timestamps.forEach((timestamp) => {
data.push([timestamp.toString()]);
});
console.log(`[exportWorker] CSV table: ${fields.length} fields × ${timestamps.length} timestamps`);

// Retrieve data
fields.forEach((field, fieldIndex) => {
data[0].push(field);
// Pre-fetch all field data
let fieldEntries: { timestamps: number[]; values: any[]; type: LoggableType }[] = [];
for (let field of fields) {
let fieldData = log.getRange(field, -Infinity, Infinity);
let fieldType = log.getType(field);
if (fieldData !== undefined && fieldType !== null) {
fieldEntries.push({ timestamps: fieldData.timestamps, values: fieldData.values, type: fieldType });
}
}

// Build header (quote column names and escape embedded quotes)
let header = csvQuote("Timestamp") + fields.map((f) => "," + csvQuote(f)).join("") + "\n";

// Encode in batches (row-major with advancing cursors)
const encoder = new TextEncoder();
const BATCH_SIZE = 5000;
let chunks: Uint8Array[] = [encoder.encode(header)];

timestamps!.forEach((timestamp, timestampIndex) => {
if (fieldData === undefined || fieldType === null) return;
let nextIndex = fieldData.timestamps.findIndex((value) => value > timestamp);
if (nextIndex === -1) nextIndex = fieldData.timestamps.length;
let value: any = null;
if (nextIndex !== 0) {
value = fieldData.values[nextIndex - 1];
// Initialize cursors — one per field, tracking position in each field's timestamp array
let cursors = new Int32Array(fieldEntries.length);

for (let batchStart = 0; batchStart < timestamps.length; batchStart += BATCH_SIZE) {
let batchEnd = Math.min(batchStart + BATCH_SIZE, timestamps.length);
let batchStr = "";

for (let t = batchStart; t < batchEnd; t++) {
let ts = timestamps[t];
let line = ts.toString();

for (let f = 0; f < fieldEntries.length; f++) {
let entry = fieldEntries[f];
// Advance cursor to first entry > ts
while (cursors[f] < entry.timestamps.length && entry.timestamps[cursors[f]] <= ts) {
cursors[f]++;
}
if (cursors[f] > 0) {
line += "," + getLogValueText(entry.values[cursors[f] - 1], entry.type).replaceAll(",", ";");
} else {
line += ",";
}
}
data[timestampIndex + 1].push(getLogValueText(value, fieldType).replaceAll(",", ";"));

// Send progress update
progress((fieldIndex + timestampIndex / timestamps!.length) / fields.length);
});
});
batchStr += line + "\n";
}

let text = data.map((x) => x.join(",")).join("\n");
progress(1);
return text;
chunks.push(encoder.encode(batchStr));
progress(batchEnd / timestamps.length);
}

return concatBuffers(chunks);
}

function generateCsvList(
Expand All @@ -158,9 +191,10 @@ function generateCsvList(
let fieldData = log.getRange(field, -Infinity, Infinity);
let fieldType = log.getType(field);
if (fieldData === undefined) return;
let quotedField = csvQuote(field);
let addValue = (timestamp: number, value: any) => {
if (fieldType === null) return;
rows.push([timestamp, field, getLogValueText(value, fieldType).replaceAll(",", ";")]);
rows.push([timestamp, quotedField, csvQuote(getLogValueText(value, fieldType))]);
};

if (timestamps === undefined) {
Expand All @@ -174,8 +208,7 @@ function generateCsvList(
// Add samples at timestamps
timestamps.forEach((timestamp, timestampIndex) => {
if (fieldData === undefined) return;
let nextIndex = fieldData.timestamps.findIndex((value) => value > timestamp);
if (nextIndex === -1) nextIndex = fieldData.timestamps.length;
let nextIndex = upperBound(fieldData.timestamps, timestamp);
let value: any = null;
if (nextIndex !== 0) {
value = fieldData.values[nextIndex - 1];
Expand All @@ -188,7 +221,7 @@ function generateCsvList(

// Sort and add header
rows.sort((a, b) => (a[0] as number) - (b[0] as number));
rows.splice(0, 0, ["Timestamp", "Key", "Value"]);
rows.splice(0, 0, [csvQuote("Timestamp"), csvQuote("Key"), csvQuote("Value")]);
let text = rows.map((x) => x.join(",")).join("\n");
progress(1);
return text;
Expand Down Expand Up @@ -315,8 +348,7 @@ function generateWPILOG(
// Add samples at timestamps
timestamps.forEach((timestamp, timestampIndex) => {
if (fieldData === undefined) return;
let nextIndex = fieldData.timestamps.findIndex((value) => value > timestamp);
if (nextIndex === -1) nextIndex = fieldData.timestamps.length;
let nextIndex = upperBound(fieldData.timestamps, timestamp);
if (nextIndex !== 0) {
addValue(timestamp, fieldData.values[nextIndex - 1]);
}
Expand Down Expand Up @@ -411,8 +443,7 @@ async function generateMCAP(
} else {
for (let i = 0; i < timestamps.length; i++) {
let timestamp = timestamps[i];
let nextIndex = fieldData.timestamps.findIndex((value) => value > timestamp);
if (nextIndex === -1) nextIndex = fieldData.timestamps.length;
let nextIndex = upperBound(fieldData.timestamps, timestamp);
if (nextIndex !== 0) {
addValue(timestamp, fieldData.values[nextIndex - 1], i);
}
Expand Down
15 changes: 15 additions & 0 deletions src/shared/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,21 @@ export function calcAxisStepSize(dataRange: [number, number], pixelRange: number
return roundBase * multiplierLookup[Math.round(stepValueApprox / roundBase)];
}

/** Returns the index of the first element in a sorted array that is greater than the target, or the array length if none. */
export function upperBound(sortedArray: number[], target: number): number {
let lo = 0;
let hi = sortedArray.length;
while (lo < hi) {
let mid = (lo + hi) >>> 1;
if (sortedArray[mid] > target) {
hi = mid;
} else {
lo = mid + 1;
}
}
return lo;
}

export function getSpiralIndex(x: number, y: number): number {
// https://stackoverflow.com/questions/9970134/get-spiral-index-from-location

Expand Down