Skip to content

Commit 130e88d

Browse files
authored
chore: proxy improvements (#3121)
* introduce global interceptors * move proxy stuff to new folder * implement resp framer * properly handle request/response and push * add global interceptor
1 parent 96a8a84 commit 130e88d

File tree

7 files changed

+1347
-201
lines changed

7 files changed

+1347
-201
lines changed

packages/test-utils/lib/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import { hideBin } from 'yargs/helpers';
2626
import * as fs from 'node:fs';
2727
import * as os from 'node:os';
2828
import * as path from 'node:path';
29-
import { RedisProxy, getFreePortNumber } from './redis-proxy';
29+
import { RedisProxy, getFreePortNumber } from './proxy/redis-proxy';
3030

3131
interface TestUtilsConfig {
3232
/**
Lines changed: 315 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,315 @@
1+
import { strict as assert } from 'node:assert';
2+
import { Buffer } from 'node:buffer';
3+
import { testUtils, GLOBAL } from '../test-utils';
4+
import { InterceptorDescription, RedisProxy } from './redis-proxy';
5+
import type { RedisClientType } from '@redis/client/lib/client/index.js';
6+
7+
describe('RedisSocketProxy', function () {
8+
testUtils.testWithClient('basic proxy functionality', async (client: RedisClientType<any, any, any, any, any>) => {
9+
const socketOptions = client?.options?.socket;
10+
//@ts-ignore
11+
assert(socketOptions?.port, 'Test requires a TCP connection to Redis');
12+
13+
const proxyPort = 50000 + Math.floor(Math.random() * 10000);
14+
const proxy = new RedisProxy({
15+
listenHost: '127.0.0.1',
16+
listenPort: proxyPort,
17+
//@ts-ignore
18+
targetPort: socketOptions.port,
19+
//@ts-ignore
20+
targetHost: socketOptions.host || '127.0.0.1',
21+
enableLogging: true
22+
});
23+
24+
const proxyEvents = {
25+
connections: [] as any[],
26+
dataTransfers: [] as any[]
27+
};
28+
29+
proxy.on('connection', (connectionInfo) => {
30+
proxyEvents.connections.push(connectionInfo);
31+
});
32+
33+
proxy.on('data', (connectionId, direction, data) => {
34+
proxyEvents.dataTransfers.push({ connectionId, direction, dataLength: data.length });
35+
});
36+
37+
try {
38+
await proxy.start();
39+
40+
const proxyClient = client.duplicate({
41+
socket: {
42+
port: proxyPort,
43+
host: '127.0.0.1'
44+
},
45+
});
46+
47+
await proxyClient.connect();
48+
49+
const stats = proxy.getStats();
50+
assert.equal(stats.activeConnections, 1, 'Should have one active connection');
51+
assert.equal(proxyEvents.connections.length, 1, 'Should have recorded one connection event');
52+
53+
const pingResult = await proxyClient.ping();
54+
assert.equal(pingResult, 'PONG', 'Client should be able to communicate with Redis through the proxy');
55+
56+
const clientToServerTransfers = proxyEvents.dataTransfers.filter(t => t.direction === 'client->server');
57+
const serverToClientTransfers = proxyEvents.dataTransfers.filter(t => t.direction === 'server->client');
58+
59+
assert(clientToServerTransfers.length > 0, 'Should have client->server data transfers');
60+
assert(serverToClientTransfers.length > 0, 'Should have server->client data transfers');
61+
62+
const testKey = `test:proxy:${Date.now()}`;
63+
const testValue = 'proxy-test-value';
64+
65+
await proxyClient.set(testKey, testValue);
66+
const retrievedValue = await proxyClient.get(testKey);
67+
assert.equal(retrievedValue, testValue, 'Should be able to set and get values through proxy');
68+
69+
proxyClient.destroy();
70+
71+
72+
} finally {
73+
await proxy.stop();
74+
}
75+
}, GLOBAL.SERVERS.OPEN_RESP_3);
76+
77+
testUtils.testWithProxiedClient('custom message injection via proxy client',
78+
async (proxiedClient: RedisClientType<any, any, any, any, any>, proxy: RedisProxy) => {
79+
const customMessageTransfers: any[] = [];
80+
81+
proxy.on('data', (connectionId, direction, data) => {
82+
if (direction === 'server->client') {
83+
customMessageTransfers.push({ connectionId, dataLength: data.length, data });
84+
}
85+
});
86+
87+
88+
const stats = proxy.getStats();
89+
assert.equal(stats.activeConnections, 1, 'Should have one active connection');
90+
91+
// Send a resp3 push
92+
const customMessage = Buffer.from('>4\r\n$6\r\nMOVING\r\n:1\r\n:2\r\n$6\r\nhost:3\r\n');
93+
94+
const sendResults = proxy.sendToAllClients(customMessage);
95+
assert.equal(sendResults.length, 1, 'Should send to one client');
96+
assert.equal(sendResults[0].success, true, 'Custom message send should succeed');
97+
98+
99+
const customMessageFound = customMessageTransfers.find(transfer =>
100+
transfer.dataLength === customMessage.length
101+
);
102+
assert(customMessageFound, 'Should have recorded the custom message transfer');
103+
104+
assert.equal(customMessageFound.dataLength, customMessage.length,
105+
'Custom message length should match');
106+
107+
const pingResult = await proxiedClient.ping();
108+
assert.equal(pingResult, 'PONG', 'Client should be able to communicate with Redis through the proxy');
109+
110+
}, GLOBAL.SERVERS.OPEN_RESP_3);
111+
112+
describe("Middleware", () => {
113+
testUtils.testWithProxiedClient(
114+
"Modify request/response via middleware",
115+
async (
116+
proxiedClient: RedisClientType<any, any, any, any, any>,
117+
proxy: RedisProxy,
118+
) => {
119+
120+
// Intercept PING commands and modify the response
121+
const pingInterceptor: InterceptorDescription = {
122+
name: `ping`,
123+
fn: async (data, next) => {
124+
if (data.includes('PING')) {
125+
return Buffer.from("+PINGINTERCEPTED\r\n");
126+
}
127+
return next(data);
128+
}
129+
};
130+
131+
// Only intercept GET responses and double numeric values
132+
// Does not modify other commands or non-numeric GET responses
133+
const doubleNumberGetInterceptor: InterceptorDescription = {
134+
name: `double-number-get`,
135+
fn: async (data, next) => {
136+
const response = await next(data);
137+
138+
// Not a GET command, return original response
139+
if (!data.includes("GET")) return response;
140+
141+
const value = (response.toString().split("\r\n"))[1];
142+
const number = Number(value);
143+
// Not a number, return original response
144+
if(isNaN(number)) return response;
145+
146+
const doubled = String(number * 2);
147+
return Buffer.from(`$${doubled.length}\r\n${doubled}\r\n`);
148+
}
149+
};
150+
151+
proxy.setGlobalInterceptors([ pingInterceptor, doubleNumberGetInterceptor ])
152+
153+
const pingResponse = await proxiedClient.ping();
154+
assert.equal(pingResponse, 'PINGINTERCEPTED', 'Response should be modified by middleware');
155+
156+
await proxiedClient.set('foo', 1);
157+
const getResponse1 = await proxiedClient.get('foo');
158+
assert.equal(getResponse1, '2', 'GET response should be doubled for numbers by middleware');
159+
160+
await proxiedClient.set('bar', 'Hi');
161+
const getResponse2 = await proxiedClient.get('bar');
162+
assert.equal(getResponse2, 'Hi', 'GET response should not be modified for strings by middleware');
163+
164+
await proxiedClient.hSet('baz', 'foo', 'dictvalue');
165+
const hgetResponse = await proxiedClient.hGet('baz', 'foo');
166+
assert.equal(hgetResponse, 'dictvalue', 'HGET response should not be modified by middleware');
167+
168+
},
169+
GLOBAL.SERVERS.OPEN_RESP_3,
170+
);
171+
172+
testUtils.testWithProxiedClient(
173+
"Stats reflect middleware activity",
174+
async (
175+
proxiedClient: RedisClientType<any, any, any, any, any>,
176+
proxy: RedisProxy,
177+
) => {
178+
const PING = `ping`;
179+
const SKIPPED = `skipped`;
180+
proxy.setGlobalInterceptors([
181+
{
182+
name: PING,
183+
matchLimit: 3,
184+
fn: async (data, next, state) => {
185+
state.invokeCount++;
186+
if(state.matchCount === state.matchLimit) return next(data);
187+
if (data.includes("PING")) {
188+
state.matchCount++;
189+
return Buffer.from("+PINGINTERCEPTED\r\n");
190+
}
191+
return next(data);
192+
},
193+
},
194+
{
195+
name: SKIPPED,
196+
fn: async (data, next, state) => {
197+
state.invokeCount++;
198+
state.matchCount++;
199+
// This interceptor does not match anything
200+
return next(data);
201+
},
202+
},
203+
]);
204+
205+
await proxiedClient.ping();
206+
await proxiedClient.ping();
207+
await proxiedClient.ping();
208+
209+
let stats = proxy.getStats();
210+
let pingInterceptor = stats.globalInterceptors.find(
211+
(i) => i.name === PING,
212+
);
213+
assert.ok(pingInterceptor, "PING interceptor stats should be present");
214+
assert.equal(pingInterceptor.invokeCount, 3);
215+
assert.equal(pingInterceptor.matchCount, 3);
216+
217+
let skipInterceptor = stats.globalInterceptors.find(
218+
(i) => i.name === SKIPPED,
219+
);
220+
assert.ok(skipInterceptor, "SKIPPED interceptor stats should be present");
221+
assert.equal(skipInterceptor.invokeCount, 0);
222+
assert.equal(skipInterceptor.matchCount, 0);
223+
224+
await proxiedClient.set("foo", "bar");
225+
await proxiedClient.get("foo");
226+
227+
stats = proxy.getStats();
228+
pingInterceptor = stats.globalInterceptors.find(
229+
(i) => i.name === PING,
230+
);
231+
assert.ok(pingInterceptor, "PING interceptor stats should be present");
232+
assert.equal(pingInterceptor.invokeCount, 5);
233+
assert.equal(pingInterceptor.matchCount, 3);
234+
235+
await proxiedClient.ping();
236+
237+
stats = proxy.getStats();
238+
pingInterceptor = stats.globalInterceptors.find(
239+
(i) => i.name === PING,
240+
);
241+
assert.ok(pingInterceptor, "PING interceptor stats should be present");
242+
assert.equal(pingInterceptor.invokeCount, 6);
243+
assert.equal(pingInterceptor.matchCount, 3, 'Should not match more than limit');
244+
245+
skipInterceptor = stats.globalInterceptors.find(
246+
(i) => i.name === SKIPPED,
247+
);
248+
assert.ok(skipInterceptor, "PING interceptor stats should be present");
249+
assert.equal(skipInterceptor.invokeCount, 3);
250+
assert.equal(skipInterceptor.matchCount, 3);
251+
},
252+
GLOBAL.SERVERS.OPEN_RESP_3,
253+
);
254+
255+
testUtils.testWithProxiedClient(
256+
"Middleware is given exactly one RESP message at a time",
257+
async (
258+
proxiedClient: RedisClientType<any, any, any, any, any>,
259+
proxy: RedisProxy,
260+
) => {
261+
proxy.setGlobalInterceptors([
262+
{
263+
name: `ping`,
264+
fn: async (data, next, state) => {
265+
state.invokeCount++;
266+
if (data.equals(Buffer.from("*1\r\n$4\r\nPING\r\n"))) {
267+
state.matchCount++;
268+
}
269+
return next(data);
270+
},
271+
},
272+
]);
273+
274+
await Promise.all([proxiedClient.ping(), proxiedClient.ping()]);
275+
276+
const stats = proxy.getStats();
277+
const pingInterceptor = stats.globalInterceptors.find(
278+
(i) => i.name === `ping`,
279+
);
280+
assert.ok(pingInterceptor, "PING interceptor stats should be present");
281+
assert.equal(pingInterceptor.invokeCount, 2);
282+
assert.equal(pingInterceptor.matchCount, 2);
283+
},
284+
GLOBAL.SERVERS.OPEN_RESP_3,
285+
);
286+
287+
testUtils.testWithProxiedClient(
288+
"Proxy passes through push messages",
289+
async (
290+
proxiedClient: RedisClientType<any, any, any, any, any>,
291+
proxy: RedisProxy,
292+
) => {
293+
let resolve: (value: string) => void;
294+
const promise = new Promise((rs) => { resolve = rs; });
295+
await proxiedClient.subscribe("test-push-channel", (message) => {
296+
resolve(message);
297+
});
298+
299+
await proxiedClient.publish("test-push-channel", "hello");
300+
const result = await promise;
301+
assert.equal(result, "hello", "Should receive push message through proxy");
302+
},
303+
{
304+
...GLOBAL.SERVERS.OPEN_RESP_3,
305+
clientOptions: {
306+
maintNotifications: 'disabled',
307+
disableClientInfo: true,
308+
RESP: 3
309+
}
310+
},
311+
);
312+
});
313+
314+
315+
});

0 commit comments

Comments
 (0)