11'use strict'
22
33const { Writable } = require ( 'node:stream' )
4+ const assert = require ( 'node:assert' )
45const { parserStates, opcodes, states, emptyBuffer, sentCloseFrameState } = require ( './constants' )
56const { kReadyState, kSentClose, kResponse, kReceivedClose } = require ( './symbols' )
67const { channels } = require ( '../../core/diagnostics' )
7- const { isValidStatusCode, failWebsocketConnection, websocketMessageReceived, utf8Decode } = require ( './util' )
8+ const { isValidStatusCode, failWebsocketConnection, websocketMessageReceived, utf8Decode, isControlFrame } = require ( './util' )
89const { WebsocketFrameSend } = require ( './frame' )
910const { CloseEvent } = require ( './events' )
1011
@@ -53,30 +54,39 @@ class ByteParser extends Writable {
5354 }
5455
5556 const buffer = this . consume ( 2 )
57+ const fin = ( buffer [ 0 ] & 0x80 ) !== 0
58+ const opcode = buffer [ 0 ] & 0x0F
59+ const masked = ( buffer [ 1 ] & 0x80 ) === 0x80
5660
57- this . #info. fin = ( buffer [ 0 ] & 0x80 ) !== 0
58- this . #info. opcode = buffer [ 0 ] & 0x0F
59- this . #info. masked = ( buffer [ 1 ] & 0x80 ) === 0x80
60-
61- if ( this . #info. masked ) {
61+ if ( masked ) {
6262 failWebsocketConnection ( this . ws , 'Frame cannot be masked' )
6363 return callback ( )
6464 }
6565
66- // If we receive a fragmented message, we use the type of the first
67- // frame to parse the full message as binary/text, when it's terminated
68- this . #info. originalOpcode ??= this . #info. opcode
69-
70- this . #info. fragmented = ! this . #info. fin && this . #info. opcode !== opcodes . CONTINUATION
66+ const fragmented = ! fin && opcode !== opcodes . CONTINUATION
7167
72- if ( this . #info . fragmented && this . #info . opcode !== opcodes . BINARY && this . #info . opcode !== opcodes . TEXT ) {
68+ if ( fragmented && opcode !== opcodes . BINARY && opcode !== opcodes . TEXT ) {
7369 // Only text and binary frames can be fragmented
7470 failWebsocketConnection ( this . ws , 'Invalid frame type was fragmented.' )
7571 return
7672 }
7773
7874 const payloadLength = buffer [ 1 ] & 0x7F
7975
76+ if ( isControlFrame ( opcode ) ) {
77+ const loop = this . parseControlFrame ( callback , {
78+ opcode,
79+ fragmented,
80+ payloadLength
81+ } )
82+
83+ if ( loop ) {
84+ continue
85+ } else {
86+ return
87+ }
88+ }
89+
8090 if ( payloadLength <= 125 ) {
8191 this . #info. payloadLength = payloadLength
8292 this . #state = parserStates . READ_DATA
@@ -86,114 +96,18 @@ class ByteParser extends Writable {
8696 this . #state = parserStates . PAYLOADLENGTH_64
8797 }
8898
99+ // TODO(@KhafraDev): handle continuation frames separately as their
100+ // semantics are different from TEXT/BINARY frames.
101+ this . #info. originalOpcode ??= opcode
102+ this . #info. opcode = opcode
103+ this . #info. masked = masked
104+ this . #info. fin = fin
105+ this . #info. fragmented = fragmented
106+
89107 if ( this . #info. fragmented && payloadLength > 125 ) {
90108 // A fragmented frame can't be fragmented itself
91109 failWebsocketConnection ( this . ws , 'Fragmented frame exceeded 125 bytes.' )
92110 return
93- } else if (
94- ( this . #info. opcode === opcodes . PING ||
95- this . #info. opcode === opcodes . PONG ||
96- this . #info. opcode === opcodes . CLOSE ) &&
97- payloadLength > 125
98- ) {
99- // Control frames can have a payload length of 125 bytes MAX
100- failWebsocketConnection ( this . ws , 'Payload length for control frame exceeded 125 bytes.' )
101- return
102- } else if ( this . #info. opcode === opcodes . CLOSE ) {
103- if ( payloadLength === 1 ) {
104- failWebsocketConnection ( this . ws , 'Received close frame with a 1-byte body.' )
105- return
106- }
107-
108- const body = this . consume ( payloadLength )
109-
110- this . #info. closeInfo = this . parseCloseBody ( body )
111-
112- if ( this . #info. closeInfo . error ) {
113- const { code, reason } = this . #info. closeInfo
114-
115- callback ( new CloseEvent ( 'close' , { wasClean : false , reason, code } ) )
116- return
117- }
118-
119- if ( this . ws [ kSentClose ] !== sentCloseFrameState . SENT ) {
120- // If an endpoint receives a Close frame and did not previously send a
121- // Close frame, the endpoint MUST send a Close frame in response. (When
122- // sending a Close frame in response, the endpoint typically echos the
123- // status code it received.)
124- let body = emptyBuffer
125- if ( this . #info. closeInfo . code ) {
126- body = Buffer . allocUnsafe ( 2 )
127- body . writeUInt16BE ( this . #info. closeInfo . code , 0 )
128- }
129- const closeFrame = new WebsocketFrameSend ( body )
130-
131- this . ws [ kResponse ] . socket . write (
132- closeFrame . createFrame ( opcodes . CLOSE ) ,
133- ( err ) => {
134- if ( ! err ) {
135- this . ws [ kSentClose ] = sentCloseFrameState . SENT
136- }
137- }
138- )
139- }
140-
141- // Upon either sending or receiving a Close control frame, it is said
142- // that _The WebSocket Closing Handshake is Started_ and that the
143- // WebSocket connection is in the CLOSING state.
144- this . ws [ kReadyState ] = states . CLOSING
145- this . ws [ kReceivedClose ] = true
146-
147- this . end ( )
148-
149- return
150- } else if ( this . #info. opcode === opcodes . PING ) {
151- // Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
152- // response, unless it already received a Close frame.
153- // A Pong frame sent in response to a Ping frame must have identical
154- // "Application data"
155-
156- const body = this . consume ( payloadLength )
157-
158- if ( ! this . ws [ kReceivedClose ] ) {
159- const frame = new WebsocketFrameSend ( body )
160-
161- this . ws [ kResponse ] . socket . write ( frame . createFrame ( opcodes . PONG ) )
162-
163- if ( channels . ping . hasSubscribers ) {
164- channels . ping . publish ( {
165- payload : body
166- } )
167- }
168- }
169-
170- this . #state = parserStates . INFO
171-
172- if ( this . #byteOffset > 0 ) {
173- continue
174- } else {
175- callback ( )
176- return
177- }
178- } else if ( this . #info. opcode === opcodes . PONG ) {
179- // A Pong frame MAY be sent unsolicited. This serves as a
180- // unidirectional heartbeat. A response to an unsolicited Pong frame is
181- // not expected.
182-
183- const body = this . consume ( payloadLength )
184-
185- if ( channels . pong . hasSubscribers ) {
186- channels . pong . publish ( {
187- payload : body
188- } )
189- }
190-
191- if ( this . #byteOffset > 0 ) {
192- continue
193- } else {
194- callback ( )
195- return
196- }
197111 }
198112 } else if ( this . #state === parserStates . PAYLOADLENGTH_16 ) {
199113 if ( this . #byteOffset < 2 ) {
@@ -303,6 +217,8 @@ class ByteParser extends Writable {
303217 }
304218
305219 parseCloseBody ( data ) {
220+ assert ( data . length !== 1 )
221+
306222 // https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.5
307223 /** @type {number|undefined } */
308224 let code
@@ -336,6 +252,111 @@ class ByteParser extends Writable {
336252 return { code, reason, error : false }
337253 }
338254
255+ /**
256+ * Parses control frames.
257+ * @param {Buffer } data
258+ * @param {(err?: Error) => void } callback
259+ * @param {{ opcode: number, fragmented: boolean, payloadLength: number } } info
260+ */
261+ parseControlFrame ( callback , info ) {
262+ assert ( ! info . fragmented )
263+
264+ if ( info . payloadLength > 125 ) {
265+ // Control frames can have a payload length of 125 bytes MAX
266+ callback ( new Error ( 'Payload length for control frame exceeded 125 bytes.' ) )
267+ return false
268+ }
269+
270+ const body = this . consume ( info . payloadLength )
271+
272+ if ( info . opcode === opcodes . CLOSE ) {
273+ if ( info . payloadLength === 1 ) {
274+ failWebsocketConnection ( this . ws , 'Received close frame with a 1-byte body.' )
275+ return
276+ }
277+
278+ this . #info. closeInfo = this . parseCloseBody ( body )
279+
280+ if ( this . #info. closeInfo . error ) {
281+ const { code, reason } = this . #info. closeInfo
282+
283+ callback ( new CloseEvent ( 'close' , { wasClean : false , reason, code } ) )
284+ return
285+ }
286+
287+ if ( this . ws [ kSentClose ] !== sentCloseFrameState . SENT ) {
288+ // If an endpoint receives a Close frame and did not previously send a
289+ // Close frame, the endpoint MUST send a Close frame in response. (When
290+ // sending a Close frame in response, the endpoint typically echos the
291+ // status code it received.)
292+ let body = emptyBuffer
293+ if ( this . #info. closeInfo . code ) {
294+ body = Buffer . allocUnsafe ( 2 )
295+ body . writeUInt16BE ( this . #info. closeInfo . code , 0 )
296+ }
297+ const closeFrame = new WebsocketFrameSend ( body )
298+
299+ this . ws [ kResponse ] . socket . write (
300+ closeFrame . createFrame ( opcodes . CLOSE ) ,
301+ ( err ) => {
302+ if ( ! err ) {
303+ this . ws [ kSentClose ] = sentCloseFrameState . SENT
304+ }
305+ }
306+ )
307+ }
308+
309+ // Upon either sending or receiving a Close control frame, it is said
310+ // that _The WebSocket Closing Handshake is Started_ and that the
311+ // WebSocket connection is in the CLOSING state.
312+ this . ws [ kReadyState ] = states . CLOSING
313+ this . ws [ kReceivedClose ] = true
314+
315+ this . end ( )
316+
317+ return
318+ } else if ( info . opcode === opcodes . PING ) {
319+ // Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
320+ // response, unless it already received a Close frame.
321+ // A Pong frame sent in response to a Ping frame must have identical
322+ // "Application data"
323+
324+ if ( ! this . ws [ kReceivedClose ] ) {
325+ const frame = new WebsocketFrameSend ( body )
326+
327+ this . ws [ kResponse ] . socket . write ( frame . createFrame ( opcodes . PONG ) )
328+
329+ if ( channels . ping . hasSubscribers ) {
330+ channels . ping . publish ( {
331+ payload : body
332+ } )
333+ }
334+ }
335+
336+ if ( this . #byteOffset <= 0 ) {
337+ callback ( )
338+ return false
339+ }
340+ } else if ( info . opcode === opcodes . PONG ) {
341+ // A Pong frame MAY be sent unsolicited. This serves as a
342+ // unidirectional heartbeat. A response to an unsolicited Pong frame is
343+ // not expected.
344+
345+ if ( channels . pong . hasSubscribers ) {
346+ channels . pong . publish ( {
347+ payload : body
348+ } )
349+ }
350+
351+ if ( this . #byteOffset <= 0 ) {
352+ callback ( )
353+ return false
354+ }
355+ }
356+
357+ return true
358+ }
359+
339360 get closingInfo ( ) {
340361 return this . #info. closeInfo
341362 }
0 commit comments