Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 37 additions & 7 deletions src/bulk/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,15 +197,17 @@ export class BulkWriteResult {
* Create a new BulkWriteResult instance
* @internal
*/
constructor(bulkResult: BulkResult) {
constructor(bulkResult: BulkResult, isOrdered: boolean) {
this.result = bulkResult;
this.insertedCount = this.result.nInserted ?? 0;
this.matchedCount = this.result.nMatched ?? 0;
this.modifiedCount = this.result.nModified ?? 0;
this.deletedCount = this.result.nRemoved ?? 0;
this.upsertedCount = this.result.upserted.length ?? 0;
this.upsertedIds = BulkWriteResult.generateIdMap(this.result.upserted);
this.insertedIds = BulkWriteResult.generateIdMap(this.result.insertedIds);
this.insertedIds = BulkWriteResult.generateIdMap(
this.getCleanedInsertedIds(bulkResult, isOrdered)
);
Object.defineProperty(this, 'result', { value: this.result, enumerable: false });
}

Expand All @@ -214,6 +216,21 @@ export class BulkWriteResult {
return this.result.ok;
}

/** Returns document_ids that were actually inserted
* @internal
*/
getCleanedInsertedIds(bulkResult: BulkResult, isOrdered: boolean) {
if (bulkResult.writeErrors.length === 0) return bulkResult.insertedIds;

if (isOrdered) {
return bulkResult.insertedIds.slice(0, bulkResult.writeErrors[0].index);
}

return bulkResult.insertedIds.filter(
({ index }) => !bulkResult.writeErrors.some(writeError => index === writeError.index)
);
}

/** Returns the upserted id at the given index */
getUpsertedIdAt(index: number): Document | undefined {
return this.result.upserted[index];
Expand Down Expand Up @@ -479,7 +496,10 @@ function executeCommands(
callback: Callback<BulkWriteResult>
) {
if (bulkOperation.s.batches.length === 0) {
return callback(undefined, new BulkWriteResult(bulkOperation.s.bulkResult));
return callback(
undefined,
new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered)
);
}

const batch = bulkOperation.s.batches.shift() as Batch;
Expand All @@ -488,17 +508,26 @@ function executeCommands(
// Error is a driver related error not a bulk op error, return early
if (err && 'message' in err && !(err instanceof MongoWriteConcernError)) {
return callback(
new MongoBulkWriteError(err, new BulkWriteResult(bulkOperation.s.bulkResult))
new MongoBulkWriteError(
err,
new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered)
)
);
}

if (err instanceof MongoWriteConcernError) {
return handleMongoWriteConcernError(batch, bulkOperation.s.bulkResult, err, callback);
return handleMongoWriteConcernError(
batch,
bulkOperation.s.bulkResult,
bulkOperation.isOrdered,
err,
callback
);
}

// Merge the results together
mergeBatchResults(batch, bulkOperation.s.bulkResult, err, result);
const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult);
const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered);
if (bulkOperation.handleWriteError(callback, writeResult)) return;

// Execute the next command in line
Expand Down Expand Up @@ -572,6 +601,7 @@ function executeCommands(
function handleMongoWriteConcernError(
batch: Batch,
bulkResult: BulkResult,
isOrdered: boolean,
err: MongoWriteConcernError,
callback: Callback<BulkWriteResult>
) {
Expand All @@ -583,7 +613,7 @@ function handleMongoWriteConcernError(
message: err.result?.writeConcernError.errmsg,
code: err.result?.writeConcernError.result
},
new BulkWriteResult(bulkResult)
new BulkWriteResult(bulkResult, isOrdered)
)
);
}
Expand Down
123 changes: 122 additions & 1 deletion test/integration/crud/bulk.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
type Collection,
Long,
MongoBatchReExecutionError,
MongoBulkWriteError,
type MongoClient,
MongoDriverError,
MongoInvalidArgumentError
Expand All @@ -31,7 +32,6 @@ describe('Bulk', function () {
.createCollection('test')
.catch(() => null); // make ns exist
});

afterEach(async function () {
const cleanup = this.configuration.newClient();
await cleanup
Expand Down Expand Up @@ -104,6 +104,127 @@ describe('Bulk', function () {
}
});
});
context('BulkWriteResult should not include invalid insert in insertedIds', function () {
async function assertFailsWithDuplicateFields(
input,
isOrdered,
indices,
expectedInsertedIds
) {
try {
const db = client.db();
const col = db.collection('test');
for (let i = 0; i < indices.length; i++) {
await col.createIndex(indices[i], { unique: true, sparse: false });
}
await col.insertMany(input, { ordered: isOrdered });
expect.fail('a MongoBulkWriteError must be thrown when write errors are encountered');
} catch (error) {
expect(error instanceof MongoBulkWriteError).to.equal(true);
expect(error.result.insertedCount).to.equal(
Object.keys(error.result.insertedIds).length
);
expect(error.result.insertedIds).to.deep.equal(expectedInsertedIds);
}
}
it('when passed 1 duplicate ID on an index', async function () {
await assertFailsWithDuplicateFields(
[
{ _id: 0, a: 1 },
{ _id: 1, a: 1 }
],
true,
[{ a: 1 }],
{ 0: 0 }
);
});
it('when, on an ordered insert, passed multiple duplicate IDs on an index', async function () {
await assertFailsWithDuplicateFields(
[
{ _id: 0, a: 1 },
{ _id: 1, a: 1 },
{ _id: 2, a: 1 },
{ _id: 3, b: 2 }
],
true,
[{ a: 1 }],
{ 0: 0 }
);
});
it('when, on an unordered insert, passed multiple duplicate IDs on an index', async function () {
await assertFailsWithDuplicateFields(
[
{ _id: 0, a: 1 },
{ _id: 1, a: 1 },
{ _id: 2, a: 1 },
{ _id: 3, b: 2 }
],
false,
[{ a: 1 }],
{ 0: 0, 3: 3 }
);
});
});
});
describe('#bulkWrite()', function () {
context('BulkWriteResult should not include invalid insert in insertedIds', function () {
async function assertFailsWithDuplicateFields(
input,
isOrdered,
indices,
expectedInsertedIds
) {
try {
const db = client.db();
const col = db.collection('test');
for (let i = 0; i < indices.length; i++) {
await col.createIndex(indices[i], { unique: true, sparse: false });
}
await col.bulkWrite(input, { ordered: isOrdered });
expect.fail('a MongoBulkWriteError must be thrown when write errors are encountered');
} catch (error) {
expect(error instanceof MongoBulkWriteError).to.equal(true);
expect(error.result.insertedCount).to.equal(
Object.keys(error.result.insertedIds).length
);
expect(error.result.insertedIds).to.deep.equal(expectedInsertedIds);
}
}
it('when passed 1 duplicate ID on an index', async function () {
await assertFailsWithDuplicateFields(
[{ insertOne: { _id: 0, a: 1 } }, { insertOne: { _id: 1, a: 1 } }],
true,
[{ a: 1 }],
{ 0: 0 }
);
});
it('when, on an ordered insert, passed multiple duplicate IDs on an index', async function () {
await assertFailsWithDuplicateFields(
[
{ insertOne: { _id: 0, a: 1 } },
{ insertOne: { _id: 1, a: 1 } },
{ insertOne: { _id: 2, a: 1 } },
{ insertOne: { _id: 3, b: 2 } }
],
true,
[{ a: 1 }],
{ 0: 0 }
);
});
it('when, on an unordered insert, passed multiple duplicate IDs on an index', async function () {
await assertFailsWithDuplicateFields(
[
{ insertOne: { _id: 0, a: 1 } },
{ insertOne: { _id: 1, a: 1 } },
{ insertOne: { _id: 2, a: 1 } },
{ insertOne: { _id: 3, b: 2 } }
],
false,
[{ a: 1 }],
{ 0: 0, 3: 3 }
);
});
});
});
});

Expand Down