Skip to content

Commit 912041a

Browse files
authored
Use promise API for abstract-level 2 compatibility (#18)
Both abstract-level 1 and abstract-level 2 are now supported.
1 parent 6767696 commit 912041a

4 files changed

Lines changed: 119 additions & 65 deletions

File tree

.github/workflows/test.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ jobs:
77
runs-on: ubuntu-latest
88
strategy:
99
matrix:
10-
node: [12, 14, 16]
10+
# TODO: drop < 18 in next major
11+
node: [12, 14, 16, 18, 20, 22]
1112
name: Node ${{ matrix.node }}
1213
steps:
1314
- name: Checkout

index.js

Lines changed: 55 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@
33
const { Readable } = require('readable-stream')
44

55
const kIterator = Symbol('iterator')
6+
const kPromises = Symbol('promises')
67
const kNextv = Symbol('nextv')
8+
const kNextvLegacy = Symbol('nextvLegacy')
9+
const kDestroy = Symbol('destroy')
710

811
class LevelReadStream extends Readable {
912
constructor (db, method, options) {
@@ -16,6 +19,13 @@ class LevelReadStream extends Readable {
1619

1720
this[kIterator] = db[method](rest)
1821
this[kNextv] = this[kNextv].bind(this)
22+
this[kNextvLegacy] = this[kNextvLegacy].bind(this)
23+
this[kDestroy] = this.destroy.bind(this)
24+
25+
// Detect abstract-level 2 by the presence of hooks. Version 2 doesn't
26+
// support callbacks anymore. Version 1 does also support promises but
27+
// that would be slower because it works by wrapping the callback API.
28+
this[kPromises] = db.hooks !== undefined
1929

2030
// NOTE: use autoDestroy option when it lands in readable-stream
2131
this.once('end', this.destroy.bind(this, null, null))
@@ -27,10 +37,18 @@ class LevelReadStream extends Readable {
2737

2838
_read (size) {
2939
if (this.destroyed) return
30-
this[kIterator].nextv(size, this[kNextv])
40+
41+
if (this[kPromises]) {
42+
this[kIterator].nextv(size).then(
43+
this[kNextv],
44+
this[kDestroy]
45+
)
46+
} else {
47+
this[kIterator].nextv(size, this[kNextvLegacy])
48+
}
3149
}
3250

33-
[kNextv] (err, items) {
51+
[kNextvLegacy] (err, items) {
3452
if (this.destroyed) return
3553
if (err) return this.destroy(err)
3654

@@ -43,10 +61,29 @@ class LevelReadStream extends Readable {
4361
}
4462
}
4563

64+
[kNextv] (items) {
65+
if (this.destroyed) return
66+
67+
if (items.length === 0) {
68+
this.push(null)
69+
} else {
70+
for (const item of items) {
71+
this.push(item)
72+
}
73+
}
74+
}
75+
4676
_destroy (err, callback) {
47-
this[kIterator].close(function (err2) {
48-
callback(err || err2)
49-
})
77+
if (this[kPromises]) {
78+
this[kIterator].close().then(
79+
err ? () => callback(err) : callback,
80+
callback
81+
)
82+
} else {
83+
this[kIterator].close(function (err2) {
84+
callback(err || err2)
85+
})
86+
}
5087
}
5188
}
5289

@@ -55,7 +92,7 @@ class EntryStream extends LevelReadStream {
5592
super(db, 'iterator', { ...options, keys: true, values: true })
5693
}
5794

58-
[kNextv] (err, entries) {
95+
[kNextvLegacy] (err, entries) {
5996
if (this.destroyed) return
6097
if (err) return this.destroy(err)
6198

@@ -67,6 +104,18 @@ class EntryStream extends LevelReadStream {
67104
}
68105
}
69106
}
107+
108+
[kNextv] (entries) {
109+
if (this.destroyed) return
110+
111+
if (entries.length === 0) {
112+
this.push(null)
113+
} else {
114+
for (const [key, value] of entries) {
115+
this.push({ key, value })
116+
}
117+
}
118+
}
70119
}
71120

72121
class KeyStream extends LevelReadStream {

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
"readable-stream": "^3.4.0"
2222
},
2323
"peerDependencies": {
24-
"abstract-level": "^1.0.0"
24+
"abstract-level": ">=1.0.0"
2525
},
2626
"peerDependenciesMeta": {
2727
"abstract-level": {
@@ -35,7 +35,7 @@
3535
"airtap-playwright": "^1.0.1",
3636
"faucet": "^0.0.3",
3737
"hallmark": "^4.0.0",
38-
"memory-level": "^1.0.0",
38+
"memory-level": "^2.0.0",
3939
"nyc": "^15.1.0",
4040
"secret-event-listener": "^1.0.0",
4141
"standard": "^16.0.3",

test.js

Lines changed: 60 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@ const { EntryStream, KeyStream, ValueStream } = require('.')
66
const { Writable, pipeline } = require('readable-stream')
77
const addSecretListener = require('secret-event-listener')
88

9+
const delayedPipeline = async (...args) => {
10+
await pipeline(...args)
11+
await new Promise(setImmediate)
12+
}
13+
914
let db
1015
const kLastIterator = Symbol('lastIterator')
1116
const data = [
@@ -14,7 +19,7 @@ const data = [
1419
{ key: 'c', value: '3' }
1520
]
1621

17-
test('setup', function (t) {
22+
test('setup', async function (t) {
1823
db = new MemoryLevel()
1924

2025
// Keep track of last created iterator for test purposes
@@ -27,37 +32,33 @@ test('setup', function (t) {
2732
}
2833
}
2934

30-
db.open(function (err) {
31-
t.error(err, 'no error')
32-
db.batch(data.map(x => ({ type: 'put', ...x })), function (err) {
33-
t.error(err, 'no error')
34-
t.end()
35-
})
36-
})
35+
await db.open()
36+
await db.batch(data.map(x => ({ type: 'put', ...x })))
3737
})
3838

39-
test('EntryStream', function (t) {
40-
t.plan(2)
39+
test('EntryStream', async function (t) {
40+
t.plan(1)
4141

42-
pipeline(new EntryStream(db), new Concat((acc) => {
42+
// TODO: pipeline returns before Concat calls the callback
43+
await delayedPipeline(new EntryStream(db), new Concat((acc) => {
4344
t.same(acc, data)
44-
}), t.ifError.bind(t))
45+
}))
4546
})
4647

47-
test('KeyStream', function (t) {
48-
t.plan(2)
48+
test('KeyStream', async function (t) {
49+
t.plan(1)
4950

50-
pipeline(new KeyStream(db), new Concat((acc) => {
51+
await delayedPipeline(new KeyStream(db), new Concat((acc) => {
5152
t.same(acc, data.map(x => x.key))
52-
}), t.ifError.bind(t))
53+
}))
5354
})
5455

55-
test('ValueStream', function (t) {
56-
t.plan(2)
56+
test('ValueStream', async function (t) {
57+
t.plan(1)
5758

58-
pipeline(new ValueStream(db), new Concat((acc) => {
59+
await delayedPipeline(new ValueStream(db), new Concat((acc) => {
5960
t.same(acc, data.map(x => x.value))
60-
}), t.ifError.bind(t))
61+
}))
6162
})
6263

6364
for (const Ctor of [EntryStream, KeyStream, ValueStream]) {
@@ -82,8 +83,8 @@ for (const Ctor of [EntryStream, KeyStream, ValueStream]) {
8283
t.end()
8384
})
8485

85-
db[kLastIterator]._nextv = function (size, options, cb) {
86-
process.nextTick(cb, new Error('nextv'))
86+
db[kLastIterator]._nextv = async function (size, options) {
87+
throw new Error('nextv')
8788
}
8889

8990
stream.resume()
@@ -142,7 +143,7 @@ for (const Ctor of [EntryStream, KeyStream, ValueStream]) {
142143
t.end()
143144
})
144145

145-
db[kLastIterator].nextv = function () {
146+
db[kLastIterator].nextv = async function () {
146147
stream.destroy()
147148
}
148149

@@ -156,7 +157,7 @@ for (const Ctor of [EntryStream, KeyStream, ValueStream]) {
156157
t.end()
157158
})
158159

159-
db[kLastIterator].nextv = function (size, options, cb) {
160+
db[kLastIterator].nextv = async function (size, options) {
160161
stream.destroy(new Error('user'))
161162
}
162163

@@ -171,10 +172,13 @@ for (const Ctor of [EntryStream, KeyStream, ValueStream]) {
171172
t.end()
172173
})
173174

174-
db[kLastIterator].nextv = function (size, options, cb) {
175-
stream.destroy(new Error('user'), function (err) {
176-
order.push('callback')
177-
t.is(err.message, 'user', 'got error')
175+
db[kLastIterator].nextv = async function (size, options) {
176+
return new Promise((resolve) => {
177+
stream.destroy(new Error('user'), function (err) {
178+
order.push('callback')
179+
t.is(err.message, 'user', 'got error')
180+
resolve()
181+
})
178182
})
179183
}
180184

@@ -189,10 +193,13 @@ for (const Ctor of [EntryStream, KeyStream, ValueStream]) {
189193
t.end()
190194
})
191195

192-
db[kLastIterator].nextv = function (size, options, cb) {
193-
stream.destroy(null, function (err) {
194-
order.push('callback')
195-
t.ifError(err, 'no error')
196+
db[kLastIterator].nextv = async function (size, options) {
197+
return new Promise((resolve) => {
198+
stream.destroy(null, function (err) {
199+
order.push('callback')
200+
t.ifError(err, 'no error')
201+
resolve()
202+
})
196203
})
197204
}
198205

@@ -203,10 +210,11 @@ for (const Ctor of [EntryStream, KeyStream, ValueStream]) {
203210
const stream = new Ctor(db)
204211
const iterator = db[kLastIterator]
205212
const nextv = iterator.nextv.bind(iterator)
206-
iterator.nextv = function (size, cb) {
213+
iterator.nextv = async function (size) {
207214
t.pass('should be called once')
208-
nextv(size, cb)
215+
const promise = nextv(size)
209216
stream.destroy()
217+
return promise
210218
}
211219
stream.on('data', function (data) {
212220
t.fail('should not be called')
@@ -219,12 +227,13 @@ for (const Ctor of [EntryStream, KeyStream, ValueStream]) {
219227
const iterator = db[kLastIterator]
220228
const nextv = iterator.nextv.bind(iterator)
221229
let count = 0
222-
iterator.nextv = function (size, cb) {
230+
iterator.nextv = async function (size) {
223231
t.pass('should be called')
224-
nextv(size, cb)
232+
const promise = nextv(size)
225233
if (++count === 2) {
226234
stream.destroy()
227235
}
236+
return promise
228237
}
229238
stream.on('data', function (data) {
230239
t.pass('should be called')
@@ -236,12 +245,11 @@ for (const Ctor of [EntryStream, KeyStream, ValueStream]) {
236245
const stream = new Ctor(db)
237246
const iterator = db[kLastIterator]
238247
const nextv = iterator.nextv.bind(iterator)
239-
iterator.nextv = function (size, cb) {
240-
nextv(size, function (err, key, value) {
241-
stream.destroy()
242-
cb(err, key, value)
243-
t.pass('should be called')
244-
})
248+
iterator.nextv = async function (size) {
249+
const result = await nextv(size)
250+
stream.destroy()
251+
t.pass('should be called')
252+
return result
245253
}
246254
stream.on('data', function (data) {
247255
t.fail('should not be called')
@@ -254,14 +262,13 @@ for (const Ctor of [EntryStream, KeyStream, ValueStream]) {
254262
const iterator = db[kLastIterator]
255263
const nextv = iterator.nextv.bind(iterator)
256264
let count = 0
257-
iterator.nextv = function (size, cb) {
258-
nextv(size, function (err, key, value) {
259-
if (++count === 2) {
260-
stream.destroy()
261-
}
262-
cb(err, key, value)
263-
t.pass('should be called')
264-
})
265+
iterator.nextv = async function (size) {
266+
const result = await nextv(size)
267+
if (++count === 2) {
268+
stream.destroy()
269+
}
270+
t.pass('should be called')
271+
return result
265272
}
266273
stream.on('data', function (data) {
267274
t.pass('should be called')
@@ -299,10 +306,7 @@ test('it is safe to close db on end of stream', function (t) {
299306
// Although the underlying iterator is still alive at this point (before
300307
// the 'close' event has been emitted) it's safe to close the db because
301308
// leveldown (v5) ends any open iterators before closing.
302-
db.close(function (err) {
303-
t.ifError(err, 'no error')
304-
t.end()
305-
})
309+
db.close().then(t.end.bind(t), t.fail.bind(t))
306310
})
307311

308312
stream.resume()
@@ -314,9 +318,9 @@ function monitor (stream, onClose) {
314318
;['_next', '_nextv', '_close'].forEach(function (method) {
315319
const original = db[kLastIterator][method]
316320

317-
db[kLastIterator][method] = function () {
321+
db[kLastIterator][method] = async function () {
318322
order.push(method)
319-
original.apply(this, arguments)
323+
return original.apply(this, arguments)
320324
}
321325
})
322326

0 commit comments

Comments
 (0)