|
|
@@ -0,0 +1,259 @@
|
|
|
+import dotenv from 'dotenv';
|
|
|
+import fs, { PathLike } from 'fs';
|
|
|
+import fsp from 'fs/promises';
|
|
|
+import path from 'path';
|
|
|
+import moment, { Moment } from 'moment';
|
|
|
+import { exec } from 'node-utils/shell';
|
|
|
+
|
|
|
+import { Logger, LogLevel } from './util/logger.class';
|
|
|
+
|
|
|
+dotenv.config();
|
|
|
+
|
|
|
+const DATA_DIR = process.env.DATA_DIR;
|
|
|
+const DATA_BUFFER_FILE = path.resolve(DATA_DIR, 'buffer.csv');
|
|
|
+const DATA_BUFFER_REMAINS = path.resolve(DATA_DIR, 'remains.csv');
|
|
|
+const DATA_REDUCED_FILE = path.resolve(DATA_DIR, 'reduced.csv');
|
|
|
+const TIMESTAMP_FORMAT = `YYYY-MM-DD[T]HH:mm:ss.SSSZZ`;
|
|
|
+const REDUCE_INTERVAL_MINUTES = 5;
|
|
|
+const REDUCE_GROUP_MINUTES = 1;
|
|
|
+
|
|
|
+const CSV_COLS = {
|
|
|
+ buffer: {
|
|
|
+ time: 0,
|
|
|
+ cpu: 1,
|
|
|
+ ram: 2
|
|
|
+ },
|
|
|
+ reduced: {
|
|
|
+ time: 0,
|
|
|
+ cpu: {
|
|
|
+ avg: 1,
|
|
|
+ peak: 2
|
|
|
+ },
|
|
|
+ ram: {
|
|
|
+ avg: 3,
|
|
|
+ peak: 4,
|
|
|
+ max: 5
|
|
|
+ }
|
|
|
+ }
|
|
|
+};
|
|
|
+
|
|
|
+const LOG_LEVEL: LogLevel = (process.env.LOG_LEVEL as LogLevel) || 'INFO';
|
|
|
+const log = new Logger(LOG_LEVEL);
|
|
|
+
|
|
|
+let intervalHdl: NodeJS.Timer;
|
|
|
+
|
|
|
+log.info('[INFO] Starting Monitoring Deamon, pid:', process.pid);
|
|
|
+(async () => {
|
|
|
+ process.on('SIGABRT', exitGracefully);
|
|
|
+ process.on('SIGQUIT', exitGracefully);
|
|
|
+ process.on('SIGTERM', exitGracefully);
|
|
|
+ try {
|
|
|
+ intervalHdl = setInterval(async () => {
|
|
|
+ const now = moment();
|
|
|
+ const time = now.format(TIMESTAMP_FORMAT);
|
|
|
+ const cpu = (await exec(`./cpu.sh`)).trim();
|
|
|
+ const ram = (await exec(`./ram.sh`)).trim();
|
|
|
+ const data = `${time};${cpu};${ram}\n`;
|
|
|
+
|
|
|
+ // Time to reduce buffer?
|
|
|
+ const firstBufferTime = await getFirstBufferTime();
|
|
|
+ if (moment.duration(now.diff(firstBufferTime)).abs().asMinutes() >= REDUCE_INTERVAL_MINUTES) {
|
|
|
+ try {
|
|
|
+ const tmpFile = await createTmpFile();
|
|
|
+ process.nextTick(() => reduceData(tmpFile));
|
|
|
+ } catch (err) {
|
|
|
+ log.error('[ERROR] Creating Temp File for Reducing Data failed:', err);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ await fsp.appendFile(DATA_BUFFER_FILE, data);
|
|
|
+ }, 500);
|
|
|
+ } catch (err) {
|
|
|
+ log.error('[FATAL]', err);
|
|
|
+ log.error('[EXITING]');
|
|
|
+ process.exit(1);
|
|
|
+ }
|
|
|
+})();
|
|
|
+
|
|
|
+async function getFirstBufferTime() {
|
|
|
+ let dataFile = DATA_BUFFER_FILE;
|
|
|
+ if (fs.existsSync(DATA_BUFFER_REMAINS)) dataFile = DATA_BUFFER_REMAINS;
|
|
|
+
|
|
|
+ const firstLine = await readFirstBufferLine(dataFile);
|
|
|
+ const timestamp = firstLine.split(';')[CSV_COLS.buffer.time];
|
|
|
+ return moment(timestamp, TIMESTAMP_FORMAT);
|
|
|
+}
|
|
|
+
|
|
|
+const readFirstBufferLine = (dataFile: PathLike) =>
|
|
|
+ new Promise<string>((resolve, reject) => {
|
|
|
+ const stream = fs.createReadStream(dataFile, { encoding: 'utf-8' });
|
|
|
+ const chunks: string[] = [];
|
|
|
+ stream
|
|
|
+ .on('data', buf => {
|
|
|
+ let chunk: string;
|
|
|
+ if (buf instanceof Buffer) chunk = buf.toString('utf-8');
|
|
|
+ else chunk = buf;
|
|
|
+
|
|
|
+ const lfIdx = chunk.indexOf('\n');
|
|
|
+ if (lfIdx >= 0) {
|
|
|
+ chunks.push(chunk.substring(0, lfIdx));
|
|
|
+ stream.close();
|
|
|
+ } else {
|
|
|
+ chunks.push(chunk);
|
|
|
+ }
|
|
|
+ })
|
|
|
+ .on('close', () => resolve(chunks.join('')))
|
|
|
+ .on('error', reject);
|
|
|
+ });
|
|
|
+
|
|
|
+async function readDataFileCSV(dataFile: PathLike): Promise<string[][]> {
|
|
|
+ return (await fsp.readFile(dataFile, { encoding: 'utf-8' }))
|
|
|
+ .split(/\r?\n/g)
|
|
|
+ .filter(l => !!l)
|
|
|
+ .map(line => line.split(';'));
|
|
|
+}
|
|
|
+
|
|
|
+async function createTmpFile() {
|
|
|
+ const tmpFilename = `buffer.tmp_${moment().format('YYYYMMDDHHmmssSSS')}.csv`;
|
|
|
+ await fsp.rename(DATA_BUFFER_FILE, path.resolve(DATA_DIR, tmpFilename));
|
|
|
+ return tmpFilename;
|
|
|
+}
|
|
|
+
|
|
|
+async function reduceData(tmpFilename: string) {
|
|
|
+ const tmpFilepath = path.resolve(DATA_DIR, tmpFilename);
|
|
|
+ log.info('[INFO] Reducing data in', tmpFilepath);
|
|
|
+ try {
|
|
|
+ const lines: string[][] = [];
|
|
|
+ if (fs.existsSync(DATA_BUFFER_REMAINS)) {
|
|
|
+ lines.push(...(await readDataFileCSV(DATA_BUFFER_REMAINS)));
|
|
|
+ }
|
|
|
+ lines.push(...(await readDataFileCSV(tmpFilepath)));
|
|
|
+
|
|
|
+ const reduced: Array<ReducedData> = [];
|
|
|
+ let valueBuffer: Array<BufferedData> = [];
|
|
|
+ do {
|
|
|
+ const line = lines.shift();
|
|
|
+ const data = parseData(line);
|
|
|
+ log.debug('[DEBUG] BufferedData:', JSON.stringify(data));
|
|
|
+ valueBuffer.push(data);
|
|
|
+
|
|
|
+ if (valueBuffer.length <= 1) {
|
|
|
+ // Need at least 2 datasets to check for time diff and eventually reduce to avg/max.
|
|
|
+ // skip to next data line
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ const firstTime = moment(valueBuffer[0].time);
|
|
|
+ const currentTime = moment(data.time);
|
|
|
+ if (moment.duration(currentTime.diff(firstTime)).abs().asMinutes() >= REDUCE_GROUP_MINUTES) {
|
|
|
+ const { cpu, ram, count } = valueBuffer.reduce(
|
|
|
+ (res, cur) => {
|
|
|
+ res.count++;
|
|
|
+ res.cpu.sum += cur.cpu;
|
|
|
+ res.cpu.peak = Math.max(res.cpu.peak, cur.cpu);
|
|
|
+ res.ram.sum += cur.ram.used;
|
|
|
+ res.ram.peak = Math.max(res.ram.peak, cur.ram.used);
|
|
|
+ res.ram.max = cur.ram.max;
|
|
|
+ return res;
|
|
|
+ },
|
|
|
+ { ram: { sum: 0, peak: 0, max: 0 }, cpu: { sum: 0, peak: 0 }, count: 0 }
|
|
|
+ );
|
|
|
+
|
|
|
+ reduced.push({
|
|
|
+ time: data.time,
|
|
|
+ cpu: {
|
|
|
+ avg: cpu.sum / count,
|
|
|
+ peak: cpu.peak
|
|
|
+ },
|
|
|
+ ram: {
|
|
|
+ avg: ram.sum / count,
|
|
|
+ peak: ram.peak,
|
|
|
+ max: ram.max
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ log.debug('[DEBUG] ReducedData:', JSON.stringify(reduced[reduced.length - 1]));
|
|
|
+
|
|
|
+ valueBuffer = [];
|
|
|
+ }
|
|
|
+ } while (lines.length > 0);
|
|
|
+
|
|
|
+ if (valueBuffer.length > 0) {
|
|
|
+ // overwrite remains.csv with valueBuffer
|
|
|
+ await fsp.writeFile(DATA_BUFFER_REMAINS, valueBuffer.map(serializeBufferedDataCSV).join('\n') + '\n', { encoding: 'utf-8' });
|
|
|
+ } else {
|
|
|
+ // delete remains.csv if exists
|
|
|
+ if (fs.existsSync(DATA_BUFFER_REMAINS)) await fsp.unlink(DATA_BUFFER_REMAINS);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (reduced.length > 0) {
|
|
|
+ // append reduced data to reduced.csv
|
|
|
+ await fsp.appendFile(DATA_REDUCED_FILE, reduced.map(serializeReducedDataCSV).join('\n') + '\n', { encoding: 'utf-8' });
|
|
|
+ }
|
|
|
+
|
|
|
+ // Delete tmpFile
|
|
|
+ await fsp.unlink(tmpFilepath);
|
|
|
+ } catch (err) {
|
|
|
+ log.error(`[ERROR] Reducing Data of tmpFile ${tmpFilepath} failed:`, err);
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+function parseData(line: string[]): BufferedData {
|
|
|
+ const cpu = Number(line[CSV_COLS.buffer.cpu]);
|
|
|
+ const time = moment(line[CSV_COLS.buffer.time], TIMESTAMP_FORMAT).toDate();
|
|
|
+ let ramSplit = line[CSV_COLS.buffer.ram].split(' ');
|
|
|
+ const unit = ramSplit[1];
|
|
|
+ ramSplit = ramSplit[0].split('/');
|
|
|
+ const [used, max] = ramSplit;
|
|
|
+ const factor = parseByteUnit(unit);
|
|
|
+
|
|
|
+ return {
|
|
|
+ time,
|
|
|
+ cpu,
|
|
|
+ ram: {
|
|
|
+ used: Number(used) * factor,
|
|
|
+ max: Number(max) * factor
|
|
|
+ }
|
|
|
+ };
|
|
|
+}
|
|
|
+
|
|
|
+const byteFactors: { [unit: string]: number } = {
|
|
|
+ '': 1,
|
|
|
+ K: 1024,
|
|
|
+ M: 1024 * 1024,
|
|
|
+ G: 1024 * 1024 * 1024,
|
|
|
+ T: 1024 * 1024 * 1024 * 1024,
|
|
|
+ P: 1024 * 1024 * 1024 * 1024 * 1024
|
|
|
+};
|
|
|
+
|
|
|
+function parseByteUnit(unit: string): number {
|
|
|
+ const m = /^([KMGTP])?i?B$/.exec(unit);
|
|
|
+ if (!m) throw new Error(`Failed to parse byte size unit '${unit}'`);
|
|
|
+
|
|
|
+ return byteFactors[m[1]];
|
|
|
+}
|
|
|
+
|
|
|
+function serializeBufferedDataCSV(data: BufferedData) {
|
|
|
+ return [
|
|
|
+ moment(data.time).format(TIMESTAMP_FORMAT),
|
|
|
+ data.cpu,
|
|
|
+ `${(data.ram.used / byteFactors['M']).toFixed(2)}/${(data.ram.max / byteFactors['M']).toFixed(2)} MiB`
|
|
|
+ ].join(';');
|
|
|
+}
|
|
|
+
|
|
|
+function serializeReducedDataCSV(data: ReducedData) {
|
|
|
+ return [
|
|
|
+ moment(data.time).format(TIMESTAMP_FORMAT),
|
|
|
+ data.cpu.avg.toFixed(2),
|
|
|
+ data.cpu.peak.toFixed(2),
|
|
|
+ data.ram.avg.toFixed(2),
|
|
|
+ data.ram.peak.toFixed(2),
|
|
|
+ data.ram.max.toFixed(2)
|
|
|
+ ].join(';');
|
|
|
+}
|
|
|
+
|
|
|
+function exitGracefully(...args: any[]) {
|
|
|
+ log.info(`[EXITING] Graceful exit, received ${JSON.stringify(args)}`);
|
|
|
+ clearInterval(intervalHdl);
|
|
|
+ process.exit(0);
|
|
|
+}
|