Skip to content

Commit 6575aa5

Browse files
authored
account for RangeError: Array buffer allocation failed (#303)
## Why ```ts function replicateArrayBufferError() { try { const size = 2 * 1024 * 1024 * 1024; const buf = new ArrayBuffer(size); conn.send(buf); } catch (e) { console.error(e); } } replicateArrayBufferError(); console.log('you can try catch this!!!') ``` tldr; a failed allocation can be try-caught by a top-level handler (e.g. in react) and river can pretend nothing went wrong and construct another message by the time the next message comes around, its possible theres enough memory to allocate again and whoops!!! we skipped sending a message some more context on this error: https://groups.google.com/a/chromium.org/g/blink-dev/c/pZ9kld0LehA/m/Ho-SCn8tBAAJ ## What changed - assume codecs are fallible: wrap codec in an adapter (`CodecMessageAdapter`) to try catch toBuffer and fromBuffer operations - delete the session if codec fails, this should gc the message buffer for that session which should help memory pressure - we also expose `ProtocolError.MessageSendFailure` so the consumer can decide to shut down the world in this case too - modify ws connection to not throw when sending on `CLOSING` or `CLOSED` states (safe to discard as we have the sendbuffer) - actually read the result of .send() - added test to prevent regression - delete unused `messageFraming` code (we axed UDS a while ago) ## Versioning - [ ] Breaking protocol change - [x] Breaking ts/js API change <!-- Kind reminder to add tests and updated documentation if needed -->
1 parent a8fcedd commit 6575aa5

23 files changed

+519
-323
lines changed

__tests__/allocation.test.ts

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
import { beforeEach, describe, test, expect, vi, assert } from 'vitest';
2+
import { TestSetupHelpers, transports } from '../testUtil/fixtures/transports';
3+
import { BinaryCodec, Codec } from '../codec';
4+
import {
5+
advanceFakeTimersByHeartbeat,
6+
createPostTestCleanups,
7+
} from '../testUtil/fixtures/cleanup';
8+
import { createServer } from '../router/server';
9+
import { createClient } from '../router/client';
10+
import { TestServiceSchema } from '../testUtil/fixtures/services';
11+
import { waitFor } from '../testUtil/fixtures/cleanup';
12+
import { numberOfConnections, closeAllConnections } from '../testUtil';
13+
import { cleanupTransports } from '../testUtil/fixtures/cleanup';
14+
import { testFinishesCleanly } from '../testUtil/fixtures/cleanup';
15+
import { ProtocolError } from '../transport/events';
16+
17+
let isOom = false;
18+
// simulate RangeError: Array buffer allocation failed
19+
const OomableCodec: Codec = {
20+
toBuffer(obj) {
21+
if (isOom) {
22+
throw new RangeError('failed allocation');
23+
}
24+
25+
return BinaryCodec.toBuffer(obj);
26+
},
27+
fromBuffer: (buff: Uint8Array) => {
28+
return BinaryCodec.fromBuffer(buff);
29+
},
30+
};
31+
32+
describe.each(transports)(
33+
'failed allocation test ($name transport)',
34+
async (transport) => {
35+
const clientOpts = { codec: OomableCodec };
36+
const serverOpts = { codec: BinaryCodec };
37+
38+
const { addPostTestCleanup, postTestCleanup } = createPostTestCleanups();
39+
let getClientTransport: TestSetupHelpers['getClientTransport'];
40+
let getServerTransport: TestSetupHelpers['getServerTransport'];
41+
beforeEach(async () => {
42+
// only allow client to oom, server has sane oom handling already
43+
const setup = await transport.setup({
44+
client: clientOpts,
45+
server: serverOpts,
46+
});
47+
getClientTransport = setup.getClientTransport;
48+
getServerTransport = setup.getServerTransport;
49+
isOom = false;
50+
51+
return async () => {
52+
await postTestCleanup();
53+
await setup.cleanup();
54+
};
55+
});
56+
57+
test('oom during heartbeat kills the session, client starts new session', async () => {
58+
// setup
59+
const clientTransport = getClientTransport('client');
60+
const serverTransport = getServerTransport();
61+
const services = { test: TestServiceSchema };
62+
const server = createServer(serverTransport, services);
63+
const client = createClient<typeof services>(
64+
clientTransport,
65+
serverTransport.clientId,
66+
);
67+
68+
const errMock = vi.fn();
69+
clientTransport.addEventListener('protocolError', errMock);
70+
addPostTestCleanup(async () => {
71+
clientTransport.removeEventListener('protocolError', errMock);
72+
await cleanupTransports([clientTransport, serverTransport]);
73+
});
74+
75+
// establish initial connection
76+
const result = await client.test.add.rpc({ n: 1 });
77+
expect(result).toStrictEqual({ ok: true, payload: { result: 1 } });
78+
79+
await waitFor(() => expect(numberOfConnections(serverTransport)).toBe(1));
80+
await waitFor(() => expect(numberOfConnections(clientTransport)).toBe(1));
81+
const oldClientSession = serverTransport.sessions.get('client');
82+
const oldServerSession = clientTransport.sessions.get('SERVER');
83+
assert(oldClientSession);
84+
assert(oldServerSession);
85+
86+
// simulate some OOM during heartbeat
87+
for (let i = 0; i < 5; i++) {
88+
isOom = i % 2 === 0;
89+
await advanceFakeTimersByHeartbeat();
90+
}
91+
92+
// verify session on client is dead
93+
await waitFor(() => expect(clientTransport.sessions.size).toBe(0));
94+
95+
// verify we got MessageSendFailure errors
96+
await waitFor(() => {
97+
expect(errMock).toHaveBeenCalledWith(
98+
expect.objectContaining({
99+
type: ProtocolError.MessageSendFailure,
100+
}),
101+
);
102+
});
103+
104+
// client should be able to reconnect and make new calls
105+
isOom = false;
106+
const result2 = await client.test.add.rpc({ n: 2 });
107+
expect(result2).toStrictEqual({ ok: true, payload: { result: 3 } });
108+
109+
// verify new session IDs are different from old ones
110+
const newClientSession = serverTransport.sessions.get('client');
111+
const newServerSession = clientTransport.sessions.get('SERVER');
112+
assert(newClientSession);
113+
assert(newServerSession);
114+
expect(newClientSession.id).not.toBe(oldClientSession.id);
115+
expect(newServerSession.id).not.toBe(oldServerSession.id);
116+
117+
await testFinishesCleanly({
118+
clientTransports: [clientTransport],
119+
serverTransport,
120+
server,
121+
});
122+
});
123+
124+
test('oom during handshake kills the session, client starts new session', async () => {
125+
// setup
126+
const clientTransport = getClientTransport('client');
127+
const serverTransport = getServerTransport();
128+
const services = { test: TestServiceSchema };
129+
const server = createServer(serverTransport, services);
130+
const client = createClient<typeof services>(
131+
clientTransport,
132+
serverTransport.clientId,
133+
);
134+
const errMock = vi.fn();
135+
clientTransport.addEventListener('protocolError', errMock);
136+
addPostTestCleanup(async () => {
137+
clientTransport.removeEventListener('protocolError', errMock);
138+
await cleanupTransports([clientTransport, serverTransport]);
139+
});
140+
141+
// establish initial connection
142+
await client.test.add.rpc({ n: 1 });
143+
await waitFor(() => expect(numberOfConnections(serverTransport)).toBe(1));
144+
await waitFor(() => expect(numberOfConnections(clientTransport)).toBe(1));
145+
146+
// close connection to force reconnection
147+
closeAllConnections(clientTransport);
148+
await waitFor(() => expect(numberOfConnections(serverTransport)).toBe(0));
149+
await waitFor(() => expect(numberOfConnections(clientTransport)).toBe(0));
150+
151+
// simulate OOM during handshake
152+
isOom = true;
153+
clientTransport.connect('SERVER');
154+
await waitFor(() => expect(numberOfConnections(serverTransport)).toBe(0));
155+
await waitFor(() => expect(numberOfConnections(clientTransport)).toBe(0));
156+
157+
await waitFor(() => {
158+
expect(errMock).toHaveBeenCalledWith(
159+
expect.objectContaining({
160+
type: ProtocolError.MessageSendFailure,
161+
}),
162+
);
163+
});
164+
165+
// client should be able to reconnect and make new calls
166+
isOom = false;
167+
const result = await client.test.add.rpc({ n: 2 });
168+
expect(result).toStrictEqual({ ok: true, payload: { result: 3 } });
169+
170+
await testFinishesCleanly({
171+
clientTransports: [clientTransport],
172+
serverTransport,
173+
server,
174+
});
175+
});
176+
},
177+
);

codec/adapter.ts

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import { Value } from '@sinclair/typebox/value';
2+
import {
3+
OpaqueTransportMessage,
4+
OpaqueTransportMessageSchema,
5+
} from '../transport';
6+
import { Codec } from './types';
7+
import { DeserializeResult, SerializeResult } from '../transport/results';
8+
import { coerceErrorString } from '../transport/stringifyError';
9+
10+
/**
11+
* Adapts a {@link Codec} to the {@link OpaqueTransportMessage} format,
12+
* accounting for fallibility of toBuffer and fromBuffer and wrapping
13+
* it with a Result type.
14+
*/
15+
export class CodecMessageAdapter {
16+
constructor(private readonly codec: Codec) {}
17+
18+
toBuffer(msg: OpaqueTransportMessage): SerializeResult {
19+
try {
20+
return {
21+
ok: true,
22+
value: this.codec.toBuffer(msg),
23+
};
24+
} catch (e) {
25+
return {
26+
ok: false,
27+
reason: coerceErrorString(e),
28+
};
29+
}
30+
}
31+
32+
fromBuffer(buf: Uint8Array): DeserializeResult {
33+
try {
34+
const parsedMsg = this.codec.fromBuffer(buf);
35+
if (!Value.Check(OpaqueTransportMessageSchema, parsedMsg)) {
36+
return {
37+
ok: false,
38+
reason: 'transport message schema mismatch',
39+
};
40+
}
41+
42+
return {
43+
ok: true,
44+
value: parsedMsg,
45+
};
46+
} catch (e) {
47+
return {
48+
ok: false,
49+
reason: coerceErrorString(e),
50+
};
51+
}
52+
}
53+
}

codec/binary.ts

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,11 @@ export const BinaryCodec: Codec = {
1010
return encode(obj, { ignoreUndefined: true });
1111
},
1212
fromBuffer: (buff: Uint8Array) => {
13-
try {
14-
const res = decode(buff);
15-
if (typeof res !== 'object') {
16-
return null;
17-
}
18-
19-
return res;
20-
} catch {
21-
return null;
13+
const res = decode(buff);
14+
if (typeof res !== 'object' || res === null) {
15+
throw new Error('unpacked msg is not an object');
2216
}
17+
18+
return res;
2319
},
2420
};

codec/codec.test.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,10 @@ describe.each(codecs)('codec -- $name', ({ codec }) => {
4141
expect(codec.fromBuffer(codec.toBuffer(msg))).toStrictEqual(msg);
4242
});
4343

44-
test('invalid json returns null', () => {
45-
expect(codec.fromBuffer(Buffer.from(''))).toBeNull();
46-
expect(codec.fromBuffer(Buffer.from('['))).toBeNull();
47-
expect(codec.fromBuffer(Buffer.from('[{}'))).toBeNull();
48-
expect(codec.fromBuffer(Buffer.from('{"a":1}[]'))).toBeNull();
44+
test('invalid json throws', () => {
45+
expect(() => codec.fromBuffer(Buffer.from(''))).toThrow();
46+
expect(() => codec.fromBuffer(Buffer.from('['))).toThrow();
47+
expect(() => codec.fromBuffer(Buffer.from('[{}'))).toThrow();
48+
expect(() => codec.fromBuffer(Buffer.from('{"a":1}[]'))).toThrow();
4949
});
5050
});

codec/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
export { BinaryCodec } from './binary';
22
export { NaiveJsonCodec } from './json';
33
export type { Codec } from './types';
4+
export { CodecMessageAdapter } from './adapter';

codec/json.ts

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -48,23 +48,21 @@ export const NaiveJsonCodec: Codec = {
4848
);
4949
},
5050
fromBuffer: (buff: Uint8Array) => {
51-
try {
52-
const parsed = JSON.parse(
53-
decoder.decode(buff),
54-
function reviver(_key, val: unknown) {
55-
if ((val as Base64EncodedValue | undefined)?.$t) {
56-
return base64ToUint8Array((val as Base64EncodedValue).$t);
57-
} else {
58-
return val;
59-
}
60-
},
61-
) as unknown;
62-
63-
if (typeof parsed === 'object') return parsed;
51+
const parsed = JSON.parse(
52+
decoder.decode(buff),
53+
function reviver(_key, val: unknown) {
54+
if ((val as Base64EncodedValue | undefined)?.$t) {
55+
return base64ToUint8Array((val as Base64EncodedValue).$t);
56+
} else {
57+
return val;
58+
}
59+
},
60+
) as unknown;
6461

65-
return null;
66-
} catch {
67-
return null;
62+
if (typeof parsed !== 'object' || parsed === null) {
63+
throw new Error('unpacked msg is not an object');
6864
}
65+
66+
return parsed;
6967
},
7068
};

codec/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@ export interface Codec {
1414
* @param buf - The Uint8 buffer to decode.
1515
* @returns The decoded object, or null if decoding failed.
1616
*/
17-
fromBuffer(buf: Uint8Array): object | null;
17+
fromBuffer(buf: Uint8Array): object;
1818
}

package-lock.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "@replit/river",
33
"description": "It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!",
4-
"version": "0.207.2",
4+
"version": "0.207.3",
55
"type": "module",
66
"exports": {
77
".": {

0 commit comments

Comments
 (0)