Skip to content

Commit 4e2b9c6

Browse files
dariacodesMatsuuu
authored andcommitted
fix: Handle collaboration push old format (#26467)
1 parent 0d829e1 commit 4e2b9c6

3 files changed

Lines changed: 80 additions & 13 deletions

File tree

packages/cli/src/collaboration/__tests__/collaboration.state.test.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,5 +119,32 @@ describe('CollaborationState', () => {
119119
userId: 'user1',
120120
});
121121
});
122+
123+
it('should gracefully ignore old cache format and clean it up', async () => {
124+
// Arrange
125+
const now = new Date().toISOString();
126+
127+
mockCacheService.getHash.mockResolvedValueOnce({
128+
'old-user-uuid': '2026-02-26T21:23:36.318Z',
129+
newClientId: `new-user-uuid|${now}`,
130+
});
131+
132+
// Act
133+
const users = await collaborationState.getCollaborators(workflowId);
134+
135+
// Assert
136+
expect(users).toEqual([
137+
{
138+
clientId: 'newClientId',
139+
lastSeen: now,
140+
userId: 'new-user-uuid',
141+
},
142+
]);
143+
144+
expect(mockCacheService.deleteFromHash).toHaveBeenCalledWith(
145+
'collaboration:workflow',
146+
'old-user-uuid',
147+
);
148+
});
122149
});
123150
});

packages/cli/src/collaboration/collaboration.service.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import type { PushPayload } from '@n8n/api-types';
22
import type { User } from '@n8n/db';
33
import { UserRepository } from '@n8n/db';
4+
import { Logger } from '@n8n/backend-common';
45
import { Service } from '@n8n/di';
56
import { ErrorReporter } from 'n8n-core';
67
import type { Workflow } from 'n8n-workflow';
@@ -29,6 +30,7 @@ import { AccessService } from '@/services/access.service';
2930
@Service()
3031
export class CollaborationService {
3132
constructor(
33+
private readonly logger: Logger,
3234
private readonly errorReporter: ErrorReporter,
3335
private readonly push: Push,
3436
private readonly state: CollaborationState,
@@ -41,6 +43,13 @@ export class CollaborationService {
4143
try {
4244
await this.handleUserMessage(event.userId, event.pushRef, event.msg);
4345
} catch (error) {
46+
if (this.isTransientError(error)) {
47+
this.logger.debug('Transient infrastructure error in collaboration service', {
48+
error,
49+
});
50+
return;
51+
}
52+
4453
this.errorReporter.error(
4554
new UnexpectedError('Error handling CollaborationService push message', {
4655
extra: {
@@ -55,6 +64,15 @@ export class CollaborationService {
5564
});
5665
}
5766

67+
private isTransientError(error: unknown): error is NodeJS.ErrnoException {
68+
return (
69+
error instanceof Error &&
70+
'code' in error &&
71+
typeof error.code === 'string' &&
72+
['ECONNREFUSED', 'ETIMEDOUT', 'ENOTFOUND', 'ECONNRESET'].includes(error.code)
73+
);
74+
}
75+
5876
async handleUserMessage(userId: User['id'], clientId: string, msg: unknown) {
5977
const workflowMessage = await parseWorkflowMessage(msg);
6078

packages/cli/src/collaboration/collaboration.state.ts

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,12 @@ export class CollaborationState {
6363
return [];
6464
}
6565

66-
const activeCollaborators = this.cacheHashToCollaborators(cacheValue);
67-
const [expired, stillActive] = this.splitToExpiredAndStillActive(activeCollaborators);
66+
const { valid, invalid } = this.parseCacheHashToCollaborators(cacheValue);
67+
const [expired, stillActive] = this.splitToExpiredAndStillActive(valid);
6868

69-
if (expired.length > 0) {
70-
void this.removeExpiredCollaborators(workflowId, expired);
69+
const toRemove = [...expired, ...invalid];
70+
if (toRemove.length > 0) {
71+
void this.removeExpiredCollaborators(workflowId, toRemove);
7172
}
7273

7374
// Deduplicate by userId - keep the most recent entry for each user
@@ -113,15 +114,36 @@ export class CollaborationState {
113114
);
114115
}
115116

116-
private cacheHashToCollaborators(workflowCacheEntry: WorkflowCacheHash): CacheEntry[] {
117-
return Object.entries(workflowCacheEntry).map(([clientId, value]) => {
118-
const [userId, lastSeen] = value.split('|');
119-
return {
120-
userId,
121-
lastSeen,
122-
clientId,
123-
};
124-
});
117+
private parseCacheHashToCollaborators(workflowCacheEntry: WorkflowCacheHash): {
118+
valid: CacheEntry[];
119+
invalid: CacheEntry[];
120+
} {
121+
const valid: CacheEntry[] = [];
122+
const invalid: CacheEntry[] = [];
123+
124+
for (const [clientId, value] of Object.entries(workflowCacheEntry)) {
125+
const parts = value.split('|');
126+
127+
// Handle old format (pre-tab-scoped collaboration) where value was just a timestamp
128+
// Old: { "userId": "2026-02-26T21:23:36.318Z" }
129+
// New: { "clientId": "userId|2026-02-26T21:23:36.318Z" }
130+
if (parts.length === 1) {
131+
invalid.push({
132+
clientId,
133+
userId: '', // Not needed for deletion
134+
lastSeen: value,
135+
});
136+
} else {
137+
const [userId, lastSeen] = parts;
138+
valid.push({
139+
userId,
140+
lastSeen,
141+
clientId,
142+
});
143+
}
144+
}
145+
146+
return { valid, invalid };
125147
}
126148

127149
private hasSessionExpired(lastSeenString: Iso8601DateTimeString) {

0 commit comments

Comments
 (0)