collector.class.ts 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480
  1. import fs, { PathLike } from 'fs';
  2. import fsp from 'fs/promises';
  3. import moment from 'moment';
  4. import { exec } from 'node-utils/shell';
  5. import path from 'path';
  6. import { format } from 'util';
  7. import { HttpStatusException } from '../../common/lib/http-status.exception';
  8. import { Logger } from '../../common/util/logger.class';
  9. import { MemoryUsage, TopTTY } from './top-tty';
  10. const DATA_DIR = process.env.DATA_DIR || 'data';
  11. const DATA_BUFFER_FILE = path.resolve(DATA_DIR, 'buffer.csv');
  12. const DATA_BUFFER_REMAINS = path.resolve(DATA_DIR, 'remains.csv');
  13. const DATA_REDUCED_FILE = path.resolve(DATA_DIR, 'reduced.csv');
  14. const TIMESTAMP_FORMAT = `YYYY-MM-DD[T]HH:mm:ss.SSSZZ`;
  15. const WRITE_BUFFER_INTERVAL = 500; // [ms]
  16. const READ_HDD_INTERVAL = 300; // [s]
  17. const REDUCE_INTERVAL_MINUTES = 5;
  18. const REDUCE_GROUP_MINUTES = 1;
  19. const MONITOR_MOUNTS = !!process.env.MONITOR_MOUNTS ? process.env.MONITOR_MOUNTS.split(':') : [];
  20. Logger.info('[INFO] Monitoring Drives:', MONITOR_MOUNTS);
  21. const CSV_COLS = {
  22. buffer: {
  23. time: 0,
  24. cpu: 1,
  25. ram: 2
  26. },
  27. reduced: {
  28. time: 0,
  29. cpu: {
  30. avg: 1,
  31. peak: 2
  32. },
  33. ram: {
  34. avg: 3,
  35. peak: 4,
  36. max: 5
  37. }
  38. }
  39. };
  40. export class Collector {
  41. private intervalHdl?: NodeJS.Timeout;
  42. private currentRamUsage: MemoryUsage = { unit: 'B', avail: 0, used: 0 };
  43. private currentCpuUsage: number = 0;
  44. private currentHddUsage: { time: number; mounts: Array<{ mount: string; stats: string }> } = {
  45. time: 0,
  46. mounts: []
  47. };
  48. constructor() {
  49. (async () => {
  50. try {
  51. if (!fs.existsSync(DATA_DIR)) {
  52. Logger.info('[INFO] DATA_DIR', DATA_DIR, 'does not exist - creating now ...');
  53. await fsp.mkdir(DATA_DIR);
  54. }
  55. this.startLoop();
  56. } catch (err) {
  57. Logger.error('[FATAL]', err);
  58. Logger.error('[EXITING]');
  59. process.exit(1);
  60. }
  61. })();
  62. }
  63. public startLoop() {
  64. this.intervalHdl = setInterval(this.loop.bind(this), WRITE_BUFFER_INTERVAL);
  65. const tty = new TopTTY();
  66. tty.subscribe.cpu(usage => (this.currentCpuUsage = usage));
  67. tty.subscribe.ram(usage => (this.currentRamUsage = usage));
  68. tty.runloop();
  69. }
  70. private async loop() {
  71. try {
  72. const now = moment();
  73. const time = now.format(TIMESTAMP_FORMAT);
  74. if (now.unix() - this.currentHddUsage.time > READ_HDD_INTERVAL) {
  75. this.currentHddUsage.time = now.unix();
  76. this.currentHddUsage.mounts = [];
  77. for (const mount of MONITOR_MOUNTS) {
  78. try {
  79. const stats = (await exec(`./hdd.sh "${mount}"`)).trim();
  80. if (stats?.length) this.currentHddUsage.mounts.push({ mount, stats });
  81. } catch (err) {
  82. Logger.warn('[WARN] Error while getting space usage of mount', mount, ':', err);
  83. }
  84. }
  85. }
  86. const hdd: string[] = this.currentHddUsage.mounts.map(({ mount, stats }) => `${mount} ${stats}`);
  87. if (!this.currentRamUsage.avail) return;
  88. const ram = format(
  89. '%s/%s %s',
  90. (Math.round(this.currentRamUsage.used * 100) / 100).toFixed(2),
  91. (Math.round(this.currentRamUsage.avail * 100) / 100).toFixed(2),
  92. this.currentRamUsage.unit
  93. );
  94. const cpu = (Math.round(this.currentCpuUsage * 100) / 100).toFixed(2);
  95. const data = `${time};${cpu};${ram}${hdd.length ? `;${hdd.join(';')}` : ''}\n`;
  96. // Time to reduce buffer?
  97. const firstBufferTime = await this.getFirstBufferTime();
  98. if (moment.duration(now.diff(firstBufferTime)).abs().asMinutes() >= REDUCE_INTERVAL_MINUTES) {
  99. try {
  100. const tmpFile = await this.createTmpFile();
  101. process.nextTick(() => this.reduceData(tmpFile));
  102. } catch (err) {
  103. Logger.error('[ERROR] Creating Temp File for Reducing Data failed:', err);
  104. }
  105. }
  106. await fsp.appendFile(DATA_BUFFER_FILE, data);
  107. } catch (err) {
  108. Logger.error(err);
  109. }
  110. }
  111. private async getFirstBufferTime() {
  112. let dataFile = DATA_BUFFER_FILE;
  113. if (fs.existsSync(DATA_BUFFER_REMAINS)) dataFile = DATA_BUFFER_REMAINS;
  114. if (!fs.existsSync(dataFile)) return moment();
  115. const firstLine = await this.readFirstBufferLine(dataFile);
  116. const timestamp = firstLine.split(';')[CSV_COLS.buffer.time];
  117. return moment(timestamp, TIMESTAMP_FORMAT);
  118. }
  119. private readFirstBufferLine = (dataFile: PathLike) =>
  120. new Promise<string>((resolve, reject) => {
  121. const stream = fs.createReadStream(dataFile, { encoding: 'utf-8' });
  122. const chunks: string[] = [];
  123. stream
  124. .on('data', buf => {
  125. let chunk: string;
  126. if (buf instanceof Buffer) chunk = buf.toString('utf-8');
  127. else chunk = buf;
  128. const lfIdx = chunk.indexOf('\n');
  129. if (lfIdx >= 0) {
  130. chunks.push(chunk.substring(0, lfIdx));
  131. stream.close();
  132. } else {
  133. chunks.push(chunk);
  134. }
  135. })
  136. .on('close', () => resolve(chunks.join('')))
  137. .on('error', reject);
  138. });
  139. private async readDataFileCSV(dataFile: PathLike): Promise<string[][]> {
  140. return (await fsp.readFile(dataFile, { encoding: 'utf-8' }))
  141. .split(/\r?\n/g)
  142. .filter(l => !!l)
  143. .map(line => line.split(';'));
  144. }
  145. private async createTmpFile() {
  146. const tmpFilename = `buffer.tmp_${moment().format('YYYYMMDDHHmmssSSS')}.csv`;
  147. await fsp.rename(DATA_BUFFER_FILE, path.resolve(DATA_DIR, tmpFilename));
  148. return tmpFilename;
  149. }
  150. private async reduceData(tmpFilename: string) {
  151. const tmpFilepath = path.resolve(DATA_DIR, tmpFilename);
  152. Logger.info('[INFO] Reducing data in', tmpFilepath);
  153. try {
  154. const lines: string[][] = [];
  155. if (fs.existsSync(DATA_BUFFER_REMAINS)) {
  156. lines.push(...(await this.readDataFileCSV(DATA_BUFFER_REMAINS)));
  157. }
  158. lines.push(...(await this.readDataFileCSV(tmpFilepath)));
  159. const reduced: Array<ReducedData> = [];
  160. let valueBuffer: Array<BufferedData> = [];
  161. do {
  162. const line = lines.shift();
  163. if (!line) break;
  164. const data = this.parseBufferedData(line);
  165. Logger.debug('[DEBUG] BufferedData:', JSON.stringify(data));
  166. valueBuffer.push(data);
  167. if (valueBuffer.length <= 1) {
  168. // Need at least 2 datasets to check for time diff and eventually reduce to avg/max.
  169. // skip to next data line
  170. continue;
  171. }
  172. const firstTime = moment(valueBuffer[0].time);
  173. const currentTime = moment(data.time);
  174. if (moment.duration(currentTime.diff(firstTime)).abs().asMinutes() >= REDUCE_GROUP_MINUTES) {
  175. type IntermediateValues = { sum: number; peak: number; max: number };
  176. type IntermediateDriveData = { [mount: string]: IntermediateValues };
  177. type IntermediateSums = { ram: IntermediateValues; cpu: IntermediateValues; hdd?: IntermediateDriveData; count: number };
  178. const { cpu, ram, count, hdd } = valueBuffer.reduce(
  179. (res, cur) => {
  180. res.count++;
  181. res.cpu.sum += cur.cpu;
  182. res.cpu.peak = Math.max(res.cpu.peak, cur.cpu);
  183. res.ram.sum += cur.ram.used;
  184. res.ram.peak = Math.max(res.ram.peak, cur.ram.used);
  185. res.ram.max = cur.ram.max;
  186. if (cur.hdd && Object.keys(cur.hdd).length) {
  187. const hdd_sums = res.hdd ?? {};
  188. res.hdd = Object.keys(cur.hdd).reduce((res_hdd, mount) => {
  189. if (!cur.hdd) return res_hdd;
  190. if (!res_hdd[mount]) {
  191. res_hdd[mount] = { sum: 0, peak: 0, max: 0 };
  192. }
  193. res_hdd[mount].sum += cur.hdd[mount].used;
  194. res_hdd[mount].peak = Math.max(res_hdd[mount].peak, cur.hdd[mount].used);
  195. res_hdd[mount].max = cur.hdd[mount].max;
  196. return res_hdd;
  197. }, hdd_sums);
  198. }
  199. return res;
  200. },
  201. { ram: { sum: 0, peak: 0, max: 0 }, cpu: { sum: 0, peak: 0 }, count: 0 } as IntermediateSums
  202. );
  203. reduced.push({
  204. time: data.time,
  205. cpu: {
  206. avg: cpu.sum / count,
  207. peak: cpu.peak
  208. },
  209. ram: {
  210. avg: ram.sum / count,
  211. peak: ram.peak,
  212. max: ram.max
  213. },
  214. hdd: hdd
  215. ? Object.keys(hdd).reduce((res, mount) => {
  216. res[mount] = {
  217. avg: hdd[mount].sum / count,
  218. peak: hdd[mount].peak,
  219. max: hdd[mount].max
  220. };
  221. return res;
  222. }, {} as ReducedDriveData)
  223. : undefined
  224. });
  225. Logger.debug('[DEBUG] ReducedData:', JSON.stringify(reduced[reduced.length - 1]));
  226. valueBuffer = [];
  227. }
  228. } while (lines.length > 0);
  229. if (valueBuffer.length > 0) {
  230. // overwrite remains.csv with valueBuffer
  231. await fsp.writeFile(DATA_BUFFER_REMAINS, valueBuffer.map(this.serializeBufferedDataCSV.bind(this)).join('\n') + '\n', { encoding: 'utf-8' });
  232. } else {
  233. // delete remains.csv if exists
  234. if (fs.existsSync(DATA_BUFFER_REMAINS)) await fsp.unlink(DATA_BUFFER_REMAINS);
  235. }
  236. if (reduced.length > 0) {
  237. // append reduced data to reduced.csv
  238. await fsp.appendFile(DATA_REDUCED_FILE, reduced.map(this.serializeReducedDataCSV.bind(this)).join('\n') + '\n', { encoding: 'utf-8' });
  239. }
  240. // Delete tmpFile
  241. await fsp.unlink(tmpFilepath);
  242. } catch (err) {
  243. Logger.error(`[ERROR] Reducing Data of tmpFile ${tmpFilepath} failed:`, err);
  244. }
  245. }
  246. private parseBufferedData(line: string[]): BufferedData {
  247. // TIMESTAMP
  248. const time = moment(line[CSV_COLS.buffer.time], TIMESTAMP_FORMAT).toDate();
  249. // CPU
  250. const cpu = Number(line[CSV_COLS.buffer.cpu]);
  251. // RAM
  252. let [stats, unit] = line[CSV_COLS.buffer.ram].split(' ');
  253. const [used, max] = stats.split('/');
  254. const factor = this.parseByteUnit(unit);
  255. const lastCol = CSV_COLS.buffer.ram;
  256. // HDD (?)
  257. let hdd: BufferedDriveData | undefined;
  258. if (MONITOR_MOUNTS.length && line.length > lastCol + 1) {
  259. for (let i = 1; i <= MONITOR_MOUNTS.length; i++) {
  260. if (lastCol + i > line.length - 1) break;
  261. const data = line[lastCol + i];
  262. const [mount, stats] = data.split(' ');
  263. const [used, max] = stats.split('/');
  264. if (!hdd) hdd = {};
  265. hdd[mount] = {
  266. used: Number(used),
  267. max: Number(max)
  268. };
  269. }
  270. }
  271. return {
  272. time,
  273. cpu,
  274. ram: {
  275. used: Number(used) * factor,
  276. max: Number(max) * factor
  277. },
  278. hdd
  279. };
  280. }
  281. private parseReducedData(line: string[]): ReducedData {
  282. const lastCol = CSV_COLS.reduced.ram.max;
  283. // HDD (?)
  284. let hdd: ReducedDriveData | undefined;
  285. if (MONITOR_MOUNTS.length && line.length > lastCol + 1) {
  286. hdd = {};
  287. for (let i = 1; lastCol + i + 3 < line.length; i += 4) {
  288. hdd[line[lastCol + i]] = {
  289. avg: Number(line[lastCol + i + 1]),
  290. peak: Number(line[lastCol + i + 2]),
  291. max: Number(line[lastCol + i + 3])
  292. };
  293. }
  294. }
  295. return {
  296. time: moment(line[CSV_COLS.reduced.time], TIMESTAMP_FORMAT).toDate(),
  297. cpu: {
  298. avg: Number(line[CSV_COLS.reduced.cpu.avg]),
  299. peak: Number(line[CSV_COLS.reduced.cpu.peak])
  300. },
  301. ram: {
  302. avg: Number(line[CSV_COLS.reduced.ram.avg]),
  303. peak: Number(line[CSV_COLS.reduced.ram.peak]),
  304. max: Number(line[CSV_COLS.reduced.ram.max])
  305. },
  306. hdd
  307. };
  308. }
  309. private byteFactors: { [unit: string]: number } = {
  310. '': 1,
  311. K: 1024,
  312. M: 1024 * 1024,
  313. G: 1024 * 1024 * 1024,
  314. T: 1024 * 1024 * 1024 * 1024,
  315. P: 1024 * 1024 * 1024 * 1024 * 1024
  316. };
  317. private parseByteUnit(unit: string): number {
  318. const m = /^([KMGTP])?i?B$/.exec(unit);
  319. if (!m) throw new Error(`Failed to parse byte size unit '${unit}'`);
  320. return this.byteFactors[m[1]];
  321. }
  322. private serializeBufferedDataCSV(data: BufferedData) {
  323. return [
  324. moment(data.time).format(TIMESTAMP_FORMAT),
  325. data.cpu,
  326. `${(data.ram.used / this.byteFactors['M']).toFixed(2)}/${(data.ram.max / this.byteFactors['M']).toFixed(2)} MiB`,
  327. ...(data.hdd ? Object.keys(data.hdd).map(mount => `${mount} ${data.hdd?.[mount].used}/${data.hdd?.[mount].max}`) : [])
  328. ].join(';');
  329. }
  330. private serializeReducedDataCSV(data: ReducedData) {
  331. return [
  332. moment(data.time).format(TIMESTAMP_FORMAT),
  333. data.cpu.avg.toFixed(2),
  334. data.cpu.peak.toFixed(2),
  335. data.ram.avg.toFixed(2),
  336. data.ram.peak.toFixed(2),
  337. data.ram.max.toFixed(2),
  338. ...(data.hdd
  339. ? Object.keys(data.hdd).reduce((res, mount) => {
  340. res.push(
  341. mount,
  342. data.hdd?.[mount].avg.toFixed(2) || '0',
  343. data.hdd?.[mount].peak.toFixed(2) || '0',
  344. data.hdd?.[mount].max.toFixed(2) || '0'
  345. );
  346. return res;
  347. }, [] as string[])
  348. : [])
  349. ].join(';');
  350. }
  351. public get trx() {
  352. return this._trx;
  353. }
  354. private _trx: {
  355. file?: PathLike;
  356. start: () => Promise<number | null>;
  357. read: () => Promise<Array<ReducedData>>;
  358. rollback: (hdl: number) => Promise<void>;
  359. commit: (hdl: number) => Promise<void>;
  360. } = {
  361. start: async () => {
  362. if (this.trx.file) {
  363. Logger.warn(`[WARN] Old transaction file found - rolling back now before starting new transaction ...`);
  364. const m = /trx_(\d+)\.csv/.exec(this.trx.file as string);
  365. const hdl = Number(m?.[1] ?? '0');
  366. await this.trx.rollback(hdl);
  367. Logger.warn(`[WARN] Transaction rollback succeeded.`);
  368. }
  369. if (!fs.existsSync(DATA_REDUCED_FILE)) {
  370. // NO DATA
  371. return null;
  372. }
  373. const hdl = moment().unix();
  374. this.trx.file = path.resolve(DATA_DIR, `reduced.trx_${hdl.toFixed(0)}.csv`);
  375. await fsp.rename(DATA_REDUCED_FILE, this.trx.file);
  376. return hdl;
  377. },
  378. read: async () => {
  379. if (!this.trx.file) throw new Error('No transaction opened');
  380. const data = await this.readDataFileCSV(this.trx.file);
  381. return data.map(this.parseReducedData.bind(this));
  382. },
  383. rollback: async (hdl: number) => {
  384. if (this.trx.file) {
  385. const filename = path.resolve(DATA_DIR, `reduced.trx_${hdl.toFixed(0)}.csv`);
  386. if (filename !== this.trx.file) throw new HttpStatusException(`Transaction #${hdl} not found`, 404);
  387. if (fs.existsSync(this.trx.file)) {
  388. let tmpFile: string | undefined;
  389. if (fs.existsSync(DATA_REDUCED_FILE)) {
  390. tmpFile = `reduced.tmp_${moment().unix().toFixed(0)}.csv`;
  391. await fsp.rename(DATA_REDUCED_FILE, tmpFile);
  392. }
  393. await fsp.rename(this.trx.file, DATA_REDUCED_FILE);
  394. if (tmpFile) {
  395. await exec(`cat "${tmpFile}" >> "${DATA_REDUCED_FILE}"`);
  396. await fsp.unlink(tmpFile);
  397. }
  398. }
  399. this.trx.file = undefined;
  400. }
  401. },
  402. commit: async (hdl: number) => {
  403. if (this.trx.file) {
  404. const filename = path.resolve(DATA_DIR, `reduced.trx_${hdl.toFixed(0)}.csv`);
  405. if (filename !== this.trx.file) throw new HttpStatusException(`Transaction #${hdl} not found`, 404);
  406. if (fs.existsSync(this.trx.file)) {
  407. await fsp.unlink(this.trx.file);
  408. }
  409. this.trx.file = undefined;
  410. }
  411. }
  412. };
  413. public stopLoop() {
  414. clearInterval(this.intervalHdl);
  415. }
  416. }