-
Notifications
You must be signed in to change notification settings - Fork 235
Expand file tree
/
Copy pathStreamUpload.ts
More file actions
148 lines (131 loc) · 5.13 KB
/
StreamUpload.ts
File metadata and controls
148 lines (131 loc) · 5.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import { Readable } from "stream";
import { GraphClientError } from "../../../GraphClientError";
import { FileObject, SliceType } from "../../LargeFileUploadTask";
import { Range } from "../Range";
/**
* @interface
* Interface to store slice of a stream and range of the slice.
* @property {Buffer} fileSlice - The slice of the stream
* @property {Range} range - The range of the slice
*/
interface SliceRecord {
fileSlice: Buffer;
range: Range;
}
/**
* @class
* FileObject class for Readable Stream upload
*/
export class StreamUpload implements FileObject<Readable> {
/**
* @private
* Represents a cache of the last attempted upload slice.
* This can be used when resuming a previously failed slice upload.
*/
private previousSlice: SliceRecord;
public constructor(public content: Readable, public name: string, public size: number) {
if (!content || !name || !size) {
throw new GraphClientError("Please provide the Readable Stream content, name of the file and size of the file");
}
}
/**
* @public
* Slices the file content to the given range
* @param {Range} range - The range value
* @returns The sliced file part
*/
public async sliceFile(range: Range): Promise<SliceType> {
let rangeSize = range.maxValue - range.minValue + 1;
/* readable.readable Is true if it is safe to call readable.read(),
* which means the stream has not been destroyed or emitted 'error' or 'end'
*/
const bufs = [];
/**
* The sliceFile reads the first `rangeSize` number of bytes from the stream.
* The previousSlice property is used to seek the range of bytes in the previous slice.
* Suppose, the sliceFile reads bytes from `10 - 20` from the stream but the upload of this slice fails.
* When the user resumes, the stream will have bytes from position 21.
* The previousSlice.Range is used to compare if the requested range is cached in the previousSlice property or present in the Readable Stream.
*/
if (this.previousSlice) {
if (range.minValue < this.previousSlice.range.minValue) {
throw new GraphClientError("An error occurred while uploading the stream. Please restart the stream upload from the first byte of the file.");
}
if (range.minValue < this.previousSlice.range.maxValue) {
const previousRangeMin = this.previousSlice.range.minValue;
const previousRangeMax = this.previousSlice.range.maxValue;
// Check if the requested range is same as previously sliced range
if (range.minValue === previousRangeMin && range.maxValue === previousRangeMax) {
return this.previousSlice.fileSlice;
}
/**
* The following check considers a possibility
* of an upload failing after some of the bytes of the previous slice
* were successfully uploaded.
* Example - Previous slice range - `10 - 20`. Current requested range is `15 - 20`.
*/
if (range.maxValue === previousRangeMax) {
return this.previousSlice.fileSlice.slice(range.minValue, range.maxValue + 1);
}
/**
* If an upload fails after some of the bytes of the previous slice
* were successfully uploaded and the new Range.Maximum is greater than the previous Range.Maximum
* Example - Previous slice range - `10 - 20`. Current requested range is `15 - 25`,
* then read the bytes from position 15 to 20 from previousSlice.fileSlice and read bytes from position 21 to 25 from the Readable Stream
*/
bufs.push(this.previousSlice.fileSlice.slice(range.minValue, previousRangeMax + 1));
rangeSize = range.maxValue - previousRangeMax;
}
}
if (this.content && this.content.readable) {
if (this.content.readableLength >= rangeSize) {
bufs.push(this.content.read(rangeSize));
} else {
bufs.push(await this.readNBytesFromStream(rangeSize));
}
} else {
throw new GraphClientError("Stream is not readable.");
}
const slicedChunk = Buffer.concat(bufs);
this.previousSlice = { fileSlice: slicedChunk, range };
return slicedChunk;
}
/**
* @private
* Reads the specified byte size from the stream
* @param {number} size - The size of bytes to be read
* @returns Buffer with the given length of data.
*/
private readNBytesFromStream(size: number): Promise<Buffer> {
return new Promise((resolve, reject) => {
const chunks = [];
let remainder = size;
let length = 0;
this.content.on("end", () => {
if (remainder > 0) {
return reject(new GraphClientError("Stream ended before reading required range size"));
}
});
this.content.on("readable", () => {
/**
* (chunk = this.content.read(size)) can return null if size of stream is less than 'size' parameter.
* Read the remainder number of bytes from the stream iteratively as they are available.
*/
let chunk;
while (length < size && (chunk = this.content.read(remainder)) !== null) {
length += chunk.length;
chunks.push(chunk);
if (remainder > 0) {
remainder = size - length;
}
}
if (length === size) {
return resolve(Buffer.concat(chunks));
}
if (!this.content || !this.content.readable) {
return reject(new GraphClientError("Error encountered while reading the stream during the upload"));
}
});
});
}
}