Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 45 additions & 2 deletions lib/cursor/aggregationCursor.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const MongooseError = require('../error/mongooseError');
const Readable = require('stream').Readable;
const eachAsync = require('../helpers/cursor/eachAsync');
const immediate = require('../helpers/immediate');
const kareem = require('kareem');
const util = require('util');

/**
Expand Down Expand Up @@ -62,7 +63,11 @@ util.inherits(AggregationCursor, Readable);

function _init(model, c, agg) {
if (!model.collection.buffer) {
model.hooks.execPre('aggregate', agg, function() {
model.hooks.execPre('aggregate', agg, function(err) {
if (err != null) {
_handlePreHookError(c, err);
return;
}
if (typeof agg.options?.cursor?.transform === 'function') {
c._transforms.push(agg.options.cursor.transform);
}
Expand All @@ -72,7 +77,12 @@ function _init(model, c, agg) {
});
} else {
model.collection.emitter.once('queue', function() {
model.hooks.execPre('aggregate', agg, function() {
model.hooks.execPre('aggregate', agg, function(err) {
if (err != null) {
_handlePreHookError(c, err);
return;
}

if (typeof agg.options?.cursor?.transform === 'function') {
c._transforms.push(agg.options.cursor.transform);
}
Expand All @@ -84,6 +94,38 @@ function _init(model, c, agg) {
}
}

/**
* Handles error emitted from pre middleware. In particular, checks for `skipWrappedFunction`, which allows skipping
* the actual aggregation and overwriting the function's return value. Because aggregation cursors don't return a value,
* we need to make sure the user doesn't accidentally set a value in skipWrappedFunction.
*
* @param {QueryCursor} queryCursor
* @param {Error} err
* @returns
*/

function _handlePreHookError(queryCursor, err) {
if (err instanceof kareem.skipWrappedFunction) {
const resultValue = err.args[0];
if (resultValue != null && (!Array.isArray(resultValue) || resultValue.length)) {
const err = new MongooseError(
'Cannot `skipMiddlewareFunction()` with a value when using ' +
'`.aggregate().cursor()`, value must be nullish or empty array, got "' +
util.inspect(resultValue) +
'".'
);
queryCursor._markError(err);
queryCursor.listeners('error').length > 0 && queryCursor.emit('error', err);
return;
}
queryCursor.emit('cursor', null);
return;
}
queryCursor._markError(err);
queryCursor.listeners('error').length > 0 && queryCursor.emit('error', err);
}


/**
* Necessary to satisfy the Readable API
* @method _read
Expand Down Expand Up @@ -424,6 +466,7 @@ function _next(ctx, cb) {
err => callback(err)
);
} else {
ctx.once('error', cb);
ctx.once('cursor', function() {
_next(ctx, cb);
});
Expand Down
30 changes: 30 additions & 0 deletions test/aggregate.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1300,4 +1300,34 @@ describe('aggregate: ', function() {
});
}, /Aggregate `near\(\)` argument has invalid coordinates, got ""/);
});

it('cursor() errors out if schema pre aggregate hook throws an error (gh-15279)', async function() {
const schema = new Schema({ name: String });

schema.pre('aggregate', function(next) {
if (!this.options.allowed) {
throw new Error('Unauthorized aggregate operation: only allowed operations are permitted');
}
next();
});

const Test = db.model('Test', schema);

await Test.create({ name: 'test1' });

await assert.rejects(
async() => {
await Test.aggregate([{ $limit: 1 }], { allowed: false }).exec();
},
err => err.message === 'Unauthorized aggregate operation: only allowed operations are permitted'
);

const cursor = Test.aggregate([{ $limit: 1 }], { allowed: false }).cursor();
await assert.rejects(
async() => {
await cursor.next();
},
err => err.message === 'Unauthorized aggregate operation: only allowed operations are permitted'
);
});
});
Loading