@@ -136,7 +136,7 @@ export class Redis implements Extension {
136136 this . sub . on ( "messageBuffer" , this . handleIncomingMessage ) ;
137137
138138 this . redlock = new Redlock ( [ this . pub ] , {
139- driftFactor : 0.1
139+ retryCount : 0 ,
140140 } ) ;
141141
142142 const identifierBuffer = Buffer . from (
@@ -241,12 +241,23 @@ export class Redis implements Extension {
241241 // to avoid conflict with other instances storing the same document.
242242 const resource = this . lockKey ( documentName )
243243 const ttl = this . configuration . lockTimeout
244- const lock = await this . redlock . acquire ( [ resource ] , ttl )
245- const oldLock = this . locks . get ( resource )
246- if ( oldLock ) {
247- await oldLock . release
248- }
249- this . locks . set ( resource , { lock} )
244+ try {
245+ await this . redlock . acquire ( [ resource ] , ttl )
246+ const oldLock = this . locks . get ( resource )
247+ if ( oldLock ) {
248+ await oldLock . release ;
249+ }
250+ } catch ( error ) {
251+ //based on: https://github.com/sesamecare/redlock/blob/508e00dcd1e4d2bc6373ce455f4fe847e98a9aab/src/index.ts#L347-L349
252+ if ( error == 'ExecutionError: The operation was unable to achieve a quorum during its retry window.' ) {
253+ // Expected behavior: Could not acquire lock, another instance locked it already.
254+ // No further `onStoreDocument` hooks will be executed; should throw a silent error with no message.
255+ throw new Error ( '' , { cause : 'Could not acquire lock, another instance locked it already.' } ) ;
256+ }
257+ //unexpected error
258+ console . error ( "unexpected error:" , error ) ;
259+ throw error
260+ }
250261 }
251262
252263 /**
@@ -255,17 +266,16 @@ export class Redis implements Extension {
255266 async afterStoreDocument ( { documentName, socketId} : afterStoreDocumentPayload ) {
256267 const lockKey = this . lockKey ( documentName )
257268 const lock = this . locks . get ( lockKey )
258- if ( ! lock ) {
259- throw new Error ( `Lock created in onStoreDocument not found in afterStoreDocument: ${ lockKey } ` )
260- }
261- try {
262- // Always try to unlock and clean up the lock
263- lock . release = lock . lock . release ( )
264- await lock . release
265- } catch {
266- // Lock will expire on its own after timeout
267- } finally {
268- this . locks . delete ( lockKey )
269+ if ( lock ) {
270+ try {
271+ // Always try to unlock and clean up the lock
272+ lock . release = lock . lock . release ( )
273+ await lock . release
274+ } catch {
275+ // Lock will expire on its own after timeout
276+ } finally {
277+ this . locks . delete ( lockKey )
278+ }
269279 }
270280 // if the change was initiated by a directConnection, we need to delay this hook to make sure sync can finish first.
271281 // for provider connections, this usually happens in the onDisconnect hook
0 commit comments