-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtail.js
More file actions
108 lines (98 loc) · 2.68 KB
/
tail.js
File metadata and controls
108 lines (98 loc) · 2.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import {
openReadOnly,
closeFD,
getReadStream,
getStat
} from './fsUtil.js';
import {
OUT_FILE as accessLog,
SLEEP_TIME,
MAPPING_FIELDS,
ENABLE_FAST_COLLECT,
FAST_FIELDS,
FAST_FIELD_POSITIONS
} from './setup_env.js';
import {
createCollector,
convertObj,
convertObjFast,
splitLine,
classifyStatusCode
} from './readlineUtil.js';
const makeObj = ENABLE_FAST_COLLECT ?
convertObjFast(FAST_FIELDS, FAST_FIELD_POSITIONS) :
convertObj(MAPPING_FIELDS);
import debug from 'debug';
const logger = {
info: debug('info'),
error: debug('error'),
debug: debug('debug'),
trace: debug('trace')
}
logger.info(`ENABLE_FAST_COLLECT = ${ENABLE_FAST_COLLECT}`);
const tailLine = async (fname, fromBytes) => {
try {
logger.debug('start tail!');
const fd = await openReadOnly(fname);
const lastSize = await getStat(fd, 'size');
if(fromBytes === lastSize){
console.log(`same size: ${lastSize}`);
closeFD(fd);
return [lastSize, []];
}
if(fromBytes > lastSize){
// file truncated of new access log created.
// read from first bytes.
fromBytes = 0;
}
console.log(`read file [ fromBytes: ${fromBytes}, size: ${lastSize - fromBytes}, read to ${lastSize}]...`);
const rStream = getReadStream(fd, fromBytes, lastSize);
const lines = await splitLine(rStream);
return [lastSize, lines];
} catch(err) {
console.log(err.message);
}
}
const classifyRecords = (records, collector) => {
records.forEach((record, index) => {
logger.trace('line = %j', record);
if(record.httpCode === undefined) return;
if(index === 0 ) collector.startTime = record.time;
const matchedCode = collector.getMatchedCode(record);
collector.increaseCount(matchedCode);
})
}
const loop = async (collector, offset, postMessage=()=>{}) => {
try {
logger.debug('start loop');
collector.updated = Date.now();
const [lastSize, lines] = await tailLine(accessLog, offset);
if(lines.length === 0){
collector.startTime = `[${(new Date(collector.updated - SLEEP_TIME)).toISOString()}]`;
postMessage(collector);
return lastSize;
}
const records = lines.map(line => makeObj(line))
classifyRecords(records, collector);
logger.debug('end loop');
postMessage(collector);
return lastSize;
} catch (err) {
console.log(err.message)
}
}
const postMessage = collector => {
console.log('collected: ',collector.startTime, collector.counts, collector.updated);
}
const main = async () => {
const collector = createCollector();
const fd = await openReadOnly(accessLog);
let offset = await getStat(fd, 'size');
await closeFD(fd);
setInterval(async () => {
const newOffset = await loop(collector, offset, postMessage);
offset = newOffset;
collector.reset();
},SLEEP_TIME);
}
main()