Skip to content

Commit 8c47b83

Browse files
authored
Support more gRPC streaming events. (#68)
- 'metadata' and 'status' events are meaningful and should be forwarded. - gRPC streaming results have 'cancel' method, this method call needs to be forwarded as well. Fixes #67
1 parent 1c6b453 commit 8c47b83

3 files changed

Lines changed: 114 additions & 2 deletions

File tree

packages/packages/google-gax/lib/streaming.js

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ function StreamProxy(type, callback) {
7070
this._callback = callback;
7171
this._writeQueue = [];
7272
this._isEndCalled = false;
73+
this._isCancelCalled = false;
7374
this.on('finish', this._onFinish.bind(this));
7475
}
7576

@@ -102,6 +103,14 @@ StreamProxy.prototype._onFinish = function() {
102103
}
103104
};
104105

106+
StreamProxy.prototype.cancel = function() {
107+
if (this.stream) {
108+
this.stream.cancel();
109+
} else {
110+
this._isCancelCalled = true;
111+
}
112+
};
113+
105114
/**
106115
* Specifies the target stream.
107116
* @param {ApiCall} apiCall - the API function to be called.
@@ -110,11 +119,30 @@ StreamProxy.prototype._onFinish = function() {
110119
StreamProxy.prototype.setStream = function(apiCall, argument) {
111120
var stream = apiCall(argument, this._callback);
112121
this.stream = stream;
113-
stream.on('error', this.emit.bind(this, 'error'));
122+
(['error', 'metadata', 'status']).forEach(function(event) {
123+
stream.on(event, this.emit.bind(this, event));
124+
}.bind(this));
125+
// We also want to supply the status data as 'response' event to support
126+
// the behavior of google-cloud-node expects.
127+
// see: https://github.com/GoogleCloudPlatform/google-cloud-node/pull/1775#issuecomment-259141029
128+
// https://github.com/GoogleCloudPlatform/google-cloud-node/blob/116436fa789d8b0f7fc5100b19b424e3ec63e6bf/packages/common/src/grpc-service.js#L355
129+
stream.on('metadata', function(metadata) {
130+
// Create a response object with succeeds.
131+
// TODO: unify this logic with the decoration of gRPC response when it's added.
132+
// see: https://github.com/googleapis/gax-nodejs/issues/65
133+
this.emit('response', {
134+
code: 200,
135+
details: '',
136+
message: 'OK',
137+
metadata: metadata
138+
});
139+
}.bind(this));
114140
if (this.type !== StreamType.CLIENT_STREAMING) {
115141
stream.on('data', this.emit.bind(this, 'data'));
116142
// Pushing null causes an ending process of the readable stream.
117143
stream.on('end', this.push.bind(this, null));
144+
// This is required in case no 'data' handler exists.
145+
this.resume();
118146
}
119147
if (this.type !== StreamType.SERVER_STREAMING) {
120148
this._writeQueue.forEach(function(data) {
@@ -124,6 +152,9 @@ StreamProxy.prototype.setStream = function(apiCall, argument) {
124152
if (this._isEndCalled) {
125153
stream.end();
126154
}
155+
if (this._isCancelCalled) {
156+
stream.cancel();
157+
}
127158
}
128159
};
129160

packages/packages/google-gax/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "google-gax",
3-
"version": "0.9.0",
3+
"version": "0.9.1",
44
"description": "Google API Extensions",
55
"main": "index.js",
66
"files": [

packages/packages/google-gax/test/api_callable.js

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -669,4 +669,85 @@ describe('streaming', function() {
669669
s.write(arg);
670670
s.end();
671671
});
672+
673+
it('forwards metadata and status', function(done) {
674+
var responseMetadata = {metadata: true};
675+
var status = {code: 0, metadata: responseMetadata};
676+
var expectedResponse = {
677+
code: 200,
678+
message: 'OK',
679+
details: '',
680+
metadata: responseMetadata
681+
};
682+
function func(metadata, options) {
683+
var s = through2.obj();
684+
setTimeout(s.emit.bind(s, 'metadata', responseMetadata), 10);
685+
s.on('end', s.emit.bind(s, 'status', status));
686+
return s;
687+
}
688+
var apiCall = createStreamingCall(
689+
func, streaming.StreamType.BIDI_STREAMING);
690+
var s = apiCall(null, null);
691+
var receivedMetadata;
692+
var receivedStatus;
693+
var receivedResponse;
694+
s.on('metadata', function(data) {
695+
receivedMetadata = data;
696+
});
697+
s.on('status', function(data) {
698+
receivedStatus = data;
699+
});
700+
s.on('response', function(data) {
701+
receivedResponse = data;
702+
});
703+
s.on('end', function() {
704+
expect(receivedMetadata).to.deep.eq(responseMetadata);
705+
expect(receivedStatus).to.deep.eq(status);
706+
expect(receivedResponse).to.deep.eq(expectedResponse);
707+
done();
708+
});
709+
expect(s.readable).to.be.true;
710+
expect(s.writable).to.be.true;
711+
setTimeout(s.end.bind(s), 50);
712+
});
713+
714+
it('cancels in the middle', function(done) {
715+
function schedulePush(s, c) {
716+
if (!s.readable) {
717+
return;
718+
}
719+
setTimeout(function() {
720+
s.push(c);
721+
schedulePush(s, c + 1);
722+
}, 10);
723+
}
724+
var cancelError = new Error('cancelled');
725+
function func(metadata, options) {
726+
var s = through2.obj();
727+
schedulePush(s, 0);
728+
s.cancel = function() {
729+
s.end();
730+
s.emit('error', cancelError);
731+
};
732+
return s;
733+
}
734+
var apiCall = createStreamingCall(
735+
func, streaming.StreamType.SERVER_STREAMING);
736+
var s = apiCall(null, null);
737+
var counter = 0;
738+
var expectedCount = 5;
739+
s.on('data', function(data) {
740+
expect(data).to.eq(counter);
741+
counter++;
742+
if (counter === expectedCount) {
743+
s.cancel();
744+
} else if (counter > expectedCount) {
745+
done(new Error('should not reach'));
746+
}
747+
});
748+
s.on('error', function(err) {
749+
expect(err).to.eq(cancelError);
750+
done();
751+
});
752+
});
672753
});

0 commit comments

Comments
 (0)