Skip to content

Commit 7528b8f

Browse files
authored
fix: pause request stream on backpressure (#936)
* fix: pause request stream on backpressure The request stream should be paused if the downstream is indicating that it cannot handle any more data at the moment. The request stream should be resumed once the downstream does accept data again. This reduces memory consumption and potentially out of memory errors when a result stream is piped into a slow writer. Fixes #934 * fix: do not retry stream indefinitely PartialResultSetStream should stop retrying to push data into the stream after a configurable number of retries have failed. * fix: process review comments * fix: remove unused code * tests: add test for pause/resume * fix: return after giving up retrying + add test
1 parent adf65c4 commit 7528b8f

7 files changed

Lines changed: 246 additions & 23 deletions

File tree

handwritten/spanner/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,4 @@ system-test/*key.json
1212
.DS_Store
1313
package-lock.json
1414
__pycache__
15+
.idea

handwritten/spanner/src/partial-result-stream.ts

Lines changed: 67 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,16 @@ interface RequestFunction {
4343
* @property {boolean} [json=false] Indicates if the Row objects should be
4444
* formatted into JSON.
4545
* @property {JSONOptions} [jsonOptions] JSON options.
46+
* @property {number} [maxResumeRetries=20] The maximum number of times that the
47+
* stream will retry to push data downstream, when the downstream indicates
48+
* that it is not ready for any more data. Increase this value if you
49+
* experience 'Stream is still not ready to receive data' errors as a
50+
* result of a slow writer in your receiving stream.
4651
*/
4752
export interface RowOptions {
4853
json?: boolean;
4954
jsonOptions?: JSONOptions;
55+
maxResumeRetries?: number;
5056
}
5157

5258
/**
@@ -136,11 +142,12 @@ export class PartialResultStream extends Transform implements ResultEvents {
136142
private _options: RowOptions;
137143
private _pendingValue?: p.IValue;
138144
private _values: p.IValue[];
145+
private _numPushFailed = 0;
139146
constructor(options = {}) {
140147
super({objectMode: true});
141148

142149
this._destroyed = false;
143-
this._options = options;
150+
this._options = Object.assign({maxResumeRetries: 20}, options);
144151
this._values = [];
145152
}
146153
/**
@@ -187,20 +194,60 @@ export class PartialResultStream extends Transform implements ResultEvents {
187194
.fields as google.spanner.v1.StructType.Field[];
188195
}
189196

197+
let res = true;
190198
if (!is.empty(chunk.values)) {
191-
this._addChunk(chunk);
199+
res = this._addChunk(chunk);
192200
}
193201

194-
next();
202+
if (res) {
203+
next();
204+
} else {
205+
// Wait a little before we push any more data into the pipeline as a
206+
// component downstream has indicated that a break is needed. Pause the
207+
// request stream to prevent it from filling up the buffer while we are
208+
// waiting.
209+
// The stream will initially pause for 2ms, and then double the pause time
210+
// for each new pause.
211+
const initialPauseMs = 2;
212+
setTimeout(() => {
213+
this._tryResume(next, 2 * initialPauseMs);
214+
}, initialPauseMs);
215+
}
216+
}
217+
218+
private _tryResume(next: Function, timeout: number) {
219+
// Try to push an empty chunk to check whether more data can be accepted.
220+
if (this.push(undefined)) {
221+
this._numPushFailed = 0;
222+
this.emit('resumed');
223+
next();
224+
} else {
225+
// Downstream returned false indicating that it is still not ready for
226+
// more data.
227+
this._numPushFailed++;
228+
if (this._numPushFailed === this._options.maxResumeRetries) {
229+
this.destroy(
230+
new Error(
231+
`Stream is still not ready to receive data after ${this._numPushFailed} attempts to resume.`
232+
)
233+
);
234+
return;
235+
}
236+
setTimeout(() => {
237+
const nextTimeout = Math.min(timeout * 2, 1024);
238+
this._tryResume(next, nextTimeout);
239+
}, timeout);
240+
}
195241
}
242+
196243
/**
197244
* Manages any chunked values.
198245
*
199246
* @private
200247
*
201248
* @param {object} chunk The partial result set.
202249
*/
203-
private _addChunk(chunk: google.spanner.v1.PartialResultSet): void {
250+
private _addChunk(chunk: google.spanner.v1.PartialResultSet): boolean {
204251
const values: Value[] = chunk.values.map(GrpcService.decodeValue_);
205252

206253
// If we have a chunk to merge, merge the values now.
@@ -223,7 +270,14 @@ export class PartialResultStream extends Transform implements ResultEvents {
223270
this._pendingValue = values.pop();
224271
}
225272

226-
values.forEach(value => this._addValue(value));
273+
let res = true;
274+
values.forEach(value => {
275+
res = this._addValue(value) && res;
276+
if (!res) {
277+
this.emit('paused');
278+
}
279+
});
280+
return res;
227281
}
228282
/**
229283
* Manages complete values, pushing a completed row into the stream once all
@@ -233,25 +287,24 @@ export class PartialResultStream extends Transform implements ResultEvents {
233287
*
234288
* @param {*} value The complete value.
235289
*/
236-
private _addValue(value: Value): void {
290+
private _addValue(value: Value): boolean {
237291
const values = this._values;
238292

239293
values.push(value);
240294

241295
if (values.length !== this._fields.length) {
242-
return;
296+
return true;
243297
}
244298

245299
this._values = [];
246300

247301
const row: Row = this._createRow(values);
248302

249303
if (this._options.json) {
250-
this.push(row.toJSON(this._options.jsonOptions));
251-
return;
304+
return this.push(row.toJSON(this._options.jsonOptions));
252305
}
253306

254-
this.push(row);
307+
return this.push(row);
255308
}
256309
/**
257310
* Converts an array of values into a row.
@@ -373,7 +426,8 @@ export function partialResultStream(
373426
// mergeStream allows multiple streams to be connected into one. This is good;
374427
// if we need to retry a request and pipe more data to the user's stream.
375428
const requestsStream = mergeStream();
376-
const userStream = streamEvents(new PartialResultStream(options));
429+
const partialRSStream = new PartialResultStream(options);
430+
const userStream = streamEvents(partialRSStream);
377431
const batchAndSplitOnTokenStream = checkpointStream.obj({
378432
maxQueued: 10,
379433
isCheckpointFn: (row: google.spanner.v1.PartialResultSet): boolean => {
@@ -433,5 +487,7 @@ export function partialResultStream(
433487
lastResumeToken = row.resumeToken;
434488
})
435489
.pipe(userStream)
490+
.on('paused', () => requestsStream.pause())
491+
.on('resumed', () => requestsStream.resume())
436492
);
437493
}

handwritten/spanner/src/transaction.ts

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ export interface RequestOptions {
5555
json?: boolean;
5656
jsonOptions?: JSONOptions;
5757
gaxOptions?: CallOptions;
58+
maxResumeRetries?: number;
5859
}
5960

6061
export interface Statement {
@@ -484,7 +485,7 @@ export class Snapshot extends EventEmitter {
484485
table: string,
485486
request = {} as ReadRequest
486487
): PartialResultStream {
487-
const {gaxOptions, json, jsonOptions} = request;
488+
const {gaxOptions, json, jsonOptions, maxResumeRetries} = request;
488489
const keySet = Snapshot.encodeKeySet(request);
489490
const transaction: spannerClient.spanner.v1.ITransactionSelector = {};
490491

@@ -499,6 +500,7 @@ export class Snapshot extends EventEmitter {
499500
delete request.gaxOptions;
500501
delete request.json;
501502
delete request.jsonOptions;
503+
delete request.maxResumeRetries;
502504
delete request.keys;
503505
delete request.ranges;
504506

@@ -518,7 +520,11 @@ export class Snapshot extends EventEmitter {
518520
});
519521
};
520522

521-
return partialResultStream(makeRequest, {json, jsonOptions});
523+
return partialResultStream(makeRequest, {
524+
json,
525+
jsonOptions,
526+
maxResumeRetries,
527+
});
522528
}
523529

524530
/**
@@ -879,7 +885,7 @@ export class Snapshot extends EventEmitter {
879885
query.queryOptions
880886
);
881887

882-
const {gaxOptions, json, jsonOptions} = query;
888+
const {gaxOptions, json, jsonOptions, maxResumeRetries} = query;
883889
const {params, paramTypes} = Snapshot.encodeParams(query);
884890
const transaction: spannerClient.spanner.v1.ITransactionSelector = {};
885891

@@ -892,6 +898,7 @@ export class Snapshot extends EventEmitter {
892898
delete query.gaxOptions;
893899
delete query.json;
894900
delete query.jsonOptions;
901+
delete query.maxResumeRetries;
895902
delete query.types;
896903

897904
const reqOpts: ExecuteSqlRequest = Object.assign(query, {
@@ -911,7 +918,11 @@ export class Snapshot extends EventEmitter {
911918
});
912919
};
913920

914-
return partialResultStream(makeRequest, {json, jsonOptions});
921+
return partialResultStream(makeRequest, {
922+
json,
923+
jsonOptions,
924+
maxResumeRetries,
925+
});
915926
}
916927

917928
/**

handwritten/spanner/test/mockserver/mockspanner.ts

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -583,16 +583,24 @@ export class MockSpanner {
583583
resultSet: protobuf.ResultSet,
584584
queryMode:
585585
| google.spanner.v1.ExecuteSqlRequest.QueryMode
586-
| keyof typeof google.spanner.v1.ExecuteSqlRequest.QueryMode
586+
| keyof typeof google.spanner.v1.ExecuteSqlRequest.QueryMode,
587+
rowsPerPartialResultSet = 1
587588
): protobuf.PartialResultSet[] {
588589
const res: protobuf.PartialResultSet[] = [];
589590
let first = true;
590-
for (let i = 0; i < resultSet.rows.length; i++) {
591+
for (let i = 0; i < resultSet.rows.length; i += rowsPerPartialResultSet) {
591592
const token = i.toString().padStart(8, '0');
592593
const partial = protobuf.PartialResultSet.create({
593594
resumeToken: Buffer.from(token),
594-
values: resultSet.rows[i].values,
595+
values: [],
595596
});
597+
for (
598+
let row = i;
599+
row < Math.min(i + rowsPerPartialResultSet, resultSet.rows.length);
600+
row++
601+
) {
602+
partial.values.push(...resultSet.rows[row].values!);
603+
}
596604
if (first) {
597605
partial.metadata = resultSet.metadata;
598606
first = false;
@@ -937,6 +945,50 @@ export function createSimpleResultSet(): protobuf.ResultSet {
937945
});
938946
}
939947

948+
export const NUM_ROWS_LARGE_RESULT_SET = 100;
949+
950+
export function createLargeResultSet(): protobuf.ResultSet {
951+
const fields = [
952+
protobuf.StructType.Field.create({
953+
name: 'NUM',
954+
type: protobuf.Type.create({code: protobuf.TypeCode.INT64}),
955+
}),
956+
protobuf.StructType.Field.create({
957+
name: 'NAME',
958+
type: protobuf.Type.create({code: protobuf.TypeCode.STRING}),
959+
}),
960+
];
961+
const metadata = new protobuf.ResultSetMetadata({
962+
rowType: new protobuf.StructType({
963+
fields,
964+
}),
965+
});
966+
const rows: google.protobuf.IListValue[] = [];
967+
for (let num = 1; num <= NUM_ROWS_LARGE_RESULT_SET; num++) {
968+
rows.push({
969+
values: [
970+
{stringValue: `${num}`},
971+
{stringValue: generateRandomString(100)},
972+
],
973+
});
974+
}
975+
return protobuf.ResultSet.create({
976+
metadata,
977+
rows,
978+
});
979+
}
980+
981+
function generateRandomString(length: number) {
982+
let result = '';
983+
const characters =
984+
'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789';
985+
const charactersLength = characters.length;
986+
for (let i = 0; i < length; i++) {
987+
result += characters.charAt(Math.floor(Math.random() * charactersLength));
988+
}
989+
return result;
990+
}
991+
940992
/**
941993
* Returns a protobuf Timestamp containing the current local system time.
942994
*/

handwritten/spanner/test/partial-result-stream.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ describe('PartialResultStream', () => {
8383
it(`should pass acceptance test: ${test.name}`, done => {
8484
// eslint-disable-next-line @typescript-eslint/no-explicit-any
8585
const values: any[] = [];
86-
const stream = new PartialResultStream();
86+
const stream = new PartialResultStream({});
8787

8888
stream
8989
.on('error', done)
@@ -113,13 +113,13 @@ describe('PartialResultStream', () => {
113113
let stream: prs.PartialResultStream;
114114

115115
beforeEach(() => {
116-
stream = new PartialResultStream();
116+
stream = new PartialResultStream({});
117117
});
118118

119119
afterEach(() => stream.destroy());
120120

121121
it('should emit the response', done => {
122-
const stream = new PartialResultStream();
122+
const stream = new PartialResultStream({});
123123

124124
stream.on('error', done).on('response', response => {
125125
assert.strictEqual(response, RESULT);

0 commit comments

Comments
 (0)