collector.class.ts 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450
  1. import fs, { PathLike } from 'fs';
  2. import fsp from 'fs/promises';
  3. import moment from 'moment';
  4. import { exec } from 'node-utils/shell';
  5. import path from 'path';
  6. import { HttpStatusException } from '../../common/lib/http-status.exception';
  7. import { Logger } from '../../common/util/logger.class';
  8. const DATA_DIR = process.env.DATA_DIR || 'data';
  9. const DATA_BUFFER_FILE = path.resolve(DATA_DIR, 'buffer.csv');
  10. const DATA_BUFFER_REMAINS = path.resolve(DATA_DIR, 'remains.csv');
  11. const DATA_REDUCED_FILE = path.resolve(DATA_DIR, 'reduced.csv');
  12. const TIMESTAMP_FORMAT = `YYYY-MM-DD[T]HH:mm:ss.SSSZZ`;
  13. const REDUCE_INTERVAL_MINUTES = 5;
  14. const REDUCE_GROUP_MINUTES = 1;
  15. const MONITOR_MOUNTS = !!process.env.MONITOR_MOUNTS ? process.env.MONITOR_MOUNTS.split(':') : [];
  16. Logger.info('[INFO] Monitoring Drives:', MONITOR_MOUNTS);
  17. const CSV_COLS = {
  18. buffer: {
  19. time: 0,
  20. cpu: 1,
  21. ram: 2
  22. },
  23. reduced: {
  24. time: 0,
  25. cpu: {
  26. avg: 1,
  27. peak: 2
  28. },
  29. ram: {
  30. avg: 3,
  31. peak: 4,
  32. max: 5
  33. }
  34. }
  35. };
  36. export class Collector {
  37. private intervalHdl?: NodeJS.Timer;
  38. constructor() {
  39. (async () => {
  40. try {
  41. if (!fs.existsSync(DATA_DIR)) {
  42. Logger.info('[INFO] DATA_DIR', DATA_DIR, 'does not exist - creating now ...');
  43. await fsp.mkdir(DATA_DIR);
  44. }
  45. this.startLoop();
  46. } catch (err) {
  47. Logger.error('[FATAL]', err);
  48. Logger.error('[EXITING]');
  49. process.exit(1);
  50. }
  51. })();
  52. }
  53. public startLoop() {
  54. this.intervalHdl = setInterval(this.loop.bind(this), 500);
  55. }
  56. private async loop() {
  57. try {
  58. const now = moment();
  59. const time = now.format(TIMESTAMP_FORMAT);
  60. const cpu = (await exec(`./cpu.sh`)).trim();
  61. const ram = (await exec(`./ram.sh`)).trim();
  62. const hdd: string[] = [];
  63. for (const mount of MONITOR_MOUNTS) {
  64. try {
  65. const stats = (await exec(`./hdd.sh "${mount}"`)).trim();
  66. if (stats?.length) hdd.push(`${mount} ${stats}`);
  67. } catch (err) {
  68. Logger.warn('[WARN] Error while getting space usage of mount', mount, ':', err);
  69. }
  70. }
  71. const data = `${time};${cpu};${ram}${hdd.length ? `;${hdd.join(';')}` : ''}\n`;
  72. // Time to reduce buffer?
  73. const firstBufferTime = await this.getFirstBufferTime();
  74. if (moment.duration(now.diff(firstBufferTime)).abs().asMinutes() >= REDUCE_INTERVAL_MINUTES) {
  75. try {
  76. const tmpFile = await this.createTmpFile();
  77. process.nextTick(() => this.reduceData(tmpFile));
  78. } catch (err) {
  79. Logger.error('[ERROR] Creating Temp File for Reducing Data failed:', err);
  80. }
  81. }
  82. await fsp.appendFile(DATA_BUFFER_FILE, data);
  83. } catch (err) {
  84. Logger.error(err);
  85. }
  86. }
  87. private async getFirstBufferTime() {
  88. let dataFile = DATA_BUFFER_FILE;
  89. if (fs.existsSync(DATA_BUFFER_REMAINS)) dataFile = DATA_BUFFER_REMAINS;
  90. if (!fs.existsSync(dataFile)) return moment();
  91. const firstLine = await this.readFirstBufferLine(dataFile);
  92. const timestamp = firstLine.split(';')[CSV_COLS.buffer.time];
  93. return moment(timestamp, TIMESTAMP_FORMAT);
  94. }
  95. private readFirstBufferLine = (dataFile: PathLike) =>
  96. new Promise<string>((resolve, reject) => {
  97. const stream = fs.createReadStream(dataFile, { encoding: 'utf-8' });
  98. const chunks: string[] = [];
  99. stream
  100. .on('data', buf => {
  101. let chunk: string;
  102. if (buf instanceof Buffer) chunk = buf.toString('utf-8');
  103. else chunk = buf;
  104. const lfIdx = chunk.indexOf('\n');
  105. if (lfIdx >= 0) {
  106. chunks.push(chunk.substring(0, lfIdx));
  107. stream.close();
  108. } else {
  109. chunks.push(chunk);
  110. }
  111. })
  112. .on('close', () => resolve(chunks.join('')))
  113. .on('error', reject);
  114. });
  115. private async readDataFileCSV(dataFile: PathLike): Promise<string[][]> {
  116. return (await fsp.readFile(dataFile, { encoding: 'utf-8' }))
  117. .split(/\r?\n/g)
  118. .filter(l => !!l)
  119. .map(line => line.split(';'));
  120. }
  121. private async createTmpFile() {
  122. const tmpFilename = `buffer.tmp_${moment().format('YYYYMMDDHHmmssSSS')}.csv`;
  123. await fsp.rename(DATA_BUFFER_FILE, path.resolve(DATA_DIR, tmpFilename));
  124. return tmpFilename;
  125. }
  126. private async reduceData(tmpFilename: string) {
  127. const tmpFilepath = path.resolve(DATA_DIR, tmpFilename);
  128. Logger.info('[INFO] Reducing data in', tmpFilepath);
  129. try {
  130. const lines: string[][] = [];
  131. if (fs.existsSync(DATA_BUFFER_REMAINS)) {
  132. lines.push(...(await this.readDataFileCSV(DATA_BUFFER_REMAINS)));
  133. }
  134. lines.push(...(await this.readDataFileCSV(tmpFilepath)));
  135. const reduced: Array<ReducedData> = [];
  136. let valueBuffer: Array<BufferedData> = [];
  137. do {
  138. const line = lines.shift();
  139. if (!line) break;
  140. const data = this.parseBufferedData(line);
  141. Logger.debug('[DEBUG] BufferedData:', JSON.stringify(data));
  142. valueBuffer.push(data);
  143. if (valueBuffer.length <= 1) {
  144. // Need at least 2 datasets to check for time diff and eventually reduce to avg/max.
  145. // skip to next data line
  146. continue;
  147. }
  148. const firstTime = moment(valueBuffer[0].time);
  149. const currentTime = moment(data.time);
  150. if (moment.duration(currentTime.diff(firstTime)).abs().asMinutes() >= REDUCE_GROUP_MINUTES) {
  151. type IntermediateValues = { sum: number; peak: number; max: number };
  152. type IntermediateDriveData = { [mount: string]: IntermediateValues };
  153. type IntermediateSums = { ram: IntermediateValues; cpu: IntermediateValues; hdd?: IntermediateDriveData; count: number };
  154. const { cpu, ram, count, hdd } = valueBuffer.reduce(
  155. (res, cur) => {
  156. res.count++;
  157. res.cpu.sum += cur.cpu;
  158. res.cpu.peak = Math.max(res.cpu.peak, cur.cpu);
  159. res.ram.sum += cur.ram.used;
  160. res.ram.peak = Math.max(res.ram.peak, cur.ram.used);
  161. res.ram.max = cur.ram.max;
  162. if (cur.hdd && Object.keys(cur.hdd).length) {
  163. const hdd_sums = res.hdd ?? {};
  164. res.hdd = Object.keys(cur.hdd).reduce((res_hdd, mount) => {
  165. if (!cur.hdd) return res_hdd;
  166. if (!res_hdd[mount]) {
  167. res_hdd[mount] = { sum: 0, peak: 0, max: 0 };
  168. }
  169. res_hdd[mount].sum += cur.hdd[mount].used;
  170. res_hdd[mount].peak = Math.max(res_hdd[mount].peak, cur.hdd[mount].used);
  171. res_hdd[mount].max = cur.hdd[mount].max;
  172. return res_hdd;
  173. }, hdd_sums);
  174. }
  175. return res;
  176. },
  177. { ram: { sum: 0, peak: 0, max: 0 }, cpu: { sum: 0, peak: 0 }, count: 0 } as IntermediateSums
  178. );
  179. reduced.push({
  180. time: data.time,
  181. cpu: {
  182. avg: cpu.sum / count,
  183. peak: cpu.peak
  184. },
  185. ram: {
  186. avg: ram.sum / count,
  187. peak: ram.peak,
  188. max: ram.max
  189. },
  190. hdd: hdd
  191. ? Object.keys(hdd).reduce((res, mount) => {
  192. res[mount] = {
  193. avg: hdd[mount].sum / count,
  194. peak: hdd[mount].peak,
  195. max: hdd[mount].max
  196. };
  197. return res;
  198. }, {} as ReducedDriveData)
  199. : undefined
  200. });
  201. Logger.debug('[DEBUG] ReducedData:', JSON.stringify(reduced[reduced.length - 1]));
  202. valueBuffer = [];
  203. }
  204. } while (lines.length > 0);
  205. if (valueBuffer.length > 0) {
  206. // overwrite remains.csv with valueBuffer
  207. await fsp.writeFile(DATA_BUFFER_REMAINS, valueBuffer.map(this.serializeBufferedDataCSV.bind(this)).join('\n') + '\n', { encoding: 'utf-8' });
  208. } else {
  209. // delete remains.csv if exists
  210. if (fs.existsSync(DATA_BUFFER_REMAINS)) await fsp.unlink(DATA_BUFFER_REMAINS);
  211. }
  212. if (reduced.length > 0) {
  213. // append reduced data to reduced.csv
  214. await fsp.appendFile(DATA_REDUCED_FILE, reduced.map(this.serializeReducedDataCSV.bind(this)).join('\n') + '\n', { encoding: 'utf-8' });
  215. }
  216. // Delete tmpFile
  217. await fsp.unlink(tmpFilepath);
  218. } catch (err) {
  219. Logger.error(`[ERROR] Reducing Data of tmpFile ${tmpFilepath} failed:`, err);
  220. }
  221. }
  222. private parseBufferedData(line: string[]): BufferedData {
  223. // TIMESTAMP
  224. const time = moment(line[CSV_COLS.buffer.time], TIMESTAMP_FORMAT).toDate();
  225. // CPU
  226. const cpu = Number(line[CSV_COLS.buffer.cpu]);
  227. // RAM
  228. let [stats, unit] = line[CSV_COLS.buffer.ram].split(' ');
  229. const [used, max] = stats.split('/');
  230. const factor = this.parseByteUnit(unit);
  231. const lastCol = CSV_COLS.buffer.ram;
  232. // HDD (?)
  233. let hdd: BufferedDriveData | undefined;
  234. if (MONITOR_MOUNTS.length && line.length > lastCol + 1) {
  235. for (let i = 1; i <= MONITOR_MOUNTS.length; i++) {
  236. if (lastCol + i > line.length - 1) break;
  237. const data = line[lastCol + i];
  238. const [mount, stats] = data.split(' ');
  239. const [used, max] = stats.split('/');
  240. if (!hdd) hdd = {};
  241. hdd[mount] = {
  242. used: Number(used),
  243. max: Number(max)
  244. };
  245. }
  246. }
  247. return {
  248. time,
  249. cpu,
  250. ram: {
  251. used: Number(used) * factor,
  252. max: Number(max) * factor
  253. },
  254. hdd
  255. };
  256. }
  257. private parseReducedData(line: string[]): ReducedData {
  258. const lastCol = CSV_COLS.reduced.ram.max;
  259. // HDD (?)
  260. let hdd: ReducedDriveData | undefined;
  261. if (MONITOR_MOUNTS.length && line.length > lastCol + 1) {
  262. hdd = {};
  263. for (let i = 1; lastCol + i + 3 < line.length; i += 4) {
  264. hdd[line[lastCol + i]] = {
  265. avg: Number(line[lastCol + i + 1]),
  266. peak: Number(line[lastCol + i + 2]),
  267. max: Number(line[lastCol + i + 3])
  268. };
  269. }
  270. }
  271. return {
  272. time: moment(line[CSV_COLS.reduced.time], TIMESTAMP_FORMAT).toDate(),
  273. cpu: {
  274. avg: Number(line[CSV_COLS.reduced.cpu.avg]),
  275. peak: Number(line[CSV_COLS.reduced.cpu.peak])
  276. },
  277. ram: {
  278. avg: Number(line[CSV_COLS.reduced.ram.avg]),
  279. peak: Number(line[CSV_COLS.reduced.ram.peak]),
  280. max: Number(line[CSV_COLS.reduced.ram.max])
  281. },
  282. hdd
  283. };
  284. }
  285. private byteFactors: { [unit: string]: number } = {
  286. '': 1,
  287. K: 1024,
  288. M: 1024 * 1024,
  289. G: 1024 * 1024 * 1024,
  290. T: 1024 * 1024 * 1024 * 1024,
  291. P: 1024 * 1024 * 1024 * 1024 * 1024
  292. };
  293. private parseByteUnit(unit: string): number {
  294. const m = /^([KMGTP])?i?B$/.exec(unit);
  295. if (!m) throw new Error(`Failed to parse byte size unit '${unit}'`);
  296. return this.byteFactors[m[1]];
  297. }
  298. private serializeBufferedDataCSV(data: BufferedData) {
  299. return [
  300. moment(data.time).format(TIMESTAMP_FORMAT),
  301. data.cpu,
  302. `${(data.ram.used / this.byteFactors['M']).toFixed(2)}/${(data.ram.max / this.byteFactors['M']).toFixed(2)} MiB`,
  303. ...(data.hdd ? Object.keys(data.hdd).map(mount => `${mount} ${data.hdd?.[mount].used}/${data.hdd?.[mount].max}`) : [])
  304. ].join(';');
  305. }
  306. private serializeReducedDataCSV(data: ReducedData) {
  307. return [
  308. moment(data.time).format(TIMESTAMP_FORMAT),
  309. data.cpu.avg.toFixed(2),
  310. data.cpu.peak.toFixed(2),
  311. data.ram.avg.toFixed(2),
  312. data.ram.peak.toFixed(2),
  313. data.ram.max.toFixed(2),
  314. ...(data.hdd
  315. ? Object.keys(data.hdd).reduce((res, mount) => {
  316. res.push(
  317. mount,
  318. data.hdd?.[mount].avg.toFixed(2) || '0',
  319. data.hdd?.[mount].peak.toFixed(2) || '0',
  320. data.hdd?.[mount].max.toFixed(2) || '0'
  321. );
  322. return res;
  323. }, [] as string[])
  324. : [])
  325. ].join(';');
  326. }
  327. public get trx() {
  328. return this._trx;
  329. }
  330. private _trx: {
  331. file?: PathLike;
  332. start: () => Promise<number | null>;
  333. read: () => Promise<Array<ReducedData>>;
  334. rollback: (hdl: number) => Promise<void>;
  335. commit: (hdl: number) => Promise<void>;
  336. } = {
  337. start: async () => {
  338. if (this.trx.file) {
  339. Logger.warn(`[WARN] Old transaction file found - rolling back now before starting new transaction ...`);
  340. const m = /trx_(\d+)\.csv/.exec(this.trx.file as string);
  341. const hdl = Number(m?.[1] ?? '0');
  342. await this.trx.rollback(hdl);
  343. Logger.warn(`[WARN] Transaction rollback succeeded.`);
  344. }
  345. if (!fs.existsSync(DATA_REDUCED_FILE)) {
  346. // NO DATA
  347. return null;
  348. }
  349. const hdl = moment().unix();
  350. this.trx.file = path.resolve(DATA_DIR, `reduced.trx_${hdl.toFixed(0)}.csv`);
  351. await fsp.rename(DATA_REDUCED_FILE, this.trx.file);
  352. return hdl;
  353. },
  354. read: async () => {
  355. if (!this.trx.file) throw new Error('No transaction opened');
  356. const data = await this.readDataFileCSV(this.trx.file);
  357. return data.map(this.parseReducedData.bind(this));
  358. },
  359. rollback: async (hdl: number) => {
  360. if (this.trx.file) {
  361. const filename = path.resolve(DATA_DIR, `reduced.trx_${hdl.toFixed(0)}.csv`);
  362. if (filename !== this.trx.file) throw new HttpStatusException(`Transaction #${hdl} not found`, 404);
  363. if (fs.existsSync(this.trx.file)) {
  364. let tmpFile: string | undefined;
  365. if (fs.existsSync(DATA_REDUCED_FILE)) {
  366. tmpFile = `reduced.tmp_${moment().unix().toFixed(0)}.csv`;
  367. await fsp.rename(DATA_REDUCED_FILE, tmpFile);
  368. }
  369. await fsp.rename(this.trx.file, DATA_REDUCED_FILE);
  370. if (tmpFile) {
  371. await exec(`cat "${tmpFile}" >> "${DATA_REDUCED_FILE}"`);
  372. await fsp.unlink(tmpFile);
  373. }
  374. }
  375. this.trx.file = undefined;
  376. }
  377. },
  378. commit: async (hdl: number) => {
  379. if (this.trx.file) {
  380. const filename = path.resolve(DATA_DIR, `reduced.trx_${hdl.toFixed(0)}.csv`);
  381. if (filename !== this.trx.file) throw new HttpStatusException(`Transaction #${hdl} not found`, 404);
  382. if (fs.existsSync(this.trx.file)) {
  383. await fsp.unlink(this.trx.file);
  384. }
  385. this.trx.file = undefined;
  386. }
  387. }
  388. };
  389. public stopLoop() {
  390. clearInterval(this.intervalHdl);
  391. }
  392. }