@@ -7,6 +7,7 @@ const sinon = require('sinon');
77const start = require ( './common' ) ;
88
99const assert = require ( 'assert' ) ;
10+ const { once } = require ( 'events' ) ;
1011const random = require ( './util' ) . random ;
1112const util = require ( './util' ) ;
1213
@@ -3508,6 +3509,9 @@ describe('Model', function() {
35083509 }
35093510 changeStream . removeListener ( 'change' , listener ) ;
35103511 listener = null ;
3512+ // Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream
3513+ // may still poll after close.
3514+ changeStream . on ( 'error' , ( ) => { } ) ;
35113515 changeStream . close ( ) ;
35123516 changeStream = null ;
35133517 } ) ;
@@ -3560,14 +3564,21 @@ describe('Model', function() {
35603564 it ( 'fullDocument (gh-11936)' , async function ( ) {
35613565 const MyModel = db . model ( 'Test' , new Schema ( { name : String } ) ) ;
35623566
3567+ const doc = await MyModel . create ( { name : 'Ned Stark' } ) ;
35633568 const changeStream = await MyModel . watch ( [ ] , {
35643569 fullDocument : 'updateLookup' ,
35653570 hydrate : true
35663571 } ) ;
3572+ await changeStream . $driverChangeStreamPromise ;
35673573
3568- const doc = await MyModel . create ( { name : 'Ned Stark' } ) ;
3569-
3570- const p = changeStream . next ( ) ;
3574+ const p = new Promise ( ( resolve ) => {
3575+ changeStream . once ( 'change' , change => {
3576+ resolve ( change ) ;
3577+ } ) ;
3578+ } ) ;
3579+ // Need to wait for resume token to be set after the event listener,
3580+ // otherwise change stream might not pick up the update.
3581+ await once ( changeStream . driverChangeStream , 'resumeTokenChanged' ) ;
35713582 await MyModel . updateOne ( { _id : doc . _id } , { name : 'Tony Stark' } ) ;
35723583
35733584 const changeData = await p ;
@@ -3576,22 +3587,31 @@ describe('Model', function() {
35763587 doc . _id . toHexString ( ) ) ;
35773588 assert . ok ( changeData . fullDocument . $__ ) ;
35783589 assert . equal ( changeData . fullDocument . get ( 'name' ) , 'Tony Stark' ) ;
3590+
3591+ await changeStream . close ( ) ;
35793592 } ) ;
35803593
35813594 it ( 'fullDocument with immediate watcher and hydrate (gh-14049)' , async function ( ) {
35823595 const MyModel = db . model ( 'Test' , new Schema ( { name : String } ) ) ;
35833596
35843597 const doc = await MyModel . create ( { name : 'Ned Stark' } ) ;
35853598
3599+ let changeStream = null ;
35863600 const p = new Promise ( ( resolve ) => {
3587- MyModel . watch ( [ ] , {
3601+ changeStream = MyModel . watch ( [ ] , {
35883602 fullDocument : 'updateLookup' ,
35893603 hydrate : true
3590- } ) . on ( 'change' , change => {
3604+ } ) ;
3605+
3606+ changeStream . on ( 'change' , change => {
35913607 resolve ( change ) ;
35923608 } ) ;
35933609 } ) ;
35943610
3611+ // Need to wait for cursor to be initialized and for resume token to
3612+ // be set, otherwise change stream might not pick up the update.
3613+ await changeStream . $driverChangeStreamPromise ;
3614+ await once ( changeStream . driverChangeStream , 'resumeTokenChanged' ) ;
35953615 await MyModel . updateOne ( { _id : doc . _id } , { name : 'Tony Stark' } ) ;
35963616
35973617 const changeData = await p ;
@@ -3600,6 +3620,8 @@ describe('Model', function() {
36003620 doc . _id . toHexString ( ) ) ;
36013621 assert . ok ( changeData . fullDocument . $__ ) ;
36023622 assert . equal ( changeData . fullDocument . get ( 'name' ) , 'Tony Stark' ) ;
3623+
3624+ await changeStream . close ( ) ;
36033625 } ) ;
36043626
36053627 it ( 'respects discriminators (gh-11007)' , async function ( ) {
@@ -3639,6 +3661,9 @@ describe('Model', function() {
36393661 assert . equal ( changeData . operationType , 'insert' ) ;
36403662 assert . equal ( changeData . fullDocument . name , 'Ned Stark' ) ;
36413663
3664+ // Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream
3665+ // may still poll after close.
3666+ changeStream . on ( 'error' , ( ) => { } ) ;
36423667 await changeStream . close ( ) ;
36433668 await db . close ( ) ;
36443669 } ) ;
@@ -3654,11 +3679,16 @@ describe('Model', function() {
36543679 setTimeout ( resolve , 500 , false ) ;
36553680 } ) ;
36563681
3657- changeStream . close ( ) ;
3658- await db ;
3682+ // Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream
3683+ // may still poll after close.
3684+ changeStream . on ( 'error' , ( ) => { } ) ;
3685+
3686+ const close = changeStream . close ( ) ;
3687+ await db . asPromise ( ) ;
36593688 const readyCalled = await ready ;
36603689 assert . strictEqual ( readyCalled , false ) ;
36613690
3691+ await close ;
36623692 await db . close ( ) ;
36633693 } ) ;
36643694
@@ -3675,6 +3705,10 @@ describe('Model', function() {
36753705
36763706 await MyModel . create ( { name : 'Hodor' } ) ;
36773707
3708+ // Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream
3709+ // may still poll after close.
3710+ changeStream . on ( 'error' , ( ) => { } ) ;
3711+
36783712 changeStream . close ( ) ;
36793713 const closedData = await closed ;
36803714 assert . strictEqual ( closedData , true ) ;
0 commit comments