From d588f8f8d09e09a7000f825e673a81b29731350a Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 21 Feb 2026 21:00:57 +0000 Subject: [PATCH] feat(chain): add P2P sync state machine P2PSync() runs the REQUEST_CHAIN / REQUEST_GET_OBJECTS loop against a P2PConnection interface. Reuses processBlockBlobs() for shared validation logic. Co-Authored-By: Charon --- chain/p2psync.go | 115 ++++++++++++++++++++++++++++++++++ chain/p2psync_test.go | 140 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 255 insertions(+) create mode 100644 chain/p2psync.go create mode 100644 chain/p2psync_test.go diff --git a/chain/p2psync.go b/chain/p2psync.go new file mode 100644 index 0000000..a6cbde4 --- /dev/null +++ b/chain/p2psync.go @@ -0,0 +1,115 @@ +// Copyright (c) 2017-2026 Lethean (https://lt.hn) +// +// Licensed under the European Union Public Licence (EUPL) version 1.2. +// SPDX-License-Identifier: EUPL-1.2 + +package chain + +import ( + "context" + "fmt" + "log" +) + +// P2PConnection abstracts the P2P communication needed for block sync. +type P2PConnection interface { + // PeerHeight returns the peer's advertised chain height. + PeerHeight() uint64 + + // RequestChain sends NOTIFY_REQUEST_CHAIN and returns the response. + RequestChain(blockIDs [][]byte) (startHeight uint64, hashes [][]byte, err error) + + // RequestObjects sends NOTIFY_REQUEST_GET_OBJECTS and returns block blobs. + RequestObjects(blockHashes [][]byte) ([]BlockBlobEntry, error) +} + +// BlockBlobEntry holds raw block and transaction blobs from a peer. +type BlockBlobEntry struct { + Block []byte + Txs [][]byte +} + +const p2pBatchSize = 200 + +// P2PSync synchronises the chain from a P2P peer. It runs the +// REQUEST_CHAIN / REQUEST_GET_OBJECTS protocol loop until the local +// chain reaches the peer's height. +func (c *Chain) P2PSync(ctx context.Context, conn P2PConnection, opts SyncOptions) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + localHeight, err := c.Height() + if err != nil { + return fmt.Errorf("p2p sync: get height: %w", err) + } + + peerHeight := conn.PeerHeight() + if localHeight >= peerHeight { + return nil // synced + } + + // Build sparse chain history. + history, err := c.SparseChainHistory() + if err != nil { + return fmt.Errorf("p2p sync: build history: %w", err) + } + + // Convert Hash to []byte for P2P. + historyBytes := make([][]byte, len(history)) + for i, h := range history { + b := make([]byte, 32) + copy(b, h[:]) + historyBytes[i] = b + } + + // Request chain entry. + startHeight, blockIDs, err := conn.RequestChain(historyBytes) + if err != nil { + return fmt.Errorf("p2p sync: request chain: %w", err) + } + + if len(blockIDs) == 0 { + return nil // nothing to sync + } + + log.Printf("p2p sync: chain entry from height %d, %d block IDs", startHeight, len(blockIDs)) + + // Fetch blocks in batches. + for i := 0; i < len(blockIDs); i += p2pBatchSize { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + end := i + p2pBatchSize + if end > len(blockIDs) { + end = len(blockIDs) + } + batch := blockIDs[i:end] + + entries, err := conn.RequestObjects(batch) + if err != nil { + return fmt.Errorf("p2p sync: request objects: %w", err) + } + + currentHeight := startHeight + uint64(i) + for j, entry := range entries { + blockHeight := currentHeight + uint64(j) + if blockHeight > 0 && blockHeight%100 == 0 { + log.Printf("p2p sync: processing block %d", blockHeight) + } + + // P2P path: difficulty=0 (TODO: compute from LWMA) + if err := c.processBlockBlobs(entry.Block, entry.Txs, + blockHeight, 0, opts); err != nil { + return fmt.Errorf("p2p sync: process block %d: %w", blockHeight, err) + } + } + } + } +} diff --git a/chain/p2psync_test.go b/chain/p2psync_test.go new file mode 100644 index 0000000..e16a053 --- /dev/null +++ b/chain/p2psync_test.go @@ -0,0 +1,140 @@ +// Copyright (c) 2017-2026 Lethean (https://lt.hn) +// +// Licensed under the European Union Public Licence (EUPL) version 1.2. +// SPDX-License-Identifier: EUPL-1.2 + +package chain + +import ( + "context" + "fmt" + "testing" + + store "forge.lthn.ai/core/go-store" + "forge.lthn.ai/core/go-blockchain/config" + "github.com/stretchr/testify/require" +) + +// mockP2PConn implements P2PConnection for testing. +type mockP2PConn struct { + peerHeight uint64 + // blocks maps hash -> (blockBlob, txBlobs) + blocks map[string]struct { + blockBlob []byte + txBlobs [][]byte + } + chainHashes [][]byte + + // requestChainErr, if set, is returned by RequestChain. + requestChainErr error + // requestObjectsErr, if set, is returned by RequestObjects. + requestObjectsErr error +} + +func (m *mockP2PConn) PeerHeight() uint64 { return m.peerHeight } + +func (m *mockP2PConn) RequestChain(blockIDs [][]byte) (startHeight uint64, hashes [][]byte, err error) { + if m.requestChainErr != nil { + return 0, nil, m.requestChainErr + } + return 0, m.chainHashes, nil +} + +func (m *mockP2PConn) RequestObjects(blockHashes [][]byte) ([]BlockBlobEntry, error) { + if m.requestObjectsErr != nil { + return nil, m.requestObjectsErr + } + var entries []BlockBlobEntry + for _, h := range blockHashes { + key := string(h) + if blk, ok := m.blocks[key]; ok { + entries = append(entries, BlockBlobEntry{ + Block: blk.blockBlob, + Txs: blk.txBlobs, + }) + } + } + return entries, nil +} + +func TestP2PSync_EmptyChain(t *testing.T) { + // Test that P2PSync with a mock that has no blocks is a no-op. + s, err := store.New(":memory:") + require.NoError(t, err) + defer s.Close() + + c := New(s) + mock := &mockP2PConn{peerHeight: 0} + + opts := SyncOptions{Forks: config.TestnetForks} + err = c.P2PSync(context.Background(), mock, opts) + require.NoError(t, err) +} + +func TestP2PSync_ContextCancellation(t *testing.T) { + s, err := store.New(":memory:") + require.NoError(t, err) + defer s.Close() + + c := New(s) + mock := &mockP2PConn{peerHeight: 100} // claims 100 blocks but returns none + + ctx, cancel := context.WithCancel(context.Background()) + cancel() // cancel immediately + + opts := SyncOptions{Forks: config.TestnetForks} + err = c.P2PSync(ctx, mock, opts) + require.ErrorIs(t, err, context.Canceled) +} + +func TestP2PSync_NoBlockIDs(t *testing.T) { + // Peer claims a height but returns no block IDs from RequestChain. + s, err := store.New(":memory:") + require.NoError(t, err) + defer s.Close() + + c := New(s) + mock := &mockP2PConn{ + peerHeight: 10, + chainHashes: nil, // empty response + } + + opts := SyncOptions{Forks: config.TestnetForks} + err = c.P2PSync(context.Background(), mock, opts) + require.NoError(t, err) +} + +func TestP2PSync_RequestChainError(t *testing.T) { + s, err := store.New(":memory:") + require.NoError(t, err) + defer s.Close() + + c := New(s) + mock := &mockP2PConn{ + peerHeight: 10, + requestChainErr: fmt.Errorf("connection reset"), + } + + opts := SyncOptions{Forks: config.TestnetForks} + err = c.P2PSync(context.Background(), mock, opts) + require.Error(t, err) + require.Contains(t, err.Error(), "request chain") +} + +func TestP2PSync_RequestObjectsError(t *testing.T) { + s, err := store.New(":memory:") + require.NoError(t, err) + defer s.Close() + + c := New(s) + mock := &mockP2PConn{ + peerHeight: 10, + chainHashes: [][]byte{{0x01}}, + requestObjectsErr: fmt.Errorf("timeout"), + } + + opts := SyncOptions{Forks: config.TestnetForks} + err = c.P2PSync(context.Background(), mock, opts) + require.Error(t, err) + require.Contains(t, err.Error(), "request objects") +}