Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
# Unreleased
- [FIXED] Restore issue with document values containing `\u2028` and `\u2029` on Node.js 24.
- [FIXED] Restore issue with readline pause/resume after close on Node.js 24.
- [UPGRADED] `axios` peerDependency to minimum version `1.13.1` to avoid broken `1.13.0` version.
- [NOTE] Updated Node.js version requirement statement for LTS 24.

# 2.11.11 (2025-10-20)
- [UPGRADED] `@ibm-cloud/cloudant` dependency to version `0.12.10`.
Expand Down
47 changes: 12 additions & 35 deletions includes/liner.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2017, 2025 IBM Corp. All rights reserved.
// Copyright © 2017, 2024 IBM Corp. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -13,7 +13,7 @@
// limitations under the License.

const { createInterface } = require('node:readline');
const { Duplex, PassThrough, Transform } = require('node:stream');
const { PassThrough, Duplex } = require('node:stream');
const debug = require('debug');

/**
Expand All @@ -32,40 +32,23 @@ class Liner extends Duplex {
log = debug(('couchbackup:liner'));
// Flag for whether the readline interface is running
isRunning = true;
// Flag for whether the readline interface is closed
isClosed = false;
// Line number state
lineNumber = 0;
// Buffer of processed lines
lines = [];
// Stream of bytes that will be processed to lines.
inStream = new PassThrough({ objectMode: false })
// if there is an error destroy this Duplex with it
.on('error', e => this.destroy(e));

constructor(sanitize = false) {
constructor() {
// Configuration of this Duplex:
// objectMode: false on the writable input (file chunks), true on the readable output (line objects)
// The readableHighWaterMark controls the number of lines buffered after this implementation calls
// "push". Backup lines are potentially large (default 500 documents - i.e. potentially MBs). Since
// there is additional buffering downstream and file processing is faster than the network ops
// we don't bottleneck here even without a large buffer.
super({ readableObjectMode: true, readableHighWaterMark: 0, writableObjectMode: false });
// Set up the stream of bytes that will be processed to lines.
if (sanitize) {
// Handle unescaped unicode "newlines" by escaping them before passing to readline
this.inStream = new Transform({
objectMode: false,
transform(chunk, encoding, callback) {
try {
this.push(chunk.toString('utf-8').replace(/\u2028/, '\\u2028').replace(/\u2029/, '\\u2029'));
callback();
} catch (e) {
callback(e);
}
}
});
} else {
this.inStream = new PassThrough({ objectMode: false });
}
// if there is an error destroy this Duplex with it
this.inStream.on('error', e => this.destroy(e));
// Built-in readline interface over the inStream
this.readlineInterface = createInterface({
input: this.inStream, // the writable side of Liner, passed through
Expand All @@ -77,8 +60,7 @@ class Liner extends Duplex {
const bufferedLines = this.lines.push(this.wrapLine(line));
this.log(`Liner processed line ${this.lineNumber}. Buffered lines available: ${bufferedLines}.`);
this.pushAvailable();
}).once('close', () => {
this.isClosed = true;
}).on('close', () => {
this.log('Liner readline interface closed.');
// Push null onto our lines buffer to signal EOF to downstream consumers.
this.lines.push(null);
Expand All @@ -105,16 +87,13 @@ class Liner extends Duplex {
// Check readline is running flag and whether there is content to push.
while (this.isRunning && this.lines.length > 0) {
if (!this.push(this.lines.shift())) {
this.log(`Back-pressure from push. Buffered lines available: ${this.lines.length}.`);
// Push returned false, this indicates downstream back-pressure.
// Pause the readline interface to stop pushing more lines downstream.
// Resumption is triggered by downstream calling _read which happens
// when it is ready for more data.
this.log(`Liner pausing after back-pressure from push. Buffered lines available: ${this.lines.length}.`);
this.isRunning = false;
if (!this.isClosed) {
this.log('Liner pausing.');
this.readlineInterface.pause();
}
this.readlineInterface.pause();
break;
} else {
this.log(`Liner pushed. Buffered lines available: ${this.lines.length}.`);
Expand All @@ -135,11 +114,9 @@ class Liner extends Duplex {
// is called to ensure that pushes are able to happen (and thereby trigger)
// subsequent reads.
if (!this.isRunning) {
this.log('Liner resuming after read.');
this.isRunning = true;
if (!this.isClosed) {
this.log('Liner resuming after read.');
this.readlineInterface.resume();
}
this.readlineInterface.resume();
}
this.pushAvailable();
}
Expand Down
2 changes: 1 addition & 1 deletion includes/restore.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ module.exports = function(dbClient, options, readstream, ee) {

const batchPreparationStreams = [
readstream, // the backup file
new Liner(true), // line by line (for Node.js 24 compatibility santize unicode line separators)
new Liner(), // line by line
new MappingStream(restore.backupLineToDocsArray), // convert line to a docs array
new BatchingStream(options.bufferSize, true), // make new arrays of the correct buffer size
new MappingStream(restore.docsToRestoreBatch) // make a restore batch
Expand Down
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
},
"license": "Apache-2.0",
"engines": {
"node": "^20 || ^22 || ^24"
"node": "^20 || ^22"
},
"dependencies": {
"@ibm-cloud/cloudant": "0.12.11",
Expand Down
20 changes: 1 addition & 19 deletions test/liner.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@

const assert = require('node:assert');
const fs = require('node:fs');
const { versions } = require('node:process');
const { Readable, Writable } = require('node:stream');
const { pipeline } = require('node:stream/promises');
const { Liner } = require('../includes/liner.js');
const { Writable } = require('node:stream');

describe('#unit liner', function() {
// Use a liner to make the line objects
Expand Down Expand Up @@ -56,21 +55,4 @@ describe('#unit liner', function() {
await pipeline(inputLines, liner, destination);
assert.deepStrictEqual(output, expected);
});

it('should split on unicode separators if not sanitizing', async function() {
// This test will only split on /u2028 and /u2029 in Node.js >=24
const nodeMajorVersion = parseInt(versions.node.split('.', 2)[0]);
const expectedLines = nodeMajorVersion >= 24 ? ['foo', 'bar', 'foo', 'bar', 'foo'] : ['foo', 'bar', 'foo\u2028bar\u2029foo'];
const input = 'foo\nbar\nfoo\u2028bar\u2029foo';
const expected = expectedLines.map((e, i) => { return { lineNumber: i + 1, line: e }; });
await pipeline(Readable.from(input), liner, destination);
assert.deepStrictEqual(output, expected);
});

it('should sanitize unicode separators when enabled', async function() {
const expected = ['foo', 'bar', 'foo\\u2028bar\\u2029foo'].map((e, i) => { return { lineNumber: i + 1, line: e }; });
const input = 'foo\nbar\nfoo\u2028bar\u2029foo';
await pipeline(Readable.from(input), new Liner(true), destination);
assert.deepStrictEqual(output, expected);
});
});