Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Unreleased
- [FIXED] Restore issue with document values containing `\u2028` and `\u2029` on Node.js 24.
- [FIXED] Restore issue with readline pause/resume after close on Node.js 24.
- [FIXED] Write after destroy errors on Node.js 24.
- [UPGRADED] `axios` peerDependency to minimum version `1.13.1` to avoid broken `1.13.0` version.
- [NOTE] Updated Node.js version requirement statement for LTS 24.

# 2.11.11 (2025-10-20)
- [UPGRADED] `@ibm-cloud/cloudant` dependency to version `0.12.10`.
Expand Down
2 changes: 1 addition & 1 deletion Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def runTest(version, filter=null, testSuite='test') {
pipeline {
agent {
kubernetes {
yaml kubePodTemplate(name: 'couchbackup.yaml', full_jnlp: 'sdks-pinned-agent:node-22')
yaml kubePodTemplate(name: 'couchbackup.yaml')
}
}
options {
Expand Down
50 changes: 36 additions & 14 deletions includes/liner.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2017, 2024 IBM Corp. All rights reserved.
// Copyright © 2017, 2025 IBM Corp. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -13,9 +13,8 @@
// limitations under the License.

const { createInterface } = require('node:readline');
const { PassThrough, Duplex } = require('node:stream');
const { Duplex, PassThrough, Transform } = require('node:stream');
const debug = require('debug');

/**
* A Duplex stream that converts the input stream to a stream
* of line objects using the built-in readline interface.
Expand All @@ -29,26 +28,43 @@ const debug = require('debug');
*/
class Liner extends Duplex {
// Configure logging
log = debug(('couchbackup:liner'));
log = debug('couchbackup:liner');
// Flag for whether the readline interface is running
isRunning = true;
// Flag for whether the readline interface is closed
isClosed = false;
// Line number state
lineNumber = 0;
// Buffer of processed lines
lines = [];
// Stream of bytes that will be processed to lines.
inStream = new PassThrough({ objectMode: false })
// if there is an error destroy this Duplex with it
.on('error', e => this.destroy(e));

constructor() {
constructor(sanitize = false) {
// Configuration of this Duplex:
// objectMode: false on the writable input (file chunks), true on the readable output (line objects)
// The readableHighWaterMark controls the number of lines buffered after this implementation calls
// "push". Backup lines are potentially large (default 500 documents - i.e. potentially MBs). Since
// there is additional buffering downstream and file processing is faster than the network ops
// we don't bottleneck here even without a large buffer.
super({ readableObjectMode: true, readableHighWaterMark: 0, writableObjectMode: false });
// Set up the stream of bytes that will be processed to lines.
if (sanitize) {
// Handle unescaped unicode "newlines" by escaping them before passing to readline
this.inStream = new Transform({
objectMode: false,
transform(chunk, encoding, callback) {
try {
this.push(chunk.toString('utf-8').replaceAll('\u2028', '\\u2028').replaceAll('\u2029', '\\u2029'), 'utf-8');
callback();
} catch (e) {
callback(e);
}
}
});
} else {
this.inStream = new PassThrough({ objectMode: false });
}
// if there is an error destroy this Duplex with it
this.inStream.on('error', e => this.destroy(e));
// Built-in readline interface over the inStream
this.readlineInterface = createInterface({
input: this.inStream, // the writable side of Liner, passed through
Expand All @@ -60,7 +76,8 @@ class Liner extends Duplex {
const bufferedLines = this.lines.push(this.wrapLine(line));
this.log(`Liner processed line ${this.lineNumber}. Buffered lines available: ${bufferedLines}.`);
this.pushAvailable();
}).on('close', () => {
}).once('close', () => {
this.isClosed = true;
this.log('Liner readline interface closed.');
// Push null onto our lines buffer to signal EOF to downstream consumers.
this.lines.push(null);
Expand All @@ -87,13 +104,16 @@ class Liner extends Duplex {
// Check readline is running flag and whether there is content to push.
while (this.isRunning && this.lines.length > 0) {
if (!this.push(this.lines.shift())) {
this.log(`Back-pressure from push. Buffered lines available: ${this.lines.length}.`);
// Push returned false, this indicates downstream back-pressure.
// Pause the readline interface to stop pushing more lines downstream.
// Resumption is triggered by downstream calling _read which happens
// when it is ready for more data.
this.log(`Liner pausing after back-pressure from push. Buffered lines available: ${this.lines.length}.`);
this.isRunning = false;
this.readlineInterface.pause();
if (!this.isClosed) {
this.log('Liner pausing.');
this.readlineInterface.pause();
}
break;
} else {
this.log(`Liner pushed. Buffered lines available: ${this.lines.length}.`);
Expand All @@ -114,9 +134,11 @@ class Liner extends Duplex {
// is called to ensure that pushes are able to happen (and thereby trigger)
// subsequent reads.
if (!this.isRunning) {
this.log('Liner resuming after read.');
this.isRunning = true;
this.readlineInterface.resume();
if (!this.isClosed) {
this.log('Liner resuming after read.');
this.readlineInterface.resume();
}
}
this.pushAvailable();
}
Expand Down
2 changes: 1 addition & 1 deletion includes/restore.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ module.exports = function(dbClient, options, readstream, ee) {

const batchPreparationStreams = [
readstream, // the backup file
new Liner(), // line by line
new Liner(true), // line by line (for Node.js 24 compatibility santize unicode line separators)
new MappingStream(restore.backupLineToDocsArray), // convert line to a docs array
new BatchingStream(options.bufferSize, true), // make new arrays of the correct buffer size
new MappingStream(restore.docsToRestoreBatch) // make a restore batch
Expand Down
66 changes: 39 additions & 27 deletions includes/transforms.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023, 2024 IBM Corp. All rights reserved.
// Copyright © 2023, 2025 IBM Corp. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -187,45 +187,57 @@ class DelegateWritable extends Writable {

_write(chunk, encoding, callback) {
const toWrite = (this.chunkMapFn) ? this.chunkMapFn(chunk) : chunk;
this.targetWritable.write(toWrite, encoding, (err) => {
if (!err) {
this.log('completed target chunk write');
if (this.postWriteFn) {
this.postWriteFn(chunk);
if (!this.targetWritable.destroyed) {
this.targetWritable.write(toWrite, encoding, (err) => {
if (!err) {
this.log('completed target chunk write');
if (this.postWriteFn) {
this.postWriteFn(chunk);
}
}
}
callback(err);
});
callback(err);
});
} else {
// Avoid write after destroy errors
this.log('supressing write after destroy error');
callback();
}
}

_final(callback) {
this.log('Finalizing');
const lastChunk = (this.lastChunkFn && this.lastChunkFn()) || null;
// We can't 'end' stdout, so use a final write instead for that case
if (this.targetWritable === process.stdout) {
// we can't 'write' null, so don't do anything if there is no last chunk
if (lastChunk) {
this.targetWritable.write(lastChunk, 'utf-8', (err) => {
if (!this.targetWritable.destroyed) {
// We can't 'end' stdout, so use a final write instead for that case
if (this.targetWritable === process.stdout) {
// we can't 'write' null, so don't do anything if there is no last chunk
if (lastChunk) {
this.targetWritable.write(lastChunk, 'utf-8', (err) => {
if (!err) {
this.log('wrote last chunk to stdout');
} else {
this.log('error writing last chunk to stdout');
}
callback(err);
});
} else {
this.log('no last chunk to write to stdout');
callback();
}
} else {
this.targetWritable.end(lastChunk, 'utf-8', (err) => {
if (!err) {
this.log('wrote last chunk to stdout');
this.log('wrote last chunk and ended target writable');
} else {
this.log('error writing last chunk to stdout');
this.log('error ending target writable');
}
callback(err);
});
} else {
this.log('no last chunk to write to stdout');
callback();
}
} else {
this.targetWritable.end(lastChunk, 'utf-8', (err) => {
if (!err) {
this.log('wrote last chunk and ended target writable');
} else {
this.log('error ending target writable');
}
callback(err);
});
// Avoid write after destroy errors
this.log('supressing write after destroy error');
callback();
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
},
"license": "Apache-2.0",
"engines": {
"node": "^20 || ^22"
"node": "^20 || ^22 || ^24"
},
"dependencies": {
"@ibm-cloud/cloudant": "0.12.11",
Expand Down
40 changes: 13 additions & 27 deletions test/ci_concurrent_backups.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2018, 2023 IBM Corp. All rights reserved.
// Copyright © 2018, 2025 IBM Corp. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -14,11 +14,12 @@

/* global describe it */

const fs = require('fs');
const assert = require('node:assert');
const fs = require('node:fs');
const { once } = require('node:events');
const readline = require('readline');
const u = require('./citestutils.js');
const uuid = require('uuid').v4;
const u = require('./citestutils.js');
const { Liner } = require('../includes/liner.js');

const params = { useApi: true };

Expand All @@ -28,29 +29,14 @@ describe(u.scenario('Concurrent database backups', params), function() {
u.setTimeout(this, 900);

const checkForEmptyBatches = async function(fileName) {
let foundEmptyBatch = false;

const rd = readline.createInterface({
input: fs.createReadStream(fileName),
output: fs.createWriteStream('/dev/null'),
terminal: false
});

rd.on('line', function(line) {
if (JSON.parse(line).length === 0) {
// Note: Empty batch arrays indicate that the running backup is
// incorrectly sharing a log file with another ongoing backup job.
foundEmptyBatch = true;
}
});

rd.on('close', function() {
if (foundEmptyBatch) {
return Promise.reject(new Error(`Log file '${fileName}' contains empty batches`));
} else {
return Promise.resolve();
}
});
assert.ok(await fs.createReadStream(fileName) // backup file
.pipe(new Liner(true)) // split to lines
.map(linerLine => JSON.parse(linerLine.line)) // parse JSON
.filter(parsedJson => Array.isArray(parsedJson)) // we want batches so filter to arrays
// Note: Empty batch arrays indicate that the running backup is
// incorrectly sharing a log file with another ongoing backup job.
.every(batch => batch.length > 0),
`Backup file ${fileName} contains empty batches.`);
};

const backupPromise = async function() {
Expand Down
20 changes: 19 additions & 1 deletion test/liner.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

const assert = require('node:assert');
const fs = require('node:fs');
const { versions } = require('node:process');
const { Readable, Writable } = require('node:stream');
const { pipeline } = require('node:stream/promises');
const { Liner } = require('../includes/liner.js');
const { Writable } = require('node:stream');

describe('#unit liner', function() {
// Use a liner to make the line objects
Expand Down Expand Up @@ -55,4 +56,21 @@ describe('#unit liner', function() {
await pipeline(inputLines, liner, destination);
assert.deepStrictEqual(output, expected);
});

it('should split on unicode separators if not sanitizing', async function() {
// This test will only split on /u2028 and /u2029 in Node.js >=24
const nodeMajorVersion = parseInt(versions.node.split('.', 2)[0]);
const expectedLines = nodeMajorVersion >= 24 ? ['foo', 'bar', 'foo', 'bar', 'foo'] : ['foo', 'bar', 'foo\u2028bar\u2029foo'];
const input = 'foo\nbar\nfoo\u2028bar\u2029foo';
const expected = expectedLines.map((e, i) => { return { lineNumber: i + 1, line: e }; });
await pipeline(Readable.from(input), liner, destination);
assert.deepStrictEqual(output, expected);
});

it('should sanitize unicode separators when enabled', async function() {
const expected = ['foo', 'bar', 'foo\\u2028bar\\u2029foo'].map((e, i) => { return { lineNumber: i + 1, line: e }; });
const input = 'foo\nbar\nfoo\u2028bar\u2029foo';
await pipeline(Readable.from(input), new Liner(true), destination);
assert.deepStrictEqual(output, expected);
});
});
Loading