mariadb-importer.class.ts 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. import moment from 'moment';
  2. import { Pool as MariaPool } from 'mysql';
  3. import { Logger } from '../../../common/util/logger.class';
  4. import { SQLiteDatabase as SQLiteDB } from './sqlite-database.class';
  5. import { MariaDBConnector } from './mariadb-connector.class';
  6. const CHUNK_SIZE = 5000;
  7. export class MariaDBImporter {
  8. private oldDb!: SQLiteDB;
  9. private newDb!: MariaDBConnector;
  10. constructor(pool: MariaPool) {
  11. this.oldDb = new SQLiteDB();
  12. this.newDb = new MariaDBConnector(pool);
  13. }
  14. async connect() {
  15. try {
  16. await this.oldDb.open();
  17. await this.newDb.connect();
  18. } catch (e) {
  19. Logger.error('[FATAL] Initializing MariaDBImporter failed:', e);
  20. Logger.error('[EXITING]');
  21. process.exit(1);
  22. }
  23. }
  24. async runImport() {
  25. // await this.newDb.beginTransaction();
  26. try {
  27. await this.cutoffOldData(moment().add(-4, 'months').toDate());
  28. await this.truncateTables();
  29. await this.importServer();
  30. await this.importServerConfig();
  31. await this.importServerDataEntry();
  32. await this.importServerDataValue();
  33. await this.importHealthCheckConfig();
  34. await this.importHealthCheckParams();
  35. await this.importHealthCheckDataEntry();
  36. // await this.newDb.commit();
  37. } catch (e) {
  38. Logger.error('[ERROR] Import to MariaDB failed:', e);
  39. // await this.newDb.rollback();
  40. process.exit(2);
  41. }
  42. }
  43. private async cutoffOldData(cutoffDate: Date) {
  44. Logger.info('[INFO]', 'Cutting off old DataEntries before', cutoffDate);
  45. await this.oldDb.run('DELETE FROM `ServerDataEntry` WHERE `Timestamp` < ?;', [cutoffDate.getTime()]);
  46. await this.oldDb.run('DELETE FROM `HealthCheckDataEntry` WHERE `Timestamp` < ?;', [cutoffDate.getTime()]);
  47. }
  48. private async truncateTables() {
  49. Logger.info('[INFO]', 'Truncating all Tables in MariaDB ...');
  50. await this.newDb.query(
  51. `
  52. SET autocommit = OFF;
  53. START TRANSACTION;
  54. SET FOREIGN_KEY_CHECKS=0;
  55. TRUNCATE TABLE \`ServerDataValue\`;
  56. TRUNCATE TABLE \`ServerDataEntry\`;
  57. TRUNCATE TABLE \`ServerConfig\`;
  58. TRUNCATE TABLE \`HealthCheckDataEntry\`;
  59. TRUNCATE TABLE \`HealthCheckParams\`;
  60. TRUNCATE TABLE \`HealthCheckConfig\`;
  61. TRUNCATE TABLE \`Server\`;
  62. COMMIT;
  63. SET FOREIGN_KEY_CHECKS=1;
  64. SET autocommit = ON;
  65. `,
  66. []
  67. );
  68. }
  69. private async importServer() {
  70. Logger.info('[INFO]', 'Importing Server Table ...');
  71. const res = await this.oldDb.stmt('SELECT * FROM `Server`;', []);
  72. for (const row of res.rows) {
  73. await this.newDb.query('INSERT INTO `Server`(`ID`, `Title`, `FQDN`) VALUES (?, ?, ?)', [row['ID'], row['Title'], row['FQDN']]);
  74. }
  75. }
  76. private async importServerConfig() {
  77. Logger.info('[INFO]', 'Importing ServerConfig Table ...');
  78. const res = await this.oldDb.stmt('SELECT * FROM `ServerConfig`;', []);
  79. for (const row of res.rows) {
  80. await this.newDb.query('INSERT INTO `ServerConfig`(`ID`, `ServerID`, `Key`, `Value`) VALUES (?, ?, ?, ?)', [
  81. row['ID'],
  82. row['ServerID'],
  83. row['Key'],
  84. row['Value']
  85. ]);
  86. }
  87. }
  88. private async importServerDataEntry() {
  89. Logger.info('[INFO]', 'Importing ServerDataEntry Table ...');
  90. let res = await this.oldDb.stmt('SELECT COUNT(*) as Count FROM `ServerDataEntry`;', []);
  91. const count = res.rows[0]['Count'] as number;
  92. let offset = 0;
  93. let pageSize = Math.min(CHUNK_SIZE, count);
  94. while (offset + pageSize <= count) {
  95. Logger.info('[INFO]', `Importing ServerDataEntry (${offset}/${count}) ...`);
  96. res = await this.oldDb.stmt('SELECT * FROM `ServerDataEntry` LIMIT ? OFFSET ?;', [pageSize, offset]);
  97. if (!res.rows.length) break;
  98. const sql = 'INSERT INTO `ServerDataEntry`(`ID`, `ServerID`, `Timestamp`) VALUES ' + res.rows.map(() => '(?,?,?)').join(',') + ';';
  99. const data = res.rows.reduce((res, row) => [...res, row['ID'], row['ServerID'], new Date(row['Timestamp'])], []);
  100. await this.newDb.query(sql, data);
  101. offset += pageSize;
  102. pageSize = Math.min(pageSize, count - offset);
  103. }
  104. }
  105. private async importServerDataValue() {
  106. Logger.info('[INFO]', 'Importing ServerDataValue Table ...');
  107. let res = await this.oldDb.stmt('SELECT COUNT(*) as Count FROM `ServerDataValue`;', []);
  108. const count = res.rows[0]['Count'] as number;
  109. let offset = 0;
  110. let pageSize = Math.min(CHUNK_SIZE, count);
  111. while (offset + pageSize <= count) {
  112. Logger.info('[INFO]', `Importing ServerDataValue (${offset}/${count}) ...`);
  113. const res = await this.oldDb.stmt('SELECT * FROM `ServerDataValue` LIMIT ? OFFSET ?;', [pageSize, offset]);
  114. if (!res.rows.length) break;
  115. const sql = 'INSERT INTO `ServerDataValue`(`ID`, `EntryID`, `Type`, `Key`, `Value`) VALUES' + res.rows.map(() => '(?,?,?,?,?)').join(',') + ';';
  116. const data = res.rows.reduce((res, row) => [...res, row['ID'], row['EntryID'], row['Type'], row['Key'], row['Value']], []);
  117. await this.newDb.query(sql, data);
  118. offset += pageSize;
  119. pageSize = Math.min(pageSize, count - offset);
  120. }
  121. }
  122. private async importHealthCheckConfig() {
  123. Logger.info('[INFO]', 'Importing HealthCheckConfig Table ...');
  124. const res = await this.oldDb.stmt('SELECT * FROM `HealthCheckConfig`;', []);
  125. for (const row of res.rows) {
  126. await this.newDb.query('INSERT INTO `HealthCheckConfig`(`ID`, `ServerID`, `Type`, `Title`) VALUES(?, ?, ?, ?)', [
  127. row['ID'],
  128. row['ServerID'],
  129. row['Type'],
  130. row['Title']
  131. ]);
  132. }
  133. }
  134. private async importHealthCheckParams() {
  135. Logger.info('[INFO]', 'Importing HealthCheckParams Table ...');
  136. const res = await this.oldDb.stmt('SELECT * FROM `HealthCheckParams`;', []);
  137. for (const row of res.rows) {
  138. await this.newDb.query('INSERT INTO `HealthCheckParams`(`ID`, `ConfigID`, `Type`, `Key`, `Value`) VALUES (?, ?, ?, ?, ?)', [
  139. row['ID'],
  140. row['ConfigID'],
  141. row['Type'],
  142. row['Key'],
  143. row['Value']
  144. ]);
  145. }
  146. }
  147. private async importHealthCheckDataEntry() {
  148. Logger.info('[INFO]', 'Importing HealthCheckDataEntry Table ...');
  149. let res = await this.oldDb.stmt('SELECT COUNT(*) as Count FROM `HealthCheckDataEntry`;', []);
  150. const count = res.rows[0]['Count'] as number;
  151. let offset = 0;
  152. let pageSize = Math.min(CHUNK_SIZE, count);
  153. while (offset + pageSize <= count) {
  154. Logger.info('[INFO]', `Importing HealthCheckDataEntry (${offset}/${count}) ...`);
  155. const res = await this.oldDb.stmt('SELECT * FROM `HealthCheckDataEntry` LIMIT ? OFFSET ?;', [pageSize, offset]);
  156. if (!res.rows.length) break;
  157. const sql =
  158. 'INSERT INTO `HealthCheckDataEntry`(`ID`, `ConfigID`, `Timestamp`, `Status`, `Message`) VALUES ' +
  159. res.rows.map(() => '(?, ?, ?, ?, ?)').join(',') +
  160. ';';
  161. const data = res.rows.reduce((res, row) => [...res, row['ID'], row['ConfigID'], new Date(row['Timestamp']), row['Status'], row['Message']], []);
  162. await this.newDb.query(sql, data);
  163. offset += pageSize;
  164. pageSize = Math.min(pageSize, count - offset);
  165. }
  166. }
  167. }