@@ -28,13 +28,6 @@ export interface SessionConnectedProps<ConnType extends Connection>
2828 listeners : SessionConnectedListeners ;
2929}
3030
31- interface TrackedMsg {
32- id : string ;
33- seq : number ;
34- streamId : string ;
35- stack ?: string ;
36- }
37-
3831/*
3932 * A session that is connected and can send and receive messages.
4033 * See transitions.ts for valid transitions.
@@ -47,29 +40,18 @@ export class SessionConnected<
4740 listeners : SessionConnectedListeners ;
4841
4942 private heartbeatHandle ?: ReturnType < typeof setInterval > | undefined ;
50- private heartbeatMisses = 0 ;
51- isActivelyHeartbeating : boolean ;
52-
53- private lastConstructedMsgs : Array < TrackedMsg > = [ ] ;
54- private pushLastConstructedMsgs = ( msg : OpaqueTransportMessage ) => {
55- const trackedMsg = {
56- id : msg . id ,
57- seq : msg . seq ,
58- streamId : msg . streamId ,
59- stack : new Error ( ) . stack ,
60- } ;
61-
62- this . lastConstructedMsgs . push ( trackedMsg ) ;
63-
64- if ( this . lastConstructedMsgs . length > 10 ) {
65- this . lastConstructedMsgs . shift ( ) ;
66- }
67- } ;
43+ private heartbeatMissTimeout ?: ReturnType < typeof setTimeout > | undefined ;
44+ private isActivelyHeartbeating = false ;
6845
6946 updateBookkeeping ( ack : number , seq : number ) {
7047 this . sendBuffer = this . sendBuffer . filter ( ( unacked ) => unacked . seq >= ack ) ;
7148 this . ack = seq + 1 ;
72- this . heartbeatMisses = 0 ;
49+
50+ if ( this . heartbeatMissTimeout ) {
51+ clearTimeout ( this . heartbeatMissTimeout ) ;
52+ }
53+
54+ this . startMissingHeartbeatTimeout ( ) ;
7355 }
7456
7557 private assertSendOrdering ( constructedMsg : TransportMessage ) {
@@ -79,9 +61,6 @@ export class SessionConnected<
7961 ...this . loggingMetadata ,
8062 transportMessage : constructedMsg ,
8163 tags : [ 'invariant-violation' ] ,
82- extras : {
83- lastConstructedMsgs : this . lastConstructedMsgs ,
84- } ,
8564 } ) ;
8665
8766 throw new Error ( msg ) ;
@@ -90,7 +69,6 @@ export class SessionConnected<
9069
9170 send ( msg : PartialTransportMessage ) : string {
9271 const constructedMsg = this . constructMsg ( msg ) ;
93- this . pushLastConstructedMsgs ( constructedMsg ) ;
9472 this . assertSendOrdering ( constructedMsg ) ;
9573 this . sendBuffer . push ( constructedMsg ) ;
9674 this . conn . send ( this . options . codec . toBuffer ( constructedMsg ) ) ;
@@ -109,6 +87,8 @@ export class SessionConnected<
10987 this . conn . addErrorListener ( this . listeners . onConnectionErrored ) ;
11088
11189 // send any buffered messages
90+ // dont explicity clear the buffer, we'll just filter out old messages
91+ // when we receive an ack
11292 if ( this . sendBuffer . length > 0 ) {
11393 this . log ?. info (
11494 `sending ${
@@ -124,41 +104,7 @@ export class SessionConnected<
124104 }
125105 }
126106
127- // dont explicity clear the buffer, we'll just filter out old messages
128- // when we receive an ack
129-
130- // setup heartbeat
131- this . isActivelyHeartbeating = false ;
132- this . heartbeatHandle = setInterval ( ( ) => {
133- const misses = this . heartbeatMisses ;
134- const missDuration = misses * this . options . heartbeatIntervalMs ;
135- if ( misses >= this . options . heartbeatsUntilDead ) {
136- this . log ?. info (
137- `closing connection to ${ this . to } due to inactivity (missed ${ misses } heartbeats which is ${ missDuration } ms)` ,
138- this . loggingMetadata ,
139- ) ;
140- this . telemetry . span . addEvent ( 'closing connection due to inactivity' ) ;
141-
142- // it is OK to close this even on the client when we can't trust the client timer
143- // due to browser throttling or hibernation
144- // at worst, this interval will fire later than what the server expects and the server
145- // will have already closed the connection
146- // this just helps us in cases where we have a proxying setup where the server has closed
147- // the connection but the proxy hasn't synchronized the server-side close to the client so
148- // the client isn't stuck with a pseudo-dead connection forever
149- this . conn . close ( ) ;
150- clearInterval ( this . heartbeatHandle ) ;
151- this . heartbeatHandle = undefined ;
152-
153- return ;
154- }
155-
156- if ( this . isActivelyHeartbeating ) {
157- this . sendHeartbeat ( ) ;
158- }
159-
160- this . heartbeatMisses ++ ;
161- } , this . options . heartbeatIntervalMs ) ;
107+ this . startMissingHeartbeatTimeout ( ) ;
162108 }
163109
164110 get loggingMetadata ( ) {
@@ -168,8 +114,27 @@ export class SessionConnected<
168114 } ;
169115 }
170116
117+ startMissingHeartbeatTimeout ( ) {
118+ const maxMisses = this . options . heartbeatsUntilDead ;
119+ const missDuration = maxMisses * this . options . heartbeatIntervalMs ;
120+ this . heartbeatMissTimeout = setTimeout ( ( ) => {
121+ this . log ?. info (
122+ `closing connection to ${ this . to } due to inactivity (missed ${ maxMisses } heartbeats which is ${ missDuration } ms)` ,
123+ this . loggingMetadata ,
124+ ) ;
125+ this . telemetry . span . addEvent (
126+ 'closing connection due to missing heartbeat' ,
127+ ) ;
128+
129+ this . conn . close ( ) ;
130+ } , missDuration ) ;
131+ }
132+
171133 startActiveHeartbeat ( ) {
172134 this . isActivelyHeartbeating = true ;
135+ this . heartbeatHandle = setInterval ( ( ) => {
136+ this . sendHeartbeat ( ) ;
137+ } , this . options . heartbeatIntervalMs ) ;
173138 }
174139
175140 private sendHeartbeat ( ) {
@@ -246,13 +211,7 @@ export class SessionConnected<
246211 // if we are not actively heartbeating, we are in passive
247212 // heartbeat mode and should send a response to the ack
248213 if ( ! this . isActivelyHeartbeating ) {
249- // purposefully make this async to avoid weird browser behavior
250- // where _some_ browsers will decide that it is ok to interrupt fully
251- // synchronous code execution (e.g. an existing .send) to receive a
252- // websocket message and hit this codepath
253- void Promise . resolve ( ) . then ( ( ) => {
254- this . sendHeartbeat ( ) ;
255- } ) ;
214+ this . sendHeartbeat ( ) ;
256215 }
257216 } ;
258217
@@ -266,6 +225,11 @@ export class SessionConnected<
266225 clearInterval ( this . heartbeatHandle ) ;
267226 this . heartbeatHandle = undefined ;
268227 }
228+
229+ if ( this . heartbeatMissTimeout ) {
230+ clearTimeout ( this . heartbeatMissTimeout ) ;
231+ this . heartbeatMissTimeout = undefined ;
232+ }
269233 }
270234
271235 _handleClose ( ) : void {
0 commit comments