import moment from 'moment'; import { Pool as MariaPool } from 'mysql'; import { Logger } from '../../../common/util/logger.class'; import { SQLiteDatabase as SQLiteDB } from './sqlite-database.class'; import { MariaDBConnector } from './mariadb-connector.class'; const CHUNK_SIZE = 5000; export class MariaDBImporter { private oldDb!: SQLiteDB; private newDb!: MariaDBConnector; constructor(pool: MariaPool) { this.oldDb = new SQLiteDB(); this.newDb = new MariaDBConnector(pool); } async connect() { try { await this.oldDb.open(); await this.newDb.connect(); } catch (e) { Logger.error('[FATAL] Initializing MariaDBImporter failed:', e); Logger.error('[EXITING]'); process.exit(1); } } async runImport() { // await this.newDb.beginTransaction(); try { await this.cutoffOldData(moment().add(-4, 'months').toDate()); await this.truncateTables(); await this.importServer(); await this.importServerConfig(); await this.importServerDataEntry(); await this.importServerDataValue(); await this.importHealthCheckConfig(); await this.importHealthCheckParams(); await this.importHealthCheckDataEntry(); // await this.newDb.commit(); } catch (e) { Logger.error('[ERROR] Import to MariaDB failed:', e); // await this.newDb.rollback(); process.exit(2); } } private async cutoffOldData(cutoffDate: Date) { Logger.info('[INFO]', 'Cutting off old DataEntries before', cutoffDate); await this.oldDb.run('DELETE FROM `ServerDataEntry` WHERE `Timestamp` < ?;', [cutoffDate.getTime()]); await this.oldDb.run('DELETE FROM `HealthCheckDataEntry` WHERE `Timestamp` < ?;', [cutoffDate.getTime()]); } private async truncateTables() { Logger.info('[INFO]', 'Truncating all Tables in MariaDB ...'); await this.newDb.query( ` SET autocommit = OFF; START TRANSACTION; SET FOREIGN_KEY_CHECKS=0; TRUNCATE TABLE \`ServerDataValue\`; TRUNCATE TABLE \`ServerDataEntry\`; TRUNCATE TABLE \`ServerConfig\`; TRUNCATE TABLE \`HealthCheckDataEntry\`; TRUNCATE TABLE \`HealthCheckParams\`; TRUNCATE TABLE \`HealthCheckConfig\`; TRUNCATE TABLE \`Server\`; COMMIT; SET FOREIGN_KEY_CHECKS=1; SET autocommit = ON; `, [] ); } private async importServer() { Logger.info('[INFO]', 'Importing Server Table ...'); const res = await this.oldDb.stmt('SELECT * FROM `Server`;', []); for (const row of res.rows) { await this.newDb.query('INSERT INTO `Server`(`ID`, `Title`, `FQDN`) VALUES (?, ?, ?)', [row['ID'], row['Title'], row['FQDN']]); } } private async importServerConfig() { Logger.info('[INFO]', 'Importing ServerConfig Table ...'); const res = await this.oldDb.stmt('SELECT * FROM `ServerConfig`;', []); for (const row of res.rows) { await this.newDb.query('INSERT INTO `ServerConfig`(`ID`, `ServerID`, `Key`, `Value`) VALUES (?, ?, ?, ?)', [ row['ID'], row['ServerID'], row['Key'], row['Value'] ]); } } private async importServerDataEntry() { Logger.info('[INFO]', 'Importing ServerDataEntry Table ...'); let res = await this.oldDb.stmt('SELECT COUNT(*) as Count FROM `ServerDataEntry`;', []); const count = res.rows[0]['Count'] as number; let offset = 0; let pageSize = Math.min(CHUNK_SIZE, count); while (offset + pageSize <= count) { Logger.info('[INFO]', `Importing ServerDataEntry (${offset}/${count}) ...`); res = await this.oldDb.stmt('SELECT * FROM `ServerDataEntry` LIMIT ? OFFSET ?;', [pageSize, offset]); if (!res.rows.length) break; const sql = 'INSERT INTO `ServerDataEntry`(`ID`, `ServerID`, `Timestamp`) VALUES ' + res.rows.map(() => '(?,?,?)').join(',') + ';'; const data = res.rows.reduce((res, row) => [...res, row['ID'], row['ServerID'], new Date(row['Timestamp'])], []); await this.newDb.query(sql, data); offset += pageSize; pageSize = Math.min(pageSize, count - offset); } } private async importServerDataValue() { Logger.info('[INFO]', 'Importing ServerDataValue Table ...'); let res = await this.oldDb.stmt('SELECT COUNT(*) as Count FROM `ServerDataValue`;', []); const count = res.rows[0]['Count'] as number; let offset = 0; let pageSize = Math.min(CHUNK_SIZE, count); while (offset + pageSize <= count) { Logger.info('[INFO]', `Importing ServerDataValue (${offset}/${count}) ...`); const res = await this.oldDb.stmt('SELECT * FROM `ServerDataValue` LIMIT ? OFFSET ?;', [pageSize, offset]); if (!res.rows.length) break; const sql = 'INSERT INTO `ServerDataValue`(`ID`, `EntryID`, `Type`, `Key`, `Value`) VALUES' + res.rows.map(() => '(?,?,?,?,?)').join(',') + ';'; const data = res.rows.reduce((res, row) => [...res, row['ID'], row['EntryID'], row['Type'], row['Key'], row['Value']], []); await this.newDb.query(sql, data); offset += pageSize; pageSize = Math.min(pageSize, count - offset); } } private async importHealthCheckConfig() { Logger.info('[INFO]', 'Importing HealthCheckConfig Table ...'); const res = await this.oldDb.stmt('SELECT * FROM `HealthCheckConfig`;', []); for (const row of res.rows) { await this.newDb.query('INSERT INTO `HealthCheckConfig`(`ID`, `ServerID`, `Type`, `Title`) VALUES(?, ?, ?, ?)', [ row['ID'], row['ServerID'], row['Type'], row['Title'] ]); } } private async importHealthCheckParams() { Logger.info('[INFO]', 'Importing HealthCheckParams Table ...'); const res = await this.oldDb.stmt('SELECT * FROM `HealthCheckParams`;', []); for (const row of res.rows) { await this.newDb.query('INSERT INTO `HealthCheckParams`(`ID`, `ConfigID`, `Type`, `Key`, `Value`) VALUES (?, ?, ?, ?, ?)', [ row['ID'], row['ConfigID'], row['Type'], row['Key'], row['Value'] ]); } } private async importHealthCheckDataEntry() { Logger.info('[INFO]', 'Importing HealthCheckDataEntry Table ...'); let res = await this.oldDb.stmt('SELECT COUNT(*) as Count FROM `HealthCheckDataEntry`;', []); const count = res.rows[0]['Count'] as number; let offset = 0; let pageSize = Math.min(CHUNK_SIZE, count); while (offset + pageSize <= count) { Logger.info('[INFO]', `Importing HealthCheckDataEntry (${offset}/${count}) ...`); const res = await this.oldDb.stmt('SELECT * FROM `HealthCheckDataEntry` LIMIT ? OFFSET ?;', [pageSize, offset]); if (!res.rows.length) break; const sql = 'INSERT INTO `HealthCheckDataEntry`(`ID`, `ConfigID`, `Timestamp`, `Status`, `Message`) VALUES ' + res.rows.map(() => '(?, ?, ?, ?, ?)').join(',') + ';'; const data = res.rows.reduce((res, row) => [...res, row['ID'], row['ConfigID'], new Date(row['Timestamp']), row['Status'], row['Message']], []); await this.newDb.query(sql, data); offset += pageSize; pageSize = Math.min(pageSize, count - offset); } } }