node: add fullLock option to the interactive rescan.

Interactive rescan by default does per block scan lock. This enables
parallel rescans, as well as chain sync while rescan is in progress. But
in specific cases, it may be more beneficial to stop the node from
syncing while the rescan is in progress.
This commit is contained in:
Nodari Chkuaselidze 2023-12-27 15:39:00 +04:00
parent dcafb2046b
commit 8affe9570d
No known key found for this signature in database
GPG key ID: B018A7BB437D1F05
6 changed files with 138 additions and 19 deletions

View file

@ -31,7 +31,7 @@ process and allows parallel rescans.
- `compactInterval` - what is the current compaction interval config.
- `nextCompaction` - when will the next compaction trigger after restart.
- `lastCompaction` - when was the last compaction run.
- Introduce `scan interactive` hook (start, filter)
- Introduce `scan interactive` hook (start, filter, fullLock)
### Node HTTP Client:
- Introduce `scanInteractive` method that starts interactive rescan.

View file

@ -2273,9 +2273,39 @@ class Chain extends AsyncEmitter {
* @param {BloomFilter} filter - Starting bloom filter containing tx,
* address and name hashes.
* @param {Function} iter - Iterator.
* @param {Boolean} [fullLock=false]
* @returns {Promise}
*/
async scanInteractive(start, filter, iter) {
async scanInteractive(start, filter, iter, fullLock = false) {
if (fullLock) {
const unlock = await this.locker.lock();
try {
// We lock the whole chain, no longer lock per block scan.
return await this._scanInteractive(start, filter, iter, false);
} catch (e) {
this.logger.debug('Scan(interactive) errored. Error: %s', e.message);
throw e;
} finally {
unlock();
}
}
return this._scanInteractive(start, filter, iter, true);
}
/**
* Interactive scan the blockchain for transactions containing specified
* address hashes. Allows repeat and abort.
* @param {Hash|Number} start - Block hash or height to start at.
* @param {BloomFilter} filter - Starting bloom filter containing tx,
* address and name hashes.
* @param {Function} iter - Iterator.
* @param {Boolean} [lockPerScan=true] - if we should lock per block scan.
* @returns {Promise}
*/
async _scanInteractive(start, filter, iter, lockPerScan = true) {
if (start == null)
start = this.network.genesis.hash;
@ -2287,7 +2317,10 @@ class Chain extends AsyncEmitter {
let hash = start;
while (hash != null) {
const unlock = await this.locker.lock();
let unlock;
if (lockPerScan)
unlock = await this.locker.lock();
try {
const {entry, txs} = await this.db.scanBlock(hash, filter);
@ -2333,7 +2366,8 @@ class Chain extends AsyncEmitter {
this.logger.debug('Scan(interactive) errored. Error: %s', e.message);
throw e;
} finally {
unlock();
if (lockPerScan)
unlock();
}
}
}

View file

@ -370,16 +370,17 @@ class NodeClient extends Client {
* Rescan for any missed transactions. (Interactive)
* @param {Number|Hash} start - Start block.
* @param {BloomFilter} [filter]
* @param {Boolean} [fullLock=false]
* @returns {Promise}
*/
rescanInteractive(start, filter = null) {
rescanInteractive(start, filter = null, fullLock = false) {
if (start == null)
start = 0;
assert(typeof start === 'number' || Buffer.isBuffer(start));
return this.call('rescan interactive', start, filter);
return this.call('rescan interactive', start, filter, fullLock);
}
}

View file

@ -369,11 +369,13 @@ class FullNode extends Node {
* @param {Number|Hash} start - Start block.
* @param {BloomFilter} filter
* @param {Function} iter - Iterator.
* @param {Boolean} [fullLock=false] - lock the whole chain instead of per
* scan.
* @returns {Promise}
*/
scanInteractive(start, filter, iter) {
return this.chain.scanInteractive(start, filter, iter);
scanInteractive(start, filter, iter, fullLock = false) {
return this.chain.scanInteractive(start, filter, iter, fullLock);
}
/**

View file

@ -712,6 +712,7 @@ class HTTP extends Server {
const valid = new Validator(args);
const start = valid.uintbhash(0);
const rawFilter = valid.buf(1);
const fullLock = valid.bool(2, false);
let filter = socket.filter;
if (start == null)
@ -720,7 +721,7 @@ class HTTP extends Server {
if (rawFilter)
filter = BloomFilter.fromRaw(rawFilter);
return this.scanInteractive(socket, start, filter);
return this.scanInteractive(socket, start, filter, fullLock);
});
}
@ -859,10 +860,11 @@ class HTTP extends Server {
* @param {WebSocket} socket
* @param {Hash} start
* @param {BloomFilter} filter
* @param {Boolean} [fullLock=false]
* @returns {Promise}
*/
async scanInteractive(socket, start, filter) {
async scanInteractive(socket, start, filter, fullLock = false) {
const iter = async (entry, txs) => {
const block = entry.encode();
const raw = [];
@ -921,7 +923,7 @@ class HTTP extends Server {
};
try {
await this.node.scanInteractive(start, filter, iter);
await this.node.scanInteractive(start, filter, iter, fullLock);
} catch (err) {
return socket.call('block rescan interactive abort', err.message);
}

View file

@ -399,20 +399,57 @@ describe('Node Rescan Interactive API', function() {
node.scanInteractive(startHeight, null, getIter(counter2))
]);
assert.strictEqual(counter1.count, 10);
assert.strictEqual(counter2.count, 10);
assert.strictEqual(counter1.count, RESCAN_DEPTH);
assert.strictEqual(counter2.count, RESCAN_DEPTH);
// Chain gets locked per block, so we should see alternating events.
// Chain gets locked per block by default, so we should see alternating events.
// Because they start in parallel, but id1 starts first they will be
// getting events in alternating older (first one gets lock, second waits,
// second gets lock, first waits, etc.)
for (let i = 0; i < 10; i++) {
for (let i = 0; i < RESCAN_DEPTH; i++) {
assert.strictEqual(events[i].id, 1);
assert.strictEqual(events[i + 1].id, 2);
i++;
}
});
it('should rescan in series', async () => {
const {node} = nodeCtx;
const startHeight = nodeCtx.height - RESCAN_DEPTH + 1;
const events = [];
const getIter = (counterObj) => {
return async (entry, txs) => {
assert.strictEqual(entry.height, startHeight + counterObj.count);
assert.strictEqual(txs.length, 4);
events.push({ ...counterObj });
counterObj.count++;
return {
type: scanActions.NEXT
};
};
};
const counter1 = { id: 1, count: 0 };
const counter2 = { id: 2, count: 0 };
await Promise.all([
node.scanInteractive(startHeight, null, getIter(counter1), true),
node.scanInteractive(startHeight, null, getIter(counter2), true)
]);
assert.strictEqual(counter1.count, RESCAN_DEPTH);
assert.strictEqual(counter2.count, RESCAN_DEPTH);
// We lock the whole chain for this test, so we should see events
// from one to other.
for (let i = 0; i < RESCAN_DEPTH; i++) {
assert.strictEqual(events[i].id, 1);
assert.strictEqual(events[i + RESCAN_DEPTH].id, 2);
}
});
describe('HTTP', function() {
let client = null;
@ -456,7 +493,7 @@ describe('Node Rescan Interactive API', function() {
filter = test.filter.encode();
await client.rescanInteractive(startHeight, filter);
assert.strictEqual(count, 10);
assert.strictEqual(count, RESCAN_DEPTH);
count = 0;
if (test.filter)
@ -757,20 +794,63 @@ describe('Node Rescan Interactive API', function() {
client2.rescanInteractive(startHeight)
]);
assert.strictEqual(counter1.count, 10);
assert.strictEqual(counter2.count, 10);
assert.strictEqual(counter1.count, RESCAN_DEPTH);
assert.strictEqual(counter2.count, RESCAN_DEPTH);
// Chain gets locked per block, so we should see alternating events.
// Because they start in parallel, but id1 starts first they will be
// getting events in alternating older (first one gets lock, second waits,
// second gets lock, first waits, etc.)
for (let i = 0; i < 10; i++) {
for (let i = 0; i < RESCAN_DEPTH; i++) {
assert.strictEqual(events[i].id, 1);
assert.strictEqual(events[i + 1].id, 2);
i++;
}
});
it('should rescan in series', async () => {
const client2 = nodeCtx.nodeClient();
await client2.open();
const startHeight = nodeCtx.height - RESCAN_DEPTH + 1;
const events = [];
const counter1 = { id: 1, count: 0 };
const counter2 = { id: 2, count: 0 };
const getIter = (counterObj) => {
return async (rawEntry, rawTXs) => {
const [entry, txs] = parseBlock(rawEntry, rawTXs);
assert.strictEqual(entry.height, startHeight + counterObj.count);
assert.strictEqual(txs.length, 4);
events.push({ ...counterObj });
counterObj.count++;
return {
type: scanActions.NEXT
};
};
};
client.hook('block rescan interactive', getIter(counter1));
client2.hook('block rescan interactive', getIter(counter2));
await Promise.all([
client.rescanInteractive(startHeight, null, true),
client2.rescanInteractive(startHeight, null, true)
]);
assert.strictEqual(counter1.count, RESCAN_DEPTH);
assert.strictEqual(counter2.count, RESCAN_DEPTH);
// We lock the whole chain for this test, so we should see events
// from one to other.
for (let i = 0; i < RESCAN_DEPTH; i++) {
assert.strictEqual(events[i].id, 1);
assert.strictEqual(events[i + RESCAN_DEPTH].id, 2);
}
});
// Make sure the client closing does not cause the chain locker to get
// indefinitely locked. (https://github.com/bcoin-org/bsock/pull/11)
it('should stop rescan when client closes', async () => {