collector.class.ts 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. import fs, { PathLike } from 'fs';
  2. import fsp from 'fs/promises';
  3. import moment from 'moment';
  4. import { exec } from 'node-utils/shell';
  5. import path from 'path';
  6. import { Logger } from '../../common/util/logger.class';
  7. import { HttpStatusException } from './lib/http-status.exception';
  8. const DATA_DIR = process.env.DATA_DIR;
  9. const DATA_BUFFER_FILE = path.resolve(DATA_DIR, 'buffer.csv');
  10. const DATA_BUFFER_REMAINS = path.resolve(DATA_DIR, 'remains.csv');
  11. const DATA_REDUCED_FILE = path.resolve(DATA_DIR, 'reduced.csv');
  12. const TIMESTAMP_FORMAT = `YYYY-MM-DD[T]HH:mm:ss.SSSZZ`;
  13. const REDUCE_INTERVAL_MINUTES = 5;
  14. const REDUCE_GROUP_MINUTES = 1;
  15. const MONITOR_MOUNTS = !!process.env.MONITOR_MOUNTS ? process.env.MONITOR_MOUNTS.split(':') : [];
  16. Logger.info('[INFO] Monitoring Drives:', MONITOR_MOUNTS);
  17. const CSV_COLS = {
  18. buffer: {
  19. time: 0,
  20. cpu: 1,
  21. ram: 2
  22. },
  23. reduced: {
  24. time: 0,
  25. cpu: {
  26. avg: 1,
  27. peak: 2
  28. },
  29. ram: {
  30. avg: 3,
  31. peak: 4,
  32. max: 5
  33. }
  34. }
  35. };
  36. export class Collector {
  37. private intervalHdl: NodeJS.Timer;
  38. constructor() {
  39. (async () => {
  40. try {
  41. if (!fs.existsSync(DATA_DIR)) {
  42. Logger.info('[INFO] DATA_DIR', DATA_DIR, 'does not exist - creating now ...');
  43. await fsp.mkdir(DATA_DIR);
  44. }
  45. this.startLoop();
  46. } catch (err) {
  47. Logger.error('[FATAL]', err);
  48. Logger.error('[EXITING]');
  49. process.exit(1);
  50. }
  51. })();
  52. }
  53. public startLoop() {
  54. this.intervalHdl = setInterval(this.loop.bind(this), 500);
  55. }
  56. private async loop() {
  57. try {
  58. const now = moment();
  59. const time = now.format(TIMESTAMP_FORMAT);
  60. const cpu = (await exec(`./cpu.sh`)).trim();
  61. const ram = (await exec(`./ram.sh`)).trim();
  62. const hdd: string[] = [];
  63. for (const mount of MONITOR_MOUNTS) {
  64. try {
  65. const stats = (await exec(`./hdd.sh "${mount}"`)).trim();
  66. if (stats?.length) hdd.push(`${mount} ${stats}`);
  67. } catch (err) {
  68. Logger.warn('[WARN] Error while getting space usage of mount', mount, ':', err);
  69. }
  70. }
  71. const data = `${time};${cpu};${ram}${hdd.length ? `;${hdd.join(';')}` : ''}\n`;
  72. // Time to reduce buffer?
  73. const firstBufferTime = await this.getFirstBufferTime();
  74. if (moment.duration(now.diff(firstBufferTime)).abs().asMinutes() >= REDUCE_INTERVAL_MINUTES) {
  75. try {
  76. const tmpFile = await this.createTmpFile();
  77. process.nextTick(() => this.reduceData(tmpFile));
  78. } catch (err) {
  79. Logger.error('[ERROR] Creating Temp File for Reducing Data failed:', err);
  80. }
  81. }
  82. await fsp.appendFile(DATA_BUFFER_FILE, data);
  83. } catch (err) {
  84. Logger.error(err);
  85. }
  86. }
  87. private async getFirstBufferTime() {
  88. let dataFile = DATA_BUFFER_FILE;
  89. if (fs.existsSync(DATA_BUFFER_REMAINS)) dataFile = DATA_BUFFER_REMAINS;
  90. if (!fs.existsSync(dataFile)) return moment();
  91. const firstLine = await this.readFirstBufferLine(dataFile);
  92. const timestamp = firstLine.split(';')[CSV_COLS.buffer.time];
  93. return moment(timestamp, TIMESTAMP_FORMAT);
  94. }
  95. private readFirstBufferLine = (dataFile: PathLike) =>
  96. new Promise<string>((resolve, reject) => {
  97. const stream = fs.createReadStream(dataFile, { encoding: 'utf-8' });
  98. const chunks: string[] = [];
  99. stream
  100. .on('data', buf => {
  101. let chunk: string;
  102. if (buf instanceof Buffer) chunk = buf.toString('utf-8');
  103. else chunk = buf;
  104. const lfIdx = chunk.indexOf('\n');
  105. if (lfIdx >= 0) {
  106. chunks.push(chunk.substring(0, lfIdx));
  107. stream.close();
  108. } else {
  109. chunks.push(chunk);
  110. }
  111. })
  112. .on('close', () => resolve(chunks.join('')))
  113. .on('error', reject);
  114. });
  115. private async readDataFileCSV(dataFile: PathLike): Promise<string[][]> {
  116. return (await fsp.readFile(dataFile, { encoding: 'utf-8' }))
  117. .split(/\r?\n/g)
  118. .filter(l => !!l)
  119. .map(line => line.split(';'));
  120. }
  121. private async createTmpFile() {
  122. const tmpFilename = `buffer.tmp_${moment().format('YYYYMMDDHHmmssSSS')}.csv`;
  123. await fsp.rename(DATA_BUFFER_FILE, path.resolve(DATA_DIR, tmpFilename));
  124. return tmpFilename;
  125. }
  126. private async reduceData(tmpFilename: string) {
  127. const tmpFilepath = path.resolve(DATA_DIR, tmpFilename);
  128. Logger.info('[INFO] Reducing data in', tmpFilepath);
  129. try {
  130. const lines: string[][] = [];
  131. if (fs.existsSync(DATA_BUFFER_REMAINS)) {
  132. lines.push(...(await this.readDataFileCSV(DATA_BUFFER_REMAINS)));
  133. }
  134. lines.push(...(await this.readDataFileCSV(tmpFilepath)));
  135. const reduced: Array<ReducedData> = [];
  136. let valueBuffer: Array<BufferedData> = [];
  137. do {
  138. const line = lines.shift();
  139. const data = this.parseBufferedData(line);
  140. Logger.debug('[DEBUG] BufferedData:', JSON.stringify(data));
  141. valueBuffer.push(data);
  142. if (valueBuffer.length <= 1) {
  143. // Need at least 2 datasets to check for time diff and eventually reduce to avg/max.
  144. // skip to next data line
  145. continue;
  146. }
  147. const firstTime = moment(valueBuffer[0].time);
  148. const currentTime = moment(data.time);
  149. if (moment.duration(currentTime.diff(firstTime)).abs().asMinutes() >= REDUCE_GROUP_MINUTES) {
  150. type IntermediateValues = { sum: number; peak: number; max: number };
  151. type IntermediateDriveData = { [mount: string]: IntermediateValues };
  152. type IntermediateSums = { ram: IntermediateValues; cpu: IntermediateValues; hdd?: IntermediateDriveData; count: number };
  153. const { cpu, ram, count, hdd } = valueBuffer.reduce(
  154. (res, cur) => {
  155. res.count++;
  156. res.cpu.sum += cur.cpu;
  157. res.cpu.peak = Math.max(res.cpu.peak, cur.cpu);
  158. res.ram.sum += cur.ram.used;
  159. res.ram.peak = Math.max(res.ram.peak, cur.ram.used);
  160. res.ram.max = cur.ram.max;
  161. if (cur.hdd && Object.keys(cur.hdd).length) {
  162. const hdd_sums = res.hdd ?? {};
  163. res.hdd = Object.keys(cur.hdd).reduce((res_hdd, mount) => {
  164. if (!res_hdd[mount]) {
  165. res_hdd[mount] = { sum: 0, peak: 0, max: 0 };
  166. }
  167. res_hdd[mount].sum += cur.hdd[mount].used;
  168. res_hdd[mount].peak = Math.max(res_hdd[mount].peak, cur.hdd[mount].used);
  169. res_hdd[mount].max = cur.hdd[mount].max;
  170. return res_hdd;
  171. }, hdd_sums);
  172. }
  173. return res;
  174. },
  175. { ram: { sum: 0, peak: 0, max: 0 }, cpu: { sum: 0, peak: 0 }, count: 0 } as IntermediateSums
  176. );
  177. reduced.push({
  178. time: data.time,
  179. cpu: {
  180. avg: cpu.sum / count,
  181. peak: cpu.peak
  182. },
  183. ram: {
  184. avg: ram.sum / count,
  185. peak: ram.peak,
  186. max: ram.max
  187. },
  188. hdd: hdd
  189. ? Object.keys(hdd).reduce((res, mount) => {
  190. res[mount] = {
  191. avg: hdd[mount].sum / count,
  192. peak: hdd[mount].peak,
  193. max: hdd[mount].max
  194. };
  195. return res;
  196. }, {} as ReducedDriveData)
  197. : undefined
  198. });
  199. Logger.debug('[DEBUG] ReducedData:', JSON.stringify(reduced[reduced.length - 1]));
  200. valueBuffer = [];
  201. }
  202. } while (lines.length > 0);
  203. if (valueBuffer.length > 0) {
  204. // overwrite remains.csv with valueBuffer
  205. await fsp.writeFile(DATA_BUFFER_REMAINS, valueBuffer.map(this.serializeBufferedDataCSV.bind(this)).join('\n') + '\n', { encoding: 'utf-8' });
  206. } else {
  207. // delete remains.csv if exists
  208. if (fs.existsSync(DATA_BUFFER_REMAINS)) await fsp.unlink(DATA_BUFFER_REMAINS);
  209. }
  210. if (reduced.length > 0) {
  211. // append reduced data to reduced.csv
  212. await fsp.appendFile(DATA_REDUCED_FILE, reduced.map(this.serializeReducedDataCSV.bind(this)).join('\n') + '\n', { encoding: 'utf-8' });
  213. }
  214. // Delete tmpFile
  215. await fsp.unlink(tmpFilepath);
  216. } catch (err) {
  217. Logger.error(`[ERROR] Reducing Data of tmpFile ${tmpFilepath} failed:`, err);
  218. }
  219. }
  220. private parseBufferedData(line: string[]): BufferedData {
  221. // TIMESTAMP
  222. const time = moment(line[CSV_COLS.buffer.time], TIMESTAMP_FORMAT).toDate();
  223. // CPU
  224. const cpu = Number(line[CSV_COLS.buffer.cpu]);
  225. // RAM
  226. let [stats, unit] = line[CSV_COLS.buffer.ram].split(' ');
  227. const [used, max] = stats.split('/');
  228. const factor = this.parseByteUnit(unit);
  229. const lastCol = CSV_COLS.buffer.ram;
  230. // HDD (?)
  231. let hdd: BufferedDriveData;
  232. if (MONITOR_MOUNTS.length && line.length > lastCol + 1) {
  233. for (let i = 1; i <= MONITOR_MOUNTS.length; i++) {
  234. if (lastCol + i > line.length - 1) break;
  235. const data = line[lastCol + i];
  236. const [mount, stats] = data.split(' ');
  237. const [used, max] = stats.split('/');
  238. if (!hdd) hdd = {};
  239. hdd[mount] = {
  240. used: Number(used),
  241. max: Number(max)
  242. };
  243. }
  244. }
  245. return {
  246. time,
  247. cpu,
  248. ram: {
  249. used: Number(used) * factor,
  250. max: Number(max) * factor
  251. },
  252. hdd
  253. };
  254. }
  255. private parseReducedData(line: string[]): ReducedData {
  256. const lastCol = CSV_COLS.reduced.ram.max;
  257. // HDD (?)
  258. let hdd: ReducedDriveData;
  259. if (MONITOR_MOUNTS.length && line.length > lastCol + 1) {
  260. hdd = {};
  261. for (let i = 1; lastCol + i + 3 < line.length; i += 4) {
  262. hdd[line[lastCol + i]] = {
  263. avg: Number(line[lastCol + i + 1]),
  264. peak: Number(line[lastCol + i + 2]),
  265. max: Number(line[lastCol + i + 3])
  266. };
  267. }
  268. }
  269. return {
  270. time: moment(line[CSV_COLS.reduced.time], TIMESTAMP_FORMAT).toDate(),
  271. cpu: {
  272. avg: Number(line[CSV_COLS.reduced.cpu.avg]),
  273. peak: Number(line[CSV_COLS.reduced.cpu.peak])
  274. },
  275. ram: {
  276. avg: Number(line[CSV_COLS.reduced.ram.avg]),
  277. peak: Number(line[CSV_COLS.reduced.ram.peak]),
  278. max: Number(line[CSV_COLS.reduced.ram.max])
  279. },
  280. hdd
  281. };
  282. }
  283. private byteFactors: { [unit: string]: number } = {
  284. '': 1,
  285. K: 1024,
  286. M: 1024 * 1024,
  287. G: 1024 * 1024 * 1024,
  288. T: 1024 * 1024 * 1024 * 1024,
  289. P: 1024 * 1024 * 1024 * 1024 * 1024
  290. };
  291. private parseByteUnit(unit: string): number {
  292. const m = /^([KMGTP])?i?B$/.exec(unit);
  293. if (!m) throw new Error(`Failed to parse byte size unit '${unit}'`);
  294. return this.byteFactors[m[1]];
  295. }
  296. private serializeBufferedDataCSV(data: BufferedData) {
  297. return [
  298. moment(data.time).format(TIMESTAMP_FORMAT),
  299. data.cpu,
  300. `${(data.ram.used / this.byteFactors['M']).toFixed(2)}/${(data.ram.max / this.byteFactors['M']).toFixed(2)} MiB`,
  301. ...(data.hdd ? Object.keys(data.hdd).map(mount => `${mount} ${data.hdd[mount].used}/${data.hdd[mount].max}`) : [])
  302. ].join(';');
  303. }
  304. private serializeReducedDataCSV(data: ReducedData) {
  305. return [
  306. moment(data.time).format(TIMESTAMP_FORMAT),
  307. data.cpu.avg.toFixed(2),
  308. data.cpu.peak.toFixed(2),
  309. data.ram.avg.toFixed(2),
  310. data.ram.peak.toFixed(2),
  311. data.ram.max.toFixed(2),
  312. ...(data.hdd
  313. ? Object.keys(data.hdd).reduce((res, mount) => {
  314. res.push(mount, data.hdd[mount].avg.toFixed(2), data.hdd[mount].peak.toFixed(2), data.hdd[mount].max.toFixed(2));
  315. return res;
  316. }, [])
  317. : [])
  318. ].join(';');
  319. }
  320. public get trx() {
  321. return this._trx;
  322. }
  323. private _trx: {
  324. file?: PathLike;
  325. start: () => Promise<number>;
  326. read: () => Promise<Array<ReducedData>>;
  327. rollback: (hdl: number) => Promise<void>;
  328. commit: (hdl: number) => Promise<void>;
  329. } = {
  330. start: async () => {
  331. if (this.trx.file) {
  332. Logger.warn(`[WARN] Old transaction file found - rolling back now before starting new transaction ...`);
  333. const hdl = Number(/trx_(\d+)\.csv/.exec(this.trx.file as string)[1]);
  334. await this.trx.rollback(hdl);
  335. Logger.warn(`[WARN] Transaction rollback succeeded.`);
  336. }
  337. if (!fs.existsSync(DATA_REDUCED_FILE)) {
  338. // NO DATA
  339. return null;
  340. }
  341. const hdl = moment().unix();
  342. this.trx.file = path.resolve(DATA_DIR, `reduced.trx_${hdl.toFixed(0)}.csv`);
  343. await fsp.rename(DATA_REDUCED_FILE, this.trx.file);
  344. return hdl;
  345. },
  346. read: async () => {
  347. if (!this.trx.file) throw new Error('No transaction opened');
  348. const data = await this.readDataFileCSV(this.trx.file);
  349. return data.map(this.parseReducedData.bind(this));
  350. },
  351. rollback: async (hdl: number) => {
  352. if (this.trx.file) {
  353. const filename = path.resolve(DATA_DIR, `reduced.trx_${hdl.toFixed(0)}.csv`);
  354. if (filename !== this.trx.file) throw new HttpStatusException(`Transaction #${hdl} not found`, 404);
  355. if (fs.existsSync(this.trx.file)) {
  356. let tmpFile: string;
  357. if (fs.existsSync(DATA_REDUCED_FILE)) {
  358. tmpFile = `reduced.tmp_${moment().unix().toFixed(0)}.csv`;
  359. await fsp.rename(DATA_REDUCED_FILE, tmpFile);
  360. }
  361. await fsp.rename(this.trx.file, DATA_REDUCED_FILE);
  362. if (tmpFile) {
  363. await exec(`cat "${tmpFile}" >> "${DATA_REDUCED_FILE}"`);
  364. await fsp.unlink(tmpFile);
  365. }
  366. }
  367. this.trx.file = undefined;
  368. }
  369. },
  370. commit: async (hdl: number) => {
  371. if (this.trx.file) {
  372. const filename = path.resolve(DATA_DIR, `reduced.trx_${hdl.toFixed(0)}.csv`);
  373. if (filename !== this.trx.file) throw new HttpStatusException(`Transaction #${hdl} not found`, 404);
  374. if (fs.existsSync(this.trx.file)) {
  375. await fsp.unlink(this.trx.file);
  376. }
  377. this.trx.file = undefined;
  378. }
  379. }
  380. };
  381. public stopLoop() {
  382. clearInterval(this.intervalHdl);
  383. }
  384. }