Skip to content

Commit 8df6ee2

Browse files
marcbachmannluin
authored andcommitted
feat: Pipeline-based script loading
1 parent bc1b168 commit 8df6ee2

File tree

9 files changed

+260
-216
lines changed

9 files changed

+260
-216
lines changed

lib/pipeline.ts

Lines changed: 1 addition & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import * as calculateSlot from "cluster-key-slot";
2-
import * as pMap from "p-map";
32
import { exists, hasFlag } from "redis-commands";
43
import asCallback from "standard-as-callback";
54
import { deprecate } from "util";
@@ -326,71 +325,9 @@ Pipeline.prototype.exec = function (
326325
}
327326
}
328327

329-
// Check whether scripts exists
330-
const scripts = [];
331-
for (let i = 0; i < this._queue.length; ++i) {
332-
const item = this._queue[i];
333-
334-
if (item.name !== "evalsha") {
335-
continue;
336-
}
337-
338-
const script = this._shaToScript[item.args[0]];
339-
340-
if (
341-
!script ||
342-
this.redis._addedScriptHashes[script.sha] ||
343-
scripts.includes(script)
344-
) {
345-
continue;
346-
}
347-
348-
scripts.push(script);
349-
}
350-
351328
const _this = this;
352-
if (!scripts.length) {
353-
return execPipeline();
354-
}
329+
execPipeline();
355330

356-
// In cluster mode, always load scripts before running the pipeline
357-
if (this.isCluster) {
358-
pMap(scripts, (script) => _this.redis.script("load", script.lua), {
359-
concurrency: 10,
360-
})
361-
.then(function () {
362-
for (let i = 0; i < scripts.length; i++) {
363-
_this.redis._addedScriptHashes[scripts[i].sha] = true;
364-
}
365-
})
366-
.then(execPipeline, this.reject);
367-
return this.promise;
368-
}
369-
370-
this.redis
371-
.script(
372-
"exists",
373-
scripts.map(({ sha }) => sha)
374-
)
375-
.then(function (results) {
376-
const pending = [];
377-
for (let i = 0; i < results.length; ++i) {
378-
if (!results[i]) {
379-
pending.push(scripts[i]);
380-
}
381-
}
382-
return Promise.all(
383-
pending.map(function (script) {
384-
return _this.redis.script("load", script.lua);
385-
})
386-
);
387-
})
388-
.then(function () {
389-
for (let i = 0; i < scripts.length; i++) {
390-
_this.redis._addedScriptHashes[scripts[i].sha] = true;
391-
}
392-
})
393-
.then(execPipeline, this.reject);
394331
return this.promise;
395332

396333
function execPipeline() {

lib/script.ts

Lines changed: 88 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -39,23 +39,97 @@ export default class Script {
3939
options.readOnly = true;
4040
}
4141

42-
const evalsha = new Command("evalsha", [this.sha].concat(args), options);
43-
evalsha.isCustomCommand = true;
44-
evalsha.promise = evalsha.promise.catch((err: Error) => {
45-
if (err.toString().indexOf("NOSCRIPT") === -1) {
46-
throw err;
47-
}
48-
const command = new Command("eval", [this.lua].concat(args), options);
49-
if (container.isPipeline === true) container.redis.sendCommand(command);
50-
else container.sendCommand(command);
51-
return command.promise;
42+
if (container.isPipeline) {
43+
return usingPipeline(this, container, args, options, callback);
44+
} else if (container.isCluster) {
45+
return usingCluster(this, container, args, options, callback);
46+
} else {
47+
return usingStandalone(this, container, args, options, callback);
48+
}
49+
}
50+
}
51+
52+
function createEvalShaWithFallback(script, args, options, client) {
53+
const evalsha = createEvalSha(script, args, options);
54+
evalsha.promise = evalsha.promise.catch((err: Error) => {
55+
if (err.toString().indexOf("NOSCRIPT") === -1) {
56+
throw err;
57+
}
58+
59+
// Do an eval as fallback, redis will hash and load it
60+
const evalcmd = new Command("eval", [script.lua].concat(args), options);
61+
evalcmd.promise = evalcmd.promise.then((r) => {
62+
client._addedScriptHashes[script.sha] = true;
63+
return r;
5264
});
65+
return client.sendCommand(evalcmd);
66+
});
67+
return evalsha;
68+
}
69+
70+
function createEvalSha(script, args, options) {
71+
const evalsha = new Command("evalsha", [script.sha].concat(args), options);
72+
evalsha.isCustomCommand = true;
73+
return evalsha;
74+
}
75+
76+
// Pipeline mode (cluster and regular)
77+
function usingPipeline(script, pipeline, args, options, callback) {
78+
// The script was loaded explicitly in this pipeline,
79+
// so we can directly execute evalsha without loading the script again
80+
if (pipeline._addedScriptHashes && pipeline._addedScriptHashes[script.sha]) {
81+
const evalsha = createEvalSha(script, args, options);
82+
asCallback(evalsha.promise, callback);
83+
return pipeline.sendCommand(evalsha);
84+
}
5385

86+
// The script is loaded in redis already, so we try an evalsha
87+
// and fallback to loading the script if it fails
88+
if (pipeline.redis._addedScriptHashes[script.sha]) {
89+
const evalsha = createEvalShaWithFallback(
90+
script,
91+
args,
92+
options,
93+
pipeline.redis
94+
);
5495
asCallback(evalsha.promise, callback);
96+
return pipeline.sendCommand(evalsha);
97+
}
98+
99+
// If the script is not present on redis or in the pipeline,
100+
// we use eval to load the script into the pipeline
101+
if (!pipeline._addedScriptHashes) pipeline._addedScriptHashes = {};
102+
pipeline._addedScriptHashes[script.sha] = true;
103+
104+
const evalcmd = new Command("eval", [script.lua].concat(args), options);
105+
evalcmd.promise = evalcmd.promise.then((r) => {
106+
pipeline.redis._addedScriptHashes[script.sha] = true;
107+
return r;
108+
});
109+
asCallback(evalcmd.promise, callback);
110+
return pipeline.sendCommand(evalcmd);
111+
}
112+
113+
// Standalone mode (cluster)
114+
function usingCluster(script, cluster, args, options, callback) {
115+
const evalsha = createEvalShaWithFallback(script, args, options, cluster);
116+
asCallback(evalsha.promise, callback);
117+
return cluster.sendCommand(evalsha);
118+
}
55119

56-
// The result here is one of
57-
// - a Promise when executed on the redis instance
58-
// - a pipeline instance in pipeline mode
59-
return container.sendCommand(evalsha);
120+
// Standalone mode (regular)
121+
function usingStandalone(script, redis, args, options, callback) {
122+
if (redis._addedScriptHashes[script.sha]) {
123+
const evalsha = createEvalShaWithFallback(script, args, options, redis);
124+
return asCallback(redis.sendCommand(evalsha), callback);
60125
}
126+
127+
const command = new Command("eval", [script.lua].concat(args), options);
128+
command.promise = command.promise.then((r) => {
129+
redis._addedScriptHashes[script.sha] = true;
130+
return r;
131+
});
132+
133+
asCallback(command.promise, callback);
134+
return redis.sendCommand(command);
61135
}

package-lock.json

Lines changed: 0 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
"lodash.defaults": "^4.2.0",
4040
"lodash.flatten": "^4.4.0",
4141
"lodash.isarguments": "^3.1.0",
42-
"p-map": "^2.1.0",
4342
"redis-commands": "1.7.0",
4443
"redis-errors": "^1.2.0",
4544
"redis-parser": "^3.0.0",

test/functional/auth.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,6 @@ describe("auth", function () {
200200
let errorEmited = false;
201201
const redis = new Redis({ port: 17379, password: "pass" });
202202
redis.on("error", function () {
203-
console.log("boop");
204203
errorEmited = true;
205204
});
206205
const stub = sinon.stub(console, "warn").callsFake((warn) => {

test/functional/cluster/autopipelining.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ describe("autoPipelining for cluster", function () {
7676
return "bar1";
7777
}
7878

79-
if (argv[0] === "evalsha") {
79+
if (argv[0] === "evalsha" || argv[0] === "eval") {
8080
return argv.slice(argv.length - 4);
8181
}
8282
});

test/functional/pipeline.ts

Lines changed: 58 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import Redis from "../../lib/Redis";
22
import { expect } from "chai";
33
import * as sinon from "sinon";
4+
import { getCommandsFromMonitor } from "../helpers/util";
45

56
describe("pipeline", function () {
67
it("should return correct result", function (done) {
@@ -275,7 +276,7 @@ describe("pipeline", function () {
275276
});
276277
});
277278

278-
it("should check and load uniq scripts only", function (done) {
279+
it("should check and load uniq scripts only", async function () {
279280
const redis = new Redis();
280281
redis.defineCommand("test", {
281282
numberOfKeys: 2,
@@ -286,49 +287,43 @@ describe("pipeline", function () {
286287
lua: "return {KEYS[1],ARGV[1]}",
287288
});
288289

289-
redis.once("ready", function () {
290-
const expectedCommands = [
291-
["script", "exists"],
292-
["script", "load", "return {KEYS[1],ARGV[1]}"],
293-
["script", "load", "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}"],
294-
["evalsha"],
295-
["evalsha"],
296-
["evalsha"],
297-
["evalsha"],
298-
["evalsha"],
299-
["evalsha"],
300-
];
301-
const expectedResults = [
302-
[null, ["a", "1"]],
303-
[null, ["b", "2"]],
304-
[null, ["k1", "k2", "v1", "v2"]],
305-
[null, ["k3", "k4", "v3", "v4"]],
306-
[null, ["c", "3"]],
307-
[null, ["k5", "k6", "v5", "v6"]],
308-
];
309-
redis.monitor(function (err, monitor) {
310-
monitor.on("monitor", function (_, command) {
311-
const expectedCommand = expectedCommands.shift();
312-
expectedCommand.forEach((arg, i) => expect(arg).to.eql(command[i]));
313-
if (!expectedCommands.length) {
314-
monitor.disconnect();
315-
redis.disconnect();
316-
done();
317-
}
290+
const expectedCommands = [
291+
["eval"],
292+
["evalsha"],
293+
["eval"],
294+
["evalsha"],
295+
["evalsha"],
296+
["evalsha"],
297+
];
298+
299+
const expectedResults = [
300+
[null, ["a", "1"]],
301+
[null, ["b", "2"]],
302+
[null, ["k1", "k2", "v1", "v2"]],
303+
[null, ["k3", "k4", "v3", "v4"]],
304+
[null, ["c", "3"]],
305+
[null, ["k5", "k6", "v5", "v6"]],
306+
];
307+
308+
const commands = await getCommandsFromMonitor(redis, 6, () => {
309+
return redis
310+
.pipeline()
311+
.echo("a", "1")
312+
.echo("b", "2")
313+
.test("k1", "k2", "v1", "v2")
314+
.test("k3", "k4", "v3", "v4")
315+
.echo("c", "3")
316+
.test("k5", "k6", "v5", "v6")
317+
.exec()
318+
.then((results) => {
319+
expect(results).to.eql(expectedResults);
318320
});
319-
const pipe = redis.pipeline();
320-
pipe
321-
.echo("a", "1")
322-
.echo("b", "2")
323-
.test("k1", "k2", "v1", "v2")
324-
.test("k3", "k4", "v3", "v4")
325-
.echo("c", "3")
326-
.test("k5", "k6", "v5", "v6")
327-
.exec(function (err, results) {
328-
expect(err).to.eql(null);
329-
expect(results).to.eql(expectedResults);
330-
});
331-
});
321+
});
322+
323+
redis.disconnect();
324+
325+
expectedCommands.forEach((expectedCommand, j) => {
326+
expectedCommand.forEach((arg, i) => expect(arg).to.eql(commands[j][i]));
332327
});
333328
});
334329

@@ -362,11 +357,10 @@ describe("pipeline", function () {
362357
lua: `return "Foo"`,
363358
});
364359

365-
const [[err, res]] = await redis
366-
.pipeline([["execafterreconnect"]])
367-
.exec();
368-
expect(err).to.equal(null);
369-
expect(res).to.equal("Foo");
360+
const preloadscript = await redis.pipeline().execafterreconnect().exec();
361+
362+
expect(preloadscript[0][0]).to.equal(null);
363+
expect(preloadscript[0][1]).to.equal("Foo");
370364

371365
const client = await redis.client("list").then((clients) => {
372366
const myInfo = clients
@@ -379,22 +373,24 @@ describe("pipeline", function () {
379373

380374
await redis2.script("flush");
381375
await redis2.client("kill", "addr", client);
382-
383-
const res2 = await redis
384-
.pipeline([
385-
["set", "foo", "bar"],
386-
["execafterreconnect"],
387-
["get", "foo"],
388-
])
389-
.exec();
390-
391-
expect(res2).to.deep.equal([
392-
[null, "OK"],
393-
[null, "Foo"],
394-
[null, "bar"],
395-
]);
376+
await redis.get("waitforready");
377+
378+
const commands = await getCommandsFromMonitor(redis2, 3, () => {
379+
return redis
380+
.pipeline([
381+
["set", "foo", "bar"],
382+
["execafterreconnect"],
383+
["get", "foo"],
384+
])
385+
.exec();
386+
});
396387
redis.disconnect();
397388
redis2.disconnect();
389+
390+
const expected = ["set", "eval", "get"];
391+
commands.forEach((c, i) => {
392+
expect(c[0]).to.equal(expected[i]);
393+
});
398394
});
399395
});
400396

0 commit comments

Comments
 (0)