1717 * limitations under the License.
1818 */
1919import { newError , PROTOCOL_ERROR } from '../../error'
20+ import BoltProtocol from '../bolt-protocol-v1'
2021
2122// Signature bytes for each response message type
2223const SUCCESS = 0x70 // 0111 0000 // SUCCESS <metadata>
@@ -32,16 +33,48 @@ const NO_OP_OBSERVER = {
3233 onError : NO_OP
3334}
3435
36+ /**
37+ * Treat the protocol responses and notify the observers
38+ */
3539export default class ResponseHandler {
36- constructor ( { protocol, channel, dechunker, log, connection } = { } ) {
40+ /**
41+ * Called when something went wrong with the connectio
42+ * @callback ResponseHandler~Observer~OnError
43+ * @param {any } error The error
44+ */
45+ /**
46+ * @typedef {Object } ResponseHandler~Observer
47+ * @property {ResponseHandler~Observer~OnError } onError Invoke when a connection error occurs
48+ * @property {ResponseHandler~Observer~OnError } onFailure Invoke when a protocol failure occurs
49+ */
50+ /**
51+ * Constructor
52+ * @param {Object } param The params
53+ * @param {BoltProtocol } protocol Bolt protocol used to parse message (to be removed)
54+ * @param {Channel } channel The channel used to exchange messages
55+ * @param {Logger } log The logger
56+ * @param {ResponseHandler~Observer } observer Object which will be notified about errors
57+ */
58+ constructor ( {
59+ protocol,
60+ channel,
61+ dechunker,
62+ log,
63+ observer,
64+ connection
65+ } = { } ) {
3766 this . _pendingObservers = [ ]
3867 this . _isBroken = false
3968 this . _log = log
4069 this . _protocol = protocol // should gone from here
4170 this . _connection = connection // should gone from here
71+ this . _observer = Object . assign (
72+ { onError : NO_OP , onFailure : NO_OP } ,
73+ observer
74+ )
4275
4376 // reset the error handler to just handle errors and forget about the handshake promise
44- channel . onerror = connection . _handleFatalError . bind ( connection )
77+ channel . onerror = this . _observer . onError . bind ( this . _observer )
4578
4679 // Ok, protocol running. Simply forward all messages to the dechunker
4780 channel . onmessage = buf => dechunker . write ( buf )
@@ -53,11 +86,11 @@ export default class ResponseHandler {
5386 }
5487
5588 _handleMessage ( msg ) {
56- if ( this . _connection . _isBroken ) {
57- // ignore all incoming messages when this connection is broken. all previously pending observers failed
58- // with the fatal error. all future observers will fail with same fatal error.
59- return
60- }
89+ // if (this._connection._isBroken) {
90+ // ignore all incoming messages when this connection is broken. all previously pending observers failed
91+ // with the fatal error. all future observers will fail with same fatal error.
92+ // return
93+ // }
6194
6295 const payload = msg . fields [ 0 ]
6396
@@ -93,7 +126,7 @@ export default class ResponseHandler {
93126 } finally {
94127 this . _updateCurrentObserver ( )
95128 // Things are now broken. Pending observers will get FAILURE messages routed until we are done handling this failure.
96- this . _connection . _resetOnFailure ( )
129+ this . _observer . onFailure ( this . _currentFailure )
97130 }
98131 break
99132 case IGNORED :
@@ -113,7 +146,7 @@ export default class ResponseHandler {
113146 }
114147 break
115148 default :
116- this . _connection . _handleFatalError (
149+ this . _observer . onError (
117150 newError ( 'Unknown Bolt protocol message: ' + msg )
118151 )
119152 }
@@ -128,12 +161,6 @@ export default class ResponseHandler {
128161 }
129162
130163 _queueObserver ( observer ) {
131- if ( this . _connection . _isBroken ) {
132- if ( observer && observer . onError ) {
133- observer . onError ( this . _error )
134- }
135- return false
136- }
137164 observer = observer || NO_OP_OBSERVER
138165 observer . onCompleted = observer . onCompleted || NO_OP
139166 observer . onError = observer . onError || NO_OP
0 commit comments