-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstreamBuffer.js
More file actions
73 lines (58 loc) · 2.6 KB
/
streamBuffer.js
File metadata and controls
73 lines (58 loc) · 2.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
const fs = require('fs');
const { Readable, Transform, Writable } = require('stream');
// Readable stream that generates ride requests
class RideRequestStream extends Readable {
constructor(options) {
super(options);
this.requests = [
{ id: 1, passenger: 'John', pickup: 'A', dropoff: 'B' },
{ id: 2, passenger: 'Alice', pickup: 'C', dropoff: 'D' },
{ id: 3, passenger: 'Bob', pickup: 'E', dropoff: 'F' },
// Add more ride requests as needed
];
this.currentIndex = 0;
}
_read() {
if (this.currentIndex === this.requests.length) {
this.push(null); // No more data
} else {
const requestData = this.requests[this.currentIndex];
const data = Buffer.from(JSON.stringify(requestData) + '\n');
this.push(data);
this.currentIndex++;
}
}
}
// Transform stream that processes ride requests
class RideRequestProcessor extends Transform {
constructor(options) {
super(options);
}
_transform(chunk, encoding, callback) {
const requestData = JSON.parse(chunk);
// Perform processing on the ride request (e.g., validate, assign driver, etc.)
const processedRequest = /* Processing logic */;
this.push(JSON.stringify(processedRequest) + '\n');
callback();
}
}
// Writable stream that stores processed ride requests in a file
class ProcessedRideRequestWritable extends Writable {
constructor(options) {
super(options);
}
_write(chunk, encoding, callback) {
fs.appendFile('processed-requests.txt', chunk, callback);
}
}
// Create instances of the streams
const rideRequestStream = new RideRequestStream();
const rideRequestProcessor = new RideRequestProcessor();
const processedRideRequestWritable = new ProcessedRideRequestWritable();
// Pipe the streams together
rideRequestStream.pipe(rideRequestProcessor).pipe(processedRideRequestWritable);
console.log('Cab booking app backend running.');
// Start generating ride requests
rideRequestStream.resume();
// In this example, we have three custom stream classes: RideRequestStream generates ride requests as a readable stream, RideRequestProcessor processes the ride requests as a transform stream, and ProcessedRideRequestWritable stores the processed ride requests in a file as a writable stream.
// We create instances of these streams and pipe them together using the pipe() method. The ride requests generated by RideRequestStream are piped to RideRequestProcessor, which processes the requests (e.g., validation, driver assignment, etc.), and then piped to ProcessedRideRequestWritable, which stores the processed ride requests in a file (processed-requests.txt).