Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 33 additions & 6 deletions lib/extension.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const { Extension } = require('./messages')

// const MAX_ACTIVE = 32
const MAX_ACTIVE = 32
const FLUSH_BATCH = 128
const MAX_PASSIVE_BATCH = 2048
const MAX_ACTIVE_BATCH = MAX_PASSIVE_BATCH + FLUSH_BATCH
Expand Down Expand Up @@ -53,15 +53,33 @@ class HyperbeeExtension {
if (snapshot.limit === -1) snapshot.limit = 0
this.outgoing.broadcast(Extension.encode({ iterator: snapshot }))
}

// can decide whether to expose this or not, can pass as callback function also
finishActive(){
this.active--
if (this.active < 0) {
this.active = 0
}
}

onmessage (buf, from) {
// TODO: handle max active extension messages
// this.active++
// drop or ignore if too many active
if (this.active >= MAX_ACTIVE) {
return;
}
this.active++

const message = decode(buf)
if (!message) return
if (!message){
this.active--;
return;
}

if (message.cache) this.oncache(message.cache, from)
if (message.cache){
this.oncache(message.cache, from)
this.finishActive();
}
if (message.get) this.onget(message.get, from)
if (message.iterator) this.oniterator(message.iterator, from)
}
Expand All @@ -72,16 +90,21 @@ class HyperbeeExtension {
}

onget (message, from) {
if (!message.version || message.version > this.db.version) return
if (!message.version || message.version > this.db.version){
this.finishActive();
return
}

const b = new Batch(this.outgoing, from)
const db = this.db.checkout(message.version)
const self = this

db.get(message.key, { extension: false, wait: false, update: false, onseq }).then(done, done)

function done () {
db.close().catch(noop)
b.send()
self.finishActive();
}

function onseq (seq) {
Expand All @@ -90,7 +113,10 @@ class HyperbeeExtension {
}

async oniterator (message, from) {
if (!message.version || message.version > this.db.version) return
if (!message.version || message.version > this.db.version) {
this.finishActive();
return
}

const b = new Batch(this.outgoing, from)
const seqs = new Set()
Expand Down Expand Up @@ -126,6 +152,7 @@ class HyperbeeExtension {
ite.close().catch(noop)
db.close().catch(noop)
b.send()
this.finishActive();
}
}

Expand Down