Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
'use strict';

const EventEmitter = require('events');
const WebSocket = require('ws');
//const WebSocket = require('ws');
//var Buffer = require('buffer');
const util = require('util');
const utils = require('../utils');
const serializer = require('../structure/io/graph-serializer');
Expand Down Expand Up @@ -117,62 +118,64 @@ class Connection extends EventEmitter {

this.emit('log', `ws open`);

this._ws = new WebSocket(this.url, {
headers: this.options.headers,
ca: this.options.ca,
cert: this.options.cert,
pfx: this.options.pfx,
rejectUnauthorized: this.options.rejectUnauthorized
});
this._ws = new WebSocket(this.url);

this._ws.on('message', (data) => this._handleMessage(data));
this._ws.on('error', (err) => this._handleError(err));
this._ws.on('close', (code, message) => this._handleClose(code, message));
this._ws.onmessage = (data => this._handleMessage(data));
this._ws.onerror = (err => this._handleError(err));
this._ws.onclose = (code, message) => this._handleClose(code, message);

this._ws.on('pong', () => {
this._ws.pong = (() => {
this.emit('log', 'ws pong received');
if (this._pongTimeout) {
clearTimeout(this._pongTimeout);
this._pongTimeout = null;
}
});
this._ws.on('ping', () => {
if (this._pongTimeout) {
clearTimeout(this._pongTimeout);
this._pongTimeout = null;
}
});
this._ws.ping = (() => {
this.emit('log', 'ws ping received');
this._ws.pong();
});
this._ws.pong();
});

return this._openPromise = new Promise((resolve, reject) => {
this._ws.on('open', () => {
this.isOpen = true;
if (this._pingEnabled) {
this._pingHeartbeat();
}
resolve();
});
});
this._ws.onopen = (() => {
this.isOpen = true;
if (this._pingEnabled) {
this._pingHeartbeat();
}
resolve();
});
});
}

/** @override */
submit(bytecode, op, args, requestId, processor) {
return this.open().then(() => new Promise((resolve, reject) => {
if (requestId === null || requestId === undefined) {
requestId = utils.getUuid();
this._responseHandlers[requestId] = {
callback: (err, result) => err ? reject(err) : resolve(result),
requestId = utils.getUuid();
this._responseHandlers[requestId] = {
callback: (err, result) => err ? reject(err) : resolve(result),
result: null
};
}
};
}

const message = Buffer.from(this._header + JSON.stringify(this._getRequest(requestId, bytecode, op, args, processor)));
this._ws.send(message);
}));
//const message = Buffer.from(this._header + JSON.stringify(this._getRequest(requestId, bytecode, op, args, processor)));
const message = this._header + JSON.stringify(this._getRequest(requestId, bytecode, op, args, processor));
var buf = new ArrayBuffer(message.length); // 2 bytes for each char
var bufView = new Uint8Array(buf);
for (var i=0, strLen=message.length; i < strLen; i++) {
bufView[i] = message.charCodeAt(i);
}
this._ws.binaryType = 'arraybuffer';
this._ws.send(bufView);
}));
}

_getRequest(id, bytecode, op, args, processor) {
if (args) {
args = this._adaptArgs(args, true);
}


return ({
'requestId': { '@type': 'g:UUID', '@value': id },
'op': op || 'bytecode',
Expand All @@ -194,20 +197,20 @@ class Connection extends EventEmitter {

this._pingInterval = setInterval(() => {
if (this.isOpen === false) {
// in case of if not open..
if (this._pingInterval) {
clearInterval(this._pingInterval);
this._pingInterval = null;
}
// in case of if not open..
if (this._pingInterval) {
clearInterval(this._pingInterval);
this._pingInterval = null;
}
}

this._pongTimeout = setTimeout(() => {
this._ws.terminate();
}, this._pongTimeoutDelay);
this._pongTimeout = setTimeout(() => {
this._ws.terminate();
}, this._pongTimeoutDelay);

this._ws.ping();
this._ws.ping();

}, this._pingIntervalDelay);
}, this._pingIntervalDelay);
}

_handleError(err) {
Expand All @@ -225,22 +228,28 @@ class Connection extends EventEmitter {
this.emit('close', code, message);
}

_handleMessage(data) {
const response = this._reader.read(JSON.parse(data.toString()));
_handleMessage(msg) {
if(msg.data instanceof ArrayBuffer ) {
//if in browser javascript, the data are sent as Uint8
var data = String.fromCharCode.apply(null, new Uint8Array(msg.data));
}else{
data = msg;
}
const response = this._reader.read(JSON.parse(data));
if (response.requestId === null || response.requestId === undefined) {
// There was a serialization issue on the server that prevented the parsing of the request id
// We invoke any of the pending handlers with an error
Object.keys(this._responseHandlers).forEach(requestId => {
const handler = this._responseHandlers[requestId];
this._clearHandler(requestId);
if (response.status !== undefined && response.status.message) {
return handler.callback(
this._clearHandler(requestId);
if (response.status !== undefined && response.status.message) {
return handler.callback(
new Error(util.format(
'Server error (no request information): %s (%d)', response.status.message, response.status.code)));
} else {
return handler.callback(new Error(util.format('Server error (no request information): %j', response)));
}
});
'Server error (no request information): %s (%d)', response.status.message, response.status.code)));
} else {
return handler.callback(new Error(util.format('Server error (no request information): %j', response)));
}
});
return;
}

Expand All @@ -255,14 +264,14 @@ class Connection extends EventEmitter {
if (response.status.code === responseStatusCode.authenticationChallenge && this._authenticator) {
this._authenticator.evaluateChallenge(response.result.data).then(res => {
return this.submit(null, 'authentication', res, response.requestId);
}).catch(handler.callback);
}).catch(handler.callback);

return;
}
else if (response.status.code >= 400) {
// callback in error
return handler.callback(
new Error(util.format('Server error: %s (%d)', response.status.message, response.status.code)));
new Error(util.format('Server error: %s (%d)', response.status.message, response.status.code)));
}
switch (response.status.code) {
case responseStatusCode.noContent:
Expand Down Expand Up @@ -327,10 +336,10 @@ class Connection extends EventEmitter {
// in another map for types like EnumValue. Could be a nicer way to do this but for now it's solving the
// problem with script submission of non JSON native types
if (protocolLevel && key === 'bindings')
newObj[key] = this._adaptArgs(args[key], false);
else
newObj[key] = this._writer.adaptObject(args[key]);
});
newObj[key] = this._adaptArgs(args[key], false);
else
newObj[key] = this._writer.adaptObject(args[key]);
});

return newObj;
}
Expand All @@ -349,8 +358,8 @@ class Connection extends EventEmitter {
if (!this._closePromise) {
this._closePromise = new Promise(resolve => {
this._closeCallback = resolve;
this._ws.close();
});
this._ws.close();
});
}
return this._closePromise;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
*/
'use strict';

const crypto = require('crypto');
const uuidv4 = require('uuid/v4');
//const crypto = require('crypto');

exports.toLong = function toLong(value) {
return new Long(value);
Expand All @@ -37,7 +38,7 @@ const Long = exports.Long = function Long(value) {
};

exports.getUuid = function getUuid() {
const buffer = crypto.randomBytes(16);
/*const buffer = Crypto.randomBytes(16);
//clear the version
buffer[6] &= 0x0f;
//set the version 4
Expand All @@ -48,11 +49,12 @@ exports.getUuid = function getUuid() {
buffer[8] |= 0x80;
const hex = buffer.toString('hex');
return (
hex.substr(0, 8) + '-' +
hex.substr(8, 4) + '-' +
hex.substr(12, 4) + '-' +
hex.substr(16, 4) + '-' +
hex.substr(20, 12));
hex.substr(0, 8) + '-' +
hex.substr(8, 4) + '-' +
hex.substr(12, 4) + '-' +
hex.substr(16, 4) + '-' +
hex.substr(20, 12));*/
return uuidv4();
};

exports.emptyArray = Object.freeze([]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@
],
"license": "Apache-2.0",
"dependencies": {
"ws": "^3.0.0"
"ws": "^6.0.0",
"util": "^0.11.1",
"events": "^3.0.0",
"uuid": "^3.3.2"
},
"devDependencies": {
"mocha": "~4.0.1",
"cucumber": "~3.1.0",
"chai": "~4.1.2",
"grunt": "~1.0.2",
"grunt-cli": "~1.2.0",
"grunt-jsdoc": "~2.3.0"
"mocha": "~5.2.0",
"cucumber": "~5.1.0",
"chai": "~4.2.0",
"grunt": "~1.0.3",
"grunt-cli": "~1.3.2",
"grunt-jsdoc": "~2.3.0",
"uuid": "^3.3.2"
},
"repository": {
"type": "git",
Expand Down