migrations: create and pass migration context with state and pending states.

This commit is contained in:
Nodari Chkuaselidze 2025-03-28 16:45:32 +04:00
parent abd0c38a6c
commit 5469d8baf5
No known key found for this signature in database
GPG key ID: B018A7BB437D1F05
7 changed files with 344 additions and 135 deletions

View file

@ -18,15 +18,17 @@ const CoinView = require('../coins/coinview');
const UndoCoins = require('../coins/undocoins');
const layout = require('./layout');
const AbstractMigration = require('../migrations/migration');
const migrator = require('../migrations/migrator');
const {
Migrator,
oldLayout,
types
} = require('../migrations/migrator');
} = migrator;
/** @typedef {import('../types').Hash} Hash */
/** @typedef {ReturnType<bdb.DB['batch']>} Batch */
/** @typedef {import('../migrations/migrator').types} MigrationType */
/** @typedef {migrator.types} MigrationType */
/** @typedef {migrator.MigrationContext} MigrationContext */
/**
* Switch to new migrations layout.
@ -59,14 +61,14 @@ class MigrateMigrations extends AbstractMigration {
/**
* Actual migration
* @param {Batch} b
* @param {MigrationContext} ctx
* @returns {Promise}
*/
async migrate(b) {
async migrate(b, ctx) {
this.logger.info('Migrating migrations..');
const oldLayout = this.layout.oldLayout;
const newLayout = this.layout.newLayout;
let nextMigration = 1;
const skipped = [];
@ -90,33 +92,9 @@ class MigrateMigrations extends AbstractMigration {
this.db.writeVersion(b, 2);
const rawState = this.encodeMigrationState(nextMigration, skipped);
b.put(newLayout.M.encode(), rawState);
}
/**
* @param {Number} nextMigration
* @param {Number[]} skipped
* @returns {Buffer}
*/
encodeMigrationState(nextMigration, skipped) {
let size = 4;
size += encoding.sizeVarint(nextMigration);
size += encoding.sizeVarint(skipped.length);
for (const id of skipped)
size += encoding.sizeVarint(id);
const bw = bio.write(size);
bw.writeU32(0);
bw.writeVarint(nextMigration);
bw.writeVarint(skipped.length);
for (const id of skipped)
bw.writeVarint(id);
return bw.render();
ctx.state.version = 0;
ctx.state.skipped = skipped;
ctx.state.nextMigration = nextMigration;
}
static info() {

View file

@ -8,6 +8,7 @@
/** @typedef {import('bdb').DB} DB */
/** @typedef {ReturnType<DB['batch']>} Batch */
/** @typedef {import('./migrator').types} MigrationType */
/** @typedef {import('./migrator').MigrationContext} MigrationContext */
/**
* Abstract class for single migration.
@ -37,10 +38,11 @@ class AbstractMigration {
/**
* Run the actual migration
* @param {Batch} b
* @param {MigrationContext} [ctx]
* @returns {Promise}
*/
async migrate(b) {
async migrate(b, ctx) {
throw new Error('Abstract method.');
}

View file

@ -1,6 +1,7 @@
/**
* migrations/migrator.js - abstract migrator for hsd.
* Copyright (c) 2021, Nodari Chkuaselidze (MIT License)
* https://github.com/handshake-org/hsd
*/
'use strict';
@ -13,6 +14,8 @@ const MigrationState = require('../migrations/state');
/** @typedef {ReturnType<bdb.DB['batch']>} Batch */
/** @typedef {import('../blockchain/chaindb')} ChainDB */
const EMPTY = Buffer.alloc(0);
/**
* This entry needs to be part of all dbs that support migrations.
* V -> DB Version
@ -44,36 +47,6 @@ const types = {
FAKE_MIGRATE: 2
};
/**
* Store migration results.
* @alias module:migrations.MigrationResult
*/
class MigrationResult {
constructor() {
/** @type {Set<Number>} */
this.migrated = new Set();
/** @type {Set<Number>} */
this.skipped = new Set();
}
/**
* @param {Number} id
*/
skip(id) {
this.skipped.add(id);
}
/**
* @param {Number} id
*/
migrate(id) {
this.migrated.add(id);
}
}
/**
* class for migrations.
* @alias module:migrations.Migrator
@ -220,12 +193,15 @@ class Migrator {
this.logger.info('Migration %d in progress...', id);
const batch = this.ldb.batch();
// queue state updates first, so migration can modify the state.
const context = this.createContext(state);
await currentMigration.migrate(batch, context);
// allow migrations to increment next migration
state.nextMigration = Math.max(state.nextMigration, id + 1);
state.inProgress = false;
state.nextMigration = id + 1;
state.inProgressData = EMPTY;
this.writeState(batch, state);
await currentMigration.migrate(batch, this.pending);
await batch.write();
this.pending.migrate(id);
this.logger.info('Migration %d is done.', id);
@ -395,9 +371,72 @@ class Migrator {
assert(data, 'State was corrupted.');
return MigrationState.decode(data);
}
/**
* Create context
* @param {MigrationState} state
* @returns {MigrationContext}
*/
createContext(state) {
return new MigrationContext(this, state, this.pending);
}
}
/**
* Store migration results.
* @alias module:migrations.MigrationResult
*/
class MigrationResult {
constructor() {
/** @type {Set<Number>} */
this.migrated = new Set();
/** @type {Set<Number>} */
this.skipped = new Set();
}
/**
* @param {Number} id
*/
skip(id) {
this.skipped.add(id);
}
/**
* @param {Number} id
*/
migrate(id) {
this.migrated.add(id);
}
}
/**
* Migration Context.
*/
class MigrationContext {
/**
* @param {Migrator} migrator
* @param {MigrationState} state
* @param {MigrationResult} pending
*/
constructor(migrator, state, pending) {
this.migrator = migrator;
this.state = state;
this.pending = pending;
}
async saveState() {
await this.migrator.saveState(this.state);
}
}
exports.Migrator = Migrator;
exports.MigrationResult = MigrationResult;
exports.MigrationContext = MigrationContext;
exports.types = types;
exports.oldLayout = oldLayout;

View file

@ -58,6 +58,7 @@ class MigrationState extends bio.Struct {
this.inProgress = obj.inProgress;
this.nextMigration = obj.nextMigration;
this.skipped = obj.skipped.slice();
this.inProgressData = obj.inProgressData.slice();
return this;
}

View file

@ -26,16 +26,19 @@ const WalletKey = require('./walletkey');
const Path = require('./path');
const MapRecord = require('./records').MapRecord;
const AbstractMigration = require('../migrations/migration');
const migrator = require('../migrations/migrator');
const {
MigrationResult,
MigrationContext,
Migrator,
types,
oldLayout
} = require('../migrations/migrator');
} = migrator;
const layouts = require('./layout');
const wlayout = layouts.wdb;
/** @typedef {import('../migrations/migrator').types} MigrationType */
/** @typedef {migrator.types} MigrationType */
/** @typedef {import('../migrations/state')} MigrationState */
/** @typedef {ReturnType<bdb.DB['batch']>} Batch */
/** @typedef {ReturnType<bdb.DB['bucket']>} Bucket */
/** @typedef {import('./walletdb')} WalletDB */
@ -74,10 +77,11 @@ class MigrateMigrations extends AbstractMigration {
/**
* Actual migration
* @param {Batch} b
* @param {WalletMigrationContext} ctx
* @returns {Promise}
*/
async migrate(b) {
async migrate(b, ctx) {
this.logger.info('Migrating migrations..');
let nextMigration = 1;
@ -87,24 +91,9 @@ class MigrateMigrations extends AbstractMigration {
}
this.db.writeVersion(b, 1);
b.put(
this.layout.newLayout.wdb.M.encode(),
this.encodeMigrationState(nextMigration)
);
}
/**
* @param {Number} nextMigration
* @returns {Buffer}
*/
encodeMigrationState(nextMigration) {
const size = 4 + 1 + 1;
const encoded = Buffer.alloc(size);
encoding.writeVarint(encoded, nextMigration, 4);
return encoded;
ctx.state.version = 0;
ctx.state.nextMigration = nextMigration;
}
static info() {
@ -166,11 +155,11 @@ class MigrateChangeAddress extends AbstractMigration {
/**
* Actual migration
* @param {Batch} b
* @param {WalletMigrationResult} [pending]
* @param {WalletMigrationContext} ctx
* @returns {Promise}
*/
async migrate(b, pending) {
async migrate(b, ctx) {
const wlayout = this.layout.wdb;
const wids = await this.ldb.keys({
gte: wlayout.W.min(),
@ -185,7 +174,7 @@ class MigrateChangeAddress extends AbstractMigration {
}
if (total > 0)
pending.rescan = true;
ctx.pending.rescan = true;
}
/**
@ -520,12 +509,12 @@ class MigrateTXDBBalances extends AbstractMigration {
/**
* Actual migration
* @param {Batch} b
* @param {WalletMigrationResult} [pending]
* @param {WalletMigrationContext} [ctx]
* @returns {Promise}
*/
async migrate(b, pending) {
pending.recalculateTXDB = true;
async migrate(b, ctx) {
ctx.pending.recalculateTXDB = true;
}
static info() {
@ -1331,6 +1320,22 @@ class WalletMigrationResult extends MigrationResult {
}
}
/**
* @alias module:blockchain.WalletMigrationContext
*/
class WalletMigrationContext extends MigrationContext {
/**
* @param {WalletMigrator} migrator
* @param {MigrationState} state
* @param {WalletMigrationResult} pending
*/
constructor(migrator, state, pending) {
super(migrator, state, pending);
}
}
/**
* Wallet Migrator
* @alias module:blockchain.WalletMigrator
@ -1375,6 +1380,15 @@ class WalletMigrator extends Migrator {
return ids;
}
/**
* @param {MigrationState} state
* @returns {WalletMigrationContext}
*/
createContext(state) {
return new WalletMigrationContext(this, state, this.pending);
}
}
/**

View file

@ -12,7 +12,8 @@ const {
const {
DB_FLAG_ERROR,
MockChainDB,
migrationError
migrationError,
mockLayout
} = require('./util/migrations');
const {rimraf, testdir} = require('./util/common');
@ -202,19 +203,17 @@ describe('Migrations', function() {
it('should initialize fresh migration state', async () => {
await rimraf(location);
const db = new MockChainDB(defaultOptions);
const {migrations} = db;
migrations.logger = {
info: () => {},
debug: () => {},
warning: (msg) => {
throw new Error(`Unexpected warning: ${msg}`);
}
};
const db = new MockChainDB({
...defaultOptions,
logger: getLogger({
warning: (msg) => {
throw new Error(`Unexpected warning: ${msg}`);
}
})
});
await db.open();
const state = await migrations.getState();
const state = await getMigrationState(db);
assert.strictEqual(state.inProgress, false);
assert.strictEqual(state.nextMigration, 0);
await db.close();
@ -223,26 +222,21 @@ describe('Migrations', function() {
it('should ignore migration flag on non-existent db', async () => {
await rimraf(location);
let warning = null;
const db = new MockChainDB({
...defaultOptions,
migrations: { 0: MockMigration1 },
migrateFlag: 0
migrateFlag: 0,
logger: getLogger({
warning: (msg) => {
warning = msg;
}
})
});
const {migrations} = db;
let warning = null;
migrations.logger = {
info: () => {},
debug: () => {},
warning: (msg) => {
warning = msg;
}
};
await db.open();
assert.strictEqual(warning, 'Fresh start, ignoring migration flag.');
const state = await migrations.getState();
const state = await getMigrationState(db);
assert.strictEqual(state.inProgress, false);
assert.strictEqual(state.nextMigration, 1);
await db.close();
@ -343,11 +337,8 @@ describe('Migrations', function() {
await db.open();
const lastID = db.migrations.getLastMigrationID();
const state = await db.migrations.getState();
const state = await getMigrationState(db);
assert.strictEqual(state.nextMigration, 2);
assert.strictEqual(lastID, 1);
assert.strictEqual(migrated1, true);
assert.strictEqual(migrated2, true);
await db.close();
@ -399,7 +390,7 @@ describe('Migrations', function() {
// check the state is correct.
await db.db.open();
const state = await db.migrations.getState();
const state = await getMigrationState(db);
assert.strictEqual(state.inProgress, true);
assert.strictEqual(state.nextMigration, 0);
@ -418,7 +409,7 @@ describe('Migrations', function() {
// check the state is correct.
await db.db.open();
const state = await db.migrations.getState();
const state = await getMigrationState(db);
assert.strictEqual(state.inProgress, true);
assert.strictEqual(state.nextMigration, 1);
@ -426,7 +417,7 @@ describe('Migrations', function() {
}
await db.open();
const state = await db.migrations.getState();
const state = await getMigrationState(db);
assert.strictEqual(state.inProgress, false);
assert.strictEqual(state.nextMigration, 2);
assert.strictEqual(migrated1, true);
@ -454,7 +445,7 @@ describe('Migrations', function() {
await db.open();
const state = await db.migrations.getState();
const state = await getMigrationState(db);
assert.strictEqual(state.inProgress, false);
assert.strictEqual(state.nextMigration, 1);
assert.strictEqual(migrate, false);
@ -490,7 +481,7 @@ describe('Migrations', function() {
await db.open();
const state = await db.migrations.getState();
const state = await getMigrationState(db);
assert.strictEqual(state.inProgress, false);
assert.strictEqual(state.nextMigration, 1);
assert.deepStrictEqual(state.skipped, [0]);
@ -501,9 +492,9 @@ describe('Migrations', function() {
await db.close();
db.migrations.migrateFlag = -1;
db.options.migrateFlag = -1;
await db.open();
const state2 = await db.migrations.getState();
const state2 = await getMigrationState(db);
assert.strictEqual(state2.inProgress, false);
assert.strictEqual(state2.nextMigration, 1);
assert.deepStrictEqual(state2.skipped, [0]);
@ -533,6 +524,141 @@ describe('Migrations', function() {
message: 'Unknown migration type.'
});
});
it('should allow nextMigration modification, skip next', async () => {
let migrated1 = false;
let migrated2 = false;
const db = new MockChainDB({
...defaultOptions,
migrateFlag: 1,
migrations: {
0: class extends AbstractMigration {
async check() {
return types.MIGRATE;
}
async migrate(b, ctx) {
migrated1 = true;
ctx.state.nextMigration = 2;
}
},
1: class extends AbstractMigration {
async check() {
return types.MIGRATE;
}
async migrate() {
migrated2 = true;
}
}
}
});
await db.open();
assert.strictEqual(migrated1, true);
assert.strictEqual(migrated2, false);
await db.close();
});
it('should store inProgressData in version 1 and clean up', async () => {
let thrown1 = false;
let migrated1 = false;
let data1 = null;
let thrown3 = false;
let migrated3 = false;
let data3 = null;
let migrated4 = false;
let data4 = null;
const db = new MockChainDB({
...defaultOptions,
migrateFlag: 4,
migrations: {
0: class extends AbstractMigration {
async check() {
return types.MIGRATE;
}
async migrate(b, ctx) {
ctx.state.version = 0;
}
},
1: class extends AbstractMigration {
async check() {
return types.MIGRATE;
}
async migrate(b, ctx) {
if (!thrown1) {
// This wont be saved, state.version = 0.
ctx.state.inProgressData = Buffer.from('data1');
await ctx.saveState();
thrown1 = true;
throw new Error('error1');
}
migrated1 = true;
data1 = ctx.state.inProgressData;
}
},
2: class extends AbstractMigration {
async check() {
return types.MIGRATE;
}
async migrate(b, ctx) {
ctx.state.version = 1;
}
},
3: class extends AbstractMigration {
async check() {
return types.MIGRATE;
}
async migrate(b, ctx) {
if (!thrown3) {
ctx.state.inProgressData = Buffer.from('data3');
await ctx.saveState();
thrown3 = true;
throw new Error('error3');
}
migrated3 = true;
data3 = ctx.state.inProgressData;
}
},
4: class extends AbstractMigration {
async check() {
return types.MIGRATE;
}
async migrate(b, ctx) {
migrated4 = true;
data4 = ctx.state.inProgressData;
}
}
},
logger: getLogger()
});
for (let i = 0; i < 2; i++) {
try {
await db.open();
} catch (e) {
await db.close();
}
}
await db.open();
assert.strictEqual(migrated1, true);
assert.strictEqual(data1.length, 0);
assert.strictEqual(migrated3, true);
assert.bufferEqual(data3, Buffer.from('data3'));
assert.strictEqual(migrated4, true);
assert.strictEqual(data4.length, 0);
await db.close();
});
});
describe('Options', function() {
@ -563,12 +689,35 @@ describe('Migrations', function() {
state1.inProgress = 1;
state1.nextMigration = 3;
state1.skipped = [1, 2];
state1.inProgressData = Buffer.from('data');
const state2 = state1.clone();
assert.notEqual(state2.skipped, state1.skipped);
assert.deepStrictEqual(state2, state1);
});
it('should not encode progress data if version is 0', () => {
const state = new MigrationState();
state.version = 0;
state.inProgressData = Buffer.from('data');
const encoded = state.encode();
const decoded = MigrationState.decode(encoded);
assert.strictEqual(decoded.inProgressData.length, 0);
});
it('should encode progress data if version is not 0', () => {
const state = new MigrationState();
state.version = 1;
state.inProgressData = Buffer.from('data');
const encoded = state.encode();
const decoded = MigrationState.decode(encoded);
assert.bufferEqual(decoded.inProgressData, state.inProgressData);
});
});
describe('AbstractMigration', function() {
@ -630,3 +779,29 @@ describe('Migrations', function() {
}
});
});
const nop = () => {};
function getLogger(opts = {}) {
return {
debug: nop,
info: nop,
warning: nop,
error: nop,
context: () => {
return {
debug: opts.debug || nop,
info: opts.info || nop,
warning: opts.warning || nop,
error: opts.error || nop
};
}
};
}
async function getMigrationState(db) {
const raw = await db.db.get(mockLayout.M.encode());
return MigrationState.decode(raw);
}

View file

@ -58,19 +58,19 @@ class MockChainDB {
this.spv = this.options.spv;
this.prune = this.options.prune;
// This is here for testing purposes.
this.migrations = new MockChainDBMigrator({
...this.options,
db: this,
dbVersion: this.dbVersion
});
}
async open() {
this.logger.debug('Opening mock chaindb.');
await this.db.open();
await this.migrations.migrate();
// This is here for testing purposes.
const migrations = new MockChainDBMigrator({
...this.options,
db: this,
dbVersion: this.dbVersion
});
await migrations.migrate();
await this.db.verify(mockLayout.V.encode(), 'chain', this.dbVersion);
}