@@ -58,6 +58,8 @@ class Transaction {
5858 private readonly _lowRecordWatermak : number
5959 private readonly _highRecordWatermark : number
6060 private _bookmarks : Bookmarks
61+ private readonly _activePromise : Promise < void >
62+ private _acceptActive : ( ) => void
6163
6264 /**
6365 * @constructor
@@ -107,6 +109,9 @@ class Transaction {
107109 this . _lowRecordWatermak = lowRecordWatermark
108110 this . _highRecordWatermark = highRecordWatermark
109111 this . _bookmarks = Bookmarks . empty ( )
112+ this . _activePromise = new Promise ( ( resolve , reject ) => {
113+ this . _acceptActive = resolve
114+ } )
110115 }
111116
112117 /**
@@ -154,6 +159,10 @@ class Transaction {
154159 }
155160 this . _onError ( error ) . catch ( ( ) => { } )
156161 } )
162+ // It should make the transaction active anyway
163+ // futher errors will be treated by the exiting
164+ // observers
165+ . finally ( ( ) => this . _acceptActive ( ) )
157166 }
158167
159168 /**
@@ -178,7 +187,8 @@ class Transaction {
178187 reactive : this . _reactive ,
179188 fetchSize : this . _fetchSize ,
180189 highRecordWatermark : this . _highRecordWatermark ,
181- lowRecordWatermark : this . _lowRecordWatermak
190+ lowRecordWatermark : this . _lowRecordWatermak ,
191+ preparationJob : this . _activePromise
182192 } )
183193 this . _results . push ( result )
184194 return result
@@ -197,7 +207,8 @@ class Transaction {
197207 onError : this . _onError ,
198208 onComplete : ( meta : any ) => this . _onCompleteCallback ( meta , this . _bookmarks ) ,
199209 onConnection : this . _onConnection ,
200- pendingResults : this . _results
210+ pendingResults : this . _results ,
211+ preparationJob : this . _activePromise
201212 } )
202213 this . _state = committed . state
203214 // clean up
@@ -224,7 +235,8 @@ class Transaction {
224235 onError : this . _onError ,
225236 onComplete : this . _onComplete ,
226237 onConnection : this . _onConnection ,
227- pendingResults : this . _results
238+ pendingResults : this . _results ,
239+ preparationJob : this . _activePromise
228240 } )
229241 this . _state = rolledback . state
230242 // clean up
@@ -293,6 +305,7 @@ interface StateTransitionParams {
293305 fetchSize : number
294306 highRecordWatermark : number
295307 lowRecordWatermark : number
308+ preparationJob ?: Promise < any >
296309}
297310
298311const _states = {
@@ -303,7 +316,8 @@ const _states = {
303316 onError,
304317 onComplete,
305318 onConnection,
306- pendingResults
319+ pendingResults,
320+ preparationJob
307321 } : StateTransitionParams ) : any => {
308322 return {
309323 result : finishTransaction (
@@ -312,7 +326,8 @@ const _states = {
312326 onError ,
313327 onComplete ,
314328 onConnection ,
315- pendingResults
329+ pendingResults ,
330+ preparationJob
316331 ) ,
317332 state : _states . SUCCEEDED
318333 }
@@ -322,7 +337,8 @@ const _states = {
322337 onError,
323338 onComplete,
324339 onConnection,
325- pendingResults
340+ pendingResults,
341+ preparationJob
326342 } : StateTransitionParams ) : any => {
327343 return {
328344 result : finishTransaction (
@@ -331,7 +347,8 @@ const _states = {
331347 onError ,
332348 onComplete ,
333349 onConnection ,
334- pendingResults
350+ pendingResults ,
351+ preparationJob
335352 ) ,
336353 state : _states . ROLLED_BACK
337354 }
@@ -347,31 +364,34 @@ const _states = {
347364 reactive,
348365 fetchSize,
349366 highRecordWatermark,
350- lowRecordWatermark
367+ lowRecordWatermark,
368+ preparationJob
351369 } : StateTransitionParams
352370 ) : any => {
353371 // RUN in explicit transaction can't contain bookmarks and transaction configuration
354372 // No need to include mode and database name as it shall be inclued in begin
355- const observerPromise = connectionHolder
356- . getConnection ( )
357- . then ( conn => {
358- onConnection ( )
359- if ( conn != null ) {
360- return conn . protocol ( ) . run ( query , parameters , {
361- bookmarks : Bookmarks . empty ( ) ,
362- txConfig : TxConfig . empty ( ) ,
363- beforeError : onError ,
364- afterComplete : onComplete ,
365- reactive : reactive ,
366- fetchSize : fetchSize ,
367- highRecordWatermark : highRecordWatermark ,
368- lowRecordWatermark : lowRecordWatermark
369- } )
370- } else {
371- throw newError ( 'No connection available' )
372- }
373- } )
374- . catch ( error => new FailedObserver ( { error, onError } ) )
373+ const requirements = preparationJob ?? Promise . resolve ( )
374+
375+ const observerPromise =
376+ requirements . then ( ( ) => connectionHolder . getConnection ( ) )
377+ . then ( conn => {
378+ onConnection ( )
379+ if ( conn != null ) {
380+ return conn . protocol ( ) . run ( query , parameters , {
381+ bookmarks : Bookmarks . empty ( ) ,
382+ txConfig : TxConfig . empty ( ) ,
383+ beforeError : onError ,
384+ afterComplete : onComplete ,
385+ reactive : reactive ,
386+ fetchSize : fetchSize ,
387+ highRecordWatermark : highRecordWatermark ,
388+ lowRecordWatermark : lowRecordWatermark
389+ } )
390+ } else {
391+ throw newError ( 'No connection available' )
392+ }
393+ } )
394+ . catch ( error => new FailedObserver ( { error, onError } ) )
375395
376396 return newCompletedResult (
377397 observerPromise ,
@@ -598,32 +618,36 @@ function finishTransaction (
598618 onError : ( err : Error ) => any ,
599619 onComplete : ( metadata : any ) => any ,
600620 onConnection : ( ) => any ,
601- pendingResults : Result [ ]
621+ pendingResults : Result [ ] ,
622+ preparationJob ?: Promise < void >
602623) : Result {
603- const observerPromise = connectionHolder
604- . getConnection ( )
605- . then ( connection => {
606- onConnection ( )
607- pendingResults . forEach ( r => r . _cancel ( ) )
608- return Promise . all ( pendingResults . map ( result => result . summary ( ) ) ) . then ( results => {
609- if ( connection != null ) {
610- if ( commit ) {
611- return connection . protocol ( ) . commitTransaction ( {
612- beforeError : onError ,
613- afterComplete : onComplete
614- } )
624+ const requirements = preparationJob ?? Promise . resolve ( )
625+
626+ const observerPromise =
627+ requirements
628+ . then ( ( ) => connectionHolder . getConnection ( ) )
629+ . then ( connection => {
630+ onConnection ( )
631+ pendingResults . forEach ( r => r . _cancel ( ) )
632+ return Promise . all ( pendingResults . map ( result => result . summary ( ) ) ) . then ( results => {
633+ if ( connection != null ) {
634+ if ( commit ) {
635+ return connection . protocol ( ) . commitTransaction ( {
636+ beforeError : onError ,
637+ afterComplete : onComplete
638+ } )
639+ } else {
640+ return connection . protocol ( ) . rollbackTransaction ( {
641+ beforeError : onError ,
642+ afterComplete : onComplete
643+ } )
644+ }
615645 } else {
616- return connection . protocol ( ) . rollbackTransaction ( {
617- beforeError : onError ,
618- afterComplete : onComplete
619- } )
646+ throw newError ( 'No connection available' )
620647 }
621- } else {
622- throw newError ( 'No connection available' )
623- }
648+ } )
624649 } )
625- } )
626- . catch ( error => new FailedObserver ( { error, onError } ) )
650+ . catch ( error => new FailedObserver ( { error, onError } ) )
627651
628652 // for commit & rollback we need result that uses real connection holder and notifies it when
629653 // connection is not needed and can be safely released to the pool
0 commit comments