Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions migrations/004_performance_indexes.cjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
* Migration 004: Performance Indexes
*
* Adds missing composite indexes and a pg_trgm GIN index to resolve:
*
* 1. ai_feedback trend queries β€” getFeedbackTrend() filters by guild_id AND
* created_at but only had a single-column guild_id index, forcing a full
* guild scan + sort for every trend call.
*
* 2. conversations ILIKE search β€” content ILIKE '%...%' is a seq-scan
* without pg_trgm. Installing the extension + GIN index reduces search from
* O(n) to O(log n * trigram matches).
*
* 3. conversations(guild_id, created_at) β€” The default 30-day listing query
* (WHERE guild_id = $1 AND created_at >= $2 ORDER BY created_at DESC)
* benefits from a dedicated 2-column index over the existing 3-column
* (guild_id, channel_id, created_at) composite when channel_id is not filtered.
*
* 4. flagged_messages(guild_id, message_id) β€” POST /flag and the detail
* endpoint both do WHERE guild_id = $1 AND message_id = ANY($2) which
* the existing (guild_id, status) index cannot serve efficiently.
*/

/** @param {import('node-pg-migrate').MigrationBuilder} pgm */
exports.up = (pgm) => {
// ai_feedback: composite for trend + recent queries
// getFeedbackTrend: WHERE guild_id = $1 AND created_at >= NOW() - INTERVAL ...
// getRecentFeedback: WHERE guild_id = $1 ORDER BY created_at DESC LIMIT $2
pgm.sql(`
CREATE INDEX IF NOT EXISTS idx_ai_feedback_guild_created
ON ai_feedback(guild_id, created_at DESC)
`);

// conversations: pg_trgm for ILIKE searches
// Enable the extension first (idempotent)
pgm.sql(`CREATE EXTENSION IF NOT EXISTS pg_trgm`);

// GIN index over content column -- supports col ILIKE '%term%' and col ~ 'pattern'
pgm.sql(`
CREATE INDEX IF NOT EXISTS idx_conversations_content_trgm
ON conversations USING gin(content gin_trgm_ops)
`);

// conversations: (guild_id, created_at) for default 30-day listing
// The existing idx_conversations_guild_channel_created covers (guild_id, channel_id, created_at)
// but queries that filter only by guild_id + date range skip the channel_id column,
// making this 2-column index cheaper to scan.
pgm.sql(`
CREATE INDEX IF NOT EXISTS idx_conversations_guild_created
ON conversations(guild_id, created_at DESC)
`);

// flagged_messages: (guild_id, message_id) for detail + flag endpoints
// Used by:
// GET /:conversationId -> WHERE guild_id = $1 AND message_id = ANY($2)
// POST /:conversationId/flag -> msgCheck + anchorCheck in parallel
pgm.sql(`
CREATE INDEX IF NOT EXISTS idx_flagged_messages_guild_message
ON flagged_messages(guild_id, message_id)
`);
};

/** @param {import('node-pg-migrate').MigrationBuilder} pgm */
exports.down = (pgm) => {
pgm.sql(`DROP INDEX IF EXISTS idx_flagged_messages_guild_message`);
pgm.sql(`DROP INDEX IF EXISTS idx_conversations_guild_created`);
pgm.sql(`DROP INDEX IF EXISTS idx_conversations_content_trgm`);
pgm.sql(`DROP INDEX IF EXISTS idx_ai_feedback_guild_created`);
// Note: do NOT drop pg_trgm extension on down -- it may be used elsewhere.
};
161 changes: 95 additions & 66 deletions src/api/routes/conversations.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,41 +90,6 @@ export function groupMessagesIntoConversations(rows) {
return conversations;
}

/**
* Build a conversation summary from grouped messages.
*
* @param {Object} convo - Grouped conversation object
* @param {import('discord.js').Guild} [guild] - Optional guild for channel name resolution
* @returns {Object} Conversation summary
*/
function buildConversationSummary(convo, guild) {
const participantMap = new Map();
for (const msg of convo.messages) {
const key = `${msg.username || 'unknown'}-${msg.role}`;
if (!participantMap.has(key)) {
participantMap.set(key, { username: msg.username || 'unknown', role: msg.role });
}
}

const firstMsg = convo.messages[0];
const preview = firstMsg?.content
? firstMsg.content.slice(0, 100) + (firstMsg.content.length > 100 ? '…' : '')
: '';

const channelName = guild?.channels?.cache?.get(convo.channelId)?.name || null;

return {
id: convo.id,
channelId: convo.channelId,
channelName,
participants: Array.from(participantMap.values()),
messageCount: convo.messages.length,
firstMessageAt: new Date(convo.firstTime).toISOString(),
lastMessageAt: new Date(convo.lastTime).toISOString(),
preview,
};
}

// ─── GET / β€” List conversations (grouped) ─────────────────────────────────────

/**
Expand Down Expand Up @@ -245,7 +210,7 @@ router.get('/', conversationsRateLimit, requireGuildAdmin, validateGuild, async
return res.status(503).json({ error: 'Database not available' });
}

const { page, limit } = parsePagination(req.query);
const { page, limit, offset } = parsePagination(req.query);
const guildId = req.params.id;

try {
Expand All @@ -256,6 +221,7 @@ router.get('/', conversationsRateLimit, requireGuildAdmin, validateGuild, async

if (req.query.search && typeof req.query.search === 'string') {
paramIndex++;
// Uses idx_conversations_content_trgm (GIN/trgm) added in migration 004
whereParts.push(`content ILIKE $${paramIndex}`);
values.push(`%${escapeIlike(req.query.search)}%`);
}
Expand Down Expand Up @@ -301,30 +267,94 @@ router.get('/', conversationsRateLimit, requireGuildAdmin, validateGuild, async

const whereClause = whereParts.join(' AND ');

// Fetch matching messages for grouping (capped at 10000 rows to prevent memory exhaustion)
// Time-based grouping requires sorted rows; paginate after grouping
// Add pagination params after all WHERE params
const limitParam = paramIndex + 1;
const offsetParam = paramIndex + 2;
values.push(limit, offset);

// SQL-based conversation grouping via window functions.
// Eliminates the previous approach of fetching up to 10,000 rows into Node
// memory and grouping/paginating in JavaScript.
//
// CTE breakdown:
// lag_step β€” compute gap from previous message in same channel
// numbered β€” assign cumulative conversation number per channel
// summaries β€” aggregate each (channel, conv_num) into a summary row
//
// COUNT(*) OVER () gives total conversation count without a second query.
// Pagination happens at the DB level via LIMIT/OFFSET on the summary rows.
const result = await dbPool.query(
`SELECT id, channel_id, role, content, username, created_at
`WITH lag_step AS (
SELECT
id, channel_id, username, role, content, created_at,
CASE
WHEN LAG(created_at) OVER (PARTITION BY channel_id ORDER BY created_at) IS NULL
OR EXTRACT(EPOCH FROM (
created_at
- LAG(created_at) OVER (PARTITION BY channel_id ORDER BY created_at)
)) > ${CONVERSATION_GAP_MINUTES * 60}
THEN 1 ELSE 0
END AS is_conv_start
FROM conversations
WHERE ${whereClause}
ORDER BY created_at DESC
LIMIT 10000 -- capped to prevent runaway memory; 30-day default window keeps this reasonable`,
),
numbered AS (
SELECT *,
SUM(is_conv_start)
OVER (PARTITION BY channel_id ORDER BY created_at) AS conv_num
FROM lag_step
),
summaries AS (
SELECT
channel_id,
conv_num,
MIN(id)::int AS id,
MIN(created_at) AS first_msg_time,
MAX(created_at) AS last_msg_time,
COUNT(*)::int AS message_count,
(ARRAY_AGG(content ORDER BY created_at))[1] AS preview_content,
ARRAY_AGG(DISTINCT
COALESCE(username, 'unknown') || ':::' || role
) AS participant_pairs
FROM numbered
GROUP BY channel_id, conv_num
)
SELECT
id, channel_id, first_msg_time, last_msg_time,
message_count, preview_content, participant_pairs,
COUNT(*) OVER ()::int AS total_conversations
FROM summaries
ORDER BY last_msg_time DESC
LIMIT $${limitParam} OFFSET $${offsetParam}`,
values,
);

// Reverse to ASC order so groupMessagesIntoConversations sees chronological messages.
// Fetching DESC first ensures we get the most recent 10k rows, not the oldest.
result.rows.reverse();
const allConversations = groupMessagesIntoConversations(result.rows);
const total = allConversations.length;
const total = result.rows[0]?.total_conversations ?? 0;

// Paginate grouped conversations
const startIdx = (page - 1) * limit;
const paginatedConversations = allConversations.slice(startIdx, startIdx + limit);
const conversations = result.rows.map((row) => {
const content = row.preview_content || '';
const preview = content.slice(0, 100) + (content.length > 100 ? '\u2026' : '');
const channelName = req.guild?.channels?.cache?.get(row.channel_id)?.name || null;

const conversations = paginatedConversations.map((convo) =>
buildConversationSummary(convo, req.guild),
);
// Parse participant_pairs encoded as "username:::role"
const participants = (row.participant_pairs || []).map((p) => {
const sepIdx = p.lastIndexOf(':::');
return sepIdx === -1
? { username: p, role: 'unknown' }
: { username: p.slice(0, sepIdx), role: p.slice(sepIdx + 3) };
});

return {
id: row.id,
channelId: row.channel_id,
channelName,
participants,
messageCount: row.message_count,
firstMessageAt: new Date(row.first_msg_time).toISOString(),
lastMessageAt: new Date(row.last_msg_time).toISOString(),
preview,
};
});

res.json({ conversations, total, page });
} catch (err) {
Expand Down Expand Up @@ -975,25 +1005,24 @@ router.post(
}

try {
// Verify the message exists and belongs to this guild
const msgCheck = await dbPool.query(
'SELECT id, channel_id, created_at FROM conversations WHERE id = $1 AND guild_id = $2',
[messageId, guildId],
);
// Run both verification lookups in parallel β€” they are independent queries.
// msgCheck verifies the target message exists in this guild.
// anchorCheck verifies the conversation anchor exists in this guild.
const [msgCheck, anchorCheck] = await Promise.all([
dbPool.query(
'SELECT id, channel_id, created_at FROM conversations WHERE id = $1 AND guild_id = $2',
[messageId, guildId],
),
dbPool.query(
'SELECT id, channel_id, created_at FROM conversations WHERE id = $1 AND guild_id = $2',
[conversationId, guildId],
),
]);

if (msgCheck.rows.length === 0) {
return res.status(404).json({ error: 'Message not found' });
}

// Verify that the message belongs to the specified conversation.
// We check by confirming that the anchor message (conversationId) and
// the flagged message share the same channel and that the flagged
// message falls within the 2-hour fetch window used by the detail endpoint.
const anchorCheck = await dbPool.query(
'SELECT id, channel_id, created_at FROM conversations WHERE id = $1 AND guild_id = $2',
[conversationId, guildId],
);

if (anchorCheck.rows.length === 0) {
return res.status(404).json({ error: 'Conversation not found' });
}
Expand Down
27 changes: 11 additions & 16 deletions tests/api/routes/conversations.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -298,25 +298,19 @@ describe('conversations routes', () => {

it('should return paginated conversations', async () => {
const baseTime = new Date('2024-01-15T10:00:00Z');
// Mock returns rows in DESC order (newest first), matching ORDER BY created_at DESC.
// The route reverses them before grouping so the conversation anchor is still the oldest message.
// The new SQL CTE returns pre-aggregated conversation summary rows.
// Each row represents a single conversation (not individual messages).
mockPool.query.mockResolvedValueOnce({
rows: [
{
id: 2,
channel_id: 'ch1',
role: 'assistant',
content: 'Hi there!',
username: 'bot',
created_at: new Date(baseTime.getTime() + 60000).toISOString(),
},
{
id: 1,
channel_id: 'ch1',
role: 'user',
content: 'Hello world',
username: 'alice',
created_at: baseTime.toISOString(),
first_msg_time: baseTime.toISOString(),
last_msg_time: new Date(baseTime.getTime() + 60000).toISOString(),
message_count: 2,
preview_content: 'Hello world',
participant_pairs: ['alice:::user', 'bot:::assistant'],
total_conversations: 1,
},
],
});
Expand All @@ -325,15 +319,16 @@ describe('conversations routes', () => {

expect(res.status).toBe(200);
expect(res.body).toHaveProperty('conversations');
expect(res.body).toHaveProperty('total');
expect(res.body).toHaveProperty('total', 1);
expect(res.body).toHaveProperty('page');
expect(res.body.conversations).toHaveLength(1);
expect(res.body.conversations[0]).toHaveProperty('id', 1);
expect(res.body.conversations[0]).toHaveProperty('channelId', 'ch1');
expect(res.body.conversations[0]).toHaveProperty('channelName', 'general');
expect(res.body.conversations[0]).toHaveProperty('messageCount', 2);
expect(res.body.conversations[0]).toHaveProperty('preview');
expect(res.body.conversations[0]).toHaveProperty('preview', 'Hello world');
expect(res.body.conversations[0].participants).toBeInstanceOf(Array);
expect(res.body.conversations[0].participants).toHaveLength(2);
});

it('should support search query', async () => {
Expand Down
Loading