@@ -27,20 +27,6 @@ import { ResultStreamObserver } from './stream-observers'
2727import buffer from 'vinyl-buffer'
2828import Bolt from './bolt'
2929
30- // Signature bytes for each response message type
31- const SUCCESS = 0x70 // 0111 0000 // SUCCESS <metadata>
32- const RECORD = 0x71 // 0111 0001 // RECORD <value>
33- const IGNORED = 0x7e // 0111 1110 // IGNORED <metadata>
34- const FAILURE = 0x7f // 0111 1111 // FAILURE <metadata>
35-
36- function NO_OP ( ) { }
37-
38- const NO_OP_OBSERVER = {
39- onNext : NO_OP ,
40- onCompleted : NO_OP ,
41- onError : NO_OP
42- }
43-
4430let idGenerator = 0
4531
4632/**
@@ -79,8 +65,7 @@ export function createChannelConnection (
7965 config . disableLosslessIntegers ,
8066 serversideRouting ,
8167 {
82- newChunker : ( ) => chunker ,
83- newDechunker : ( ) => dechunker
68+ newChunker : ( ) => chunker
8469 }
8570 )
8671
@@ -94,16 +79,15 @@ export function createChannelConnection (
9479
9580 connection . _protocol = protocol
9681
97- // reset the error handler to just handle errors and forget about the handshake promise
98- channel . onerror = connection . _handleFatalError . bind ( connection )
99-
100- // Ok, protocol running. Simply forward all messages to the dechunker
101- channel . onmessage = buf => dechunker . write ( buf )
82+ const responseHandler = new Bolt . ResponseHandler ( {
83+ protocol,
84+ channel,
85+ dechunker,
86+ log,
87+ connection
88+ } )
10289
103- // setup dechunker to dechunk messages and forward them to the message handler
104- dechunker . onmessage = buf => {
105- connection . _handleMessage ( protocol . unpacker ( ) . unpack ( buf ) )
106- }
90+ connection . _responseHandler = responseHandler
10791
10892 // forward all pending bytes to the dechunker
10993 consumeRemainingBuffer ( buffer => dechunker . write ( buffer ) )
@@ -133,10 +117,7 @@ export default class ChannelConnection extends Connection {
133117 log ,
134118 disableLosslessIntegers = false ,
135119 serversideRouting = null ,
136- {
137- newChunker = channel => new Chunker ( channel ) ,
138- newDechunker = ( ) => new Dechunker ( )
139- } = { }
120+ { newChunker = channel => new Chunker ( channel ) } = { }
140121 ) {
141122 super ( errorHandler )
142123
@@ -148,7 +129,6 @@ export default class ChannelConnection extends Connection {
148129 this . _pendingObservers = [ ]
149130 this . _currentObserver = undefined
150131 this . _ch = channel
151- this . _dechunker = newDechunker ( )
152132 this . _chunker = newChunker ( channel )
153133 this . _log = log
154134 this . _serversideRouting = serversideRouting
@@ -163,6 +143,8 @@ export default class ChannelConnection extends Connection {
163143 */
164144 this . _protocol = null
165145
146+ this . _responseHandler = null
147+
166148 // error extracted from a FAILURE message
167149 this . _currentFailure = null
168150
@@ -275,7 +257,7 @@ export default class ChannelConnection extends Connection {
275257 * @param {boolean } flush `true` if flush should happen after the message is written to the buffer.
276258 */
277259 write ( message , observer , flush ) {
278- const queued = this . _queueObserver ( observer )
260+ const queued = this . _responseHandler . _queueObserver ( observer )
279261
280262 if ( queued ) {
281263 if ( this . _log . isDebugEnabled ( ) ) {
@@ -312,82 +294,7 @@ export default class ChannelConnection extends Connection {
312294 )
313295 }
314296
315- if ( this . _currentObserver && this . _currentObserver . onError ) {
316- this . _currentObserver . onError ( this . _error )
317- }
318- while ( this . _pendingObservers . length > 0 ) {
319- const observer = this . _pendingObservers . shift ( )
320- if ( observer && observer . onError ) {
321- observer . onError ( this . _error )
322- }
323- }
324- }
325-
326- _handleMessage ( msg ) {
327- if ( this . _isBroken ) {
328- // ignore all incoming messages when this connection is broken. all previously pending observers failed
329- // with the fatal error. all future observers will fail with same fatal error.
330- return
331- }
332-
333- const payload = msg . fields [ 0 ]
334-
335- switch ( msg . signature ) {
336- case RECORD :
337- if ( this . _log . isDebugEnabled ( ) ) {
338- this . _log . debug ( `${ this } S: RECORD ${ JSON . stringify ( msg ) } ` )
339- }
340- this . _currentObserver . onNext ( payload )
341- break
342- case SUCCESS :
343- if ( this . _log . isDebugEnabled ( ) ) {
344- this . _log . debug ( `${ this } S: SUCCESS ${ JSON . stringify ( msg ) } ` )
345- }
346- try {
347- const metadata = this . _protocol . transformMetadata ( payload )
348- this . _currentObserver . onCompleted ( metadata )
349- } finally {
350- this . _updateCurrentObserver ( )
351- }
352- break
353- case FAILURE :
354- if ( this . _log . isDebugEnabled ( ) ) {
355- this . _log . debug ( `${ this } S: FAILURE ${ JSON . stringify ( msg ) } ` )
356- }
357- try {
358- const error = newError ( payload . message , payload . code )
359- this . _currentFailure = this . handleAndTransformError (
360- error ,
361- this . _address
362- )
363- this . _currentObserver . onError ( this . _currentFailure )
364- } finally {
365- this . _updateCurrentObserver ( )
366- // Things are now broken. Pending observers will get FAILURE messages routed until we are done handling this failure.
367- this . _resetOnFailure ( )
368- }
369- break
370- case IGNORED :
371- if ( this . _log . isDebugEnabled ( ) ) {
372- this . _log . debug ( `${ this } S: IGNORED ${ JSON . stringify ( msg ) } ` )
373- }
374- try {
375- if ( this . _currentFailure && this . _currentObserver . onError ) {
376- this . _currentObserver . onError ( this . _currentFailure )
377- } else if ( this . _currentObserver . onError ) {
378- this . _currentObserver . onError (
379- newError ( 'Ignored either because of an error or RESET' )
380- )
381- }
382- } finally {
383- this . _updateCurrentObserver ( )
384- }
385- break
386- default :
387- this . _handleFatalError (
388- newError ( 'Unknown Bolt protocol message: ' + msg )
389- )
390- }
297+ this . _responseHandler . _notifyErrorToObservers ( this . _error )
391298 }
392299
393300 /**
@@ -427,30 +334,15 @@ export default class ChannelConnection extends Connection {
427334 }
428335
429336 _queueObserver ( observer ) {
430- if ( this . _isBroken ) {
431- if ( observer && observer . onError ) {
432- observer . onError ( this . _error )
433- }
434- return false
435- }
436- observer = observer || NO_OP_OBSERVER
437- observer . onCompleted = observer . onCompleted || NO_OP
438- observer . onError = observer . onError || NO_OP
439- observer . onNext = observer . onNext || NO_OP
440- if ( this . _currentObserver === undefined ) {
441- this . _currentObserver = observer
442- } else {
443- this . _pendingObservers . push ( observer )
444- }
445- return true
337+ return this . _responseHandler . _queueObserver ( observer )
446338 }
447339
448340 /*
449341 * Pop next pending observer form the list of observers and make it current observer.
450342 * @protected
451343 */
452344 _updateCurrentObserver ( ) {
453- this . _currentObserver = this . _pendingObservers . shift ( )
345+ this . _responseHandler . _updateCurrentObserver ( )
454346 }
455347
456348 /** Check if this connection is in working condition */
0 commit comments