feat(chain): add P2P sync state machine

P2PSync() runs the REQUEST_CHAIN / REQUEST_GET_OBJECTS loop against
a P2PConnection interface. Reuses processBlockBlobs() for shared
validation logic.

Co-Authored-By: Charon <charon@lethean.io>
This commit is contained in:
Claude 2026-02-21 21:00:57 +00:00
parent 3fe3b558d7
commit d588f8f8d0
No known key found for this signature in database
GPG key ID: AF404715446AEB41
2 changed files with 255 additions and 0 deletions

115
chain/p2psync.go Normal file
View file

@ -0,0 +1,115 @@
// Copyright (c) 2017-2026 Lethean (https://lt.hn)
//
// Licensed under the European Union Public Licence (EUPL) version 1.2.
// SPDX-License-Identifier: EUPL-1.2
package chain
import (
"context"
"fmt"
"log"
)
// P2PConnection abstracts the P2P communication needed for block sync.
type P2PConnection interface {
// PeerHeight returns the peer's advertised chain height.
PeerHeight() uint64
// RequestChain sends NOTIFY_REQUEST_CHAIN and returns the response.
RequestChain(blockIDs [][]byte) (startHeight uint64, hashes [][]byte, err error)
// RequestObjects sends NOTIFY_REQUEST_GET_OBJECTS and returns block blobs.
RequestObjects(blockHashes [][]byte) ([]BlockBlobEntry, error)
}
// BlockBlobEntry holds raw block and transaction blobs from a peer.
type BlockBlobEntry struct {
Block []byte
Txs [][]byte
}
const p2pBatchSize = 200
// P2PSync synchronises the chain from a P2P peer. It runs the
// REQUEST_CHAIN / REQUEST_GET_OBJECTS protocol loop until the local
// chain reaches the peer's height.
func (c *Chain) P2PSync(ctx context.Context, conn P2PConnection, opts SyncOptions) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
localHeight, err := c.Height()
if err != nil {
return fmt.Errorf("p2p sync: get height: %w", err)
}
peerHeight := conn.PeerHeight()
if localHeight >= peerHeight {
return nil // synced
}
// Build sparse chain history.
history, err := c.SparseChainHistory()
if err != nil {
return fmt.Errorf("p2p sync: build history: %w", err)
}
// Convert Hash to []byte for P2P.
historyBytes := make([][]byte, len(history))
for i, h := range history {
b := make([]byte, 32)
copy(b, h[:])
historyBytes[i] = b
}
// Request chain entry.
startHeight, blockIDs, err := conn.RequestChain(historyBytes)
if err != nil {
return fmt.Errorf("p2p sync: request chain: %w", err)
}
if len(blockIDs) == 0 {
return nil // nothing to sync
}
log.Printf("p2p sync: chain entry from height %d, %d block IDs", startHeight, len(blockIDs))
// Fetch blocks in batches.
for i := 0; i < len(blockIDs); i += p2pBatchSize {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
end := i + p2pBatchSize
if end > len(blockIDs) {
end = len(blockIDs)
}
batch := blockIDs[i:end]
entries, err := conn.RequestObjects(batch)
if err != nil {
return fmt.Errorf("p2p sync: request objects: %w", err)
}
currentHeight := startHeight + uint64(i)
for j, entry := range entries {
blockHeight := currentHeight + uint64(j)
if blockHeight > 0 && blockHeight%100 == 0 {
log.Printf("p2p sync: processing block %d", blockHeight)
}
// P2P path: difficulty=0 (TODO: compute from LWMA)
if err := c.processBlockBlobs(entry.Block, entry.Txs,
blockHeight, 0, opts); err != nil {
return fmt.Errorf("p2p sync: process block %d: %w", blockHeight, err)
}
}
}
}
}

140
chain/p2psync_test.go Normal file
View file

@ -0,0 +1,140 @@
// Copyright (c) 2017-2026 Lethean (https://lt.hn)
//
// Licensed under the European Union Public Licence (EUPL) version 1.2.
// SPDX-License-Identifier: EUPL-1.2
package chain
import (
"context"
"fmt"
"testing"
store "forge.lthn.ai/core/go-store"
"forge.lthn.ai/core/go-blockchain/config"
"github.com/stretchr/testify/require"
)
// mockP2PConn implements P2PConnection for testing.
type mockP2PConn struct {
peerHeight uint64
// blocks maps hash -> (blockBlob, txBlobs)
blocks map[string]struct {
blockBlob []byte
txBlobs [][]byte
}
chainHashes [][]byte
// requestChainErr, if set, is returned by RequestChain.
requestChainErr error
// requestObjectsErr, if set, is returned by RequestObjects.
requestObjectsErr error
}
func (m *mockP2PConn) PeerHeight() uint64 { return m.peerHeight }
func (m *mockP2PConn) RequestChain(blockIDs [][]byte) (startHeight uint64, hashes [][]byte, err error) {
if m.requestChainErr != nil {
return 0, nil, m.requestChainErr
}
return 0, m.chainHashes, nil
}
func (m *mockP2PConn) RequestObjects(blockHashes [][]byte) ([]BlockBlobEntry, error) {
if m.requestObjectsErr != nil {
return nil, m.requestObjectsErr
}
var entries []BlockBlobEntry
for _, h := range blockHashes {
key := string(h)
if blk, ok := m.blocks[key]; ok {
entries = append(entries, BlockBlobEntry{
Block: blk.blockBlob,
Txs: blk.txBlobs,
})
}
}
return entries, nil
}
func TestP2PSync_EmptyChain(t *testing.T) {
// Test that P2PSync with a mock that has no blocks is a no-op.
s, err := store.New(":memory:")
require.NoError(t, err)
defer s.Close()
c := New(s)
mock := &mockP2PConn{peerHeight: 0}
opts := SyncOptions{Forks: config.TestnetForks}
err = c.P2PSync(context.Background(), mock, opts)
require.NoError(t, err)
}
func TestP2PSync_ContextCancellation(t *testing.T) {
s, err := store.New(":memory:")
require.NoError(t, err)
defer s.Close()
c := New(s)
mock := &mockP2PConn{peerHeight: 100} // claims 100 blocks but returns none
ctx, cancel := context.WithCancel(context.Background())
cancel() // cancel immediately
opts := SyncOptions{Forks: config.TestnetForks}
err = c.P2PSync(ctx, mock, opts)
require.ErrorIs(t, err, context.Canceled)
}
func TestP2PSync_NoBlockIDs(t *testing.T) {
// Peer claims a height but returns no block IDs from RequestChain.
s, err := store.New(":memory:")
require.NoError(t, err)
defer s.Close()
c := New(s)
mock := &mockP2PConn{
peerHeight: 10,
chainHashes: nil, // empty response
}
opts := SyncOptions{Forks: config.TestnetForks}
err = c.P2PSync(context.Background(), mock, opts)
require.NoError(t, err)
}
func TestP2PSync_RequestChainError(t *testing.T) {
s, err := store.New(":memory:")
require.NoError(t, err)
defer s.Close()
c := New(s)
mock := &mockP2PConn{
peerHeight: 10,
requestChainErr: fmt.Errorf("connection reset"),
}
opts := SyncOptions{Forks: config.TestnetForks}
err = c.P2PSync(context.Background(), mock, opts)
require.Error(t, err)
require.Contains(t, err.Error(), "request chain")
}
func TestP2PSync_RequestObjectsError(t *testing.T) {
s, err := store.New(":memory:")
require.NoError(t, err)
defer s.Close()
c := New(s)
mock := &mockP2PConn{
peerHeight: 10,
chainHashes: [][]byte{{0x01}},
requestObjectsErr: fmt.Errorf("timeout"),
}
opts := SyncOptions{Forks: config.TestnetForks}
err = c.P2PSync(context.Background(), mock, opts)
require.Error(t, err)
require.Contains(t, err.Error(), "request objects")
}