Przeglądaj źródła

Daemon: Implementierung Transaction Endpoints

Christian Kahlau 3 lat temu
rodzic
commit
eb3d8c1ec6

+ 112 - 27
daemon/src/collector.class.ts

@@ -5,6 +5,7 @@ import { exec } from 'node-utils/shell';
 import path from 'path';
 
 import { Logger } from '../../common/util/logger.class';
+import { HttpStatusException } from './lib/http-status.exception';
 
 const DATA_DIR = process.env.DATA_DIR;
 const DATA_BUFFER_FILE = path.resolve(DATA_DIR, 'buffer.csv');
@@ -45,30 +46,7 @@ export class Collector {
           await fsp.mkdir(DATA_DIR);
         }
 
-        this.intervalHdl = setInterval(async () => {
-          try {
-            const now = moment();
-            const time = now.format(TIMESTAMP_FORMAT);
-            const cpu = (await exec(`./cpu.sh`)).trim();
-            const ram = (await exec(`./ram.sh`)).trim();
-            const data = `${time};${cpu};${ram}\n`;
-
-            // Time to reduce buffer?
-            const firstBufferTime = await this.getFirstBufferTime();
-            if (moment.duration(now.diff(firstBufferTime)).abs().asMinutes() >= REDUCE_INTERVAL_MINUTES) {
-              try {
-                const tmpFile = await this.createTmpFile();
-                process.nextTick(() => this.reduceData(tmpFile));
-              } catch (err) {
-                Logger.error('[ERROR] Creating Temp File for Reducing Data failed:', err);
-              }
-            }
-
-            await fsp.appendFile(DATA_BUFFER_FILE, data);
-          } catch (err) {
-            Logger.error(err);
-          }
-        }, 500);
+        this.startLoop();
       } catch (err) {
         Logger.error('[FATAL]', err);
         Logger.error('[EXITING]');
@@ -77,6 +55,35 @@ export class Collector {
     })();
   }
 
+  public startLoop() {
+    this.intervalHdl = setInterval(this.loop.bind(this), 500);
+  }
+
+  private async loop() {
+    try {
+      const now = moment();
+      const time = now.format(TIMESTAMP_FORMAT);
+      const cpu = (await exec(`./cpu.sh`)).trim();
+      const ram = (await exec(`./ram.sh`)).trim();
+      const data = `${time};${cpu};${ram}\n`;
+
+      // Time to reduce buffer?
+      const firstBufferTime = await this.getFirstBufferTime();
+      if (moment.duration(now.diff(firstBufferTime)).abs().asMinutes() >= REDUCE_INTERVAL_MINUTES) {
+        try {
+          const tmpFile = await this.createTmpFile();
+          process.nextTick(() => this.reduceData(tmpFile));
+        } catch (err) {
+          Logger.error('[ERROR] Creating Temp File for Reducing Data failed:', err);
+        }
+      }
+
+      await fsp.appendFile(DATA_BUFFER_FILE, data);
+    } catch (err) {
+      Logger.error(err);
+    }
+  }
+
   private async getFirstBufferTime() {
     let dataFile = DATA_BUFFER_FILE;
     if (fs.existsSync(DATA_BUFFER_REMAINS)) dataFile = DATA_BUFFER_REMAINS;
@@ -136,7 +143,7 @@ export class Collector {
       let valueBuffer: Array<BufferedData> = [];
       do {
         const line = lines.shift();
-        const data = this.parseData(line);
+        const data = this.parseBufferedData(line);
         Logger.debug('[DEBUG] BufferedData:', JSON.stringify(data));
         valueBuffer.push(data);
 
@@ -201,7 +208,7 @@ export class Collector {
     }
   }
 
-  private parseData(line: string[]): BufferedData {
+  private parseBufferedData(line: string[]): BufferedData {
     const cpu = Number(line[CSV_COLS.buffer.cpu]);
     const time = moment(line[CSV_COLS.buffer.time], TIMESTAMP_FORMAT).toDate();
     let ramSplit = line[CSV_COLS.buffer.ram].split(' ');
@@ -220,6 +227,21 @@ export class Collector {
     };
   }
 
+  private parseReducedData(line: string[]): ReducedData {
+    return {
+      time: moment(line[CSV_COLS.reduced.time], TIMESTAMP_FORMAT).toDate(),
+      cpu: {
+        avg: Number(line[CSV_COLS.reduced.cpu.avg]),
+        peak: Number(line[CSV_COLS.reduced.cpu.peak])
+      },
+      ram: {
+        avg: Number(line[CSV_COLS.reduced.ram.avg]),
+        peak: Number(line[CSV_COLS.reduced.ram.peak]),
+        max: Number(line[CSV_COLS.reduced.ram.max])
+      }
+    };
+  }
+
   private byteFactors: { [unit: string]: number } = {
     '': 1,
     K: 1024,
@@ -255,7 +277,70 @@ export class Collector {
     ].join(';');
   }
 
-  exit() {
+  public get trx() {
+    return this._trx;
+  }
+
+  private _trx: {
+    file?: PathLike;
+    start: () => Promise<number>;
+    read: () => Promise<Array<ReducedData>>;
+    rollback: (hdl: number) => Promise<void>;
+    commit: (hdl: number) => Promise<void>;
+  } = {
+    start: async () => {
+      if (this.trx.file) {
+        Logger.warn(`[WARN] Old transaction file found - rolling back now before starting new transaction ...`);
+        const hdl = Number(/trx_(\d+)\.csv/.exec(this.trx.file as string)[1]);
+        await this.trx.rollback(hdl);
+        Logger.warn(`[WARN] Transaction rollback succeeded.`);
+      }
+
+      const hdl = moment().unix();
+      this.trx.file = path.resolve(DATA_DIR, `reduced.trx_${hdl.toFixed(0)}.csv`);
+      await fsp.rename(DATA_REDUCED_FILE, this.trx.file);
+      return hdl;
+    },
+    read: async () => {
+      if (!this.trx.file) throw new Error('No transaction opened');
+
+      const data = await this.readDataFileCSV(this.trx.file);
+      return data.map(this.parseReducedData.bind(this));
+    },
+    rollback: async (hdl: number) => {
+      if (this.trx.file) {
+        const filename = path.resolve(DATA_DIR, `reduced.trx_${hdl.toFixed(0)}.csv`);
+        if (filename !== this.trx.file) throw new HttpStatusException(`Transaction #${hdl} not found`, 404);
+
+        if (fs.existsSync(this.trx.file)) {
+          let tmpFile: string;
+          if (fs.existsSync(DATA_REDUCED_FILE)) {
+            tmpFile = `reduced.tmp_${moment().unix().toFixed(0)}.csv`;
+            await fsp.rename(DATA_REDUCED_FILE, tmpFile);
+          }
+          await fsp.rename(this.trx.file, DATA_REDUCED_FILE);
+          if (tmpFile) {
+            await exec(`cat "${tmpFile}" >> "${DATA_REDUCED_FILE}"`);
+            await fsp.unlink(tmpFile);
+          }
+        }
+        this.trx.file = undefined;
+      }
+    },
+    commit: async (hdl: number) => {
+      if (this.trx.file) {
+        const filename = path.resolve(DATA_DIR, `reduced.trx_${hdl.toFixed(0)}.csv`);
+        if (filename !== this.trx.file) throw new HttpStatusException(`Transaction #${hdl} not found`, 404);
+
+        if (fs.existsSync(this.trx.file)) {
+          await fsp.unlink(this.trx.file);
+        }
+        this.trx.file = undefined;
+      }
+    }
+  };
+
+  public stopLoop() {
     clearInterval(this.intervalHdl);
   }
 }

+ 1 - 1
daemon/src/index.ts

@@ -21,6 +21,6 @@ new Webserver(Number(process.env.WEB_PORT ?? '80'), collector);
 
 function exitGracefully(...args: any[]) {
   Logger.info(`[EXITING] Graceful exit, received ${JSON.stringify(args)}`);
-  collector.exit();
+  collector.stopLoop();
   process.exit(0);
 }

+ 5 - 0
daemon/src/lib/http-status.exception.ts

@@ -0,0 +1,5 @@
+export class HttpStatusException extends Error {
+  constructor(msg: string, public statusCode: number) {
+    super(msg);
+  }
+}

+ 48 - 0
daemon/src/webserver.class.ts

@@ -1,5 +1,8 @@
 import express, { Express } from 'express';
+
 import { Logger } from '../../common/util/logger.class';
+
+import { HttpStatusException } from './lib/http-status.exception';
 import { Collector } from './collector.class';
 
 export class Webserver {
@@ -8,6 +11,51 @@ export class Webserver {
   constructor(private port: number, private collector: Collector) {
     this.app = express();
 
+    this.app.get('/', async (req, res, next) => {
+      try {
+        const hdl = await this.collector.trx.start();
+        const data = await this.collector.trx.read();
+
+        res.send({
+          hdl,
+          data
+        });
+      } catch (err) {
+        next(err);
+      }
+    });
+
+    this.app.patch('/:hdl', async (req, res, next) => {
+      try {
+        const hdl = Number(req.params.hdl);
+        await this.collector.trx.commit(hdl);
+
+        res.send({ ok: true });
+      } catch (err) {
+        next(err);
+      }
+    });
+
+    this.app.delete('/:hdl', async (req, res, next) => {
+      try {
+        const hdl = Number(req.params.hdl);
+        await this.collector.trx.rollback(hdl);
+
+        res.send({ ok: true });
+      } catch (err) {
+        next(err);
+      }
+    });
+
+    this.app.use((err, req, res, next) => {
+      if (err instanceof HttpStatusException) {
+        res.status(err.statusCode).send(err.message);
+      } else {
+        Logger.error('[ERROR] Webservice ErrorHandler caught:', err);
+        res.status(500).send(JSON.stringify(err));
+      }
+    });
+
     this.app.listen(this.port, () => {
       Logger.info(`[INFO] Monitoring Daemon Webservice started at http://localhost:${this.port}`);
     });