diff --git a/src/hub/WorkerManager.ts b/src/hub/WorkerManager.ts index 2d13e10a..95f29b62 100644 --- a/src/hub/WorkerManager.ts +++ b/src/hub/WorkerManager.ts @@ -7,6 +7,8 @@ /** Manages communication with worker scripts using a promise interface. */ export default abstract class WorkerManager { + private static HEARTBEAT_TIMEOUT_MS = 30_000; + private static globalRequestId: number = 0; private static workers: { [id: number]: { @@ -14,9 +16,31 @@ export default abstract class WorkerManager { resolve: (value: any) => void; reject: (value: any) => void; progress?: (progress: number) => void; + heartbeatTimer: ReturnType | null; }; } = {}; + private static cleanupWorker(requestId: number) { + let entry = this.workers[requestId]; + if (entry === undefined) return; + if (entry.heartbeatTimer !== null) clearTimeout(entry.heartbeatTimer); + entry.worker.terminate(); + delete this.workers[requestId]; + } + + private static resetHeartbeat(requestId: number) { + let entry = this.workers[requestId]; + if (entry === undefined) return; + if (entry.heartbeatTimer !== null) clearTimeout(entry.heartbeatTimer); + entry.heartbeatTimer = setTimeout(() => { + if (WorkerManager.workers[requestId] !== undefined) { + let reject = WorkerManager.workers[requestId].reject; + WorkerManager.cleanupWorker(requestId); + reject(null); + } + }, this.HEARTBEAT_TIMEOUT_MS); + } + static request(script: string, payload: any, progressCallback?: (progress: number) => void): Promise { const requestId = this.globalRequestId++; return new Promise((resolve, reject) => { @@ -25,9 +49,18 @@ export default abstract class WorkerManager { worker: worker, resolve: resolve, reject: reject, - progress: progressCallback + progress: progressCallback, + heartbeatTimer: null }; worker.onmessage = this.handleResponse; + worker.onerror = () => { + if (WorkerManager.workers[requestId] !== undefined) { + let reject = WorkerManager.workers[requestId].reject; + WorkerManager.cleanupWorker(requestId); + reject(null); + } + }; + WorkerManager.resetHeartbeat(requestId); worker.postMessage({ id: requestId, payload: payload @@ -37,23 +70,20 @@ export default abstract class WorkerManager { private static handleResponse(event: any) { let message = event.data; - let deleteWorker = () => { - WorkerManager.workers[message.id].worker.terminate(); - delete WorkerManager.workers[message.id]; - }; if (message.id in WorkerManager.workers) { if ("payload" in message) { let resolve = WorkerManager.workers[message.id].resolve; - deleteWorker(); + WorkerManager.cleanupWorker(message.id); resolve(message.payload); } else if ("progress" in message) { + WorkerManager.resetHeartbeat(message.id); let progress = WorkerManager.workers[message.id].progress; if (progress !== undefined) { progress(message.progress as number); } } else { let reject = WorkerManager.workers[message.id].reject; - deleteWorker(); + WorkerManager.cleanupWorker(message.id); reject(null); } } diff --git a/src/hub/exportWorker.ts b/src/hub/exportWorker.ts index 09851e2c..f4e91206 100644 --- a/src/hub/exportWorker.ts +++ b/src/hub/exportWorker.ts @@ -12,7 +12,7 @@ import Log from "../shared/log/Log"; import LogFieldTree from "../shared/log/LogFieldTree"; import { AKIT_TIMESTAMP_KEYS, filterFieldByPrefixes, getLogValueText } from "../shared/log/LogUtil"; import LoggableType from "../shared/log/LoggableType"; -import { cleanFloat } from "../shared/util"; +import { cleanFloat, concatBuffers, upperBound } from "../shared/util"; import { WPILOGEncoder, WPILOGEncoderRecord } from "./dataSources/wpilog/WPILOGEncoder"; self.onmessage = async (event) => { @@ -20,7 +20,12 @@ self.onmessage = async (event) => { self.onmessage = null; let { id, payload } = event.data; function resolve(result: any) { - self.postMessage({ id: id, payload: result }); + let message = { id: id, payload: result }; + if (result instanceof Uint8Array) { + (self as unknown as DedicatedWorkerGlobalScope).postMessage(message, [result.buffer]); + } else { + self.postMessage(message); + } } function progress(percent: number) { self.postMessage({ id: id, progress: percent }); @@ -97,53 +102,81 @@ self.onmessage = async (event) => { resolve(await generateMCAP(log, fields, progress, timestamps)); break; } - } catch { + } catch (e) { // Something went wrong + console.error("[exportWorker] Export failed:", e); reject(); } }; +/** Quotes a string for CSV output per RFC 4180. */ +function csvQuote(s: string): string { + return '"' + s.replaceAll('"', '""') + '"'; +} + function generateCsvTable( log: Log, fields: string[], progress: (progress: number) => void, timestamps?: number[] -): string { +): Uint8Array { // Generate timestamps for changes if (timestamps === undefined) { timestamps = log.getTimestamps(fields); } - // Record timestamps - let data: string[][] = [["Timestamp"]]; - timestamps.forEach((timestamp) => { - data.push([timestamp.toString()]); - }); + console.log(`[exportWorker] CSV table: ${fields.length} fields × ${timestamps.length} timestamps`); - // Retrieve data - fields.forEach((field, fieldIndex) => { - data[0].push(field); + // Pre-fetch all field data + let fieldEntries: { timestamps: number[]; values: any[]; type: LoggableType }[] = []; + for (let field of fields) { let fieldData = log.getRange(field, -Infinity, Infinity); let fieldType = log.getType(field); + if (fieldData !== undefined && fieldType !== null) { + fieldEntries.push({ timestamps: fieldData.timestamps, values: fieldData.values, type: fieldType }); + } + } + + // Build header (quote column names and escape embedded quotes) + let header = csvQuote("Timestamp") + fields.map((f) => "," + csvQuote(f)).join("") + "\n"; + + // Encode in batches (row-major with advancing cursors) + const encoder = new TextEncoder(); + const BATCH_SIZE = 5000; + let chunks: Uint8Array[] = [encoder.encode(header)]; - timestamps!.forEach((timestamp, timestampIndex) => { - if (fieldData === undefined || fieldType === null) return; - let nextIndex = fieldData.timestamps.findIndex((value) => value > timestamp); - if (nextIndex === -1) nextIndex = fieldData.timestamps.length; - let value: any = null; - if (nextIndex !== 0) { - value = fieldData.values[nextIndex - 1]; + // Initialize cursors — one per field, tracking position in each field's timestamp array + let cursors = new Int32Array(fieldEntries.length); + + for (let batchStart = 0; batchStart < timestamps.length; batchStart += BATCH_SIZE) { + let batchEnd = Math.min(batchStart + BATCH_SIZE, timestamps.length); + let batchStr = ""; + + for (let t = batchStart; t < batchEnd; t++) { + let ts = timestamps[t]; + let line = ts.toString(); + + for (let f = 0; f < fieldEntries.length; f++) { + let entry = fieldEntries[f]; + // Advance cursor to first entry > ts + while (cursors[f] < entry.timestamps.length && entry.timestamps[cursors[f]] <= ts) { + cursors[f]++; + } + if (cursors[f] > 0) { + line += "," + getLogValueText(entry.values[cursors[f] - 1], entry.type).replaceAll(",", ";"); + } else { + line += ","; + } } - data[timestampIndex + 1].push(getLogValueText(value, fieldType).replaceAll(",", ";")); - // Send progress update - progress((fieldIndex + timestampIndex / timestamps!.length) / fields.length); - }); - }); + batchStr += line + "\n"; + } - let text = data.map((x) => x.join(",")).join("\n"); - progress(1); - return text; + chunks.push(encoder.encode(batchStr)); + progress(batchEnd / timestamps.length); + } + + return concatBuffers(chunks); } function generateCsvList( @@ -158,9 +191,10 @@ function generateCsvList( let fieldData = log.getRange(field, -Infinity, Infinity); let fieldType = log.getType(field); if (fieldData === undefined) return; + let quotedField = csvQuote(field); let addValue = (timestamp: number, value: any) => { if (fieldType === null) return; - rows.push([timestamp, field, getLogValueText(value, fieldType).replaceAll(",", ";")]); + rows.push([timestamp, quotedField, csvQuote(getLogValueText(value, fieldType))]); }; if (timestamps === undefined) { @@ -174,8 +208,7 @@ function generateCsvList( // Add samples at timestamps timestamps.forEach((timestamp, timestampIndex) => { if (fieldData === undefined) return; - let nextIndex = fieldData.timestamps.findIndex((value) => value > timestamp); - if (nextIndex === -1) nextIndex = fieldData.timestamps.length; + let nextIndex = upperBound(fieldData.timestamps, timestamp); let value: any = null; if (nextIndex !== 0) { value = fieldData.values[nextIndex - 1]; @@ -188,7 +221,7 @@ function generateCsvList( // Sort and add header rows.sort((a, b) => (a[0] as number) - (b[0] as number)); - rows.splice(0, 0, ["Timestamp", "Key", "Value"]); + rows.splice(0, 0, [csvQuote("Timestamp"), csvQuote("Key"), csvQuote("Value")]); let text = rows.map((x) => x.join(",")).join("\n"); progress(1); return text; @@ -315,8 +348,7 @@ function generateWPILOG( // Add samples at timestamps timestamps.forEach((timestamp, timestampIndex) => { if (fieldData === undefined) return; - let nextIndex = fieldData.timestamps.findIndex((value) => value > timestamp); - if (nextIndex === -1) nextIndex = fieldData.timestamps.length; + let nextIndex = upperBound(fieldData.timestamps, timestamp); if (nextIndex !== 0) { addValue(timestamp, fieldData.values[nextIndex - 1]); } @@ -411,8 +443,7 @@ async function generateMCAP( } else { for (let i = 0; i < timestamps.length; i++) { let timestamp = timestamps[i]; - let nextIndex = fieldData.timestamps.findIndex((value) => value > timestamp); - if (nextIndex === -1) nextIndex = fieldData.timestamps.length; + let nextIndex = upperBound(fieldData.timestamps, timestamp); if (nextIndex !== 0) { addValue(timestamp, fieldData.values[nextIndex - 1], i); } diff --git a/src/shared/util.ts b/src/shared/util.ts index 7a7e3c90..84fa12ac 100644 --- a/src/shared/util.ts +++ b/src/shared/util.ts @@ -191,6 +191,21 @@ export function calcAxisStepSize(dataRange: [number, number], pixelRange: number return roundBase * multiplierLookup[Math.round(stepValueApprox / roundBase)]; } +/** Returns the index of the first element in a sorted array that is greater than the target, or the array length if none. */ +export function upperBound(sortedArray: number[], target: number): number { + let lo = 0; + let hi = sortedArray.length; + while (lo < hi) { + let mid = (lo + hi) >>> 1; + if (sortedArray[mid] > target) { + hi = mid; + } else { + lo = mid + 1; + } + } + return lo; +} + export function getSpiralIndex(x: number, y: number): number { // https://stackoverflow.com/questions/9970134/get-spiral-index-from-location