1- /* eslint-disable id-length */
21import { Buffer } from 'node:buffer' ;
32import { once } from 'node:events' ;
43import { clearInterval , clearTimeout , setInterval , setTimeout } from 'node:timers' ;
54import { setTimeout as sleep } from 'node:timers/promises' ;
65import { URLSearchParams } from 'node:url' ;
76import { TextDecoder } from 'node:util' ;
8- import { inflate } from 'node:zlib' ;
7+ import type * as nativeZlib from 'node:zlib' ;
98import { Collection } from '@discordjs/collection' ;
109import { lazy , shouldUseGlobalFetchAndWebSocket } from '@discordjs/util' ;
1110import { AsyncQueue } from '@sapphire/async-queue' ;
@@ -21,13 +20,20 @@ import {
2120 type GatewaySendPayload ,
2221} from 'discord-api-types/v10' ;
2322import { WebSocket , type Data } from 'ws' ;
24- import type { Inflate } from 'zlib-sync' ;
25- import type { IContextFetchingStrategy } from '../strategies/context/IContextFetchingStrategy.js' ;
26- import { ImportantGatewayOpcodes , getInitialSendRateLimitState } from '../utils/constants.js' ;
23+ import type * as ZlibSync from 'zlib-sync' ;
24+ import type { IContextFetchingStrategy } from '../strategies/context/IContextFetchingStrategy' ;
25+ import {
26+ CompressionMethod ,
27+ CompressionParameterMap ,
28+ ImportantGatewayOpcodes ,
29+ getInitialSendRateLimitState ,
30+ } from '../utils/constants.js' ;
2731import type { SessionInfo } from './WebSocketManager.js' ;
2832
29- // eslint-disable-next-line promise/prefer-await-to-then
33+ /* eslint-disable promise/prefer-await-to-then */
3034const getZlibSync = lazy ( async ( ) => import ( 'zlib-sync' ) . then ( ( mod ) => mod . default ) . catch ( ( ) => null ) ) ;
35+ const getNativeZlib = lazy ( async ( ) => import ( 'node:zlib' ) . then ( ( mod ) => mod ) . catch ( ( ) => null ) ) ;
36+ /* eslint-enable promise/prefer-await-to-then */
3137
3238export enum WebSocketShardEvents {
3339 Closed = 'closed' ,
@@ -86,9 +92,9 @@ const WebSocketConstructor: typeof WebSocket = shouldUseGlobalFetchAndWebSocket(
8692export class WebSocketShard extends AsyncEventEmitter < WebSocketShardEventsMap > {
8793 private connection : WebSocket | null = null ;
8894
89- private useIdentifyCompress = false ;
95+ private nativeInflate : nativeZlib . Inflate | null = null ;
9096
91- private inflate : Inflate | null = null ;
97+ private zLibSyncInflate : ZlibSync . Inflate | null = null ;
9298
9399 private readonly textDecoder = new TextDecoder ( ) ;
94100
@@ -120,6 +126,18 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
120126
121127 #status: WebSocketShardStatus = WebSocketShardStatus . Idle ;
122128
129+ private identifyCompressionEnabled = false ;
130+
131+ /**
132+ * @privateRemarks
133+ *
134+ * This is needed because `this.strategy.options.compression` is not an actual reflection of the compression method
135+ * used, but rather the compression method that the user wants to use. This is because the libraries could just be missing.
136+ */
137+ private get transportCompressionEnabled ( ) {
138+ return this . strategy . options . compression !== null && ( this . nativeInflate ?? this . zLibSyncInflate ) !== null ;
139+ }
140+
123141 public get status ( ) : WebSocketShardStatus {
124142 return this . #status;
125143 }
@@ -161,21 +179,63 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
161179 throw new Error ( "Tried to connect a shard that wasn't idle" ) ;
162180 }
163181
164- const { version, encoding, compression } = this . strategy . options ;
182+ const { version, encoding, compression, useIdentifyCompression } = this . strategy . options ;
183+ this . identifyCompressionEnabled = useIdentifyCompression ;
184+
185+ // eslint-disable-next-line id-length
165186 const params = new URLSearchParams ( { v : version , encoding } ) ;
166- if ( compression ) {
167- const zlib = await getZlibSync ( ) ;
168- if ( zlib ) {
169- params . append ( 'compress' , compression ) ;
170- this . inflate = new zlib . Inflate ( {
171- chunkSize : 65_535 ,
172- to : 'string' ,
173- } ) ;
174- } else if ( ! this . useIdentifyCompress ) {
175- this . useIdentifyCompress = true ;
176- console . warn (
177- 'WebSocketShard: Compression is enabled but zlib-sync is not installed, falling back to identify compress' ,
178- ) ;
187+ if ( compression !== null ) {
188+ if ( useIdentifyCompression ) {
189+ console . warn ( 'WebSocketShard: transport compression is enabled, disabling identify compression' ) ;
190+ this . identifyCompressionEnabled = false ;
191+ }
192+
193+ params . append ( 'compress' , CompressionParameterMap [ compression ] ) ;
194+
195+ switch ( compression ) {
196+ case CompressionMethod . ZlibNative : {
197+ const zlib = await getNativeZlib ( ) ;
198+ if ( zlib ) {
199+ const inflate = zlib . createInflate ( {
200+ chunkSize : 65_535 ,
201+ flush : zlib . constants . Z_SYNC_FLUSH ,
202+ } ) ;
203+
204+ inflate . on ( 'error' , ( error ) => {
205+ this . emit ( WebSocketShardEvents . Error , { error } ) ;
206+ } ) ;
207+
208+ this . nativeInflate = inflate ;
209+ } else {
210+ console . warn ( 'WebSocketShard: Compression is set to native but node:zlib is not available.' ) ;
211+ params . delete ( 'compress' ) ;
212+ }
213+
214+ break ;
215+ }
216+
217+ case CompressionMethod . ZlibSync : {
218+ const zlib = await getZlibSync ( ) ;
219+ if ( zlib ) {
220+ this . zLibSyncInflate = new zlib . Inflate ( {
221+ chunkSize : 65_535 ,
222+ to : 'string' ,
223+ } ) ;
224+ } else {
225+ console . warn ( 'WebSocketShard: Compression is set to zlib-sync, but it is not installed.' ) ;
226+ params . delete ( 'compress' ) ;
227+ }
228+
229+ break ;
230+ }
231+ }
232+ }
233+
234+ if ( this . identifyCompressionEnabled ) {
235+ const zlib = await getNativeZlib ( ) ;
236+ if ( ! zlib ) {
237+ console . warn ( 'WebSocketShard: Identify compression is enabled, but node:zlib is not available.' ) ;
238+ this . identifyCompressionEnabled = false ;
179239 }
180240 }
181241
@@ -451,28 +511,29 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
451511 `shard id: ${ this . id . toString ( ) } ` ,
452512 `shard count: ${ this . strategy . options . shardCount } ` ,
453513 `intents: ${ this . strategy . options . intents } ` ,
454- `compression: ${ this . inflate ? 'zlib-stream' : this . useIdentifyCompress ? 'identify' : 'none' } ` ,
514+ `compression: ${ this . transportCompressionEnabled ? CompressionParameterMap [ this . strategy . options . compression ! ] : this . identifyCompressionEnabled ? 'identify' : 'none' } ` ,
455515 ] ) ;
456516
457- const d : GatewayIdentifyData = {
517+ const data : GatewayIdentifyData = {
458518 token : this . strategy . options . token ,
459519 properties : this . strategy . options . identifyProperties ,
460520 intents : this . strategy . options . intents ,
461- compress : this . useIdentifyCompress ,
521+ compress : this . identifyCompressionEnabled ,
462522 shard : [ this . id , this . strategy . options . shardCount ] ,
463523 } ;
464524
465525 if ( this . strategy . options . largeThreshold ) {
466- d . large_threshold = this . strategy . options . largeThreshold ;
526+ data . large_threshold = this . strategy . options . largeThreshold ;
467527 }
468528
469529 if ( this . strategy . options . initialPresence ) {
470- d . presence = this . strategy . options . initialPresence ;
530+ data . presence = this . strategy . options . initialPresence ;
471531 }
472532
473533 await this . send ( {
474534 op : GatewayOpcodes . Identify ,
475- d,
535+ // eslint-disable-next-line id-length
536+ d : data ,
476537 } ) ;
477538
478539 await this . waitForEvent ( WebSocketShardEvents . Ready , this . strategy . options . readyTimeout ) ;
@@ -490,6 +551,7 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
490551 this . replayedEvents = 0 ;
491552 return this . send ( {
492553 op : GatewayOpcodes . Resume ,
554+ // eslint-disable-next-line id-length
493555 d : {
494556 token : this . strategy . options . token ,
495557 seq : session . sequence ,
@@ -507,13 +569,22 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
507569
508570 await this . send ( {
509571 op : GatewayOpcodes . Heartbeat ,
572+ // eslint-disable-next-line id-length
510573 d : session ?. sequence ?? null ,
511574 } ) ;
512575
513576 this . lastHeartbeatAt = Date . now ( ) ;
514577 this . isAck = false ;
515578 }
516579
580+ private parseInflateResult ( result : any ) : GatewayReceivePayload | null {
581+ if ( ! result ) {
582+ return null ;
583+ }
584+
585+ return JSON . parse ( typeof result === 'string' ? result : this . textDecoder . decode ( result ) ) as GatewayReceivePayload ;
586+ }
587+
517588 private async unpackMessage ( data : Data , isBinary : boolean ) : Promise < GatewayReceivePayload | null > {
518589 // Deal with no compression
519590 if ( ! isBinary ) {
@@ -528,10 +599,12 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
528599 const decompressable = new Uint8Array ( data as ArrayBuffer ) ;
529600
530601 // Deal with identify compress
531- if ( this . useIdentifyCompress ) {
532- return new Promise ( ( resolve , reject ) => {
602+ if ( this . identifyCompressionEnabled ) {
603+ // eslint-disable-next-line no-async-promise-executor
604+ return new Promise ( async ( resolve , reject ) => {
605+ const zlib = ( await getNativeZlib ( ) ) ! ;
533606 // eslint-disable-next-line promise/prefer-await-to-callbacks
534- inflate ( decompressable , { chunkSize : 65_535 } , ( err , result ) => {
607+ zlib . inflate ( decompressable , { chunkSize : 65_535 } , ( err , result ) => {
535608 if ( err ) {
536609 reject ( err ) ;
537610 return ;
@@ -542,42 +615,50 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
542615 } ) ;
543616 }
544617
545- // Deal with gw wide zlib-stream compression
546- if ( this . inflate ) {
547- const l = decompressable . length ;
618+ // Deal with transport compression
619+ if ( this . transportCompressionEnabled ) {
548620 const flush =
549- l >= 4 &&
550- decompressable [ l - 4 ] === 0x00 &&
551- decompressable [ l - 3 ] === 0x00 &&
552- decompressable [ l - 2 ] === 0xff &&
553- decompressable [ l - 1 ] === 0xff ;
621+ decompressable . length >= 4 &&
622+ decompressable . at ( - 4 ) === 0x00 &&
623+ decompressable . at ( - 3 ) === 0x00 &&
624+ decompressable . at ( - 2 ) === 0xff &&
625+ decompressable . at ( - 1 ) === 0xff ;
554626
555- const zlib = ( await getZlibSync ( ) ) ! ;
556- this . inflate . push ( Buffer . from ( decompressable ) , flush ? zlib . Z_SYNC_FLUSH : zlib . Z_NO_FLUSH ) ;
627+ if ( this . nativeInflate ) {
628+ this . nativeInflate . write ( decompressable , 'binary' ) ;
557629
558- if ( this . inflate . err ) {
559- this . emit ( WebSocketShardEvents . Error , {
560- error : new Error ( `${ this . inflate . err } ${ this . inflate . msg ? `: ${ this . inflate . msg } ` : '' } ` ) ,
561- } ) ;
562- }
630+ if ( ! flush ) {
631+ return null ;
632+ }
563633
564- if ( ! flush ) {
565- return null ;
566- }
634+ const [ result ] = await once ( this . nativeInflate , 'data' ) ;
635+ return this . parseInflateResult ( result ) ;
636+ } else if ( this . zLibSyncInflate ) {
637+ const zLibSync = ( await getZlibSync ( ) ) ! ;
638+ this . zLibSyncInflate . push ( Buffer . from ( decompressable ) , flush ? zLibSync . Z_SYNC_FLUSH : zLibSync . Z_NO_FLUSH ) ;
639+
640+ if ( this . zLibSyncInflate . err ) {
641+ this . emit ( WebSocketShardEvents . Error , {
642+ error : new Error (
643+ `${ this . zLibSyncInflate . err } ${ this . zLibSyncInflate . msg ? `: ${ this . zLibSyncInflate . msg } ` : '' } ` ,
644+ ) ,
645+ } ) ;
646+ }
567647
568- const { result } = this . inflate ;
569- if ( ! result ) {
570- return null ;
571- }
648+ if ( ! flush ) {
649+ return null ;
650+ }
572651
573- return JSON . parse ( typeof result === 'string' ? result : this . textDecoder . decode ( result ) ) as GatewayReceivePayload ;
652+ const { result } = this . zLibSyncInflate ;
653+ return this . parseInflateResult ( result ) ;
654+ }
574655 }
575656
576657 this . debug ( [
577658 'Received a message we were unable to decompress' ,
578659 `isBinary: ${ isBinary . toString ( ) } ` ,
579- `useIdentifyCompress : ${ this . useIdentifyCompress . toString ( ) } ` ,
580- `inflate: ${ Boolean ( this . inflate ) . toString ( ) } ` ,
660+ `identifyCompressionEnabled : ${ this . identifyCompressionEnabled . toString ( ) } ` ,
661+ `inflate: ${ this . transportCompressionEnabled ? CompressionMethod [ this . strategy . options . compression ! ] : 'none' } ` ,
581662 ] ) ;
582663
583664 return null ;
@@ -838,7 +919,7 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
838919 messages . length > 1
839920 ? `\n${ messages
840921 . slice ( 1 )
841- . map ( ( m ) => ` ${ m } ` )
922+ . map ( ( message ) => ` ${ message } ` )
842923 . join ( '\n' ) } `
843924 : ''
844925 } `;
0 commit comments