| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201 |
- import moment from 'moment';
- import { Pool as MariaPool } from 'mysql';
- import { Logger } from '../../../common/util/logger.class';
- import { SQLiteDatabase as SQLiteDB } from './sqlite-database.class';
- import { MariaDBConnector } from './mariadb-connector.class';
- const CHUNK_SIZE = 5000;
- export class MariaDBImporter {
- private oldDb!: SQLiteDB;
- private newDb!: MariaDBConnector;
- constructor(pool: MariaPool) {
- this.oldDb = new SQLiteDB();
- this.newDb = new MariaDBConnector(pool);
- }
- async connect() {
- try {
- await this.oldDb.open();
- await this.newDb.connect();
- } catch (e) {
- Logger.error('[FATAL] Initializing MariaDBImporter failed:', e);
- Logger.error('[EXITING]');
- process.exit(1);
- }
- }
- async runImport() {
- // await this.newDb.beginTransaction();
- try {
- await this.cutoffOldData(moment().add(-4, 'months').toDate());
- await this.truncateTables();
- await this.importServer();
- await this.importServerConfig();
- await this.importServerDataEntry();
- await this.importServerDataValue();
- await this.importHealthCheckConfig();
- await this.importHealthCheckParams();
- await this.importHealthCheckDataEntry();
- // await this.newDb.commit();
- } catch (e) {
- Logger.error('[ERROR] Import to MariaDB failed:', e);
- // await this.newDb.rollback();
- process.exit(2);
- }
- }
- private async cutoffOldData(cutoffDate: Date) {
- Logger.info('[INFO]', 'Cutting off old DataEntries before', cutoffDate);
- await this.oldDb.run('DELETE FROM `ServerDataEntry` WHERE `Timestamp` < ?;', [cutoffDate.getTime()]);
- await this.oldDb.run('DELETE FROM `HealthCheckDataEntry` WHERE `Timestamp` < ?;', [cutoffDate.getTime()]);
- }
- private async truncateTables() {
- Logger.info('[INFO]', 'Truncating all Tables in MariaDB ...');
- await this.newDb.query(
- `
- SET autocommit = OFF;
- START TRANSACTION;
- SET FOREIGN_KEY_CHECKS=0;
- TRUNCATE TABLE \`ServerDataValue\`;
- TRUNCATE TABLE \`ServerDataEntry\`;
- TRUNCATE TABLE \`ServerConfig\`;
-
- TRUNCATE TABLE \`HealthCheckDataEntry\`;
- TRUNCATE TABLE \`HealthCheckParams\`;
- TRUNCATE TABLE \`HealthCheckConfig\`;
- TRUNCATE TABLE \`Server\`;
- COMMIT;
- SET FOREIGN_KEY_CHECKS=1;
- SET autocommit = ON;
- `,
- []
- );
- }
- private async importServer() {
- Logger.info('[INFO]', 'Importing Server Table ...');
- const res = await this.oldDb.stmt('SELECT * FROM `Server`;', []);
- for (const row of res.rows) {
- await this.newDb.query('INSERT INTO `Server`(`ID`, `Title`, `FQDN`) VALUES (?, ?, ?)', [row['ID'], row['Title'], row['FQDN']]);
- }
- }
- private async importServerConfig() {
- Logger.info('[INFO]', 'Importing ServerConfig Table ...');
- const res = await this.oldDb.stmt('SELECT * FROM `ServerConfig`;', []);
- for (const row of res.rows) {
- await this.newDb.query('INSERT INTO `ServerConfig`(`ID`, `ServerID`, `Key`, `Value`) VALUES (?, ?, ?, ?)', [
- row['ID'],
- row['ServerID'],
- row['Key'],
- row['Value']
- ]);
- }
- }
- private async importServerDataEntry() {
- Logger.info('[INFO]', 'Importing ServerDataEntry Table ...');
- let res = await this.oldDb.stmt('SELECT COUNT(*) as Count FROM `ServerDataEntry`;', []);
- const count = res.rows[0]['Count'] as number;
- let offset = 0;
- let pageSize = Math.min(CHUNK_SIZE, count);
- while (offset + pageSize <= count) {
- Logger.info('[INFO]', `Importing ServerDataEntry (${offset}/${count}) ...`);
- res = await this.oldDb.stmt('SELECT * FROM `ServerDataEntry` LIMIT ? OFFSET ?;', [pageSize, offset]);
- if (!res.rows.length) break;
- const sql = 'INSERT INTO `ServerDataEntry`(`ID`, `ServerID`, `Timestamp`) VALUES ' + res.rows.map(() => '(?,?,?)').join(',') + ';';
- const data = res.rows.reduce((res, row) => [...res, row['ID'], row['ServerID'], new Date(row['Timestamp'])], []);
- await this.newDb.query(sql, data);
- offset += pageSize;
- pageSize = Math.min(pageSize, count - offset);
- }
- }
- private async importServerDataValue() {
- Logger.info('[INFO]', 'Importing ServerDataValue Table ...');
- let res = await this.oldDb.stmt('SELECT COUNT(*) as Count FROM `ServerDataValue`;', []);
- const count = res.rows[0]['Count'] as number;
- let offset = 0;
- let pageSize = Math.min(CHUNK_SIZE, count);
- while (offset + pageSize <= count) {
- Logger.info('[INFO]', `Importing ServerDataValue (${offset}/${count}) ...`);
- const res = await this.oldDb.stmt('SELECT * FROM `ServerDataValue` LIMIT ? OFFSET ?;', [pageSize, offset]);
- if (!res.rows.length) break;
- const sql = 'INSERT INTO `ServerDataValue`(`ID`, `EntryID`, `Type`, `Key`, `Value`) VALUES' + res.rows.map(() => '(?,?,?,?,?)').join(',') + ';';
- const data = res.rows.reduce((res, row) => [...res, row['ID'], row['EntryID'], row['Type'], row['Key'], row['Value']], []);
- await this.newDb.query(sql, data);
- offset += pageSize;
- pageSize = Math.min(pageSize, count - offset);
- }
- }
- private async importHealthCheckConfig() {
- Logger.info('[INFO]', 'Importing HealthCheckConfig Table ...');
- const res = await this.oldDb.stmt('SELECT * FROM `HealthCheckConfig`;', []);
- for (const row of res.rows) {
- await this.newDb.query('INSERT INTO `HealthCheckConfig`(`ID`, `ServerID`, `Type`, `Title`) VALUES(?, ?, ?, ?)', [
- row['ID'],
- row['ServerID'],
- row['Type'],
- row['Title']
- ]);
- }
- }
- private async importHealthCheckParams() {
- Logger.info('[INFO]', 'Importing HealthCheckParams Table ...');
- const res = await this.oldDb.stmt('SELECT * FROM `HealthCheckParams`;', []);
- for (const row of res.rows) {
- await this.newDb.query('INSERT INTO `HealthCheckParams`(`ID`, `ConfigID`, `Type`, `Key`, `Value`) VALUES (?, ?, ?, ?, ?)', [
- row['ID'],
- row['ConfigID'],
- row['Type'],
- row['Key'],
- row['Value']
- ]);
- }
- }
- private async importHealthCheckDataEntry() {
- Logger.info('[INFO]', 'Importing HealthCheckDataEntry Table ...');
- let res = await this.oldDb.stmt('SELECT COUNT(*) as Count FROM `HealthCheckDataEntry`;', []);
- const count = res.rows[0]['Count'] as number;
- let offset = 0;
- let pageSize = Math.min(CHUNK_SIZE, count);
- while (offset + pageSize <= count) {
- Logger.info('[INFO]', `Importing HealthCheckDataEntry (${offset}/${count}) ...`);
- const res = await this.oldDb.stmt('SELECT * FROM `HealthCheckDataEntry` LIMIT ? OFFSET ?;', [pageSize, offset]);
- if (!res.rows.length) break;
- const sql =
- 'INSERT INTO `HealthCheckDataEntry`(`ID`, `ConfigID`, `Timestamp`, `Status`, `Message`) VALUES ' +
- res.rows.map(() => '(?, ?, ?, ?, ?)').join(',') +
- ';';
- const data = res.rows.reduce((res, row) => [...res, row['ID'], row['ConfigID'], new Date(row['Timestamp']), row['Status'], row['Message']], []);
- await this.newDb.query(sql, data);
- offset += pageSize;
- pageSize = Math.min(pageSize, count - offset);
- }
- }
- }
|