From 5ea5afa242124d8db12e18a52b9ed6eed00baa9c Mon Sep 17 00:00:00 2001 From: Pip Build Date: Sun, 1 Mar 2026 23:35:46 -0500 Subject: [PATCH 1/2] =?UTF-8?q?feat:=20audit=20log=20improvements=20?= =?UTF-8?q?=E2=80=94=20CSV/JSON=20export,=20real-time=20WebSocket=20stream?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add GET /:id/audit-log/export endpoint (CSV and JSON, up to 10k rows) - Add /ws/audit-log WebSocket server for real-time audit entry broadcast - Refactor buildFilters() shared helper to eliminate duplication - Hook broadcastAuditEntry() into insertAuditEntry (RETURNING id+created_at) - Wire setupAuditStream/stopAuditStream into startServer/stopServer lifecycle - Add escapeCsvValue/rowsToCsv helpers with full test coverage - 30 route tests + 17 WebSocket stream tests, all green Closes #136 --- src/api/middleware/auditLog.js | 24 +- src/api/routes/auditLog.js | 201 +++++++++++++---- src/api/server.js | 12 + src/api/ws/auditStream.js | 322 +++++++++++++++++++++++++++ tests/api/routes/auditLog.test.js | 234 ++++++++++++++++++++ tests/api/ws/auditStream.test.js | 349 ++++++++++++++++++++++++++++++ 6 files changed, 1102 insertions(+), 40 deletions(-) create mode 100644 src/api/ws/auditStream.js create mode 100644 tests/api/ws/auditStream.test.js diff --git a/src/api/middleware/auditLog.js b/src/api/middleware/auditLog.js index 75c5a27a1..0fdb00fe5 100644 --- a/src/api/middleware/auditLog.js +++ b/src/api/middleware/auditLog.js @@ -7,6 +7,7 @@ import { info, error as logError } from '../../logger.js'; import { getConfig } from '../../modules/config.js'; import { maskSensitiveFields } from '../utils/configAllowlist.js'; +import { broadcastAuditEntry } from '../ws/auditStream.js'; /** HTTP methods considered mutating */ const MUTATING_METHODS = new Set(['POST', 'PUT', 'PATCH', 'DELETE']); @@ -104,7 +105,8 @@ function insertAuditEntry(pool, entry) { try { const result = pool.query( `INSERT INTO audit_logs (guild_id, user_id, action, target_type, target_id, details, ip_address) - VALUES ($1, $2, $3, $4, $5, $6, $7)`, + VALUES ($1, $2, $3, $4, $5, $6, $7) + RETURNING id, guild_id, user_id, action, target_type, target_id, details, ip_address, created_at`, [ guildId || 'global', userId, @@ -118,8 +120,26 @@ function insertAuditEntry(pool, entry) { if (result && typeof result.then === 'function') { result - .then(() => { + .then((insertResult) => { info('Audit log entry created', { action, guildId, userId }); + // Broadcast to real-time audit log WebSocket clients + const row = insertResult?.rows?.[0]; + const broadcastEntry = { + guild_id: guildId || 'global', + user_id: userId, + action, + target_type: targetType || null, + target_id: targetId || null, + details: details || null, + ip_address: ipAddress || null, + created_at: new Date().toISOString(), + ...(row || {}), + }; + try { + broadcastAuditEntry(broadcastEntry); + } catch { + // Non-critical — streaming failure must not affect audit integrity + } }) .catch((err) => { logError('Failed to insert audit log entry', { error: err.message, action, guildId }); diff --git a/src/api/routes/auditLog.js b/src/api/routes/auditLog.js index 6930b1bea..740bc0f4f 100644 --- a/src/api/routes/auditLog.js +++ b/src/api/routes/auditLog.js @@ -1,8 +1,8 @@ /** * Audit Log API Routes - * Paginated, filterable audit log retrieval for dashboard consumption. + * Paginated, filterable audit log retrieval and export for dashboard consumption. * - * @see https://github.com/VolvoxLLC/volvox-bot/issues/123 + * @see https://github.com/VolvoxLLC/volvox-bot/issues/136 */ import { Router } from 'express'; @@ -15,6 +15,9 @@ const router = Router(); /** Rate limiter for audit log endpoints — 30 req/min per IP. */ const auditRateLimit = rateLimit({ windowMs: 60 * 1000, max: 30 }); +/** Rate limiter for export endpoints — 10 req/min per IP (exports are heavier). */ +const exportRateLimit = rateLimit({ windowMs: 60 * 1000, max: 10 }); + /** * Helper to get the database pool from app.locals. * @@ -39,6 +42,94 @@ function toFilterString(value) { return trimmed.length > 0 ? trimmed : null; } +/** + * Build WHERE conditions and params from query filters. + * + * @param {string} guildId + * @param {import('express').Request['query']} query + * @returns {{ conditions: string[], params: unknown[], paramIndex: number }} + */ +function buildFilters(guildId, query) { + const conditions = ['guild_id = $1']; + const params = [guildId]; + let paramIndex = 2; + + const actionFilter = toFilterString(query.action); + if (actionFilter) { + conditions.push(`action = $${paramIndex}`); + params.push(actionFilter); + paramIndex++; + } + + const userIdFilter = toFilterString(query.userId); + if (userIdFilter) { + conditions.push(`user_id = $${paramIndex}`); + params.push(userIdFilter); + paramIndex++; + } + + if (query.startDate) { + const start = new Date(query.startDate); + if (!Number.isNaN(start.getTime())) { + conditions.push(`created_at >= $${paramIndex}`); + params.push(start.toISOString()); + paramIndex++; + } + } + + if (query.endDate) { + const end = new Date(query.endDate); + if (!Number.isNaN(end.getTime())) { + conditions.push(`created_at <= $${paramIndex}`); + params.push(end.toISOString()); + paramIndex++; + } + } + + return { conditions, params, paramIndex }; +} + +/** + * Escape a value for CSV output. + * Wraps in double quotes and escapes internal double quotes. + * + * @param {unknown} value + * @returns {string} + */ +export function escapeCsvValue(value) { + if (value === null || value === undefined) return ''; + const str = typeof value === 'object' ? JSON.stringify(value) : String(value); + if (str.includes(',') || str.includes('\n') || str.includes('"')) { + return `"${str.replace(/"/g, '""')}"`; + } + return str; +} + +/** + * Convert an array of audit log rows to CSV string. + * + * @param {Object[]} rows + * @returns {string} + */ +export function rowsToCsv(rows) { + const headers = [ + 'id', + 'guild_id', + 'user_id', + 'action', + 'target_type', + 'target_id', + 'details', + 'ip_address', + 'created_at', + ]; + const lines = [headers.join(',')]; + for (const row of rows) { + lines.push(headers.map((h) => escapeCsvValue(row[h])).join(',')); + } + return lines.join('\n'); +} + // ─── GET /:id/audit-log ────────────────────────────────────────────────────── /** @@ -61,42 +152,7 @@ router.get('/:id/audit-log', auditRateLimit, requireGuildAdmin, validateGuild, a const offset = Math.max(0, Number.parseInt(req.query.offset, 10) || 0); try { - const conditions = ['guild_id = $1']; - const params = [guildId]; - let paramIndex = 2; - - const actionFilter = toFilterString(req.query.action); - if (actionFilter) { - conditions.push(`action = $${paramIndex}`); - params.push(actionFilter); - paramIndex++; - } - - const userIdFilter = toFilterString(req.query.userId); - if (userIdFilter) { - conditions.push(`user_id = $${paramIndex}`); - params.push(userIdFilter); - paramIndex++; - } - - if (req.query.startDate) { - const start = new Date(req.query.startDate); - if (!Number.isNaN(start.getTime())) { - conditions.push(`created_at >= $${paramIndex}`); - params.push(start.toISOString()); - paramIndex++; - } - } - - if (req.query.endDate) { - const end = new Date(req.query.endDate); - if (!Number.isNaN(end.getTime())) { - conditions.push(`created_at <= $${paramIndex}`); - params.push(end.toISOString()); - paramIndex++; - } - } - + const { conditions, params, paramIndex } = buildFilters(guildId, req.query); const whereClause = conditions.join(' AND '); const [countResult, entriesResult] = await Promise.all([ @@ -123,4 +179,73 @@ router.get('/:id/audit-log', auditRateLimit, requireGuildAdmin, validateGuild, a } }); +// ─── GET /:id/audit-log/export ─────────────────────────────────────────────── + +/** + * GET /:id/audit-log/export — Export full filtered audit log as CSV or JSON. + * + * Query params: + * format — 'csv' or 'json' (default 'json') + * action — Filter by action type + * userId — Filter by admin user ID + * startDate — ISO timestamp lower bound + * endDate — ISO timestamp upper bound + * limit — Max rows to export (default 1000, max 10000) + */ +router.get( + '/:id/audit-log/export', + exportRateLimit, + requireGuildAdmin, + validateGuild, + async (req, res) => { + const { id: guildId } = req.params; + const pool = getDbPool(req); + if (!pool) return res.status(503).json({ error: 'Database not available' }); + + const format = toFilterString(req.query.format) || 'json'; + if (format !== 'csv' && format !== 'json') { + return res.status(400).json({ error: 'Invalid format. Use "csv" or "json".' }); + } + + // Allow larger exports — up to 10k rows + const limit = Math.min(10000, Math.max(1, Number.parseInt(req.query.limit, 10) || 1000)); + + try { + const { conditions, params, paramIndex } = buildFilters(guildId, req.query); + const whereClause = conditions.join(' AND '); + + const result = await pool.query( + `SELECT id, guild_id, user_id, action, target_type, target_id, details, ip_address, created_at + FROM audit_logs + WHERE ${whereClause} + ORDER BY created_at DESC + LIMIT $${paramIndex}`, + [...params, limit], + ); + + const rows = result.rows; + const timestamp = new Date().toISOString().replace(/[:.]/g, '-'); + const filename = `audit-log-${guildId}-${timestamp}`; + + if (format === 'csv') { + res.setHeader('Content-Type', 'text/csv'); + res.setHeader('Content-Disposition', `attachment; filename="${filename}.csv"`); + res.send(rowsToCsv(rows)); + } else { + res.setHeader('Content-Type', 'application/json'); + res.setHeader('Content-Disposition', `attachment; filename="${filename}.json"`); + res.json({ + guildId, + exportedAt: new Date().toISOString(), + count: rows.length, + entries: rows, + }); + } + } catch (err) { + logError('Failed to export audit log', { guildId, error: err.message }); + res.status(500).json({ error: 'Failed to export audit log' }); + } + }, +); + export default router; diff --git a/src/api/server.js b/src/api/server.js index 39049a4b0..15b3a9d52 100644 --- a/src/api/server.js +++ b/src/api/server.js @@ -10,6 +10,7 @@ import { rateLimit } from './middleware/rateLimit.js'; import { stopAuthCleanup } from './routes/auth.js'; import { swaggerSpec } from './swagger.js'; import { stopGuildCacheCleanup } from './utils/discordApi.js'; +import { setupAuditStream, stopAuditStream } from './ws/auditStream.js'; import { setupLogStream, stopLogStream } from './ws/logStream.js'; /** @type {import('node:http').Server | null} */ @@ -132,6 +133,14 @@ export async function startServer(client, dbPool, options = {}) { } } + // Attach audit log real-time WebSocket stream + try { + setupAuditStream(server); + } catch (err) { + error('Failed to setup audit log WebSocket stream', { error: err.message }); + // Non-fatal — HTTP server still works without audit WS streaming + } + resolve(server); }); server.once('error', (err) => { @@ -151,6 +160,9 @@ export async function stopServer() { // Stop WebSocket log stream before closing HTTP server await stopLogStream(); + // Stop audit log WebSocket stream + await stopAuditStream(); + stopAuthCleanup(); stopGuildCacheCleanup(); diff --git a/src/api/ws/auditStream.js b/src/api/ws/auditStream.js new file mode 100644 index 000000000..56bd958ae --- /dev/null +++ b/src/api/ws/auditStream.js @@ -0,0 +1,322 @@ +/** + * WebSocket Audit Log Stream + * + * Broadcasts real-time audit log entries to connected, authenticated dashboard clients. + * Clients connect to /ws/audit-log, authenticate with an HMAC ticket, and receive + * new audit entries as they are written. + * + * Protocol (JSON messages): + * Client → Server: + * { type: 'auth', ticket: '..' } + * { type: 'filter', guildId: '...', action: '...', userId: '...' } + * + * Server → Client: + * { type: 'auth_ok' } + * { type: 'entry', entry: { ... } } + * { type: 'error', message: '...' } + */ + +import { createHmac, timingSafeEqual } from 'node:crypto'; +import WebSocket, { WebSocketServer } from 'ws'; +import { info, error as logError, warn } from '../../logger.js'; + +/** Maximum concurrent authenticated audit stream clients */ +const MAX_CLIENTS = 10; + +/** Heartbeat interval in milliseconds */ +const HEARTBEAT_INTERVAL_MS = 30_000; + +/** Auth timeout — clients must authenticate within this window */ +const AUTH_TIMEOUT_MS = 10_000; + +/** @type {WebSocketServer | null} */ +let wss = null; + +/** @type {ReturnType | null} */ +let heartbeatTimer = null; + +/** @type {number} */ +let authenticatedCount = 0; + +/** + * Validate an HMAC ticket of the form `nonce.expiry.hmac`. + * + * @param {string} ticket + * @param {string} secret + * @returns {boolean} + */ +function validateTicket(ticket, secret) { + if (typeof ticket !== 'string' || typeof secret !== 'string') return false; + const parts = ticket.split('.'); + if (parts.length !== 3) return false; + const [nonce, expiry, hmac] = parts; + if (!nonce || !expiry || !hmac) return false; + const expiryNum = Number(expiry); + if (!Number.isFinite(expiryNum) || expiryNum <= Date.now()) return false; + const expected = createHmac('sha256', secret).update(`${nonce}.${expiry}`).digest('hex'); + try { + return timingSafeEqual(Buffer.from(expected, 'hex'), Buffer.from(hmac, 'hex')); + } catch { + return false; + } +} + +/** + * Send JSON to a WebSocket client (safe — swallows errors). + * + * @param {WebSocket} ws + * @param {Object} data + */ +function sendJson(ws, data) { + try { + if (ws.readyState === WebSocket.OPEN) { + ws.send(JSON.stringify(data)); + } + } catch { + // Ignore send errors — cleanup handled elsewhere + } +} + +/** + * Clean up a disconnecting client. + * + * @param {WebSocket} ws + */ +function cleanupClient(ws) { + if (ws.authTimeout) { + clearTimeout(ws.authTimeout); + ws.authTimeout = null; + } + if (ws.authenticated) { + ws.authenticated = false; + authenticatedCount = Math.max(0, authenticatedCount - 1); + info('Audit stream client disconnected', { totalClients: authenticatedCount }); + } +} + +/** + * Handle auth message. + * + * @param {WebSocket} ws + * @param {Object} msg + */ +function handleAuth(ws, msg) { + if (ws.authenticated) { + sendJson(ws, { type: 'error', message: 'Already authenticated' }); + return; + } + + if (!validateTicket(msg.ticket, process.env.BOT_API_SECRET)) { + warn('Audit stream auth failed', { reason: 'invalid ticket' }); + ws.close(4003, 'Authentication failed'); + return; + } + + if (authenticatedCount >= MAX_CLIENTS) { + warn('Audit stream max clients reached', { max: MAX_CLIENTS }); + ws.close(4029, 'Too many clients'); + return; + } + + ws.authenticated = true; + authenticatedCount++; + + if (ws.authTimeout) { + clearTimeout(ws.authTimeout); + ws.authTimeout = null; + } + + sendJson(ws, { type: 'auth_ok' }); + info('Audit stream client authenticated', { totalClients: authenticatedCount }); +} + +/** + * Handle filter message. + * Clients can subscribe to a specific guildId and optionally narrow by action or userId. + * + * @param {WebSocket} ws + * @param {Object} msg + */ +function handleFilter(ws, msg) { + if (!ws.authenticated) { + sendJson(ws, { type: 'error', message: 'Not authenticated' }); + return; + } + + ws.auditFilter = { + guildId: typeof msg.guildId === 'string' ? msg.guildId : null, + action: typeof msg.action === 'string' ? msg.action : null, + userId: typeof msg.userId === 'string' ? msg.userId : null, + }; + + sendJson(ws, { type: 'filter_ok', filter: ws.auditFilter }); +} + +/** + * Handle incoming message from a client. + * + * @param {WebSocket} ws + * @param {Buffer|string} data + */ +function handleMessage(ws, data) { + let msg; + try { + msg = JSON.parse(data.toString()); + } catch { + sendJson(ws, { type: 'error', message: 'Invalid JSON' }); + return; + } + + if (!msg || typeof msg.type !== 'string') { + sendJson(ws, { type: 'error', message: 'Missing message type' }); + return; + } + + switch (msg.type) { + case 'auth': + handleAuth(ws, msg); + break; + case 'filter': + handleFilter(ws, msg); + break; + default: + sendJson(ws, { type: 'error', message: `Unknown message type: ${msg.type}` }); + } +} + +/** + * Handle a new WebSocket connection. + * + * @param {WebSocket} ws + */ +function handleConnection(ws) { + ws.isAlive = true; + ws.authenticated = false; + ws.auditFilter = null; + + ws.authTimeout = setTimeout(() => { + if (!ws.authenticated) { + ws.close(4001, 'Authentication timeout'); + } + }, AUTH_TIMEOUT_MS); + + ws.on('pong', () => { + ws.isAlive = true; + }); + + ws.on('message', (data) => { + handleMessage(ws, data); + }); + + ws.on('close', () => { + cleanupClient(ws); + }); + + ws.on('error', (err) => { + logError('Audit stream client error', { error: err.message }); + cleanupClient(ws); + }); +} + +/** + * Check whether an entry matches a client's filter. + * + * @param {Object} filter - Client's active filter (may be null) + * @param {Object} entry - Audit log entry + * @returns {boolean} + */ +function matchesFilter(filter, entry) { + if (!filter) return true; // No filter → receive everything + if (filter.guildId && entry.guild_id !== filter.guildId) return false; + if (filter.action && entry.action !== filter.action) return false; + if (filter.userId && entry.user_id !== filter.userId) return false; + return true; +} + +/** + * Broadcast a new audit log entry to all connected, authenticated clients + * whose filter matches the entry. + * + * Called by the audit log middleware after a successful DB insert. + * + * @param {Object} entry - Audit log entry (same shape as DB row) + */ +export function broadcastAuditEntry(entry) { + if (!wss) return; + + for (const ws of wss.clients) { + if (ws.authenticated && matchesFilter(ws.auditFilter, entry)) { + sendJson(ws, { type: 'entry', entry }); + } + } +} + +/** + * Set up the audit log WebSocket server on the provided HTTP server. + * Attaches to path `/ws/audit-log`. + * + * @param {import('node:http').Server} httpServer + */ +export function setupAuditStream(httpServer) { + if (wss) { + warn('setupAuditStream called while already running — cleaning up previous instance'); + stopAuditStream(); + } + + wss = new WebSocketServer({ server: httpServer, path: '/ws/audit-log' }); + wss.on('connection', handleConnection); + + heartbeatTimer = setInterval(() => { + if (!wss) return; + for (const ws of wss.clients) { + if (ws.isAlive === false) { + info('Terminating dead audit stream client', { reason: 'heartbeat timeout' }); + cleanupClient(ws); + ws.terminate(); + continue; + } + ws.isAlive = false; + ws.ping(); + } + }, HEARTBEAT_INTERVAL_MS); + + if (heartbeatTimer.unref) heartbeatTimer.unref(); + + info('Audit log WebSocket stream started', { path: '/ws/audit-log' }); +} + +/** + * Stop the audit log WebSocket server and disconnect all clients. + * + * @returns {Promise} + */ +export async function stopAuditStream() { + if (heartbeatTimer) { + clearInterval(heartbeatTimer); + heartbeatTimer = null; + } + + if (wss) { + for (const ws of wss.clients) { + cleanupClient(ws); + ws.close(1001, 'Server shutting down'); + } + + await new Promise((resolve) => { + wss.close(() => resolve()); + }); + + wss = null; + authenticatedCount = 0; + info('Audit log WebSocket stream stopped'); + } +} + +/** + * Get the current count of authenticated audit stream clients. + * + * @returns {number} + */ +export function getAuditStreamClientCount() { + return authenticatedCount; +} diff --git a/tests/api/routes/auditLog.test.js b/tests/api/routes/auditLog.test.js index 0b0eee8cf..3464b99b7 100644 --- a/tests/api/routes/auditLog.test.js +++ b/tests/api/routes/auditLog.test.js @@ -263,3 +263,237 @@ describe('auditLog routes', () => { }); }); }); + +// ─── Export helpers ─────────────────────────────────────────────────────────── + +import { escapeCsvValue, rowsToCsv } from '../../../src/api/routes/auditLog.js'; + +describe('CSV helpers', () => { + describe('escapeCsvValue', () => { + it('should return empty string for null/undefined', () => { + expect(escapeCsvValue(null)).toBe(''); + expect(escapeCsvValue(undefined)).toBe(''); + }); + + it('should return plain string without special chars', () => { + expect(escapeCsvValue('config.update')).toBe('config.update'); + }); + + it('should wrap in quotes if value contains comma', () => { + expect(escapeCsvValue('a,b')).toBe('"a,b"'); + }); + + it('should escape internal double quotes', () => { + expect(escapeCsvValue('say "hello"')).toBe('"say ""hello"""'); + }); + + it('should wrap in quotes if value contains newline', () => { + expect(escapeCsvValue('line1\nline2')).toBe('"line1\nline2"'); + }); + + it('should stringify objects as JSON', () => { + const val = escapeCsvValue({ key: 'value' }); + expect(val).toBe('"{""key"":""value""}"'); + }); + }); + + describe('rowsToCsv', () => { + it('should produce header line for empty array', () => { + const csv = rowsToCsv([]); + expect(csv).toBe( + 'id,guild_id,user_id,action,target_type,target_id,details,ip_address,created_at', + ); + }); + + it('should produce correct CSV for a row', () => { + const rows = [ + { + id: 1, + guild_id: 'guild1', + user_id: 'user1', + action: 'config.update', + target_type: null, + target_id: null, + details: null, + ip_address: '127.0.0.1', + created_at: '2026-01-01T00:00:00Z', + }, + ]; + const csv = rowsToCsv(rows); + const lines = csv.split('\n'); + expect(lines).toHaveLength(2); + expect(lines[1]).toContain('config.update'); + expect(lines[1]).toContain('127.0.0.1'); + }); + }); +}); + +// ─── GET /:id/audit-log/export ──────────────────────────────────────────────── + +describe('GET /:id/audit-log/export', () => { + let app; + let mockPool; + + const mockGuild = { + id: 'guild1', + name: 'Test Server', + iconURL: () => 'https://cdn.example.com/icon.png', + memberCount: 100, + channels: { cache: new Map() }, + roles: { cache: new Map() }, + members: { cache: new Map() }, + }; + + beforeEach(() => { + vi.stubEnv('BOT_API_SECRET', 'test-audit-secret'); + + mockPool = { + query: vi.fn().mockResolvedValue({ rows: [] }), + connect: vi.fn(), + }; + + const client = { + guilds: { cache: new Map([['guild1', mockGuild]]) }, + ws: { status: 0, ping: 42 }, + user: { tag: 'Bot#1234' }, + }; + + app = createApp(client, mockPool); + }); + + afterEach(() => { + vi.clearAllMocks(); + vi.unstubAllEnvs(); + }); + + it('should return 401 without auth', async () => { + const res = await request(app).get('/api/v1/guilds/guild1/audit-log/export'); + expect(res.status).toBe(401); + }); + + it('should export JSON by default', async () => { + const mockRows = [ + { + id: 1, + guild_id: 'guild1', + user_id: 'user1', + action: 'config.update', + target_type: null, + target_id: null, + details: null, + ip_address: '127.0.0.1', + created_at: '2026-01-01T00:00:00Z', + }, + ]; + + mockPool.query.mockResolvedValueOnce({ rows: mockRows }); + + const res = await request(app) + .get('/api/v1/guilds/guild1/audit-log/export') + .set('x-api-secret', 'test-audit-secret'); + + expect(res.status).toBe(200); + expect(res.headers['content-type']).toContain('application/json'); + expect(res.headers['content-disposition']).toContain('attachment'); + expect(res.body.guildId).toBe('guild1'); + expect(res.body.entries).toHaveLength(1); + expect(res.body.count).toBe(1); + }); + + it('should export CSV when format=csv', async () => { + const mockRows = [ + { + id: 1, + guild_id: 'guild1', + user_id: 'user1', + action: 'config.update', + target_type: null, + target_id: null, + details: null, + ip_address: '127.0.0.1', + created_at: '2026-01-01T00:00:00Z', + }, + ]; + + mockPool.query.mockResolvedValueOnce({ rows: mockRows }); + + const res = await request(app) + .get('/api/v1/guilds/guild1/audit-log/export?format=csv') + .set('x-api-secret', 'test-audit-secret'); + + expect(res.status).toBe(200); + expect(res.headers['content-type']).toContain('text/csv'); + expect(res.headers['content-disposition']).toContain('attachment'); + expect(res.text).toContain('guild_id,user_id,action'); + expect(res.text).toContain('config.update'); + }); + + it('should return 400 for invalid format', async () => { + const res = await request(app) + .get('/api/v1/guilds/guild1/audit-log/export?format=xml') + .set('x-api-secret', 'test-audit-secret'); + + expect(res.status).toBe(400); + expect(res.body.error).toContain('Invalid format'); + }); + + it('should return 503 when database is unavailable', async () => { + const client = { + guilds: { cache: new Map([['guild1', mockGuild]]) }, + ws: { status: 0, ping: 42 }, + user: { tag: 'Bot#1234' }, + }; + const appNoDb = createApp(client, null); + + const res = await request(appNoDb) + .get('/api/v1/guilds/guild1/audit-log/export') + .set('x-api-secret', 'test-audit-secret'); + + expect(res.status).toBe(503); + }); + + it('should return 500 on database error', async () => { + mockPool.query.mockRejectedValue(new Error('DB error')); + + const res = await request(app) + .get('/api/v1/guilds/guild1/audit-log/export') + .set('x-api-secret', 'test-audit-secret'); + + expect(res.status).toBe(500); + expect(res.body.error).toBe('Failed to export audit log'); + }); + + it('should cap export limit at 10000', async () => { + mockPool.query.mockResolvedValueOnce({ rows: [] }); + + const res = await request(app) + .get('/api/v1/guilds/guild1/audit-log/export?limit=99999') + .set('x-api-secret', 'test-audit-secret'); + + expect(res.status).toBe(200); + // Verify the query was called with limit=10000 + const call = mockPool.query.mock.calls[0]; + expect(call[1]).toContain(10000); + }); + + it('should apply filters to export query', async () => { + mockPool.query.mockResolvedValueOnce({ rows: [] }); + + await request(app) + .get('/api/v1/guilds/guild1/audit-log/export?format=json&action=config.update&userId=user42') + .set('x-api-secret', 'test-audit-secret'); + + const call = mockPool.query.mock.calls[0]; + expect(call[0]).toContain('action = $2'); + expect(call[0]).toContain('user_id = $3'); + expect(call[1]).toContain('config.update'); + expect(call[1]).toContain('user42'); + }); + + it('should return 404 for unknown guild', async () => { + const res = await request(app) + .get('/api/v1/guilds/unknown-guild/audit-log/export') + .set('x-api-secret', 'test-audit-secret'); + expect(res.status).toBe(404); + }); +}); diff --git a/tests/api/ws/auditStream.test.js b/tests/api/ws/auditStream.test.js new file mode 100644 index 000000000..3fb0fcf11 --- /dev/null +++ b/tests/api/ws/auditStream.test.js @@ -0,0 +1,349 @@ +/** + * Tests for src/api/ws/auditStream.js + * Covers connection lifecycle, auth, filtering, and broadcast. + */ +import { createHmac, randomBytes } from 'node:crypto'; +import http from 'node:http'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import WebSocket from 'ws'; +import { + broadcastAuditEntry, + getAuditStreamClientCount, + setupAuditStream, + stopAuditStream, +} from '../../../src/api/ws/auditStream.js'; + +vi.mock('../../../src/logger.js', () => ({ + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), +})); + +const TEST_SECRET = 'audit-stream-test-secret'; + +function makeTicket(secret = TEST_SECRET, ttlMs = 60_000) { + const nonce = randomBytes(16).toString('hex'); + const expiry = String(Date.now() + ttlMs); + const hmac = createHmac('sha256', secret).update(`${nonce}.${expiry}`).digest('hex'); + return `${nonce}.${expiry}.${hmac}`; +} + +function createTestServer() { + return new Promise((resolve) => { + const server = http.createServer(); + server.listen(0, () => resolve({ server, port: server.address().port })); + }); +} + +function connectWs(port) { + return new Promise((resolve, reject) => { + const ws = new WebSocket(`ws://localhost:${port}/ws/audit-log`); + ws.on('open', () => resolve(ws)); + ws.on('error', reject); + }); +} + +function createMessageQueue(ws) { + const queue = []; + const waiters = []; + + ws.on('message', (data) => { + const msg = JSON.parse(data.toString()); + if (waiters.length > 0) { + waiters.shift().resolve(msg); + } else { + queue.push(msg); + } + }); + + return { + next(timeoutMs = 3000) { + if (queue.length > 0) return Promise.resolve(queue.shift()); + return new Promise((resolve, reject) => { + const waiter = { + resolve: (msg) => { + clearTimeout(timer); + resolve(msg); + }, + }; + const timer = setTimeout(() => { + const idx = waiters.indexOf(waiter); + if (idx >= 0) waiters.splice(idx, 1); + reject(new Error('Message timeout')); + }, timeoutMs); + waiters.push(waiter); + }); + }, + }; +} + +function waitForClose(ws, timeoutMs = 3000) { + return new Promise((resolve, reject) => { + if (ws.readyState === WebSocket.CLOSED) return resolve(1000); + const timer = setTimeout(() => reject(new Error('Close timeout')), timeoutMs); + ws.once('close', (code) => { + clearTimeout(timer); + resolve(code); + }); + }); +} + +function sendJson(ws, data) { + ws.send(JSON.stringify(data)); +} + +describe('Audit Log WebSocket Stream', () => { + let httpServer; + let port; + + beforeEach(async () => { + vi.stubEnv('BOT_API_SECRET', TEST_SECRET); + const result = await createTestServer(); + httpServer = result.server; + port = result.port; + setupAuditStream(httpServer); + }); + + afterEach(async () => { + await stopAuditStream(); + await new Promise((resolve) => httpServer.close(resolve)); + vi.unstubAllEnvs(); + vi.clearAllMocks(); + }); + + // ─── Connection lifecycle ───────────────────────────────────────────────── + + it('should accept WebSocket connections on /ws/audit-log', async () => { + const ws = await connectWs(port); + expect(ws.readyState).toBe(WebSocket.OPEN); + ws.close(); + }); + + it('should close unauthenticated clients after auth timeout', async () => { + const ws = await connectWs(port); + // We don't send auth — wait for close with code 4001 + const code = await waitForClose(ws, 12_000); + expect(code).toBe(4001); + }, 15_000); + + // ─── Authentication ─────────────────────────────────────────────────────── + + it('should authenticate with a valid ticket', async () => { + const ws = await connectWs(port); + const q = createMessageQueue(ws); + sendJson(ws, { type: 'auth', ticket: makeTicket() }); + const msg = await q.next(); + expect(msg.type).toBe('auth_ok'); + expect(getAuditStreamClientCount()).toBe(1); + ws.close(); + await waitForClose(ws); + }); + + it('should reject an invalid ticket', async () => { + const ws = await connectWs(port); + sendJson(ws, { type: 'auth', ticket: 'bad.ticket.here' }); + const code = await waitForClose(ws); + expect(code).toBe(4003); + }); + + it('should reject an expired ticket', async () => { + const ws = await connectWs(port); + const expired = makeTicket(TEST_SECRET, -1000); + sendJson(ws, { type: 'auth', ticket: expired }); + const code = await waitForClose(ws); + expect(code).toBe(4003); + }); + + it('should reject double auth', async () => { + const ws = await connectWs(port); + const q = createMessageQueue(ws); + sendJson(ws, { type: 'auth', ticket: makeTicket() }); + await q.next(); // auth_ok + sendJson(ws, { type: 'auth', ticket: makeTicket() }); + const msg = await q.next(); + expect(msg.type).toBe('error'); + ws.close(); + await waitForClose(ws); + }); + + it('should decrement client count on disconnect', async () => { + const ws = await connectWs(port); + const q = createMessageQueue(ws); + sendJson(ws, { type: 'auth', ticket: makeTicket() }); + await q.next(); // auth_ok + expect(getAuditStreamClientCount()).toBe(1); + ws.close(); + await waitForClose(ws); + await new Promise((r) => setTimeout(r, 50)); // allow cleanup + expect(getAuditStreamClientCount()).toBe(0); + }); + + // ─── Filter ─────────────────────────────────────────────────────────────── + + it('should handle filter message after auth', async () => { + const ws = await connectWs(port); + const q = createMessageQueue(ws); + sendJson(ws, { type: 'auth', ticket: makeTicket() }); + await q.next(); // auth_ok + sendJson(ws, { type: 'filter', guildId: 'guild1', action: 'config.update' }); + const msg = await q.next(); + expect(msg.type).toBe('filter_ok'); + expect(msg.filter.guildId).toBe('guild1'); + expect(msg.filter.action).toBe('config.update'); + ws.close(); + await waitForClose(ws); + }); + + it('should reject filter before auth', async () => { + const ws = await connectWs(port); + const q = createMessageQueue(ws); + sendJson(ws, { type: 'filter', guildId: 'guild1' }); + const msg = await q.next(); + expect(msg.type).toBe('error'); + ws.close(); + await waitForClose(ws); + }); + + // ─── broadcastAuditEntry ───────────────────────────────────────────────── + + it('should broadcast entry to authenticated clients with no filter', async () => { + const ws = await connectWs(port); + const q = createMessageQueue(ws); + sendJson(ws, { type: 'auth', ticket: makeTicket() }); + await q.next(); // auth_ok + + const entry = { + id: 1, + guild_id: 'guild1', + user_id: 'user1', + action: 'config.update', + created_at: new Date().toISOString(), + }; + broadcastAuditEntry(entry); + + const msg = await q.next(); + expect(msg.type).toBe('entry'); + expect(msg.entry.action).toBe('config.update'); + ws.close(); + await waitForClose(ws); + }); + + it('should broadcast to client with matching guildId filter', async () => { + const ws = await connectWs(port); + const q = createMessageQueue(ws); + sendJson(ws, { type: 'auth', ticket: makeTicket() }); + await q.next(); // auth_ok + sendJson(ws, { type: 'filter', guildId: 'guild1' }); + await q.next(); // filter_ok + + const entry = { + id: 2, + guild_id: 'guild1', + user_id: 'user1', + action: 'members.delete', + created_at: new Date().toISOString(), + }; + broadcastAuditEntry(entry); + + const msg = await q.next(); + expect(msg.type).toBe('entry'); + expect(msg.entry.guild_id).toBe('guild1'); + ws.close(); + await waitForClose(ws); + }); + + it('should NOT broadcast to client with non-matching guildId filter', async () => { + const ws = await connectWs(port); + const q = createMessageQueue(ws); + sendJson(ws, { type: 'auth', ticket: makeTicket() }); + await q.next(); // auth_ok + sendJson(ws, { type: 'filter', guildId: 'other-guild' }); + await q.next(); // filter_ok + + broadcastAuditEntry({ + id: 3, + guild_id: 'guild1', + user_id: 'user1', + action: 'config.update', + created_at: new Date().toISOString(), + }); + + // No message should arrive — timeout should fire + await expect(q.next(500)).rejects.toThrow('Message timeout'); + ws.close(); + await waitForClose(ws); + }); + + it('should filter by action', async () => { + const ws = await connectWs(port); + const q = createMessageQueue(ws); + sendJson(ws, { type: 'auth', ticket: makeTicket() }); + await q.next(); // auth_ok + sendJson(ws, { type: 'filter', action: 'moderation.create' }); + await q.next(); // filter_ok + + // This one should NOT arrive + broadcastAuditEntry({ + id: 4, + guild_id: 'guild1', + user_id: 'u', + action: 'config.update', + created_at: new Date().toISOString(), + }); + // This one SHOULD arrive + broadcastAuditEntry({ + id: 5, + guild_id: 'guild1', + user_id: 'u', + action: 'moderation.create', + created_at: new Date().toISOString(), + }); + + const msg = await q.next(); + expect(msg.entry.action).toBe('moderation.create'); + ws.close(); + await waitForClose(ws); + }); + + it('should not broadcast to unauthenticated clients', async () => { + const ws = await connectWs(port); + const q = createMessageQueue(ws); + // Don't authenticate + broadcastAuditEntry({ + id: 6, + guild_id: 'guild1', + user_id: 'u', + action: 'config.update', + created_at: new Date().toISOString(), + }); + await expect(q.next(300)).rejects.toThrow('Message timeout'); + ws.close(); + }); + + it('should handle unknown message type', async () => { + const ws = await connectWs(port); + const q = createMessageQueue(ws); + sendJson(ws, { type: 'unknown_type', data: 'test' }); + const msg = await q.next(); + expect(msg.type).toBe('error'); + ws.close(); + await waitForClose(ws); + }); + + it('should handle invalid JSON', async () => { + const ws = await connectWs(port); + const q = createMessageQueue(ws); + ws.send('not json at all'); + const msg = await q.next(); + expect(msg.type).toBe('error'); + ws.close(); + await waitForClose(ws); + }); + + // ─── broadcastAuditEntry with no wss ───────────────────────────────────── + + it('should not throw if broadcastAuditEntry called before setup', async () => { + await stopAuditStream(); // force wss to null + expect(() => broadcastAuditEntry({ id: 99, guild_id: 'x', action: 'y' })).not.toThrow(); + }); +}); From 34b4f4604bdf6ce46269e56c37ec730e57aadf16 Mon Sep 17 00:00:00 2001 From: Pip Build Date: Mon, 2 Mar 2026 00:41:28 -0500 Subject: [PATCH 2/2] fix: PR #215 review feedback - audit stream fixes - ws.ping() crash: guard with readyState check + try/catch to avoid crashing heartbeat interval when socket not OPEN - stopAuditStream race: make setupAuditStream async and await stopAuditStream() to prevent concurrent WebSocketServer creation - Query param array coercion: add typeof === 'string' checks for startDate/endDate to handle Express string|string[]|undefined - CSV CRLF quoting: add \r to RFC 4180 special-char check for proper Windows line ending handling - Test timeouts: make AUTH_TIMEOUT_MS configurable via AUDIT_STREAM_AUTH_TIMEOUT_MS env var, use 100ms in tests --- src/api/routes/auditLog.js | 8 +++++--- src/api/ws/auditStream.js | 28 ++++++++++++++++++++++------ tests/api/ws/auditStream.test.js | 10 ++++++---- 3 files changed, 33 insertions(+), 13 deletions(-) diff --git a/src/api/routes/auditLog.js b/src/api/routes/auditLog.js index 740bc0f4f..fc910bd89 100644 --- a/src/api/routes/auditLog.js +++ b/src/api/routes/auditLog.js @@ -68,7 +68,8 @@ function buildFilters(guildId, query) { paramIndex++; } - if (query.startDate) { + // Guard against array query params — Express can pass string|string[]|undefined + if (typeof query.startDate === 'string') { const start = new Date(query.startDate); if (!Number.isNaN(start.getTime())) { conditions.push(`created_at >= $${paramIndex}`); @@ -77,7 +78,7 @@ function buildFilters(guildId, query) { } } - if (query.endDate) { + if (typeof query.endDate === 'string') { const end = new Date(query.endDate); if (!Number.isNaN(end.getTime())) { conditions.push(`created_at <= $${paramIndex}`); @@ -99,7 +100,8 @@ function buildFilters(guildId, query) { export function escapeCsvValue(value) { if (value === null || value === undefined) return ''; const str = typeof value === 'object' ? JSON.stringify(value) : String(value); - if (str.includes(',') || str.includes('\n') || str.includes('"')) { + // RFC 4180: also check for \r (CRLF) to properly handle Windows line endings + if (str.includes(',') || str.includes('\n') || str.includes('\r') || str.includes('"')) { return `"${str.replace(/"/g, '""')}"`; } return str; diff --git a/src/api/ws/auditStream.js b/src/api/ws/auditStream.js index 56bd958ae..269b7fa4a 100644 --- a/src/api/ws/auditStream.js +++ b/src/api/ws/auditStream.js @@ -26,8 +26,10 @@ const MAX_CLIENTS = 10; /** Heartbeat interval in milliseconds */ const HEARTBEAT_INTERVAL_MS = 30_000; -/** Auth timeout — clients must authenticate within this window */ -const AUTH_TIMEOUT_MS = 10_000; +/** Auth timeout — clients must authenticate within this window (configurable via env) */ +function getAuthTimeoutMs() { + return Number(process.env.AUDIT_STREAM_AUTH_TIMEOUT_MS) || 10_000; +} /** @type {WebSocketServer | null} */ let wss = null; @@ -198,7 +200,7 @@ function handleConnection(ws) { if (!ws.authenticated) { ws.close(4001, 'Authentication timeout'); } - }, AUTH_TIMEOUT_MS); + }, getAuthTimeoutMs()); ws.on('pong', () => { ws.isAlive = true; @@ -256,11 +258,12 @@ export function broadcastAuditEntry(entry) { * Attaches to path `/ws/audit-log`. * * @param {import('node:http').Server} httpServer + * @returns {Promise} */ -export function setupAuditStream(httpServer) { +export async function setupAuditStream(httpServer) { if (wss) { warn('setupAuditStream called while already running — cleaning up previous instance'); - stopAuditStream(); + await stopAuditStream(); } wss = new WebSocketServer({ server: httpServer, path: '/ws/audit-log' }); @@ -276,7 +279,20 @@ export function setupAuditStream(httpServer) { continue; } ws.isAlive = false; - ws.ping(); + // Guard ping() with readyState check and try/catch to avoid crashing the interval + try { + if (ws.readyState === WebSocket.OPEN) { + ws.ping(); + } + } catch (err) { + logError('Audit stream ping failed', { error: err.message }); + cleanupClient(ws); + try { + ws.terminate(); + } catch { + // Ignore terminate errors + } + } } }, HEARTBEAT_INTERVAL_MS); diff --git a/tests/api/ws/auditStream.test.js b/tests/api/ws/auditStream.test.js index 3fb0fcf11..681b3ee80 100644 --- a/tests/api/ws/auditStream.test.js +++ b/tests/api/ws/auditStream.test.js @@ -98,10 +98,12 @@ describe('Audit Log WebSocket Stream', () => { beforeEach(async () => { vi.stubEnv('BOT_API_SECRET', TEST_SECRET); + // Use short auth timeout for faster tests (100ms instead of 10s) + vi.stubEnv('AUDIT_STREAM_AUTH_TIMEOUT_MS', '100'); const result = await createTestServer(); httpServer = result.server; port = result.port; - setupAuditStream(httpServer); + await setupAuditStream(httpServer); }); afterEach(async () => { @@ -121,10 +123,10 @@ describe('Audit Log WebSocket Stream', () => { it('should close unauthenticated clients after auth timeout', async () => { const ws = await connectWs(port); - // We don't send auth — wait for close with code 4001 - const code = await waitForClose(ws, 12_000); + // We don't send auth — wait for close with code 4001 (uses 100ms timeout from env) + const code = await waitForClose(ws, 500); expect(code).toBe(4001); - }, 15_000); + }, 2_000); // ─── Authentication ───────────────────────────────────────────────────────