Replace all fmt.Errorf and errors.New in production code with
coreerr.E("Caller.Method", "message", err) from go-log. Replace
os.MkdirAll with coreio.Local.EnsureDir from go-io. Sentinel errors
(consensus/errors.go, wire/varint.go) intentionally kept as errors.New
for errors.Is compatibility.
270 error call sites converted across 38 files. Test files untouched.
crypto/ directory (CGO) untouched.
Co-Authored-By: Virgil <virgil@lethean.io>
133 lines
3.5 KiB
Go
133 lines
3.5 KiB
Go
// Copyright (c) 2017-2026 Lethean (https://lt.hn)
|
|
//
|
|
// Licensed under the European Union Public Licence (EUPL) version 1.2.
|
|
// SPDX-License-Identifier: EUPL-1.2
|
|
|
|
package chain
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
|
|
coreerr "forge.lthn.ai/core/go-log"
|
|
)
|
|
|
|
// P2PConnection abstracts the P2P communication needed for block sync.
|
|
type P2PConnection interface {
|
|
// PeerHeight returns the peer's advertised chain height.
|
|
PeerHeight() uint64
|
|
|
|
// RequestChain sends NOTIFY_REQUEST_CHAIN and returns the response.
|
|
RequestChain(blockIDs [][]byte) (startHeight uint64, hashes [][]byte, err error)
|
|
|
|
// RequestObjects sends NOTIFY_REQUEST_GET_OBJECTS and returns block blobs.
|
|
RequestObjects(blockHashes [][]byte) ([]BlockBlobEntry, error)
|
|
}
|
|
|
|
// BlockBlobEntry holds raw block and transaction blobs from a peer.
|
|
type BlockBlobEntry struct {
|
|
Block []byte
|
|
Txs [][]byte
|
|
}
|
|
|
|
const p2pBatchSize = 200
|
|
|
|
// P2PSync synchronises the chain from a P2P peer. It runs the
|
|
// REQUEST_CHAIN / REQUEST_GET_OBJECTS protocol loop until the local
|
|
// chain reaches the peer's height.
|
|
func (c *Chain) P2PSync(ctx context.Context, conn P2PConnection, opts SyncOptions) error {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
localHeight, err := c.Height()
|
|
if err != nil {
|
|
return coreerr.E("Chain.P2PSync", "p2p sync: get height", err)
|
|
}
|
|
|
|
peerHeight := conn.PeerHeight()
|
|
if localHeight >= peerHeight {
|
|
return nil // synced
|
|
}
|
|
|
|
// Build sparse chain history.
|
|
history, err := c.SparseChainHistory()
|
|
if err != nil {
|
|
return coreerr.E("Chain.P2PSync", "p2p sync: build history", err)
|
|
}
|
|
|
|
// Convert Hash to []byte for P2P.
|
|
historyBytes := make([][]byte, len(history))
|
|
for i, h := range history {
|
|
b := make([]byte, 32)
|
|
copy(b, h[:])
|
|
historyBytes[i] = b
|
|
}
|
|
|
|
// Request chain entry.
|
|
startHeight, blockIDs, err := conn.RequestChain(historyBytes)
|
|
if err != nil {
|
|
return coreerr.E("Chain.P2PSync", "p2p sync: request chain", err)
|
|
}
|
|
|
|
if len(blockIDs) == 0 {
|
|
return nil // nothing to sync
|
|
}
|
|
|
|
log.Printf("p2p sync: chain entry from height %d, %d block IDs", startHeight, len(blockIDs))
|
|
|
|
// The daemon returns the fork-point block as the first entry.
|
|
// Skip blocks we already have.
|
|
skip := 0
|
|
if startHeight < localHeight {
|
|
skip = int(localHeight - startHeight)
|
|
if skip >= len(blockIDs) {
|
|
continue // all IDs are blocks we already have
|
|
}
|
|
}
|
|
fetchIDs := blockIDs[skip:]
|
|
fetchStart := startHeight + uint64(skip)
|
|
|
|
// Fetch blocks in batches.
|
|
for i := 0; i < len(fetchIDs); i += p2pBatchSize {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
end := i + p2pBatchSize
|
|
if end > len(fetchIDs) {
|
|
end = len(fetchIDs)
|
|
}
|
|
batch := fetchIDs[i:end]
|
|
|
|
entries, err := conn.RequestObjects(batch)
|
|
if err != nil {
|
|
return coreerr.E("Chain.P2PSync", "p2p sync: request objects", err)
|
|
}
|
|
|
|
currentHeight := fetchStart + uint64(i)
|
|
for j, entry := range entries {
|
|
blockHeight := currentHeight + uint64(j)
|
|
if blockHeight > 0 && blockHeight%100 == 0 {
|
|
log.Printf("p2p sync: processing block %d", blockHeight)
|
|
}
|
|
|
|
blockDiff, err := c.NextDifficulty(blockHeight, opts.Forks)
|
|
if err != nil {
|
|
return coreerr.E("Chain.P2PSync", fmt.Sprintf("p2p sync: compute difficulty for block %d", blockHeight), err)
|
|
}
|
|
|
|
if err := c.processBlockBlobs(entry.Block, entry.Txs,
|
|
blockHeight, blockDiff, opts); err != nil {
|
|
return coreerr.E("Chain.P2PSync", fmt.Sprintf("p2p sync: process block %d", blockHeight), err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|