From a74ac2e2c7165b204ca68684f4901ad278962c1d Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 21 Feb 2026 21:32:05 +0000 Subject: [PATCH] feat(chain): add Levin P2P connection adapter and sync integration test LevinP2PConn wraps a levin.Connection to implement the P2PConnection interface, handling timed_sync keep-alive responses automatically. Integration test syncs the full testnet chain (2577 blocks) via P2P in under 5 seconds. Co-Authored-By: Charon --- chain/integration_test.go | 76 +++++++++++++++++++++++++ chain/levinconn.go | 117 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 193 insertions(+) create mode 100644 chain/levinconn.go diff --git a/chain/integration_test.go b/chain/integration_test.go index 02c6353..70a4821 100644 --- a/chain/integration_test.go +++ b/chain/integration_test.go @@ -9,6 +9,9 @@ package chain import ( "context" + "crypto/rand" + "encoding/binary" + "net" "net/http" "testing" "time" @@ -16,8 +19,10 @@ import ( "github.com/stretchr/testify/require" "forge.lthn.ai/core/go-blockchain/config" + "forge.lthn.ai/core/go-blockchain/p2p" "forge.lthn.ai/core/go-blockchain/rpc" "forge.lthn.ai/core/go-blockchain/types" + levin "forge.lthn.ai/core/go-p2p/node/levin" store "forge.lthn.ai/core/go-store" ) @@ -162,3 +167,74 @@ func TestIntegration_SyncWithSignatures(t *testing.T) { t.Logf("synced %d blocks with signature verification", finalHeight) require.Equal(t, remoteHeight, finalHeight) } + +func TestIntegration_P2PSync(t *testing.T) { + if testing.Short() { + t.Skip("skipping P2P sync test in short mode") + } + + // Dial testnet daemon P2P port. + conn, err := net.DialTimeout("tcp", "localhost:46942", 10*time.Second) + if err != nil { + t.Skipf("testnet P2P not reachable: %v", err) + } + defer conn.Close() + + lc := levin.NewConnection(conn) + + // Handshake. + var peerIDBuf [8]byte + rand.Read(peerIDBuf[:]) + peerID := binary.LittleEndian.Uint64(peerIDBuf[:]) + + req := p2p.HandshakeRequest{ + NodeData: p2p.NodeData{ + NetworkID: config.NetworkIDTestnet, + PeerID: peerID, + LocalTime: time.Now().Unix(), + MyPort: 0, + }, + PayloadData: p2p.CoreSyncData{ + CurrentHeight: 1, + ClientVersion: config.ClientVersion, + NonPruningMode: true, + }, + } + payload, err := p2p.EncodeHandshakeRequest(&req) + require.NoError(t, err) + require.NoError(t, lc.WritePacket(p2p.CommandHandshake, payload, true)) + + hdr, data, err := lc.ReadPacket() + require.NoError(t, err) + require.Equal(t, uint32(p2p.CommandHandshake), hdr.Command) + + var resp p2p.HandshakeResponse + require.NoError(t, resp.Decode(data)) + t.Logf("peer height: %d", resp.PayloadData.CurrentHeight) + + // Create P2P connection adapter with our local sync state. + localSync := p2p.CoreSyncData{ + CurrentHeight: 1, + ClientVersion: config.ClientVersion, + NonPruningMode: true, + } + p2pConn := NewLevinP2PConn(lc, resp.PayloadData.CurrentHeight, localSync) + + // Create chain and sync. + s, err := store.New(":memory:") + require.NoError(t, err) + defer s.Close() + + c := New(s) + opts := SyncOptions{ + VerifySignatures: false, + Forks: config.TestnetForks, + } + + err = c.P2PSync(context.Background(), p2pConn, opts) + require.NoError(t, err) + + finalHeight, _ := c.Height() + t.Logf("P2P synced %d blocks", finalHeight) + require.Equal(t, resp.PayloadData.CurrentHeight, finalHeight) +} diff --git a/chain/levinconn.go b/chain/levinconn.go new file mode 100644 index 0000000..c6b0e6d --- /dev/null +++ b/chain/levinconn.go @@ -0,0 +1,117 @@ +// Copyright (c) 2017-2026 Lethean (https://lt.hn) +// +// Licensed under the European Union Public Licence (EUPL) version 1.2. +// SPDX-License-Identifier: EUPL-1.2 + +package chain + +import ( + "fmt" + "log" + + "forge.lthn.ai/core/go-blockchain/p2p" + levinpkg "forge.lthn.ai/core/go-p2p/node/levin" +) + +// LevinP2PConn adapts a Levin connection to the P2PConnection interface. +type LevinP2PConn struct { + conn *levinpkg.Connection + peerHeight uint64 + localSync p2p.CoreSyncData +} + +// NewLevinP2PConn wraps a Levin connection for P2P sync. +// peerHeight is obtained from the handshake CoreSyncData. +// localSync is our local sync state sent in timed_sync responses. +func NewLevinP2PConn(conn *levinpkg.Connection, peerHeight uint64, localSync p2p.CoreSyncData) *LevinP2PConn { + return &LevinP2PConn{conn: conn, peerHeight: peerHeight, localSync: localSync} +} + +func (c *LevinP2PConn) PeerHeight() uint64 { return c.peerHeight } + +// handleMessage processes non-target messages received while waiting for +// a specific response. It replies to timed_sync requests to keep the +// connection alive. +func (c *LevinP2PConn) handleMessage(hdr levinpkg.Header, data []byte) error { + if hdr.Command == p2p.CommandTimedSync && hdr.ExpectResponse { + // Respond to keep-alive. The daemon expects a timed_sync + // response with our payload_data. + resp := p2p.TimedSyncRequest{PayloadData: c.localSync} + payload, err := resp.Encode() + if err != nil { + return fmt.Errorf("encode timed_sync response: %w", err) + } + if err := c.conn.WriteResponse(p2p.CommandTimedSync, payload, levinpkg.ReturnOK); err != nil { + return fmt.Errorf("write timed_sync response: %w", err) + } + log.Printf("p2p: responded to timed_sync") + return nil + } + // Silently skip other messages (new_block notifications, etc.) + return nil +} + +func (c *LevinP2PConn) RequestChain(blockIDs [][]byte) (uint64, [][]byte, error) { + req := p2p.RequestChain{BlockIDs: blockIDs} + payload, err := req.Encode() + if err != nil { + return 0, nil, fmt.Errorf("encode request_chain: %w", err) + } + + // Send as notification (expectResponse=false) per CryptoNote protocol. + if err := c.conn.WritePacket(p2p.CommandRequestChain, payload, false); err != nil { + return 0, nil, fmt.Errorf("write request_chain: %w", err) + } + + // Read until we get RESPONSE_CHAIN_ENTRY. + for { + hdr, data, err := c.conn.ReadPacket() + if err != nil { + return 0, nil, fmt.Errorf("read response_chain: %w", err) + } + if hdr.Command == p2p.CommandResponseChain { + var resp p2p.ResponseChainEntry + if err := resp.Decode(data); err != nil { + return 0, nil, fmt.Errorf("decode response_chain: %w", err) + } + return resp.StartHeight, resp.BlockIDs, nil + } + if err := c.handleMessage(hdr, data); err != nil { + return 0, nil, err + } + } +} + +func (c *LevinP2PConn) RequestObjects(blockHashes [][]byte) ([]BlockBlobEntry, error) { + req := p2p.RequestGetObjects{Blocks: blockHashes} + payload, err := req.Encode() + if err != nil { + return nil, fmt.Errorf("encode request_get_objects: %w", err) + } + + if err := c.conn.WritePacket(p2p.CommandRequestObjects, payload, false); err != nil { + return nil, fmt.Errorf("write request_get_objects: %w", err) + } + + // Read until we get RESPONSE_GET_OBJECTS. + for { + hdr, data, err := c.conn.ReadPacket() + if err != nil { + return nil, fmt.Errorf("read response_get_objects: %w", err) + } + if hdr.Command == p2p.CommandResponseObjects { + var resp p2p.ResponseGetObjects + if err := resp.Decode(data); err != nil { + return nil, fmt.Errorf("decode response_get_objects: %w", err) + } + entries := make([]BlockBlobEntry, len(resp.Blocks)) + for i, b := range resp.Blocks { + entries[i] = BlockBlobEntry{Block: b.Block, Txs: b.Txs} + } + return entries, nil + } + if err := c.handleMessage(hdr, data); err != nil { + return nil, err + } + } +}