Prechádzať zdrojové kódy

Server: Implementation API Sync

Christian Kahlau 3 rokov pred
rodič
commit
3bb8b0eabe

+ 0 - 5
common/types/server.d.ts

@@ -4,9 +4,4 @@ type Server = {
   fqdn: string;
 
   config: { [key: string]: string };
-  data?: {
-    [timestamp: number]: {
-      [key: string]: number;
-    };
-  };
 };

+ 1 - 0
server/package.json

@@ -10,6 +10,7 @@
   "author": "Christian Kahlau, HostBBQ ©2022",
   "license": "ISC",
   "dependencies": {
+    "axios": "^0.27.2",
     "dotenv": "^16.0.2",
     "express": "^4.18.1",
     "sqlite3": "^5.1.1"

+ 76 - 1
server/src/database.class.ts

@@ -60,10 +60,11 @@ export class Database {
           `CREATE TABLE ServerDataValue (
             ID INTEGER PRIMARY KEY AUTOINCREMENT,
             EntryID INTEGER NOT NULL,
+            Type Text NOT NULL,
             Key TEXT NOT NULL,
             Value REAL NOT NULL,
             FOREIGN KEY(EntryID) REFERENCES ServerDataEntry(ID),
-            UNIQUE(EntryID, Key)
+            UNIQUE(EntryID, Type, Key)
           );`,
           []
         );
@@ -110,6 +111,68 @@ export class Database {
     }, [] as Server[]);
   }
 
+  public async insertServerData(serverID: number, data: ReducedData[]) {
+    if (!data.length) return;
+
+    await this.beginTransaction();
+    try {
+      let c = 1;
+      for (const entry of data) {
+        const result = await this.run('INSERT INTO ServerDataEntry(ServerID, Timestamp) VALUES(?, ?);', [serverID, entry.time.getTime()]);
+        let entryID = result.lastID;
+
+        for (const type of Object.keys(entry).filter(t => t !== 'time')) {
+          for (const key of Object.keys(entry[type])) {
+            await this.run('INSERT INTO ServerDataValue(EntryID, Type, Key, Value) VALUES(?, ?, ?, ?);', [entryID, type, key, entry[type][key]]);
+          }
+        }
+        c++;
+      }
+      await this.commit();
+    } catch (err) {
+      await this.rollback();
+      throw err;
+    }
+  }
+
+  public async getServerData(serverID: number, start: Date, end: Date): Promise<ReducedData[]> {
+    /* FIRST DRAFT - SIMPLY GET ALL DATA POINTS OF ALL TYPES */
+    /* TODO: ONLY GET DATA OF ONE TYPE, REDUCE DOWN TO FEWER DATA POINTS, COMPUTING AVGs & PEAKs */
+
+    const result = await this.stmt(
+      `SELECT
+          ServerDataEntry.*,
+          ServerDataValue.Type,
+          ServerDataValue.Key,
+          ServerDataValue.Value
+        FROM ServerDataEntry
+        JOIN ServerDataValue ON ServerDataEntry.ID = ServerDataValue.EntryID
+        WHERE ServerID = ?
+        AND Timestamp BETWEEN ? AND ?
+        ORDER BY Timestamp, Type, Key;`,
+      [serverID, start.getTime(), end.getTime()]
+    );
+
+    return result.rows.reduce((res, line, i) => {
+      const timestamp = line['Timestamp'];
+      let entry: ReducedData;
+      if (i === 0 || res[res.length - 1].time.getTime() !== timestamp) {
+        entry = { time: new Date(timestamp) } as ReducedData;
+        res.push(entry);
+      } else {
+        entry = res[res.length - 1];
+      }
+
+      const type = line['Type'];
+      if (typeof entry[type] === 'undefined') {
+        entry[type] = {};
+      }
+      entry[type][line['Key']] = line['Value'];
+
+      return res;
+    }, [] as ReducedData[]);
+  }
+
   private async run(sql: string, params: any): Promise<RunResult> {
     return new Promise<RunResult>((res, rej) => {
       this.db.run(sql, params, function (err) {
@@ -128,4 +191,16 @@ export class Database {
       });
     });
   }
+
+  public async beginTransaction() {
+    await this.run('BEGIN TRANSACTION;', []);
+  }
+
+  public async commit() {
+    await this.run('COMMIT;', []);
+  }
+
+  public async rollback() {
+    await this.run('ROLLBACK;', []);
+  }
 }

+ 33 - 1
server/src/server-connector.class.ts

@@ -1,3 +1,5 @@
+import axios from 'axios';
+
 import { Logger } from '../../common/util/logger.class';
 import { Database } from './database.class';
 import { Timer } from './timer.class';
@@ -15,6 +17,12 @@ export class ServerConnector {
           Logger.info('[INFO] Starting Server Sync Connector for', server.title, 'with interval', interval, 'seconds ...');
           const id = Timer.instance.subscribe(interval, () => this.timerTick(server));
           this.subscriptions.push({ id, interval, server });
+
+          process.nextTick(async () => {
+            Logger.info('[INFO] Initial Sync for', server.title, '...');
+            await this.timerTick(server);
+            Logger.info('[SUCCESS] Initial Sync for', server.title, 'succeeded.');
+          });
         });
       } catch (err) {
         Logger.error('[FATAL] Initializing ServerConnector failed:', err);
@@ -24,7 +32,31 @@ export class ServerConnector {
     })();
   }
 
-  private timerTick(server: Server) {
+  private async timerTick(server: Server) {
     Logger.debug('[DEBUG] TICK', new Date(), JSON.stringify(server));
+
+    let trxHdl: number;
+    try {
+      // Start Transaction, receiving Data and a Transaction Handle
+      let response = await axios.get(`http://${server.fqdn}:8890/`, { responseType: 'json' });
+      trxHdl = response.data.hdl;
+      const data: ReducedData[] = response.data.data.map(entry => ({ ...entry, time: new Date(entry.time) }));
+
+      // Process data in DB
+      await this.db.insertServerData(server.id, data);
+
+      // Commit Transaction
+      await axios.patch(`http://${server.fqdn}:8890/${trxHdl}`, null, { responseType: 'json' });
+    } catch (err) {
+      Logger.error('[ERROR] Server data sync failed:', err);
+
+      if (!!trxHdl) {
+        Logger.error(`[WARN] Rolling back transaction #${trxHdl} ... `);
+        const response = await axios.delete(`http://${server.fqdn}:8890/${trxHdl}`, { responseType: 'json' });
+        if (response.data.ok) {
+          Logger.error(`[WARN] Rollback succeeded.`);
+        }
+      }
+    }
   }
 }