wdb-migration: save and recover migration progress for coin selection.
This commit is contained in:
parent
b44affe4b7
commit
cd3dfeb943
3 changed files with 101 additions and 16 deletions
|
|
@ -434,6 +434,14 @@ class MigrationContext {
|
|||
async saveState() {
|
||||
await this.migrator.saveState(this.state);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {Batch} b
|
||||
*/
|
||||
|
||||
writeState(b) {
|
||||
this.migrator.writeState(b, this.state);
|
||||
}
|
||||
}
|
||||
|
||||
exports.Migrator = Migrator;
|
||||
|
|
|
|||
|
|
@ -1369,6 +1369,13 @@ class MigrateCoinSelection extends AbstractMigration {
|
|||
this.layout = MigrateCoinSelection.layout();
|
||||
|
||||
this.UNCONFIRMED_HEIGHT = 0xffffffff;
|
||||
this.batchSize = 5000;
|
||||
this.progress = {
|
||||
wid: 0,
|
||||
account: 0,
|
||||
hash: consensus.ZERO_HASH,
|
||||
index: 0
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -1395,30 +1402,44 @@ class MigrateCoinSelection extends AbstractMigration {
|
|||
parse: key => wlayout.W.decode(key)[0]
|
||||
});
|
||||
|
||||
await this.decodeProgress(ctx.state.inProgressData);
|
||||
|
||||
for (const wid of wids) {
|
||||
await this.migrateWallet(wid);
|
||||
if (wid < this.progress.wid)
|
||||
continue;
|
||||
|
||||
await this.migrateWallet(wid, ctx);
|
||||
}
|
||||
|
||||
await this.db.writeVersion(b, 5);
|
||||
this.db.writeVersion(b, 5);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {Number} wid
|
||||
* @param {WalletMigrationContext} ctx
|
||||
* @returns {Promise}
|
||||
*/
|
||||
|
||||
async migrateWallet(wid) {
|
||||
async migrateWallet(wid, ctx) {
|
||||
const txlayout = this.layout.txdb;
|
||||
const prefix = txlayout.prefix.encode(wid);
|
||||
const bucket = this.ldb.bucket(prefix);
|
||||
|
||||
const min = txlayout.C.encode(
|
||||
this.progress.account,
|
||||
this.progress.hash,
|
||||
this.progress.index
|
||||
);
|
||||
|
||||
const coinsIter = bucket.iterator({
|
||||
gte: txlayout.C.min(),
|
||||
gte: min,
|
||||
lte: txlayout.C.max()
|
||||
});
|
||||
|
||||
let parent = bucket.batch();
|
||||
let total = 0;
|
||||
|
||||
for await (const {key} of coinsIter) {
|
||||
const b = bucket.batch();
|
||||
const [account, hash, index] = txlayout.C.decode(key);
|
||||
const rawCoin = await bucket.get(txlayout.c.encode(hash, index));
|
||||
const coin = Coin.decode(rawCoin);
|
||||
|
|
@ -1428,23 +1449,79 @@ class MigrateCoinSelection extends AbstractMigration {
|
|||
|
||||
if (coin.height === -1) {
|
||||
// index coins by value
|
||||
b.put(txlayout.Su.encode(coin.value, hash, index), null);
|
||||
b.put(txlayout.SU.encode(account, coin.value, hash, index), null);
|
||||
parent.put(txlayout.Su.encode(coin.value, hash, index), null);
|
||||
parent.put(txlayout.SU.encode(account, coin.value, hash, index), null);
|
||||
|
||||
// index coins by height
|
||||
b.put(txlayout.Sh.encode(this.UNCONFIRMED_HEIGHT, hash, index), null);
|
||||
b.put(txlayout.SH.encode(account, this.UNCONFIRMED_HEIGHT, hash, index),
|
||||
parent.put(txlayout.Sh.encode(this.UNCONFIRMED_HEIGHT, hash, index),
|
||||
null);
|
||||
} else {
|
||||
b.put(txlayout.Sv.encode(coin.value, hash, index), null);
|
||||
b.put(txlayout.SV.encode(account, coin.value, hash, index), null);
|
||||
|
||||
b.put(txlayout.Sh.encode(coin.height, hash, index), null);
|
||||
b.put(txlayout.SH.encode(account, coin.height, hash, index), null);
|
||||
parent.put(
|
||||
txlayout.SH.encode(account, this.UNCONFIRMED_HEIGHT, hash, index),
|
||||
null
|
||||
);
|
||||
} else {
|
||||
parent.put(txlayout.Sv.encode(coin.value, hash, index), null);
|
||||
parent.put(txlayout.SV.encode(account, coin.value, hash, index), null);
|
||||
|
||||
parent.put(txlayout.Sh.encode(coin.height, hash, index), null);
|
||||
parent.put(txlayout.SH.encode(account, coin.height, hash, index), null);
|
||||
}
|
||||
|
||||
await b.write();
|
||||
if (++total % this.batchSize === 0) {
|
||||
// save progress
|
||||
this.progress.wid = wid;
|
||||
this.progress.account = account;
|
||||
this.progress.hash = hash;
|
||||
this.progress.index = index;
|
||||
|
||||
ctx.state.inProgressData = this.encodeProgress();
|
||||
ctx.writeState(parent.root());
|
||||
|
||||
await parent.write();
|
||||
parent = bucket.batch();
|
||||
}
|
||||
};
|
||||
|
||||
this.progress.wid = wid + 1;
|
||||
this.progress.account = 0;
|
||||
this.progress.hash = consensus.ZERO_HASH;
|
||||
this.progress.index = 0;
|
||||
ctx.state.inProgressData = this.encodeProgress();
|
||||
ctx.writeState(parent);
|
||||
await parent.write();
|
||||
}
|
||||
|
||||
/**
|
||||
* @returns {Buffer}
|
||||
*/
|
||||
|
||||
encodeProgress() {
|
||||
const bw = bio.write(44);
|
||||
bw.writeU32(this.progress.wid);
|
||||
bw.writeU32(this.progress.account);
|
||||
bw.writeBytes(this.progress.hash);
|
||||
bw.writeU32(this.progress.index);
|
||||
return bw.render();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get migration info.
|
||||
* @param {Buffer} data
|
||||
* @return {Object}
|
||||
*/
|
||||
|
||||
decodeProgress(data) {
|
||||
if (data.length === 0)
|
||||
return;
|
||||
|
||||
assert(data.length === 44);
|
||||
|
||||
const br = bio.read(data);
|
||||
this.progress.wid = br.readU32();
|
||||
this.progress.account = br.readU32();
|
||||
this.progress.hash = br.readBytes(32);
|
||||
this.progress.index = br.readU32();
|
||||
}
|
||||
|
||||
static info() {
|
||||
|
|
|
|||
|
|
@ -1158,7 +1158,7 @@ describe('Wallet Migrations', function() {
|
|||
// check that we have not created extra entries in the db
|
||||
// that is not present in the data dump.
|
||||
await checkExactEntries(ldb, data.prefixes, {
|
||||
after: { ...data.after },
|
||||
after: data.after,
|
||||
throw: true
|
||||
});
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue