diff --git a/lib/commands/query.js b/lib/commands/query.js index 117e4bac70..ddefae8c01 100644 --- a/lib/commands/query.js +++ b/lib/commands/query.js @@ -295,6 +295,9 @@ class Query extends Command { stream.on('end', () => { stream.emit('close'); }); + stream.on('error', () => { + this._connection && this._connection.destroy(); + }); return stream; } diff --git a/test/integration/connection/test-stream-error-destroy-connection.test.cjs b/test/integration/connection/test-stream-error-destroy-connection.test.cjs new file mode 100644 index 0000000000..bf918fe075 --- /dev/null +++ b/test/integration/connection/test-stream-error-destroy-connection.test.cjs @@ -0,0 +1,45 @@ +'use strict'; + +const process = require('node:process'); +const { test, skip } = require('poku'); +const common = require('../../common.test.cjs'); + +if (process.env.MYSQL_USE_TLS === '1') skip('Skipping for SSL=1'); + +test('Ensure stream ends in case of error', async () => { + const connection = common.createConnection(); + + connection.query( + [ + 'CREATE TEMPORARY TABLE `items` (', + '`id` int(11) NOT NULL AUTO_INCREMENT,', + '`text` varchar(255) DEFAULT NULL,', + 'PRIMARY KEY (`id`)', + ') ENGINE=InnoDB DEFAULT CHARSET=utf8', + ].join('\n'), + (err) => { + if (err) { + throw err; + } + } + ); + + for (let i = 0; i < 100; i++) { + connection.execute('INSERT INTO items(text) VALUES(?)', ['test'], (err) => { + if (err) { + throw err; + } + }); + } + + const rows = connection.query('SELECT * FROM items').stream(); + + // eslint-disable-next-line no-unused-vars + for await (const _ of rows) break; + + setTimeout(() => { + throw new Error('Connection remains open after stream error'); + }, 1000).unref(); + + connection.end(); +});