mariadb-importer.class.ts 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. import moment from 'moment';
  2. import { Pool as MariaPool } from 'mysql';
  3. import { Logger } from '../../../common/util/logger.class';
  4. import { SQLiteDatabase as SQLiteDB } from './sqlite-database.class';
  5. import { MariaDBConnector } from './mariadb-connector.class';
  6. const CHUNK_SIZE = 5000;
  7. export class MariaDBImporter {
  8. private oldDb!: SQLiteDB;
  9. private newDb!: MariaDBConnector;
  10. constructor(pool: MariaPool) {
  11. this.oldDb = new SQLiteDB();
  12. this.newDb = new MariaDBConnector(pool);
  13. }
  14. async connect() {
  15. try {
  16. await this.oldDb.open();
  17. await this.newDb.connect();
  18. } catch (e) {
  19. Logger.error('[FATAL] Initializing MariaDBImporter failed:', e);
  20. Logger.error('[EXITING]');
  21. process.exit(1);
  22. }
  23. }
  24. async runImport() {
  25. // await this.newDb.beginTransaction();
  26. try {
  27. // await this.cutoffOldData(moment().add(-4, 'months').toDate());
  28. await this.truncateTables();
  29. await this.importServer();
  30. await this.importServerConfig();
  31. await this.importServerDataEntry();
  32. await this.importServerDataValue();
  33. await this.importHealthCheckConfig();
  34. await this.importHealthCheckParams();
  35. await this.importHealthCheckDataEntry();
  36. // await this.newDb.commit();
  37. } catch (e) {
  38. Logger.error('[ERROR] Import to MariaDB failed:', e);
  39. // await this.newDb.rollback();
  40. process.exit(2);
  41. }
  42. }
  43. // private async cutoffOldData(cutoffDate: Date) {
  44. // Logger.info('[INFO]', 'Cutting off old DataEntries before', cutoffDate);
  45. // await this.oldDb.run('DELETE FROM `ServerDataEntry` WHERE `Timestamp` < ?;', [cutoffDate.getTime()]);
  46. // await this.oldDb.run('DELETE FROM `HealthCheckDataEntry` WHERE `Timestamp` < ?;', [cutoffDate.getTime()]);
  47. // }
  48. private async truncateTables() {
  49. Logger.info('[INFO]', 'Truncating all Tables in MariaDB ...');
  50. await this.newDb.query(
  51. `
  52. SET autocommit = OFF;
  53. START TRANSACTION;
  54. SET FOREIGN_KEY_CHECKS=0;
  55. TRUNCATE TABLE \`ServerDataValue\`;
  56. TRUNCATE TABLE \`ServerDataEntry\`;
  57. TRUNCATE TABLE \`ServerConfig\`;
  58. TRUNCATE TABLE \`HealthCheckDataEntry\`;
  59. TRUNCATE TABLE \`HealthCheckParams\`;
  60. TRUNCATE TABLE \`HealthCheckConfig\`;
  61. TRUNCATE TABLE \`Server\`;
  62. COMMIT;
  63. SET FOREIGN_KEY_CHECKS=1;
  64. SET autocommit = ON;
  65. `,
  66. []
  67. );
  68. }
  69. private async importServer() {
  70. Logger.info('[INFO]', 'Importing Server Table ...');
  71. const res = await this.oldDb.stmt('SELECT * FROM `Server`;', []);
  72. for (const row of res.rows) {
  73. await this.newDb.query('INSERT INTO `Server`(`ID`, `Title`, `FQDN`) VALUES (?, ?, ?)', [row['ID'], row['Title'], row['FQDN']]);
  74. }
  75. }
  76. private async importServerConfig() {
  77. Logger.info('[INFO]', 'Importing ServerConfig Table ...');
  78. const res = await this.oldDb.stmt('SELECT * FROM `ServerConfig`;', []);
  79. for (const row of res.rows) {
  80. await this.newDb.query('INSERT INTO `ServerConfig`(`ID`, `ServerID`, `Key`, `Value`) VALUES (?, ?, ?, ?)', [
  81. row['ID'],
  82. row['ServerID'],
  83. row['Key'],
  84. row['Value']
  85. ]);
  86. }
  87. }
  88. private async importServerDataEntry() {
  89. Logger.info('[INFO]', 'Importing ServerDataEntry Table ...');
  90. let res = await this.oldDb.stmt('SELECT COUNT(*) as Count FROM `ServerDataEntry`;', []);
  91. const count = res.rows[0]['Count'] as number;
  92. let offset = 0;
  93. let pageSize = Math.min(CHUNK_SIZE, count);
  94. let measure: number[] = [];
  95. let estimate = '-- Rows/s | EST --:--:--';
  96. let prevTime = new Date().getTime();
  97. while (offset + pageSize <= count) {
  98. Logger.info('[INFO]', `Importing ServerDataEntry (${offset}/${count}) - ${estimate} ...`);
  99. res = await this.oldDb.stmt('SELECT * FROM `ServerDataEntry` LIMIT ? OFFSET ?;', [pageSize, offset]);
  100. if (!res.rows.length) break;
  101. const sql = 'INSERT INTO `ServerDataEntry`(`ID`, `ServerID`, `Timestamp`) VALUES ' + res.rows.map(() => '(?,?,?)').join(',') + ';';
  102. const data = res.rows.reduce((res, row) => [...res, row['ID'], row['ServerID'], new Date(row['Timestamp'])], []);
  103. await this.newDb.query(sql, data);
  104. measure.push((pageSize / (new Date().getTime() - prevTime)) * 1000);
  105. if (measure.length > 10) measure.shift();
  106. prevTime = new Date().getTime();
  107. if (measure.length > 0) {
  108. const rowsPerSec = measure.reduce((res, meas) => (res += meas), 0) / measure.length;
  109. const estSecs = (count - offset) / rowsPerSec;
  110. estimate = `${Math.round(rowsPerSec * 10) / 10} rows/s | EST ${moment(0).add(estSecs, 'seconds').format('HH:mm:ss')}`;
  111. }
  112. offset += pageSize;
  113. pageSize = Math.min(pageSize, count - offset);
  114. }
  115. }
  116. private async importServerDataValue() {
  117. Logger.info('[INFO]', 'Importing ServerDataValue Table ...');
  118. let res = await this.oldDb.stmt('SELECT COUNT(*) as Count FROM `ServerDataValue`;', []);
  119. const count = res.rows[0]['Count'] as number;
  120. let offset = 0;
  121. let pageSize = Math.min(CHUNK_SIZE, count);
  122. let measure: number[] = [];
  123. let estimate = '-- Rows/s | EST --:--:--';
  124. let prevTime = new Date().getTime();
  125. while (offset + pageSize <= count) {
  126. Logger.info('[INFO]', `Importing ServerDataValue (${offset}/${count}) - ${estimate} ...`);
  127. const res = await this.oldDb.stmt('SELECT * FROM `ServerDataValue` LIMIT ? OFFSET ?;', [pageSize, offset]);
  128. if (!res.rows.length) break;
  129. const sql = 'INSERT INTO `ServerDataValue`(`ID`, `EntryID`, `Type`, `Key`, `Value`) VALUES' + res.rows.map(() => '(?,?,?,?,?)').join(',') + ';';
  130. const data = res.rows.reduce((res, row) => [...res, row['ID'], row['EntryID'], row['Type'], row['Key'], row['Value']], []);
  131. await this.newDb.query(sql, data);
  132. measure.push((pageSize / (new Date().getTime() - prevTime)) * 1000);
  133. if (measure.length > 10) measure.shift();
  134. prevTime = new Date().getTime();
  135. if (measure.length > 0) {
  136. const rowsPerSec = measure.reduce((res, meas) => (res += meas), 0) / measure.length;
  137. const estSecs = (count - offset) / rowsPerSec;
  138. estimate = `${Math.round(rowsPerSec * 10) / 10} rows/s | EST ${moment(0).add(estSecs, 'seconds').format('HH:mm:ss')}`;
  139. }
  140. offset += pageSize;
  141. pageSize = Math.min(pageSize, count - offset);
  142. }
  143. }
  144. private async importHealthCheckConfig() {
  145. Logger.info('[INFO]', 'Importing HealthCheckConfig Table ...');
  146. const res = await this.oldDb.stmt('SELECT * FROM `HealthCheckConfig`;', []);
  147. for (const row of res.rows) {
  148. await this.newDb.query('INSERT INTO `HealthCheckConfig`(`ID`, `ServerID`, `Type`, `Title`) VALUES(?, ?, ?, ?)', [
  149. row['ID'],
  150. row['ServerID'],
  151. row['Type'],
  152. row['Title']
  153. ]);
  154. }
  155. }
  156. private async importHealthCheckParams() {
  157. Logger.info('[INFO]', 'Importing HealthCheckParams Table ...');
  158. const res = await this.oldDb.stmt('SELECT * FROM `HealthCheckParams`;', []);
  159. for (const row of res.rows) {
  160. await this.newDb.query('INSERT INTO `HealthCheckParams`(`ID`, `ConfigID`, `Type`, `Key`, `Value`) VALUES (?, ?, ?, ?, ?)', [
  161. row['ID'],
  162. row['ConfigID'],
  163. row['Type'],
  164. row['Key'],
  165. row['Value']
  166. ]);
  167. }
  168. }
  169. private async importHealthCheckDataEntry() {
  170. Logger.info('[INFO]', 'Importing HealthCheckDataEntry Table ...');
  171. let res = await this.oldDb.stmt('SELECT COUNT(*) as Count FROM `HealthCheckDataEntry`;', []);
  172. const count = res.rows[0]['Count'] as number;
  173. let offset = 0;
  174. let pageSize = Math.min(CHUNK_SIZE, count);
  175. let measure: number[] = [];
  176. let estimate = '-- Rows/s | EST --:--:--';
  177. let prevTime = new Date().getTime();
  178. while (offset + pageSize <= count) {
  179. Logger.info('[INFO]', `Importing HealthCheckDataEntry (${offset}/${count}) - ${estimate} ...`);
  180. const res = await this.oldDb.stmt('SELECT * FROM `HealthCheckDataEntry` LIMIT ? OFFSET ?;', [pageSize, offset]);
  181. if (!res.rows.length) break;
  182. const sql =
  183. 'INSERT INTO `HealthCheckDataEntry`(`ID`, `ConfigID`, `Timestamp`, `Status`, `Message`) VALUES ' +
  184. res.rows.map(() => '(?, ?, ?, ?, ?)').join(',') +
  185. ';';
  186. const data = res.rows.reduce((res, row) => [...res, row['ID'], row['ConfigID'], new Date(row['Timestamp']), row['Status'], row['Message']], []);
  187. await this.newDb.query(sql, data);
  188. measure.push((pageSize / (new Date().getTime() - prevTime)) * 1000);
  189. if (measure.length > 10) measure.shift();
  190. prevTime = new Date().getTime();
  191. if (measure.length > 0) {
  192. const rowsPerSec = measure.reduce((res, meas) => (res += meas), 0) / measure.length;
  193. const estSecs = (count - offset) / rowsPerSec;
  194. estimate = `${Math.round(rowsPerSec * 10) / 10} rows/s | EST ${moment(0).add(estSecs, 'seconds').format('HH:mm:ss')}`;
  195. }
  196. offset += pageSize;
  197. pageSize = Math.min(pageSize, count - offset);
  198. }
  199. }
  200. public async close(): Promise<void> {
  201. await this.newDb.close();
  202. await this.oldDb.close();
  203. }
  204. }