Skip to content

Commit d0f96f5

Browse files
authored
Fixes for paged iterations. (#126)
- Fix #123: introduces maxResults parameter - Fix #124: fixed the behaviors of the streaming it seems better to be back of using through2.
1 parent 90b4a09 commit d0f96f5

4 files changed

Lines changed: 136 additions & 51 deletions

File tree

packages/packages/google-gax/lib/gax.js

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,15 @@
5151
* @property {RetryOptions=} retry - determines whether and how to retry
5252
* on transient errors. When set to null, the call will not retry.
5353
* @property {boolean=} autoPaginate - If set to false and the call is
54-
* configured for page streaming, page streaming is not performed, instead
54+
* configured for paged iteration, page unrolling is not performed, instead
5555
* the callback will be called with the response object.
5656
* @property {Object=} pageToken - If set and the call is configured for
57-
* page streaming, page streaming is not performed and requested with this
57+
* paged iteration, paged iteration is not performed and requested with this
5858
* pageToken.
59+
* @property {number} maxResults - If set and the call is configured for
60+
* paged iteration, the call will stop when the number of response elements
61+
* reaches to the specified size. By default, it will unroll the page to
62+
* the end of the list.
5963
* @property {boolean=} isBundling - If set to false and the call is configured
6064
* for bundling, bundling is not performed.
6165
* @property {BackoffSettings=} longrunning - BackoffSettings used for polling.
@@ -164,6 +168,7 @@ function CallSettings(settings) {
164168
this.autoPaginate =
165169
('autoPaginate' in settings) ? settings.autoPaginate : true;
166170
this.pageToken = settings.pageToken;
171+
this.maxResults = settings.maxResults;
167172
this.otherArgs = settings.otherArgs || {};
168173
this.bundleOptions = settings.bundleOptions;
169174
this.isBundling = ('isBundling' in settings) ? settings.isBundling : true;
@@ -189,6 +194,7 @@ CallSettings.prototype.merge = function merge(options) {
189194
var retry = this.retry;
190195
var autoPaginate = this.autoPaginate;
191196
var pageToken = this.pageToken;
197+
var maxResults = this.maxResults;
192198
var otherArgs = this.otherArgs;
193199
var isBundling = this.isBundling;
194200
var longrunning = this.longrunning;
@@ -209,6 +215,10 @@ CallSettings.prototype.merge = function merge(options) {
209215
pageToken = options.pageToken;
210216
}
211217

218+
if ('maxResults' in options) {
219+
maxResults = options.maxResults;
220+
}
221+
212222
if ('otherArgs' in options) {
213223
otherArgs = {};
214224
for (var key in this.otherArgs) {
@@ -243,6 +253,7 @@ CallSettings.prototype.merge = function merge(options) {
243253
longrunning: longrunning,
244254
autoPaginate: autoPaginate,
245255
pageToken: pageToken,
256+
maxResults: maxResults,
246257
otherArgs: otherArgs,
247258
isBundling: isBundling,
248259
promise: promise

packages/packages/google-gax/lib/paged_iteration.js

Lines changed: 40 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ var extend = require('extend');
3434
var nextTick = require('process-nextick-args');
3535
var util = require('util');
3636
var NormalApiCaller = require('./api_callable').NormalApiCaller;
37-
var ReadableStream = require('readable-stream');
37+
var through2 = require('through2');
38+
var ended = require('is-stream-ended');
3839

3940
/**
4041
* Creates an API caller that returns a stream to performs page-streaming.
@@ -97,49 +98,31 @@ PagedIteration.prototype.call = function(
9798
return;
9899
}
99100

101+
var maxResults = settings.maxResults || -1;
100102
var allResources = [];
101103
function pushResources(err, resources, next) {
102104
if (err) {
103105
canceller.callback(err);
104106
return;
105107
}
106108

107-
allResources.push.apply(allResources, resources);
109+
for (var i = 0; i < resources.length; ++i) {
110+
allResources.push(resources[i]);
111+
if (allResources.length === maxResults) {
112+
next = null;
113+
break;
114+
}
115+
}
108116
if (!next) {
109117
canceller.callback(null, allResources);
110118
return;
111119
}
112-
nextTick(apiCall, argument, pushResources);
120+
nextTick(apiCall, next, pushResources);
113121
}
114122

115123
nextTick(apiCall, argument, pushResources);
116124
};
117125

118-
/**
119-
* An implementation of readalbe stream which fits for the usage of paged iteration.
120-
* @private
121-
* @constructor
122-
*/
123-
function PagedStream() {
124-
ReadableStream.call(this, {objectMode: true});
125-
}
126-
127-
util.inherits(PagedStream, ReadableStream);
128-
129-
PagedStream.prototype._read = function(n) {
130-
};
131-
132-
/**
133-
*/
134-
PagedStream.prototype.end = function() {
135-
// pushing a null will cause ending the stream.
136-
this.push(null);
137-
138-
// onEof callback of ReadableStream does not update 'readable' field immediately,
139-
// thus settings here explicitly.
140-
this.readable = false;
141-
};
142-
143126
/**
144127
* Describes the structure of a page-streaming call.
145128
*
@@ -174,24 +157,33 @@ exports.PageDescriptor = PageDescriptor;
174157
*/
175158
PageDescriptor.prototype.createStream = function(
176159
apiCall, request, options) {
177-
var stream = new PagedStream();
178-
options = extend({}, options);
179-
options.autoPaginate = false;
180-
function callback(response) {
181-
var resources = response[0];
160+
var stream = through2.obj();
161+
options = extend({}, options, {autoPaginate: false});
162+
var maxResults = ('maxResults' in options) ? options.maxResults : -1;
163+
var pushCount = 0;
164+
var started = false;
165+
function callback(err, resources, next) {
166+
if (err) {
167+
stream.emit('error', err);
168+
return;
169+
}
182170
for (var i = 0; i < resources.length; ++i) {
183-
if (!stream.readable) {
171+
if (ended(stream)) {
184172
return;
185173
}
186174
if (resources[i] === null) {
187175
continue;
188176
}
189177
stream.push(resources[i]);
178+
pushCount++;
179+
if (pushCount === maxResults) {
180+
stream.end();
181+
}
190182
}
191-
if (!stream.readable) {
183+
if (ended(stream)) {
192184
return;
193185
}
194-
if (!response[1]) {
186+
if (!next) {
195187
stream.end();
196188
return;
197189
}
@@ -200,17 +192,19 @@ PageDescriptor.prototype.createStream = function(
200192
if ('pageToken' in options) {
201193
delete options.pageToken;
202194
}
203-
return apiCall(response[1], options)
204-
.then(callback)
205-
.catch(function(err) {
206-
stream.emit('error', err);
207-
});
195+
if (stream.isPaused()) {
196+
request = next;
197+
started = false;
198+
} else {
199+
nextTick(apiCall, next, options, callback);
200+
}
208201
}
209-
apiCall(request, options)
210-
.then(callback)
211-
.catch(function(err) {
212-
stream.emit('error', err);
213-
});
202+
stream.on('resume', function() {
203+
if (!started) {
204+
started = true;
205+
apiCall(request, options, callback);
206+
}
207+
});
214208
return stream;
215209
};
216210

packages/packages/google-gax/package.json

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@
1111
"google-auto-auth": "^0.5.2",
1212
"google-proto-files": "^0.9.1",
1313
"grpc": "^1.1",
14+
"is-stream-ended": "^0.1.0",
1415
"lodash": "^4.17.2",
1516
"process-nextick-args": "^1.0.7",
16-
"readable-stream": "^2.2.2"
17+
"readable-stream": "^2.2.2",
18+
"through2": "^2.0.3"
1719
},
1820
"devDependencies": {
1921
"chai": "*",
@@ -24,8 +26,9 @@
2426
"jsdoc": "~3.4.0",
2527
"mocha": "~2.2.5",
2628
"pegjs": "~0.9.0",
29+
"pumpify": "^1.3.5",
2730
"sinon": "~1.17.3",
28-
"through2": "~2.0.1"
31+
"stream-events": "^1.0.1"
2932
},
3033
"scripts": {
3134
"codecov": "istanbul cover _mocha -- --reporter spec --slow 500 --timeout 5000 && codecov",

packages/packages/google-gax/test/paged_iteration.js

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,11 @@
3535
var util = require('./utils');
3636
var PageDescriptor = require('../lib/paged_iteration').PageDescriptor;
3737
var expect = require('chai').expect;
38+
var process = require('process');
39+
var pumpify = require('pumpify');
3840
var sinon = require('sinon');
41+
var streamEvents = require('stream-events');
42+
var through2 = require('through2');
3943

4044
describe('paged iteration', function() {
4145
var pageSize = 3;
@@ -155,6 +159,22 @@ describe('paged iteration', function() {
155159
}).catch(done);
156160
});
157161

162+
it('caps the results by maxResults', function() {
163+
var spy = sinon.spy(func);
164+
var apiCall = util.createApiCall(spy, createOptions);
165+
return apiCall({}, {maxResults: pageSize * 2 + 2}).then(function(response) {
166+
expect(response).to.be.an('array');
167+
expect(response[0]).to.be.an('array');
168+
expect(response[0].length).to.eq(pageSize * 2 + 2);
169+
var expected = 0;
170+
for (var i = 0; i < response[0].length; ++i) {
171+
expect(response[0][i]).to.eq(expected);
172+
expected++;
173+
}
174+
expect(spy.callCount).to.eq(3);
175+
});
176+
});
177+
158178
describe('stream conversion', function() {
159179
var spy;
160180
var apiCall;
@@ -171,7 +191,10 @@ describe('paged iteration', function() {
171191
}).on('end', function() {
172192
onEnd();
173193
done();
174-
}).on('error', done);
194+
}).on('error', function(err) {
195+
console.error(err);
196+
done(err);
197+
});
175198
}
176199

177200
it('returns a stream', function(done) {
@@ -200,5 +223,59 @@ describe('paged iteration', function() {
200223
expect(spy.callCount).to.eq(pagesToStream);
201224
}, done, pageSize);
202225
});
226+
227+
it('caps the elements by maxResults', function(done) {
228+
var onData = sinon.spy();
229+
var stream = descriptor.createStream(
230+
apiCall, {}, {maxResults: pageSize * 2 + 2});
231+
stream.on('data', onData);
232+
streamChecker(stream, function() {
233+
expect(spy.callCount).to.eq(3);
234+
expect(onData.callCount).to.eq(pageSize * 2 + 2);
235+
}, done, 0);
236+
});
237+
238+
it('does not call API eagerly', function(done) {
239+
var stream = descriptor.createStream(apiCall, {}, null);
240+
setTimeout(function() {
241+
expect(spy.callCount).to.eq(0);
242+
streamChecker(stream, function() {
243+
expect(spy.callCount).to.eq(pagesToStream + 1);
244+
}, done, 0);
245+
}, 50);
246+
});
247+
248+
it('does not start calls when it is already started', function(done) {
249+
var stream = descriptor.createStream(apiCall, {}, null);
250+
stream.on('end', function() {
251+
expect(spy.callCount).to.eq(pagesToStream + 1);
252+
done();
253+
});
254+
stream.resume();
255+
process.nextTick(function() {
256+
stream.pause();
257+
stream.resume();
258+
});
259+
});
260+
261+
it('cooperates with google-cloud-node usage', function(done) {
262+
var stream;
263+
var output = streamEvents(pumpify.obj());
264+
output.once('reading', function() {
265+
stream = descriptor.createStream(apiCall, {}, null);
266+
output.setPipeline(stream, through2.obj());
267+
});
268+
var count = 0;
269+
output.on('data', function() {
270+
count++;
271+
if (count === pageSize + 1) {
272+
output.end();
273+
}
274+
}).on('end', function() {
275+
expect(count).to.eq(pageSize + 1);
276+
expect(spy.callCount).to.eq(2);
277+
done();
278+
}).on('error', done);
279+
});
203280
});
204281
});

0 commit comments

Comments
 (0)