| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450 |
- import fs, { PathLike } from 'fs';
- import fsp from 'fs/promises';
- import moment from 'moment';
- import { exec } from 'node-utils/shell';
- import path from 'path';
- import { HttpStatusException } from '../../common/lib/http-status.exception';
- import { Logger } from '../../common/util/logger.class';
- const DATA_DIR = process.env.DATA_DIR || 'data';
- const DATA_BUFFER_FILE = path.resolve(DATA_DIR, 'buffer.csv');
- const DATA_BUFFER_REMAINS = path.resolve(DATA_DIR, 'remains.csv');
- const DATA_REDUCED_FILE = path.resolve(DATA_DIR, 'reduced.csv');
- const TIMESTAMP_FORMAT = `YYYY-MM-DD[T]HH:mm:ss.SSSZZ`;
- const REDUCE_INTERVAL_MINUTES = 5;
- const REDUCE_GROUP_MINUTES = 1;
- const MONITOR_MOUNTS = !!process.env.MONITOR_MOUNTS ? process.env.MONITOR_MOUNTS.split(':') : [];
- Logger.info('[INFO] Monitoring Drives:', MONITOR_MOUNTS);
- const CSV_COLS = {
- buffer: {
- time: 0,
- cpu: 1,
- ram: 2
- },
- reduced: {
- time: 0,
- cpu: {
- avg: 1,
- peak: 2
- },
- ram: {
- avg: 3,
- peak: 4,
- max: 5
- }
- }
- };
- export class Collector {
- private intervalHdl?: NodeJS.Timer;
- constructor() {
- (async () => {
- try {
- if (!fs.existsSync(DATA_DIR)) {
- Logger.info('[INFO] DATA_DIR', DATA_DIR, 'does not exist - creating now ...');
- await fsp.mkdir(DATA_DIR);
- }
- this.startLoop();
- } catch (err) {
- Logger.error('[FATAL]', err);
- Logger.error('[EXITING]');
- process.exit(1);
- }
- })();
- }
- public startLoop() {
- this.intervalHdl = setInterval(this.loop.bind(this), 500);
- }
- private async loop() {
- try {
- const now = moment();
- const time = now.format(TIMESTAMP_FORMAT);
- const cpu = (await exec(`./cpu.sh`)).trim();
- const ram = (await exec(`./ram.sh`)).trim();
- const hdd: string[] = [];
- for (const mount of MONITOR_MOUNTS) {
- try {
- const stats = (await exec(`./hdd.sh "${mount}"`)).trim();
- if (stats?.length) hdd.push(`${mount} ${stats}`);
- } catch (err) {
- Logger.warn('[WARN] Error while getting space usage of mount', mount, ':', err);
- }
- }
- const data = `${time};${cpu};${ram}${hdd.length ? `;${hdd.join(';')}` : ''}\n`;
- // Time to reduce buffer?
- const firstBufferTime = await this.getFirstBufferTime();
- if (moment.duration(now.diff(firstBufferTime)).abs().asMinutes() >= REDUCE_INTERVAL_MINUTES) {
- try {
- const tmpFile = await this.createTmpFile();
- process.nextTick(() => this.reduceData(tmpFile));
- } catch (err) {
- Logger.error('[ERROR] Creating Temp File for Reducing Data failed:', err);
- }
- }
- await fsp.appendFile(DATA_BUFFER_FILE, data);
- } catch (err) {
- Logger.error(err);
- }
- }
- private async getFirstBufferTime() {
- let dataFile = DATA_BUFFER_FILE;
- if (fs.existsSync(DATA_BUFFER_REMAINS)) dataFile = DATA_BUFFER_REMAINS;
- if (!fs.existsSync(dataFile)) return moment();
- const firstLine = await this.readFirstBufferLine(dataFile);
- const timestamp = firstLine.split(';')[CSV_COLS.buffer.time];
- return moment(timestamp, TIMESTAMP_FORMAT);
- }
- private readFirstBufferLine = (dataFile: PathLike) =>
- new Promise<string>((resolve, reject) => {
- const stream = fs.createReadStream(dataFile, { encoding: 'utf-8' });
- const chunks: string[] = [];
- stream
- .on('data', buf => {
- let chunk: string;
- if (buf instanceof Buffer) chunk = buf.toString('utf-8');
- else chunk = buf;
- const lfIdx = chunk.indexOf('\n');
- if (lfIdx >= 0) {
- chunks.push(chunk.substring(0, lfIdx));
- stream.close();
- } else {
- chunks.push(chunk);
- }
- })
- .on('close', () => resolve(chunks.join('')))
- .on('error', reject);
- });
- private async readDataFileCSV(dataFile: PathLike): Promise<string[][]> {
- return (await fsp.readFile(dataFile, { encoding: 'utf-8' }))
- .split(/\r?\n/g)
- .filter(l => !!l)
- .map(line => line.split(';'));
- }
- private async createTmpFile() {
- const tmpFilename = `buffer.tmp_${moment().format('YYYYMMDDHHmmssSSS')}.csv`;
- await fsp.rename(DATA_BUFFER_FILE, path.resolve(DATA_DIR, tmpFilename));
- return tmpFilename;
- }
- private async reduceData(tmpFilename: string) {
- const tmpFilepath = path.resolve(DATA_DIR, tmpFilename);
- Logger.info('[INFO] Reducing data in', tmpFilepath);
- try {
- const lines: string[][] = [];
- if (fs.existsSync(DATA_BUFFER_REMAINS)) {
- lines.push(...(await this.readDataFileCSV(DATA_BUFFER_REMAINS)));
- }
- lines.push(...(await this.readDataFileCSV(tmpFilepath)));
- const reduced: Array<ReducedData> = [];
- let valueBuffer: Array<BufferedData> = [];
- do {
- const line = lines.shift();
- if (!line) break;
- const data = this.parseBufferedData(line);
- Logger.debug('[DEBUG] BufferedData:', JSON.stringify(data));
- valueBuffer.push(data);
- if (valueBuffer.length <= 1) {
- // Need at least 2 datasets to check for time diff and eventually reduce to avg/max.
- // skip to next data line
- continue;
- }
- const firstTime = moment(valueBuffer[0].time);
- const currentTime = moment(data.time);
- if (moment.duration(currentTime.diff(firstTime)).abs().asMinutes() >= REDUCE_GROUP_MINUTES) {
- type IntermediateValues = { sum: number; peak: number; max: number };
- type IntermediateDriveData = { [mount: string]: IntermediateValues };
- type IntermediateSums = { ram: IntermediateValues; cpu: IntermediateValues; hdd?: IntermediateDriveData; count: number };
- const { cpu, ram, count, hdd } = valueBuffer.reduce(
- (res, cur) => {
- res.count++;
- res.cpu.sum += cur.cpu;
- res.cpu.peak = Math.max(res.cpu.peak, cur.cpu);
- res.ram.sum += cur.ram.used;
- res.ram.peak = Math.max(res.ram.peak, cur.ram.used);
- res.ram.max = cur.ram.max;
- if (cur.hdd && Object.keys(cur.hdd).length) {
- const hdd_sums = res.hdd ?? {};
- res.hdd = Object.keys(cur.hdd).reduce((res_hdd, mount) => {
- if (!cur.hdd) return res_hdd;
- if (!res_hdd[mount]) {
- res_hdd[mount] = { sum: 0, peak: 0, max: 0 };
- }
- res_hdd[mount].sum += cur.hdd[mount].used;
- res_hdd[mount].peak = Math.max(res_hdd[mount].peak, cur.hdd[mount].used);
- res_hdd[mount].max = cur.hdd[mount].max;
- return res_hdd;
- }, hdd_sums);
- }
- return res;
- },
- { ram: { sum: 0, peak: 0, max: 0 }, cpu: { sum: 0, peak: 0 }, count: 0 } as IntermediateSums
- );
- reduced.push({
- time: data.time,
- cpu: {
- avg: cpu.sum / count,
- peak: cpu.peak
- },
- ram: {
- avg: ram.sum / count,
- peak: ram.peak,
- max: ram.max
- },
- hdd: hdd
- ? Object.keys(hdd).reduce((res, mount) => {
- res[mount] = {
- avg: hdd[mount].sum / count,
- peak: hdd[mount].peak,
- max: hdd[mount].max
- };
- return res;
- }, {} as ReducedDriveData)
- : undefined
- });
- Logger.debug('[DEBUG] ReducedData:', JSON.stringify(reduced[reduced.length - 1]));
- valueBuffer = [];
- }
- } while (lines.length > 0);
- if (valueBuffer.length > 0) {
- // overwrite remains.csv with valueBuffer
- await fsp.writeFile(DATA_BUFFER_REMAINS, valueBuffer.map(this.serializeBufferedDataCSV.bind(this)).join('\n') + '\n', { encoding: 'utf-8' });
- } else {
- // delete remains.csv if exists
- if (fs.existsSync(DATA_BUFFER_REMAINS)) await fsp.unlink(DATA_BUFFER_REMAINS);
- }
- if (reduced.length > 0) {
- // append reduced data to reduced.csv
- await fsp.appendFile(DATA_REDUCED_FILE, reduced.map(this.serializeReducedDataCSV.bind(this)).join('\n') + '\n', { encoding: 'utf-8' });
- }
- // Delete tmpFile
- await fsp.unlink(tmpFilepath);
- } catch (err) {
- Logger.error(`[ERROR] Reducing Data of tmpFile ${tmpFilepath} failed:`, err);
- }
- }
- private parseBufferedData(line: string[]): BufferedData {
- // TIMESTAMP
- const time = moment(line[CSV_COLS.buffer.time], TIMESTAMP_FORMAT).toDate();
- // CPU
- const cpu = Number(line[CSV_COLS.buffer.cpu]);
- // RAM
- let [stats, unit] = line[CSV_COLS.buffer.ram].split(' ');
- const [used, max] = stats.split('/');
- const factor = this.parseByteUnit(unit);
- const lastCol = CSV_COLS.buffer.ram;
- // HDD (?)
- let hdd: BufferedDriveData | undefined;
- if (MONITOR_MOUNTS.length && line.length > lastCol + 1) {
- for (let i = 1; i <= MONITOR_MOUNTS.length; i++) {
- if (lastCol + i > line.length - 1) break;
- const data = line[lastCol + i];
- const [mount, stats] = data.split(' ');
- const [used, max] = stats.split('/');
- if (!hdd) hdd = {};
- hdd[mount] = {
- used: Number(used),
- max: Number(max)
- };
- }
- }
- return {
- time,
- cpu,
- ram: {
- used: Number(used) * factor,
- max: Number(max) * factor
- },
- hdd
- };
- }
- private parseReducedData(line: string[]): ReducedData {
- const lastCol = CSV_COLS.reduced.ram.max;
- // HDD (?)
- let hdd: ReducedDriveData | undefined;
- if (MONITOR_MOUNTS.length && line.length > lastCol + 1) {
- hdd = {};
- for (let i = 1; lastCol + i + 3 < line.length; i += 4) {
- hdd[line[lastCol + i]] = {
- avg: Number(line[lastCol + i + 1]),
- peak: Number(line[lastCol + i + 2]),
- max: Number(line[lastCol + i + 3])
- };
- }
- }
- return {
- time: moment(line[CSV_COLS.reduced.time], TIMESTAMP_FORMAT).toDate(),
- cpu: {
- avg: Number(line[CSV_COLS.reduced.cpu.avg]),
- peak: Number(line[CSV_COLS.reduced.cpu.peak])
- },
- ram: {
- avg: Number(line[CSV_COLS.reduced.ram.avg]),
- peak: Number(line[CSV_COLS.reduced.ram.peak]),
- max: Number(line[CSV_COLS.reduced.ram.max])
- },
- hdd
- };
- }
- private byteFactors: { [unit: string]: number } = {
- '': 1,
- K: 1024,
- M: 1024 * 1024,
- G: 1024 * 1024 * 1024,
- T: 1024 * 1024 * 1024 * 1024,
- P: 1024 * 1024 * 1024 * 1024 * 1024
- };
- private parseByteUnit(unit: string): number {
- const m = /^([KMGTP])?i?B$/.exec(unit);
- if (!m) throw new Error(`Failed to parse byte size unit '${unit}'`);
- return this.byteFactors[m[1]];
- }
- private serializeBufferedDataCSV(data: BufferedData) {
- return [
- moment(data.time).format(TIMESTAMP_FORMAT),
- data.cpu,
- `${(data.ram.used / this.byteFactors['M']).toFixed(2)}/${(data.ram.max / this.byteFactors['M']).toFixed(2)} MiB`,
- ...(data.hdd ? Object.keys(data.hdd).map(mount => `${mount} ${data.hdd?.[mount].used}/${data.hdd?.[mount].max}`) : [])
- ].join(';');
- }
- private serializeReducedDataCSV(data: ReducedData) {
- return [
- moment(data.time).format(TIMESTAMP_FORMAT),
- data.cpu.avg.toFixed(2),
- data.cpu.peak.toFixed(2),
- data.ram.avg.toFixed(2),
- data.ram.peak.toFixed(2),
- data.ram.max.toFixed(2),
- ...(data.hdd
- ? Object.keys(data.hdd).reduce((res, mount) => {
- res.push(
- mount,
- data.hdd?.[mount].avg.toFixed(2) || '0',
- data.hdd?.[mount].peak.toFixed(2) || '0',
- data.hdd?.[mount].max.toFixed(2) || '0'
- );
- return res;
- }, [] as string[])
- : [])
- ].join(';');
- }
- public get trx() {
- return this._trx;
- }
- private _trx: {
- file?: PathLike;
- start: () => Promise<number | null>;
- read: () => Promise<Array<ReducedData>>;
- rollback: (hdl: number) => Promise<void>;
- commit: (hdl: number) => Promise<void>;
- } = {
- start: async () => {
- if (this.trx.file) {
- Logger.warn(`[WARN] Old transaction file found - rolling back now before starting new transaction ...`);
- const m = /trx_(\d+)\.csv/.exec(this.trx.file as string);
- const hdl = Number(m?.[1] ?? '0');
- await this.trx.rollback(hdl);
- Logger.warn(`[WARN] Transaction rollback succeeded.`);
- }
- if (!fs.existsSync(DATA_REDUCED_FILE)) {
- // NO DATA
- return null;
- }
- const hdl = moment().unix();
- this.trx.file = path.resolve(DATA_DIR, `reduced.trx_${hdl.toFixed(0)}.csv`);
- await fsp.rename(DATA_REDUCED_FILE, this.trx.file);
- return hdl;
- },
- read: async () => {
- if (!this.trx.file) throw new Error('No transaction opened');
- const data = await this.readDataFileCSV(this.trx.file);
- return data.map(this.parseReducedData.bind(this));
- },
- rollback: async (hdl: number) => {
- if (this.trx.file) {
- const filename = path.resolve(DATA_DIR, `reduced.trx_${hdl.toFixed(0)}.csv`);
- if (filename !== this.trx.file) throw new HttpStatusException(`Transaction #${hdl} not found`, 404);
- if (fs.existsSync(this.trx.file)) {
- let tmpFile: string | undefined;
- if (fs.existsSync(DATA_REDUCED_FILE)) {
- tmpFile = `reduced.tmp_${moment().unix().toFixed(0)}.csv`;
- await fsp.rename(DATA_REDUCED_FILE, tmpFile);
- }
- await fsp.rename(this.trx.file, DATA_REDUCED_FILE);
- if (tmpFile) {
- await exec(`cat "${tmpFile}" >> "${DATA_REDUCED_FILE}"`);
- await fsp.unlink(tmpFile);
- }
- }
- this.trx.file = undefined;
- }
- },
- commit: async (hdl: number) => {
- if (this.trx.file) {
- const filename = path.resolve(DATA_DIR, `reduced.trx_${hdl.toFixed(0)}.csv`);
- if (filename !== this.trx.file) throw new HttpStatusException(`Transaction #${hdl} not found`, 404);
- if (fs.existsSync(this.trx.file)) {
- await fsp.unlink(this.trx.file);
- }
- this.trx.file = undefined;
- }
- }
- };
- public stopLoop() {
- clearInterval(this.intervalHdl);
- }
- }
|