collector.class.ts 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. import fs, { PathLike } from 'fs';
  2. import fsp from 'fs/promises';
  3. import moment from 'moment';
  4. import { exec } from 'node-utils/shell';
  5. import path from 'path';
  6. import { Logger } from '../../common/util/logger.class';
  7. import { HttpStatusException } from './lib/http-status.exception';
  8. const DATA_DIR = process.env.DATA_DIR;
  9. const DATA_BUFFER_FILE = path.resolve(DATA_DIR, 'buffer.csv');
  10. const DATA_BUFFER_REMAINS = path.resolve(DATA_DIR, 'remains.csv');
  11. const DATA_REDUCED_FILE = path.resolve(DATA_DIR, 'reduced.csv');
  12. const TIMESTAMP_FORMAT = `YYYY-MM-DD[T]HH:mm:ss.SSSZZ`;
  13. const REDUCE_INTERVAL_MINUTES = 5;
  14. const REDUCE_GROUP_MINUTES = 1;
  15. const CSV_COLS = {
  16. buffer: {
  17. time: 0,
  18. cpu: 1,
  19. ram: 2
  20. },
  21. reduced: {
  22. time: 0,
  23. cpu: {
  24. avg: 1,
  25. peak: 2
  26. },
  27. ram: {
  28. avg: 3,
  29. peak: 4,
  30. max: 5
  31. }
  32. }
  33. };
  34. export class Collector {
  35. private intervalHdl: NodeJS.Timer;
  36. constructor() {
  37. (async () => {
  38. try {
  39. if (!fs.existsSync(DATA_DIR)) {
  40. Logger.info('[INFO] DATA_DIR', DATA_DIR, 'does not exist - creating now ...');
  41. await fsp.mkdir(DATA_DIR);
  42. }
  43. this.startLoop();
  44. } catch (err) {
  45. Logger.error('[FATAL]', err);
  46. Logger.error('[EXITING]');
  47. process.exit(1);
  48. }
  49. })();
  50. }
  51. public startLoop() {
  52. this.intervalHdl = setInterval(this.loop.bind(this), 500);
  53. }
  54. private async loop() {
  55. try {
  56. const now = moment();
  57. const time = now.format(TIMESTAMP_FORMAT);
  58. const cpu = (await exec(`./cpu.sh`)).trim();
  59. const ram = (await exec(`./ram.sh`)).trim();
  60. const data = `${time};${cpu};${ram}\n`;
  61. // Time to reduce buffer?
  62. const firstBufferTime = await this.getFirstBufferTime();
  63. if (moment.duration(now.diff(firstBufferTime)).abs().asMinutes() >= REDUCE_INTERVAL_MINUTES) {
  64. try {
  65. const tmpFile = await this.createTmpFile();
  66. process.nextTick(() => this.reduceData(tmpFile));
  67. } catch (err) {
  68. Logger.error('[ERROR] Creating Temp File for Reducing Data failed:', err);
  69. }
  70. }
  71. await fsp.appendFile(DATA_BUFFER_FILE, data);
  72. } catch (err) {
  73. Logger.error(err);
  74. }
  75. }
  76. private async getFirstBufferTime() {
  77. let dataFile = DATA_BUFFER_FILE;
  78. if (fs.existsSync(DATA_BUFFER_REMAINS)) dataFile = DATA_BUFFER_REMAINS;
  79. if (!fs.existsSync(dataFile)) return moment();
  80. const firstLine = await this.readFirstBufferLine(dataFile);
  81. const timestamp = firstLine.split(';')[CSV_COLS.buffer.time];
  82. return moment(timestamp, TIMESTAMP_FORMAT);
  83. }
  84. private readFirstBufferLine = (dataFile: PathLike) =>
  85. new Promise<string>((resolve, reject) => {
  86. const stream = fs.createReadStream(dataFile, { encoding: 'utf-8' });
  87. const chunks: string[] = [];
  88. stream
  89. .on('data', buf => {
  90. let chunk: string;
  91. if (buf instanceof Buffer) chunk = buf.toString('utf-8');
  92. else chunk = buf;
  93. const lfIdx = chunk.indexOf('\n');
  94. if (lfIdx >= 0) {
  95. chunks.push(chunk.substring(0, lfIdx));
  96. stream.close();
  97. } else {
  98. chunks.push(chunk);
  99. }
  100. })
  101. .on('close', () => resolve(chunks.join('')))
  102. .on('error', reject);
  103. });
  104. private async readDataFileCSV(dataFile: PathLike): Promise<string[][]> {
  105. return (await fsp.readFile(dataFile, { encoding: 'utf-8' }))
  106. .split(/\r?\n/g)
  107. .filter(l => !!l)
  108. .map(line => line.split(';'));
  109. }
  110. private async createTmpFile() {
  111. const tmpFilename = `buffer.tmp_${moment().format('YYYYMMDDHHmmssSSS')}.csv`;
  112. await fsp.rename(DATA_BUFFER_FILE, path.resolve(DATA_DIR, tmpFilename));
  113. return tmpFilename;
  114. }
  115. private async reduceData(tmpFilename: string) {
  116. const tmpFilepath = path.resolve(DATA_DIR, tmpFilename);
  117. Logger.info('[INFO] Reducing data in', tmpFilepath);
  118. try {
  119. const lines: string[][] = [];
  120. if (fs.existsSync(DATA_BUFFER_REMAINS)) {
  121. lines.push(...(await this.readDataFileCSV(DATA_BUFFER_REMAINS)));
  122. }
  123. lines.push(...(await this.readDataFileCSV(tmpFilepath)));
  124. const reduced: Array<ReducedData> = [];
  125. let valueBuffer: Array<BufferedData> = [];
  126. do {
  127. const line = lines.shift();
  128. const data = this.parseBufferedData(line);
  129. Logger.debug('[DEBUG] BufferedData:', JSON.stringify(data));
  130. valueBuffer.push(data);
  131. if (valueBuffer.length <= 1) {
  132. // Need at least 2 datasets to check for time diff and eventually reduce to avg/max.
  133. // skip to next data line
  134. continue;
  135. }
  136. const firstTime = moment(valueBuffer[0].time);
  137. const currentTime = moment(data.time);
  138. if (moment.duration(currentTime.diff(firstTime)).abs().asMinutes() >= REDUCE_GROUP_MINUTES) {
  139. const { cpu, ram, count } = valueBuffer.reduce(
  140. (res, cur) => {
  141. res.count++;
  142. res.cpu.sum += cur.cpu;
  143. res.cpu.peak = Math.max(res.cpu.peak, cur.cpu);
  144. res.ram.sum += cur.ram.used;
  145. res.ram.peak = Math.max(res.ram.peak, cur.ram.used);
  146. res.ram.max = cur.ram.max;
  147. return res;
  148. },
  149. { ram: { sum: 0, peak: 0, max: 0 }, cpu: { sum: 0, peak: 0 }, count: 0 }
  150. );
  151. reduced.push({
  152. time: data.time,
  153. cpu: {
  154. avg: cpu.sum / count,
  155. peak: cpu.peak
  156. },
  157. ram: {
  158. avg: ram.sum / count,
  159. peak: ram.peak,
  160. max: ram.max
  161. }
  162. });
  163. Logger.debug('[DEBUG] ReducedData:', JSON.stringify(reduced[reduced.length - 1]));
  164. valueBuffer = [];
  165. }
  166. } while (lines.length > 0);
  167. if (valueBuffer.length > 0) {
  168. // overwrite remains.csv with valueBuffer
  169. await fsp.writeFile(DATA_BUFFER_REMAINS, valueBuffer.map(this.serializeBufferedDataCSV.bind(this)).join('\n') + '\n', { encoding: 'utf-8' });
  170. } else {
  171. // delete remains.csv if exists
  172. if (fs.existsSync(DATA_BUFFER_REMAINS)) await fsp.unlink(DATA_BUFFER_REMAINS);
  173. }
  174. if (reduced.length > 0) {
  175. // append reduced data to reduced.csv
  176. await fsp.appendFile(DATA_REDUCED_FILE, reduced.map(this.serializeReducedDataCSV.bind(this)).join('\n') + '\n', { encoding: 'utf-8' });
  177. }
  178. // Delete tmpFile
  179. await fsp.unlink(tmpFilepath);
  180. } catch (err) {
  181. Logger.error(`[ERROR] Reducing Data of tmpFile ${tmpFilepath} failed:`, err);
  182. }
  183. }
  184. private parseBufferedData(line: string[]): BufferedData {
  185. const cpu = Number(line[CSV_COLS.buffer.cpu]);
  186. const time = moment(line[CSV_COLS.buffer.time], TIMESTAMP_FORMAT).toDate();
  187. let ramSplit = line[CSV_COLS.buffer.ram].split(' ');
  188. const unit = ramSplit[1];
  189. ramSplit = ramSplit[0].split('/');
  190. const [used, max] = ramSplit;
  191. const factor = this.parseByteUnit(unit);
  192. return {
  193. time,
  194. cpu,
  195. ram: {
  196. used: Number(used) * factor,
  197. max: Number(max) * factor
  198. }
  199. };
  200. }
  201. private parseReducedData(line: string[]): ReducedData {
  202. return {
  203. time: moment(line[CSV_COLS.reduced.time], TIMESTAMP_FORMAT).toDate(),
  204. cpu: {
  205. avg: Number(line[CSV_COLS.reduced.cpu.avg]),
  206. peak: Number(line[CSV_COLS.reduced.cpu.peak])
  207. },
  208. ram: {
  209. avg: Number(line[CSV_COLS.reduced.ram.avg]),
  210. peak: Number(line[CSV_COLS.reduced.ram.peak]),
  211. max: Number(line[CSV_COLS.reduced.ram.max])
  212. }
  213. };
  214. }
  215. private byteFactors: { [unit: string]: number } = {
  216. '': 1,
  217. K: 1024,
  218. M: 1024 * 1024,
  219. G: 1024 * 1024 * 1024,
  220. T: 1024 * 1024 * 1024 * 1024,
  221. P: 1024 * 1024 * 1024 * 1024 * 1024
  222. };
  223. private parseByteUnit(unit: string): number {
  224. const m = /^([KMGTP])?i?B$/.exec(unit);
  225. if (!m) throw new Error(`Failed to parse byte size unit '${unit}'`);
  226. return this.byteFactors[m[1]];
  227. }
  228. private serializeBufferedDataCSV(data: BufferedData) {
  229. return [
  230. moment(data.time).format(TIMESTAMP_FORMAT),
  231. data.cpu,
  232. `${(data.ram.used / this.byteFactors['M']).toFixed(2)}/${(data.ram.max / this.byteFactors['M']).toFixed(2)} MiB`
  233. ].join(';');
  234. }
  235. private serializeReducedDataCSV(data: ReducedData) {
  236. return [
  237. moment(data.time).format(TIMESTAMP_FORMAT),
  238. data.cpu.avg.toFixed(2),
  239. data.cpu.peak.toFixed(2),
  240. data.ram.avg.toFixed(2),
  241. data.ram.peak.toFixed(2),
  242. data.ram.max.toFixed(2)
  243. ].join(';');
  244. }
  245. public get trx() {
  246. return this._trx;
  247. }
  248. private _trx: {
  249. file?: PathLike;
  250. start: () => Promise<number>;
  251. read: () => Promise<Array<ReducedData>>;
  252. rollback: (hdl: number) => Promise<void>;
  253. commit: (hdl: number) => Promise<void>;
  254. } = {
  255. start: async () => {
  256. if (this.trx.file) {
  257. Logger.warn(`[WARN] Old transaction file found - rolling back now before starting new transaction ...`);
  258. const hdl = Number(/trx_(\d+)\.csv/.exec(this.trx.file as string)[1]);
  259. await this.trx.rollback(hdl);
  260. Logger.warn(`[WARN] Transaction rollback succeeded.`);
  261. }
  262. const hdl = moment().unix();
  263. this.trx.file = path.resolve(DATA_DIR, `reduced.trx_${hdl.toFixed(0)}.csv`);
  264. await fsp.rename(DATA_REDUCED_FILE, this.trx.file);
  265. return hdl;
  266. },
  267. read: async () => {
  268. if (!this.trx.file) throw new Error('No transaction opened');
  269. const data = await this.readDataFileCSV(this.trx.file);
  270. return data.map(this.parseReducedData.bind(this));
  271. },
  272. rollback: async (hdl: number) => {
  273. if (this.trx.file) {
  274. const filename = path.resolve(DATA_DIR, `reduced.trx_${hdl.toFixed(0)}.csv`);
  275. if (filename !== this.trx.file) throw new HttpStatusException(`Transaction #${hdl} not found`, 404);
  276. if (fs.existsSync(this.trx.file)) {
  277. let tmpFile: string;
  278. if (fs.existsSync(DATA_REDUCED_FILE)) {
  279. tmpFile = `reduced.tmp_${moment().unix().toFixed(0)}.csv`;
  280. await fsp.rename(DATA_REDUCED_FILE, tmpFile);
  281. }
  282. await fsp.rename(this.trx.file, DATA_REDUCED_FILE);
  283. if (tmpFile) {
  284. await exec(`cat "${tmpFile}" >> "${DATA_REDUCED_FILE}"`);
  285. await fsp.unlink(tmpFile);
  286. }
  287. }
  288. this.trx.file = undefined;
  289. }
  290. },
  291. commit: async (hdl: number) => {
  292. if (this.trx.file) {
  293. const filename = path.resolve(DATA_DIR, `reduced.trx_${hdl.toFixed(0)}.csv`);
  294. if (filename !== this.trx.file) throw new HttpStatusException(`Transaction #${hdl} not found`, 404);
  295. if (fs.existsSync(this.trx.file)) {
  296. await fsp.unlink(this.trx.file);
  297. }
  298. this.trx.file = undefined;
  299. }
  300. }
  301. };
  302. public stopLoop() {
  303. clearInterval(this.intervalHdl);
  304. }
  305. }