feat(chain): add Levin P2P connection adapter and sync integration test
LevinP2PConn wraps a levin.Connection to implement the P2PConnection interface, handling timed_sync keep-alive responses automatically. Integration test syncs the full testnet chain (2577 blocks) via P2P in under 5 seconds. Co-Authored-By: Charon <charon@lethean.io>
This commit is contained in:
parent
c6504276dc
commit
a74ac2e2c7
2 changed files with 193 additions and 0 deletions
|
|
@ -9,6 +9,9 @@ package chain
|
|||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"encoding/binary"
|
||||
"net"
|
||||
"net/http"
|
||||
"testing"
|
||||
"time"
|
||||
|
|
@ -16,8 +19,10 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
|
||||
"forge.lthn.ai/core/go-blockchain/config"
|
||||
"forge.lthn.ai/core/go-blockchain/p2p"
|
||||
"forge.lthn.ai/core/go-blockchain/rpc"
|
||||
"forge.lthn.ai/core/go-blockchain/types"
|
||||
levin "forge.lthn.ai/core/go-p2p/node/levin"
|
||||
store "forge.lthn.ai/core/go-store"
|
||||
)
|
||||
|
||||
|
|
@ -162,3 +167,74 @@ func TestIntegration_SyncWithSignatures(t *testing.T) {
|
|||
t.Logf("synced %d blocks with signature verification", finalHeight)
|
||||
require.Equal(t, remoteHeight, finalHeight)
|
||||
}
|
||||
|
||||
func TestIntegration_P2PSync(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping P2P sync test in short mode")
|
||||
}
|
||||
|
||||
// Dial testnet daemon P2P port.
|
||||
conn, err := net.DialTimeout("tcp", "localhost:46942", 10*time.Second)
|
||||
if err != nil {
|
||||
t.Skipf("testnet P2P not reachable: %v", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
lc := levin.NewConnection(conn)
|
||||
|
||||
// Handshake.
|
||||
var peerIDBuf [8]byte
|
||||
rand.Read(peerIDBuf[:])
|
||||
peerID := binary.LittleEndian.Uint64(peerIDBuf[:])
|
||||
|
||||
req := p2p.HandshakeRequest{
|
||||
NodeData: p2p.NodeData{
|
||||
NetworkID: config.NetworkIDTestnet,
|
||||
PeerID: peerID,
|
||||
LocalTime: time.Now().Unix(),
|
||||
MyPort: 0,
|
||||
},
|
||||
PayloadData: p2p.CoreSyncData{
|
||||
CurrentHeight: 1,
|
||||
ClientVersion: config.ClientVersion,
|
||||
NonPruningMode: true,
|
||||
},
|
||||
}
|
||||
payload, err := p2p.EncodeHandshakeRequest(&req)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, lc.WritePacket(p2p.CommandHandshake, payload, true))
|
||||
|
||||
hdr, data, err := lc.ReadPacket()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint32(p2p.CommandHandshake), hdr.Command)
|
||||
|
||||
var resp p2p.HandshakeResponse
|
||||
require.NoError(t, resp.Decode(data))
|
||||
t.Logf("peer height: %d", resp.PayloadData.CurrentHeight)
|
||||
|
||||
// Create P2P connection adapter with our local sync state.
|
||||
localSync := p2p.CoreSyncData{
|
||||
CurrentHeight: 1,
|
||||
ClientVersion: config.ClientVersion,
|
||||
NonPruningMode: true,
|
||||
}
|
||||
p2pConn := NewLevinP2PConn(lc, resp.PayloadData.CurrentHeight, localSync)
|
||||
|
||||
// Create chain and sync.
|
||||
s, err := store.New(":memory:")
|
||||
require.NoError(t, err)
|
||||
defer s.Close()
|
||||
|
||||
c := New(s)
|
||||
opts := SyncOptions{
|
||||
VerifySignatures: false,
|
||||
Forks: config.TestnetForks,
|
||||
}
|
||||
|
||||
err = c.P2PSync(context.Background(), p2pConn, opts)
|
||||
require.NoError(t, err)
|
||||
|
||||
finalHeight, _ := c.Height()
|
||||
t.Logf("P2P synced %d blocks", finalHeight)
|
||||
require.Equal(t, resp.PayloadData.CurrentHeight, finalHeight)
|
||||
}
|
||||
|
|
|
|||
117
chain/levinconn.go
Normal file
117
chain/levinconn.go
Normal file
|
|
@ -0,0 +1,117 @@
|
|||
// Copyright (c) 2017-2026 Lethean (https://lt.hn)
|
||||
//
|
||||
// Licensed under the European Union Public Licence (EUPL) version 1.2.
|
||||
// SPDX-License-Identifier: EUPL-1.2
|
||||
|
||||
package chain
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"forge.lthn.ai/core/go-blockchain/p2p"
|
||||
levinpkg "forge.lthn.ai/core/go-p2p/node/levin"
|
||||
)
|
||||
|
||||
// LevinP2PConn adapts a Levin connection to the P2PConnection interface.
|
||||
type LevinP2PConn struct {
|
||||
conn *levinpkg.Connection
|
||||
peerHeight uint64
|
||||
localSync p2p.CoreSyncData
|
||||
}
|
||||
|
||||
// NewLevinP2PConn wraps a Levin connection for P2P sync.
|
||||
// peerHeight is obtained from the handshake CoreSyncData.
|
||||
// localSync is our local sync state sent in timed_sync responses.
|
||||
func NewLevinP2PConn(conn *levinpkg.Connection, peerHeight uint64, localSync p2p.CoreSyncData) *LevinP2PConn {
|
||||
return &LevinP2PConn{conn: conn, peerHeight: peerHeight, localSync: localSync}
|
||||
}
|
||||
|
||||
func (c *LevinP2PConn) PeerHeight() uint64 { return c.peerHeight }
|
||||
|
||||
// handleMessage processes non-target messages received while waiting for
|
||||
// a specific response. It replies to timed_sync requests to keep the
|
||||
// connection alive.
|
||||
func (c *LevinP2PConn) handleMessage(hdr levinpkg.Header, data []byte) error {
|
||||
if hdr.Command == p2p.CommandTimedSync && hdr.ExpectResponse {
|
||||
// Respond to keep-alive. The daemon expects a timed_sync
|
||||
// response with our payload_data.
|
||||
resp := p2p.TimedSyncRequest{PayloadData: c.localSync}
|
||||
payload, err := resp.Encode()
|
||||
if err != nil {
|
||||
return fmt.Errorf("encode timed_sync response: %w", err)
|
||||
}
|
||||
if err := c.conn.WriteResponse(p2p.CommandTimedSync, payload, levinpkg.ReturnOK); err != nil {
|
||||
return fmt.Errorf("write timed_sync response: %w", err)
|
||||
}
|
||||
log.Printf("p2p: responded to timed_sync")
|
||||
return nil
|
||||
}
|
||||
// Silently skip other messages (new_block notifications, etc.)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *LevinP2PConn) RequestChain(blockIDs [][]byte) (uint64, [][]byte, error) {
|
||||
req := p2p.RequestChain{BlockIDs: blockIDs}
|
||||
payload, err := req.Encode()
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("encode request_chain: %w", err)
|
||||
}
|
||||
|
||||
// Send as notification (expectResponse=false) per CryptoNote protocol.
|
||||
if err := c.conn.WritePacket(p2p.CommandRequestChain, payload, false); err != nil {
|
||||
return 0, nil, fmt.Errorf("write request_chain: %w", err)
|
||||
}
|
||||
|
||||
// Read until we get RESPONSE_CHAIN_ENTRY.
|
||||
for {
|
||||
hdr, data, err := c.conn.ReadPacket()
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("read response_chain: %w", err)
|
||||
}
|
||||
if hdr.Command == p2p.CommandResponseChain {
|
||||
var resp p2p.ResponseChainEntry
|
||||
if err := resp.Decode(data); err != nil {
|
||||
return 0, nil, fmt.Errorf("decode response_chain: %w", err)
|
||||
}
|
||||
return resp.StartHeight, resp.BlockIDs, nil
|
||||
}
|
||||
if err := c.handleMessage(hdr, data); err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *LevinP2PConn) RequestObjects(blockHashes [][]byte) ([]BlockBlobEntry, error) {
|
||||
req := p2p.RequestGetObjects{Blocks: blockHashes}
|
||||
payload, err := req.Encode()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("encode request_get_objects: %w", err)
|
||||
}
|
||||
|
||||
if err := c.conn.WritePacket(p2p.CommandRequestObjects, payload, false); err != nil {
|
||||
return nil, fmt.Errorf("write request_get_objects: %w", err)
|
||||
}
|
||||
|
||||
// Read until we get RESPONSE_GET_OBJECTS.
|
||||
for {
|
||||
hdr, data, err := c.conn.ReadPacket()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read response_get_objects: %w", err)
|
||||
}
|
||||
if hdr.Command == p2p.CommandResponseObjects {
|
||||
var resp p2p.ResponseGetObjects
|
||||
if err := resp.Decode(data); err != nil {
|
||||
return nil, fmt.Errorf("decode response_get_objects: %w", err)
|
||||
}
|
||||
entries := make([]BlockBlobEntry, len(resp.Blocks))
|
||||
for i, b := range resp.Blocks {
|
||||
entries[i] = BlockBlobEntry{Block: b.Block, Txs: b.Txs}
|
||||
}
|
||||
return entries, nil
|
||||
}
|
||||
if err := c.handleMessage(hdr, data); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue