@@ -71,37 +71,46 @@ export function createChannelConnection (
7171 const channel = new Channel ( channelConfig )
7272
7373 return Bolt . handshake ( channel )
74- . then ( ( { protocolVersion, consumeRemainingBuffer } ) => {
74+ . then ( ( { protocolVersion : version , consumeRemainingBuffer } ) => {
75+ const chunker = new Chunker ( channel )
76+ const dechunker = new Dechunker ( )
77+
7578 const connection = new ChannelConnection (
7679 channel ,
7780 errorHandler ,
7881 address ,
7982 log ,
8083 config . disableLosslessIntegers ,
81- serversideRouting
84+ serversideRouting ,
85+ {
86+ newChunker : ( ) => chunker ,
87+ newDechunker : ( ) => dechunker
88+ }
8289 )
8390
84- connection . _protocol = Bolt . create ( {
85- version : protocolVersion ,
91+ const protocol = Bolt . create ( {
92+ version,
8693 connection,
87- chunker : connection . _chunker ,
94+ chunker,
8895 disableLosslessIntegers : config . disableLosslessIntegers ,
8996 serversideRouting
9097 } )
9198
99+ connection . _protocol = protocol
100+
92101 // reset the error handler to just handle errors and forget about the handshake promise
93102 channel . onerror = connection . _handleFatalError . bind ( connection )
94103
95104 // Ok, protocol running. Simply forward all messages to the dechunker
96- channel . onmessage = buf => connection . _dechunker . write ( buf )
105+ channel . onmessage = buf => dechunker . write ( buf )
97106
98107 // setup dechunker to dechunk messages and forward them to the message handler
99- connection . _dechunker . onmessage = buf => {
100- connection . _handleMessage ( connection . _protocol . unpacker ( ) . unpack ( buf ) )
108+ dechunker . onmessage = buf => {
109+ connection . _handleMessage ( protocol . unpacker ( ) . unpack ( buf ) )
101110 }
102111
103112 // forward all pending bytes to the dechunker
104- consumeRemainingBuffer ( buffer => connection . _dechunker . write ( buffer ) )
113+ consumeRemainingBuffer ( buffer => dechunker . write ( buffer ) )
105114
106115 return connection
107116 } )
@@ -127,7 +136,11 @@ export default class ChannelConnection extends Connection {
127136 address ,
128137 log ,
129138 disableLosslessIntegers = false ,
130- serversideRouting = null
139+ serversideRouting = null ,
140+ {
141+ newChunker = channel => new Chunker ( channel ) ,
142+ newDechunker = ( ) => new Dechunker ( )
143+ } = { }
131144 ) {
132145 super ( errorHandler )
133146
@@ -139,8 +152,8 @@ export default class ChannelConnection extends Connection {
139152 this . _pendingObservers = [ ]
140153 this . _currentObserver = undefined
141154 this . _ch = channel
142- this . _dechunker = new Dechunker ( )
143- this . _chunker = new Chunker ( channel )
155+ this . _dechunker = newDechunker ( )
156+ this . _chunker = newChunker ( channel )
144157 this . _log = log
145158 this . _serversideRouting = serversideRouting
146159
0 commit comments