Explorar o código

Aufbau Daemon Webservice; Refactoring webserver.class.ts

Christian Kahlau %!s(int64=3) %!d(string=hai) anos
pai
achega
2f283ef575

+ 2 - 1
daemon/.env

@@ -1,2 +1,3 @@
 DATA_DIR=data
-LOG_LEVEL=INFO
+LOG_LEVEL=INFO
+WEB_PORT=8890

+ 2 - 0
daemon/package.json

@@ -10,11 +10,13 @@
   "author": "Christian Kahlau, HostBBQ ©2022",
   "license": "ISC",
   "devDependencies": {
+    "@types/express": "^4.17.14",
     "@types/node": "^18.7.18",
     "typescript": "^4.8.3"
   },
   "dependencies": {
     "dotenv": "^16.0.2",
+    "express": "^4.18.1",
     "moment": "^2.29.4",
     "node-utils": "git+ssh://git@gogs.hostbbq.com:8301/hostbbq/node-utils.git#1.0.5"
   }

+ 261 - 0
daemon/src/collector.class.ts

@@ -0,0 +1,261 @@
+import fs, { PathLike } from 'fs';
+import fsp from 'fs/promises';
+import moment from 'moment';
+import { exec } from 'node-utils/shell';
+import path from 'path';
+
+import { Logger } from '../../common/util/logger.class';
+
+const DATA_DIR = process.env.DATA_DIR;
+const DATA_BUFFER_FILE = path.resolve(DATA_DIR, 'buffer.csv');
+const DATA_BUFFER_REMAINS = path.resolve(DATA_DIR, 'remains.csv');
+const DATA_REDUCED_FILE = path.resolve(DATA_DIR, 'reduced.csv');
+const TIMESTAMP_FORMAT = `YYYY-MM-DD[T]HH:mm:ss.SSSZZ`;
+const REDUCE_INTERVAL_MINUTES = 5;
+const REDUCE_GROUP_MINUTES = 1;
+
+const CSV_COLS = {
+  buffer: {
+    time: 0,
+    cpu: 1,
+    ram: 2
+  },
+  reduced: {
+    time: 0,
+    cpu: {
+      avg: 1,
+      peak: 2
+    },
+    ram: {
+      avg: 3,
+      peak: 4,
+      max: 5
+    }
+  }
+};
+
+export class Collector {
+  private intervalHdl: NodeJS.Timer;
+
+  constructor() {
+    (async () => {
+      try {
+        if (!fs.existsSync(DATA_DIR)) {
+          Logger.info('[INFO] DATA_DIR', DATA_DIR, 'does not exist - creating now ...');
+          await fsp.mkdir(DATA_DIR);
+        }
+
+        this.intervalHdl = setInterval(async () => {
+          try {
+            const now = moment();
+            const time = now.format(TIMESTAMP_FORMAT);
+            const cpu = (await exec(`./cpu.sh`)).trim();
+            const ram = (await exec(`./ram.sh`)).trim();
+            const data = `${time};${cpu};${ram}\n`;
+
+            // Time to reduce buffer?
+            const firstBufferTime = await this.getFirstBufferTime();
+            if (moment.duration(now.diff(firstBufferTime)).abs().asMinutes() >= REDUCE_INTERVAL_MINUTES) {
+              try {
+                const tmpFile = await this.createTmpFile();
+                process.nextTick(() => this.reduceData(tmpFile));
+              } catch (err) {
+                Logger.error('[ERROR] Creating Temp File for Reducing Data failed:', err);
+              }
+            }
+
+            await fsp.appendFile(DATA_BUFFER_FILE, data);
+          } catch (err) {
+            Logger.error(err);
+          }
+        }, 500);
+      } catch (err) {
+        Logger.error('[FATAL]', err);
+        Logger.error('[EXITING]');
+        process.exit(1);
+      }
+    })();
+  }
+
+  private async getFirstBufferTime() {
+    let dataFile = DATA_BUFFER_FILE;
+    if (fs.existsSync(DATA_BUFFER_REMAINS)) dataFile = DATA_BUFFER_REMAINS;
+    if (!fs.existsSync(dataFile)) return moment();
+
+    const firstLine = await this.readFirstBufferLine(dataFile);
+    const timestamp = firstLine.split(';')[CSV_COLS.buffer.time];
+    return moment(timestamp, TIMESTAMP_FORMAT);
+  }
+
+  private readFirstBufferLine = (dataFile: PathLike) =>
+    new Promise<string>((resolve, reject) => {
+      const stream = fs.createReadStream(dataFile, { encoding: 'utf-8' });
+      const chunks: string[] = [];
+      stream
+        .on('data', buf => {
+          let chunk: string;
+          if (buf instanceof Buffer) chunk = buf.toString('utf-8');
+          else chunk = buf;
+
+          const lfIdx = chunk.indexOf('\n');
+          if (lfIdx >= 0) {
+            chunks.push(chunk.substring(0, lfIdx));
+            stream.close();
+          } else {
+            chunks.push(chunk);
+          }
+        })
+        .on('close', () => resolve(chunks.join('')))
+        .on('error', reject);
+    });
+
+  private async readDataFileCSV(dataFile: PathLike): Promise<string[][]> {
+    return (await fsp.readFile(dataFile, { encoding: 'utf-8' }))
+      .split(/\r?\n/g)
+      .filter(l => !!l)
+      .map(line => line.split(';'));
+  }
+
+  private async createTmpFile() {
+    const tmpFilename = `buffer.tmp_${moment().format('YYYYMMDDHHmmssSSS')}.csv`;
+    await fsp.rename(DATA_BUFFER_FILE, path.resolve(DATA_DIR, tmpFilename));
+    return tmpFilename;
+  }
+
+  private async reduceData(tmpFilename: string) {
+    const tmpFilepath = path.resolve(DATA_DIR, tmpFilename);
+    Logger.info('[INFO] Reducing data in', tmpFilepath);
+    try {
+      const lines: string[][] = [];
+      if (fs.existsSync(DATA_BUFFER_REMAINS)) {
+        lines.push(...(await this.readDataFileCSV(DATA_BUFFER_REMAINS)));
+      }
+      lines.push(...(await this.readDataFileCSV(tmpFilepath)));
+
+      const reduced: Array<ReducedData> = [];
+      let valueBuffer: Array<BufferedData> = [];
+      do {
+        const line = lines.shift();
+        const data = this.parseData(line);
+        Logger.debug('[DEBUG] BufferedData:', JSON.stringify(data));
+        valueBuffer.push(data);
+
+        if (valueBuffer.length <= 1) {
+          // Need at least 2 datasets to check for time diff and eventually reduce to avg/max.
+          // skip to next data line
+          continue;
+        }
+
+        const firstTime = moment(valueBuffer[0].time);
+        const currentTime = moment(data.time);
+        if (moment.duration(currentTime.diff(firstTime)).abs().asMinutes() >= REDUCE_GROUP_MINUTES) {
+          const { cpu, ram, count } = valueBuffer.reduce(
+            (res, cur) => {
+              res.count++;
+              res.cpu.sum += cur.cpu;
+              res.cpu.peak = Math.max(res.cpu.peak, cur.cpu);
+              res.ram.sum += cur.ram.used;
+              res.ram.peak = Math.max(res.ram.peak, cur.ram.used);
+              res.ram.max = cur.ram.max;
+              return res;
+            },
+            { ram: { sum: 0, peak: 0, max: 0 }, cpu: { sum: 0, peak: 0 }, count: 0 }
+          );
+
+          reduced.push({
+            time: data.time,
+            cpu: {
+              avg: cpu.sum / count,
+              peak: cpu.peak
+            },
+            ram: {
+              avg: ram.sum / count,
+              peak: ram.peak,
+              max: ram.max
+            }
+          });
+
+          Logger.debug('[DEBUG] ReducedData:', JSON.stringify(reduced[reduced.length - 1]));
+
+          valueBuffer = [];
+        }
+      } while (lines.length > 0);
+
+      if (valueBuffer.length > 0) {
+        // overwrite remains.csv with valueBuffer
+        await fsp.writeFile(DATA_BUFFER_REMAINS, valueBuffer.map(this.serializeBufferedDataCSV.bind(this)).join('\n') + '\n', { encoding: 'utf-8' });
+      } else {
+        // delete remains.csv if exists
+        if (fs.existsSync(DATA_BUFFER_REMAINS)) await fsp.unlink(DATA_BUFFER_REMAINS);
+      }
+
+      if (reduced.length > 0) {
+        // append reduced data to reduced.csv
+        await fsp.appendFile(DATA_REDUCED_FILE, reduced.map(this.serializeReducedDataCSV.bind(this)).join('\n') + '\n', { encoding: 'utf-8' });
+      }
+
+      // Delete tmpFile
+      await fsp.unlink(tmpFilepath);
+    } catch (err) {
+      Logger.error(`[ERROR] Reducing Data of tmpFile ${tmpFilepath} failed:`, err);
+    }
+  }
+
+  private parseData(line: string[]): BufferedData {
+    const cpu = Number(line[CSV_COLS.buffer.cpu]);
+    const time = moment(line[CSV_COLS.buffer.time], TIMESTAMP_FORMAT).toDate();
+    let ramSplit = line[CSV_COLS.buffer.ram].split(' ');
+    const unit = ramSplit[1];
+    ramSplit = ramSplit[0].split('/');
+    const [used, max] = ramSplit;
+    const factor = this.parseByteUnit(unit);
+
+    return {
+      time,
+      cpu,
+      ram: {
+        used: Number(used) * factor,
+        max: Number(max) * factor
+      }
+    };
+  }
+
+  private byteFactors: { [unit: string]: number } = {
+    '': 1,
+    K: 1024,
+    M: 1024 * 1024,
+    G: 1024 * 1024 * 1024,
+    T: 1024 * 1024 * 1024 * 1024,
+    P: 1024 * 1024 * 1024 * 1024 * 1024
+  };
+
+  private parseByteUnit(unit: string): number {
+    const m = /^([KMGTP])?i?B$/.exec(unit);
+    if (!m) throw new Error(`Failed to parse byte size unit '${unit}'`);
+
+    return this.byteFactors[m[1]];
+  }
+
+  private serializeBufferedDataCSV(data: BufferedData) {
+    return [
+      moment(data.time).format(TIMESTAMP_FORMAT),
+      data.cpu,
+      `${(data.ram.used / this.byteFactors['M']).toFixed(2)}/${(data.ram.max / this.byteFactors['M']).toFixed(2)} MiB`
+    ].join(';');
+  }
+
+  private serializeReducedDataCSV(data: ReducedData) {
+    return [
+      moment(data.time).format(TIMESTAMP_FORMAT),
+      data.cpu.avg.toFixed(2),
+      data.cpu.peak.toFixed(2),
+      data.ram.avg.toFixed(2),
+      data.ram.peak.toFixed(2),
+      data.ram.max.toFixed(2)
+    ].join(';');
+  }
+
+  exit() {
+    clearInterval(this.intervalHdl);
+  }
+}

+ 8 - 247
daemon/src/index.ts

@@ -1,265 +1,26 @@
 import dotenv from 'dotenv';
-import fs, { PathLike } from 'fs';
-import fsp from 'fs/promises';
-import path from 'path';
-import moment from 'moment';
-import { exec } from 'node-utils/shell';
 
 import { Logger, LogLevel } from '../../common/util/logger.class';
 
 dotenv.config();
 
-const DATA_DIR = process.env.DATA_DIR;
-const DATA_BUFFER_FILE = path.resolve(DATA_DIR, 'buffer.csv');
-const DATA_BUFFER_REMAINS = path.resolve(DATA_DIR, 'remains.csv');
-const DATA_REDUCED_FILE = path.resolve(DATA_DIR, 'reduced.csv');
-const TIMESTAMP_FORMAT = `YYYY-MM-DD[T]HH:mm:ss.SSSZZ`;
-const REDUCE_INTERVAL_MINUTES = 5;
-const REDUCE_GROUP_MINUTES = 1;
-
-const CSV_COLS = {
-  buffer: {
-    time: 0,
-    cpu: 1,
-    ram: 2
-  },
-  reduced: {
-    time: 0,
-    cpu: {
-      avg: 1,
-      peak: 2
-    },
-    ram: {
-      avg: 3,
-      peak: 4,
-      max: 5
-    }
-  }
-};
+import { Collector } from './collector.class';
+import { Webserver } from './webserver.class';
 
 const LOG_LEVEL: LogLevel = (process.env.LOG_LEVEL as LogLevel) || 'INFO';
 Logger.logLevel = LOG_LEVEL;
 
-let intervalHdl: NodeJS.Timer;
+process.on('SIGABRT', exitGracefully);
+process.on('SIGQUIT', exitGracefully);
+process.on('SIGTERM', exitGracefully);
 
 Logger.info('[INFO] Starting Monitoring Deamon, pid:', process.pid);
-(async () => {
-  process.on('SIGABRT', exitGracefully);
-  process.on('SIGQUIT', exitGracefully);
-  process.on('SIGTERM', exitGracefully);
-
-  try {
-    if (!fs.existsSync(DATA_DIR)) {
-      console.info('[INFO] DATA_DIR', DATA_DIR, 'does not exist - creating now ...');
-      await fsp.mkdir(DATA_DIR);
-    }
-
-    intervalHdl = setInterval(async () => {
-      const now = moment();
-      const time = now.format(TIMESTAMP_FORMAT);
-      const cpu = (await exec(`./cpu.sh`)).trim();
-      const ram = (await exec(`./ram.sh`)).trim();
-      const data = `${time};${cpu};${ram}\n`;
-
-      // Time to reduce buffer?
-      const firstBufferTime = await getFirstBufferTime();
-      if (moment.duration(now.diff(firstBufferTime)).abs().asMinutes() >= REDUCE_INTERVAL_MINUTES) {
-        try {
-          const tmpFile = await createTmpFile();
-          process.nextTick(() => reduceData(tmpFile));
-        } catch (err) {
-          Logger.error('[ERROR] Creating Temp File for Reducing Data failed:', err);
-        }
-      }
-
-      await fsp.appendFile(DATA_BUFFER_FILE, data);
-    }, 500);
-  } catch (err) {
-    Logger.error('[FATAL]', err);
-    Logger.error('[EXITING]');
-    process.exit(1);
-  }
-})();
-
-async function getFirstBufferTime() {
-  let dataFile = DATA_BUFFER_FILE;
-  if (fs.existsSync(DATA_BUFFER_REMAINS)) dataFile = DATA_BUFFER_REMAINS;
-
-  const firstLine = await readFirstBufferLine(dataFile);
-  const timestamp = firstLine.split(';')[CSV_COLS.buffer.time];
-  return moment(timestamp, TIMESTAMP_FORMAT);
-}
-
-const readFirstBufferLine = (dataFile: PathLike) =>
-  new Promise<string>((resolve, reject) => {
-    const stream = fs.createReadStream(dataFile, { encoding: 'utf-8' });
-    const chunks: string[] = [];
-    stream
-      .on('data', buf => {
-        let chunk: string;
-        if (buf instanceof Buffer) chunk = buf.toString('utf-8');
-        else chunk = buf;
-
-        const lfIdx = chunk.indexOf('\n');
-        if (lfIdx >= 0) {
-          chunks.push(chunk.substring(0, lfIdx));
-          stream.close();
-        } else {
-          chunks.push(chunk);
-        }
-      })
-      .on('close', () => resolve(chunks.join('')))
-      .on('error', reject);
-  });
-
-async function readDataFileCSV(dataFile: PathLike): Promise<string[][]> {
-  return (await fsp.readFile(dataFile, { encoding: 'utf-8' }))
-    .split(/\r?\n/g)
-    .filter(l => !!l)
-    .map(line => line.split(';'));
-}
-
-async function createTmpFile() {
-  const tmpFilename = `buffer.tmp_${moment().format('YYYYMMDDHHmmssSSS')}.csv`;
-  await fsp.rename(DATA_BUFFER_FILE, path.resolve(DATA_DIR, tmpFilename));
-  return tmpFilename;
-}
-
-async function reduceData(tmpFilename: string) {
-  const tmpFilepath = path.resolve(DATA_DIR, tmpFilename);
-  Logger.info('[INFO] Reducing data in', tmpFilepath);
-  try {
-    const lines: string[][] = [];
-    if (fs.existsSync(DATA_BUFFER_REMAINS)) {
-      lines.push(...(await readDataFileCSV(DATA_BUFFER_REMAINS)));
-    }
-    lines.push(...(await readDataFileCSV(tmpFilepath)));
-
-    const reduced: Array<ReducedData> = [];
-    let valueBuffer: Array<BufferedData> = [];
-    do {
-      const line = lines.shift();
-      const data = parseData(line);
-      Logger.debug('[DEBUG] BufferedData:', JSON.stringify(data));
-      valueBuffer.push(data);
-
-      if (valueBuffer.length <= 1) {
-        // Need at least 2 datasets to check for time diff and eventually reduce to avg/max.
-        // skip to next data line
-        continue;
-      }
-
-      const firstTime = moment(valueBuffer[0].time);
-      const currentTime = moment(data.time);
-      if (moment.duration(currentTime.diff(firstTime)).abs().asMinutes() >= REDUCE_GROUP_MINUTES) {
-        const { cpu, ram, count } = valueBuffer.reduce(
-          (res, cur) => {
-            res.count++;
-            res.cpu.sum += cur.cpu;
-            res.cpu.peak = Math.max(res.cpu.peak, cur.cpu);
-            res.ram.sum += cur.ram.used;
-            res.ram.peak = Math.max(res.ram.peak, cur.ram.used);
-            res.ram.max = cur.ram.max;
-            return res;
-          },
-          { ram: { sum: 0, peak: 0, max: 0 }, cpu: { sum: 0, peak: 0 }, count: 0 }
-        );
 
-        reduced.push({
-          time: data.time,
-          cpu: {
-            avg: cpu.sum / count,
-            peak: cpu.peak
-          },
-          ram: {
-            avg: ram.sum / count,
-            peak: ram.peak,
-            max: ram.max
-          }
-        });
-
-        Logger.debug('[DEBUG] ReducedData:', JSON.stringify(reduced[reduced.length - 1]));
-
-        valueBuffer = [];
-      }
-    } while (lines.length > 0);
-
-    if (valueBuffer.length > 0) {
-      // overwrite remains.csv with valueBuffer
-      await fsp.writeFile(DATA_BUFFER_REMAINS, valueBuffer.map(serializeBufferedDataCSV).join('\n') + '\n', { encoding: 'utf-8' });
-    } else {
-      // delete remains.csv if exists
-      if (fs.existsSync(DATA_BUFFER_REMAINS)) await fsp.unlink(DATA_BUFFER_REMAINS);
-    }
-
-    if (reduced.length > 0) {
-      // append reduced data to reduced.csv
-      await fsp.appendFile(DATA_REDUCED_FILE, reduced.map(serializeReducedDataCSV).join('\n') + '\n', { encoding: 'utf-8' });
-    }
-
-    // Delete tmpFile
-    await fsp.unlink(tmpFilepath);
-  } catch (err) {
-    Logger.error(`[ERROR] Reducing Data of tmpFile ${tmpFilepath} failed:`, err);
-  }
-}
-
-function parseData(line: string[]): BufferedData {
-  const cpu = Number(line[CSV_COLS.buffer.cpu]);
-  const time = moment(line[CSV_COLS.buffer.time], TIMESTAMP_FORMAT).toDate();
-  let ramSplit = line[CSV_COLS.buffer.ram].split(' ');
-  const unit = ramSplit[1];
-  ramSplit = ramSplit[0].split('/');
-  const [used, max] = ramSplit;
-  const factor = parseByteUnit(unit);
-
-  return {
-    time,
-    cpu,
-    ram: {
-      used: Number(used) * factor,
-      max: Number(max) * factor
-    }
-  };
-}
-
-const byteFactors: { [unit: string]: number } = {
-  '': 1,
-  K: 1024,
-  M: 1024 * 1024,
-  G: 1024 * 1024 * 1024,
-  T: 1024 * 1024 * 1024 * 1024,
-  P: 1024 * 1024 * 1024 * 1024 * 1024
-};
-
-function parseByteUnit(unit: string): number {
-  const m = /^([KMGTP])?i?B$/.exec(unit);
-  if (!m) throw new Error(`Failed to parse byte size unit '${unit}'`);
-
-  return byteFactors[m[1]];
-}
-
-function serializeBufferedDataCSV(data: BufferedData) {
-  return [
-    moment(data.time).format(TIMESTAMP_FORMAT),
-    data.cpu,
-    `${(data.ram.used / byteFactors['M']).toFixed(2)}/${(data.ram.max / byteFactors['M']).toFixed(2)} MiB`
-  ].join(';');
-}
-
-function serializeReducedDataCSV(data: ReducedData) {
-  return [
-    moment(data.time).format(TIMESTAMP_FORMAT),
-    data.cpu.avg.toFixed(2),
-    data.cpu.peak.toFixed(2),
-    data.ram.avg.toFixed(2),
-    data.ram.peak.toFixed(2),
-    data.ram.max.toFixed(2)
-  ].join(';');
-}
+const collector = new Collector();
+new Webserver(Number(process.env.WEB_PORT ?? '80'), collector);
 
 function exitGracefully(...args: any[]) {
   Logger.info(`[EXITING] Graceful exit, received ${JSON.stringify(args)}`);
-  clearInterval(intervalHdl);
+  collector.exit();
   process.exit(0);
 }

+ 15 - 0
daemon/src/webserver.class.ts

@@ -0,0 +1,15 @@
+import express, { Express } from 'express';
+import { Logger } from '../../common/util/logger.class';
+import { Collector } from './collector.class';
+
+export class Webserver {
+  private app: Express;
+
+  constructor(private port: number, private collector: Collector) {
+    this.app = express();
+
+    this.app.listen(this.port, () => {
+      Logger.info(`[INFO] Monitoring Daemon Webservice started at http://localhost:${this.port}`);
+    });
+  }
+}

+ 1 - 1
server/src/index.ts

@@ -1,11 +1,11 @@
 import dotenv from 'dotenv';
 
 import { Logger, LogLevel } from '../../common/util/logger.class';
+import { Webserver } from './webserver.class';
 
 dotenv.config();
 
 const LOG_LEVEL: LogLevel = (process.env.LOG_LEVEL as LogLevel) || 'INFO';
 Logger.logLevel = LOG_LEVEL;
 
-import { Webserver } from './webserver';
 new Webserver(Number(process.env.WEB_PORT ?? '80'));

+ 1 - 3
server/src/webserver.ts → server/src/webserver.class.ts

@@ -1,15 +1,13 @@
 import express, { Express } from 'express';
 import { Logger } from '../../common/util/logger.class';
 
-const STATIC_DIR = process.env.STATIC_DIR || 'public';
-
 export class Webserver {
   private app: Express;
 
   constructor(private port: number) {
     this.app = express();
 
-    this.app.use('/', express.static(STATIC_DIR));
+    this.app.use('/', express.static(process.env.STATIC_DIR || 'public'));
 
     this.app.listen(this.port, () => {
       Logger.info(`[INFO] Monitoring Webserver started at http://localhost:${this.port}`);