import fs, { PathLike } from 'fs'; import fsp from 'fs/promises'; import moment from 'moment'; import { exec } from 'node-utils/shell'; import path from 'path'; import { format } from 'util'; import { HttpStatusException } from '../../common/lib/http-status.exception'; import { Logger } from '../../common/util/logger.class'; import { MemoryUsage, TopTTY } from './top-tty'; const DATA_DIR = process.env.DATA_DIR || 'data'; const DATA_BUFFER_FILE = path.resolve(DATA_DIR, 'buffer.csv'); const DATA_BUFFER_REMAINS = path.resolve(DATA_DIR, 'remains.csv'); const DATA_REDUCED_FILE = path.resolve(DATA_DIR, 'reduced.csv'); const TIMESTAMP_FORMAT = `YYYY-MM-DD[T]HH:mm:ss.SSSZZ`; const WRITE_BUFFER_INTERVAL = 500; // [ms] const READ_HDD_INTERVAL = 300; // [s] const REDUCE_INTERVAL_MINUTES = 5; const REDUCE_GROUP_MINUTES = 1; const MONITOR_MOUNTS = !!process.env.MONITOR_MOUNTS ? process.env.MONITOR_MOUNTS.split(':') : []; Logger.info('[INFO] Monitoring Drives:', MONITOR_MOUNTS); const CSV_COLS = { buffer: { time: 0, cpu: 1, ram: 2 }, reduced: { time: 0, cpu: { avg: 1, peak: 2 }, ram: { avg: 3, peak: 4, max: 5 } } }; export class Collector { private intervalHdl?: NodeJS.Timeout; private currentRamUsage: MemoryUsage = { unit: 'B', avail: 0, used: 0 }; private currentCpuUsage: number = 0; private currentHddUsage: { time: number; mounts: Array<{ mount: string; stats: string }> } = { time: 0, mounts: [] }; constructor() { (async () => { try { if (!fs.existsSync(DATA_DIR)) { Logger.info('[INFO] DATA_DIR', DATA_DIR, 'does not exist - creating now ...'); await fsp.mkdir(DATA_DIR); } this.startLoop(); } catch (err) { Logger.error('[FATAL]', err); Logger.error('[EXITING]'); process.exit(1); } })(); } public startLoop() { this.intervalHdl = setInterval(this.loop.bind(this), WRITE_BUFFER_INTERVAL); const tty = new TopTTY(); tty.subscribe.cpu(usage => (this.currentCpuUsage = usage)); tty.subscribe.ram(usage => (this.currentRamUsage = usage)); tty.runloop(); } private async loop() { try { const now = moment(); const time = now.format(TIMESTAMP_FORMAT); if (now.unix() - this.currentHddUsage.time > READ_HDD_INTERVAL) { this.currentHddUsage.time = now.unix(); this.currentHddUsage.mounts = []; for (const mount of MONITOR_MOUNTS) { try { const stats = (await exec(`./hdd.sh "${mount}"`)).trim(); if (stats?.length) this.currentHddUsage.mounts.push({ mount, stats }); } catch (err) { Logger.warn('[WARN] Error while getting space usage of mount', mount, ':', err); } } } const hdd: string[] = this.currentHddUsage.mounts.map(({ mount, stats }) => `${mount} ${stats}`); if (!this.currentRamUsage.avail) return; const ram = format( '%s/%s %s', (Math.round(this.currentRamUsage.used * 100) / 100).toFixed(2), (Math.round(this.currentRamUsage.avail * 100) / 100).toFixed(2), this.currentRamUsage.unit ); const cpu = (Math.round(this.currentCpuUsage * 100) / 100).toFixed(2); const data = `${time};${cpu};${ram}${hdd.length ? `;${hdd.join(';')}` : ''}\n`; // Time to reduce buffer? const firstBufferTime = await this.getFirstBufferTime(); if (moment.duration(now.diff(firstBufferTime)).abs().asMinutes() >= REDUCE_INTERVAL_MINUTES) { try { const tmpFile = await this.createTmpFile(); process.nextTick(() => this.reduceData(tmpFile)); } catch (err) { Logger.error('[ERROR] Creating Temp File for Reducing Data failed:', err); } } await fsp.appendFile(DATA_BUFFER_FILE, data); } catch (err) { Logger.error(err); } } private async getFirstBufferTime() { let dataFile = DATA_BUFFER_FILE; if (fs.existsSync(DATA_BUFFER_REMAINS)) dataFile = DATA_BUFFER_REMAINS; if (!fs.existsSync(dataFile)) return moment(); const firstLine = await this.readFirstBufferLine(dataFile); const timestamp = firstLine.split(';')[CSV_COLS.buffer.time]; return moment(timestamp, TIMESTAMP_FORMAT); } private readFirstBufferLine = (dataFile: PathLike) => new Promise((resolve, reject) => { const stream = fs.createReadStream(dataFile, { encoding: 'utf-8' }); const chunks: string[] = []; stream .on('data', buf => { let chunk: string; if (buf instanceof Buffer) chunk = buf.toString('utf-8'); else chunk = buf; const lfIdx = chunk.indexOf('\n'); if (lfIdx >= 0) { chunks.push(chunk.substring(0, lfIdx)); stream.close(); } else { chunks.push(chunk); } }) .on('close', () => resolve(chunks.join(''))) .on('error', reject); }); private async readDataFileCSV(dataFile: PathLike): Promise { return (await fsp.readFile(dataFile, { encoding: 'utf-8' })) .split(/\r?\n/g) .filter(l => !!l) .map(line => line.split(';')); } private async createTmpFile() { const tmpFilename = `buffer.tmp_${moment().format('YYYYMMDDHHmmssSSS')}.csv`; await fsp.rename(DATA_BUFFER_FILE, path.resolve(DATA_DIR, tmpFilename)); return tmpFilename; } private async reduceData(tmpFilename: string) { const tmpFilepath = path.resolve(DATA_DIR, tmpFilename); Logger.info('[INFO] Reducing data in', tmpFilepath); try { const lines: string[][] = []; if (fs.existsSync(DATA_BUFFER_REMAINS)) { lines.push(...(await this.readDataFileCSV(DATA_BUFFER_REMAINS))); } lines.push(...(await this.readDataFileCSV(tmpFilepath))); const reduced: Array = []; let valueBuffer: Array = []; do { const line = lines.shift(); if (!line) break; const data = this.parseBufferedData(line); Logger.debug('[DEBUG] BufferedData:', JSON.stringify(data)); valueBuffer.push(data); if (valueBuffer.length <= 1) { // Need at least 2 datasets to check for time diff and eventually reduce to avg/max. // skip to next data line continue; } const firstTime = moment(valueBuffer[0].time); const currentTime = moment(data.time); if (moment.duration(currentTime.diff(firstTime)).abs().asMinutes() >= REDUCE_GROUP_MINUTES) { type IntermediateValues = { sum: number; peak: number; max: number }; type IntermediateDriveData = { [mount: string]: IntermediateValues }; type IntermediateSums = { ram: IntermediateValues; cpu: IntermediateValues; hdd?: IntermediateDriveData; count: number }; const { cpu, ram, count, hdd } = valueBuffer.reduce( (res, cur) => { res.count++; res.cpu.sum += cur.cpu; res.cpu.peak = Math.max(res.cpu.peak, cur.cpu); res.ram.sum += cur.ram.used; res.ram.peak = Math.max(res.ram.peak, cur.ram.used); res.ram.max = cur.ram.max; if (cur.hdd && Object.keys(cur.hdd).length) { const hdd_sums = res.hdd ?? {}; res.hdd = Object.keys(cur.hdd).reduce((res_hdd, mount) => { if (!cur.hdd) return res_hdd; if (!res_hdd[mount]) { res_hdd[mount] = { sum: 0, peak: 0, max: 0 }; } res_hdd[mount].sum += cur.hdd[mount].used; res_hdd[mount].peak = Math.max(res_hdd[mount].peak, cur.hdd[mount].used); res_hdd[mount].max = cur.hdd[mount].max; return res_hdd; }, hdd_sums); } return res; }, { ram: { sum: 0, peak: 0, max: 0 }, cpu: { sum: 0, peak: 0 }, count: 0 } as IntermediateSums ); reduced.push({ time: data.time, cpu: { avg: cpu.sum / count, peak: cpu.peak }, ram: { avg: ram.sum / count, peak: ram.peak, max: ram.max }, hdd: hdd ? Object.keys(hdd).reduce((res, mount) => { res[mount] = { avg: hdd[mount].sum / count, peak: hdd[mount].peak, max: hdd[mount].max }; return res; }, {} as ReducedDriveData) : undefined }); Logger.debug('[DEBUG] ReducedData:', JSON.stringify(reduced[reduced.length - 1])); valueBuffer = []; } } while (lines.length > 0); if (valueBuffer.length > 0) { // overwrite remains.csv with valueBuffer await fsp.writeFile(DATA_BUFFER_REMAINS, valueBuffer.map(this.serializeBufferedDataCSV.bind(this)).join('\n') + '\n', { encoding: 'utf-8' }); } else { // delete remains.csv if exists if (fs.existsSync(DATA_BUFFER_REMAINS)) await fsp.unlink(DATA_BUFFER_REMAINS); } if (reduced.length > 0) { // append reduced data to reduced.csv await fsp.appendFile(DATA_REDUCED_FILE, reduced.map(this.serializeReducedDataCSV.bind(this)).join('\n') + '\n', { encoding: 'utf-8' }); } // Delete tmpFile await fsp.unlink(tmpFilepath); } catch (err) { Logger.error(`[ERROR] Reducing Data of tmpFile ${tmpFilepath} failed:`, err); } } private parseBufferedData(line: string[]): BufferedData { // TIMESTAMP const time = moment(line[CSV_COLS.buffer.time], TIMESTAMP_FORMAT).toDate(); // CPU const cpu = Number(line[CSV_COLS.buffer.cpu]); // RAM let [stats, unit] = line[CSV_COLS.buffer.ram].split(' '); const [used, max] = stats.split('/'); const factor = this.parseByteUnit(unit); const lastCol = CSV_COLS.buffer.ram; // HDD (?) let hdd: BufferedDriveData | undefined; if (MONITOR_MOUNTS.length && line.length > lastCol + 1) { for (let i = 1; i <= MONITOR_MOUNTS.length; i++) { if (lastCol + i > line.length - 1) break; const data = line[lastCol + i]; const [mount, stats] = data.split(' '); const [used, max] = stats.split('/'); if (!hdd) hdd = {}; hdd[mount] = { used: Number(used), max: Number(max) }; } } return { time, cpu, ram: { used: Number(used) * factor, max: Number(max) * factor }, hdd }; } private parseReducedData(line: string[]): ReducedData { const lastCol = CSV_COLS.reduced.ram.max; // HDD (?) let hdd: ReducedDriveData | undefined; if (MONITOR_MOUNTS.length && line.length > lastCol + 1) { hdd = {}; for (let i = 1; lastCol + i + 3 < line.length; i += 4) { hdd[line[lastCol + i]] = { avg: Number(line[lastCol + i + 1]), peak: Number(line[lastCol + i + 2]), max: Number(line[lastCol + i + 3]) }; } } return { time: moment(line[CSV_COLS.reduced.time], TIMESTAMP_FORMAT).toDate(), cpu: { avg: Number(line[CSV_COLS.reduced.cpu.avg]), peak: Number(line[CSV_COLS.reduced.cpu.peak]) }, ram: { avg: Number(line[CSV_COLS.reduced.ram.avg]), peak: Number(line[CSV_COLS.reduced.ram.peak]), max: Number(line[CSV_COLS.reduced.ram.max]) }, hdd }; } private byteFactors: { [unit: string]: number } = { '': 1, K: 1024, M: 1024 * 1024, G: 1024 * 1024 * 1024, T: 1024 * 1024 * 1024 * 1024, P: 1024 * 1024 * 1024 * 1024 * 1024 }; private parseByteUnit(unit: string): number { const m = /^([KMGTP])?i?B$/.exec(unit); if (!m) throw new Error(`Failed to parse byte size unit '${unit}'`); return this.byteFactors[m[1]]; } private serializeBufferedDataCSV(data: BufferedData) { return [ moment(data.time).format(TIMESTAMP_FORMAT), data.cpu, `${(data.ram.used / this.byteFactors['M']).toFixed(2)}/${(data.ram.max / this.byteFactors['M']).toFixed(2)} MiB`, ...(data.hdd ? Object.keys(data.hdd).map(mount => `${mount} ${data.hdd?.[mount].used}/${data.hdd?.[mount].max}`) : []) ].join(';'); } private serializeReducedDataCSV(data: ReducedData) { return [ moment(data.time).format(TIMESTAMP_FORMAT), data.cpu.avg.toFixed(2), data.cpu.peak.toFixed(2), data.ram.avg.toFixed(2), data.ram.peak.toFixed(2), data.ram.max.toFixed(2), ...(data.hdd ? Object.keys(data.hdd).reduce((res, mount) => { res.push( mount, data.hdd?.[mount].avg.toFixed(2) || '0', data.hdd?.[mount].peak.toFixed(2) || '0', data.hdd?.[mount].max.toFixed(2) || '0' ); return res; }, [] as string[]) : []) ].join(';'); } public get trx() { return this._trx; } private _trx: { file?: PathLike; start: () => Promise; read: () => Promise>; rollback: (hdl: number) => Promise; commit: (hdl: number) => Promise; } = { start: async () => { if (this.trx.file) { Logger.warn(`[WARN] Old transaction file found - rolling back now before starting new transaction ...`); const m = /trx_(\d+)\.csv/.exec(this.trx.file as string); const hdl = Number(m?.[1] ?? '0'); await this.trx.rollback(hdl); Logger.warn(`[WARN] Transaction rollback succeeded.`); } if (!fs.existsSync(DATA_REDUCED_FILE)) { // NO DATA return null; } const hdl = moment().unix(); this.trx.file = path.resolve(DATA_DIR, `reduced.trx_${hdl.toFixed(0)}.csv`); await fsp.rename(DATA_REDUCED_FILE, this.trx.file); return hdl; }, read: async () => { if (!this.trx.file) throw new Error('No transaction opened'); const data = await this.readDataFileCSV(this.trx.file); return data.map(this.parseReducedData.bind(this)); }, rollback: async (hdl: number) => { if (this.trx.file) { const filename = path.resolve(DATA_DIR, `reduced.trx_${hdl.toFixed(0)}.csv`); if (filename !== this.trx.file) throw new HttpStatusException(`Transaction #${hdl} not found`, 404); if (fs.existsSync(this.trx.file)) { let tmpFile: string | undefined; if (fs.existsSync(DATA_REDUCED_FILE)) { tmpFile = `reduced.tmp_${moment().unix().toFixed(0)}.csv`; await fsp.rename(DATA_REDUCED_FILE, tmpFile); } await fsp.rename(this.trx.file, DATA_REDUCED_FILE); if (tmpFile) { await exec(`cat "${tmpFile}" >> "${DATA_REDUCED_FILE}"`); await fsp.unlink(tmpFile); } } this.trx.file = undefined; } }, commit: async (hdl: number) => { if (this.trx.file) { const filename = path.resolve(DATA_DIR, `reduced.trx_${hdl.toFixed(0)}.csv`); if (filename !== this.trx.file) throw new HttpStatusException(`Transaction #${hdl} not found`, 404); if (fs.existsSync(this.trx.file)) { await fsp.unlink(this.trx.file); } this.trx.file = undefined; } } }; public stopLoop() { clearInterval(this.intervalHdl); } }