refactor(ax): make chain sync logging explicit
Some checks are pending
Security Scan / security (push) Waiting to run
Test / Test (push) Waiting to run

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-04-04 21:03:36 +00:00
parent 7e01df15fe
commit b34afa827f
7 changed files with 131 additions and 136 deletions

View file

@ -6,9 +6,7 @@
package chain
import (
"log"
coreerr "dappco.re/go/core/log"
corelog "dappco.re/go/core/log"
"dappco.re/go/core/blockchain/p2p"
levinpkg "dappco.re/go/core/p2p/node/levin"
@ -40,12 +38,12 @@ func (c *LevinP2PConn) handleMessage(hdr levinpkg.Header, data []byte) error {
resp := p2p.TimedSyncRequest{PayloadData: c.localSync}
payload, err := resp.Encode()
if err != nil {
return coreerr.E("LevinP2PConn.handleMessage", "encode timed_sync response", err)
return corelog.E("LevinP2PConn.handleMessage", "encode timed_sync response", err)
}
if err := c.conn.WriteResponse(p2p.CommandTimedSync, payload, levinpkg.ReturnOK); err != nil {
return coreerr.E("LevinP2PConn.handleMessage", "write timed_sync response", err)
return corelog.E("LevinP2PConn.handleMessage", "write timed_sync response", err)
}
log.Printf("p2p: responded to timed_sync")
corelog.Info("p2p responded to timed_sync")
return nil
}
// Silently skip other messages (new_block notifications, etc.)
@ -56,24 +54,24 @@ func (c *LevinP2PConn) RequestChain(blockIDs [][]byte) (uint64, [][]byte, error)
req := p2p.RequestChain{BlockIDs: blockIDs}
payload, err := req.Encode()
if err != nil {
return 0, nil, coreerr.E("LevinP2PConn.RequestChain", "encode request_chain", err)
return 0, nil, corelog.E("LevinP2PConn.RequestChain", "encode request_chain", err)
}
// Send as notification (expectResponse=false) per CryptoNote protocol.
if err := c.conn.WritePacket(p2p.CommandRequestChain, payload, false); err != nil {
return 0, nil, coreerr.E("LevinP2PConn.RequestChain", "write request_chain", err)
return 0, nil, corelog.E("LevinP2PConn.RequestChain", "write request_chain", err)
}
// Read until we get RESPONSE_CHAIN_ENTRY.
for {
hdr, data, err := c.conn.ReadPacket()
if err != nil {
return 0, nil, coreerr.E("LevinP2PConn.RequestChain", "read response_chain", err)
return 0, nil, corelog.E("LevinP2PConn.RequestChain", "read response_chain", err)
}
if hdr.Command == p2p.CommandResponseChain {
var resp p2p.ResponseChainEntry
if err := resp.Decode(data); err != nil {
return 0, nil, coreerr.E("LevinP2PConn.RequestChain", "decode response_chain", err)
return 0, nil, corelog.E("LevinP2PConn.RequestChain", "decode response_chain", err)
}
return resp.StartHeight, resp.BlockIDs, nil
}
@ -87,23 +85,23 @@ func (c *LevinP2PConn) RequestObjects(blockHashes [][]byte) ([]BlockBlobEntry, e
req := p2p.RequestGetObjects{Blocks: blockHashes}
payload, err := req.Encode()
if err != nil {
return nil, coreerr.E("LevinP2PConn.RequestObjects", "encode request_get_objects", err)
return nil, corelog.E("LevinP2PConn.RequestObjects", "encode request_get_objects", err)
}
if err := c.conn.WritePacket(p2p.CommandRequestObjects, payload, false); err != nil {
return nil, coreerr.E("LevinP2PConn.RequestObjects", "write request_get_objects", err)
return nil, corelog.E("LevinP2PConn.RequestObjects", "write request_get_objects", err)
}
// Read until we get RESPONSE_GET_OBJECTS.
for {
hdr, data, err := c.conn.ReadPacket()
if err != nil {
return nil, coreerr.E("LevinP2PConn.RequestObjects", "read response_get_objects", err)
return nil, corelog.E("LevinP2PConn.RequestObjects", "read response_get_objects", err)
}
if hdr.Command == p2p.CommandResponseObjects {
var resp p2p.ResponseGetObjects
if err := resp.Decode(data); err != nil {
return nil, coreerr.E("LevinP2PConn.RequestObjects", "decode response_get_objects", err)
return nil, corelog.E("LevinP2PConn.RequestObjects", "decode response_get_objects", err)
}
entries := make([]BlockBlobEntry, len(resp.Blocks))
for i, b := range resp.Blocks {

View file

@ -8,9 +8,8 @@ package chain
import (
"context"
"fmt"
"log"
coreerr "dappco.re/go/core/log"
corelog "dappco.re/go/core/log"
)
// P2PConnection abstracts the P2P communication needed for block sync.
@ -46,7 +45,7 @@ func (c *Chain) P2PSync(ctx context.Context, conn P2PConnection, opts SyncOption
localHeight, err := c.Height()
if err != nil {
return coreerr.E("Chain.P2PSync", "p2p sync: get height", err)
return corelog.E("Chain.P2PSync", "p2p sync: get height", err)
}
peerHeight := conn.PeerHeight()
@ -57,7 +56,7 @@ func (c *Chain) P2PSync(ctx context.Context, conn P2PConnection, opts SyncOption
// Build sparse chain history.
history, err := c.SparseChainHistory()
if err != nil {
return coreerr.E("Chain.P2PSync", "p2p sync: build history", err)
return corelog.E("Chain.P2PSync", "p2p sync: build history", err)
}
// Convert Hash to []byte for P2P.
@ -71,14 +70,14 @@ func (c *Chain) P2PSync(ctx context.Context, conn P2PConnection, opts SyncOption
// Request chain entry.
startHeight, blockIDs, err := conn.RequestChain(historyBytes)
if err != nil {
return coreerr.E("Chain.P2PSync", "p2p sync: request chain", err)
return corelog.E("Chain.P2PSync", "p2p sync: request chain", err)
}
if len(blockIDs) == 0 {
return nil // nothing to sync
}
log.Printf("p2p sync: chain entry from height %d, %d block IDs", startHeight, len(blockIDs))
corelog.Info("p2p sync chain entry", "start_height", startHeight, "block_ids", len(blockIDs))
// The daemon returns the fork-point block as the first entry.
// Skip blocks we already have.
@ -108,24 +107,24 @@ func (c *Chain) P2PSync(ctx context.Context, conn P2PConnection, opts SyncOption
entries, err := conn.RequestObjects(batch)
if err != nil {
return coreerr.E("Chain.P2PSync", "p2p sync: request objects", err)
return corelog.E("Chain.P2PSync", "p2p sync: request objects", err)
}
currentHeight := fetchStart + uint64(i)
for j, entry := range entries {
blockHeight := currentHeight + uint64(j)
if blockHeight > 0 && blockHeight%100 == 0 {
log.Printf("p2p sync: processing block %d", blockHeight)
corelog.Info("p2p sync processing block", "height", blockHeight)
}
blockDiff, err := c.NextDifficulty(blockHeight, opts.Forks)
if err != nil {
return coreerr.E("Chain.P2PSync", fmt.Sprintf("p2p sync: compute difficulty for block %d", blockHeight), err)
return corelog.E("Chain.P2PSync", fmt.Sprintf("p2p sync: compute difficulty for block %d", blockHeight), err)
}
if err := c.processBlockBlobs(entry.Block, entry.Txs,
blockHeight, blockDiff, opts); err != nil {
return coreerr.E("Chain.P2PSync", fmt.Sprintf("p2p sync: process block %d", blockHeight), err)
return corelog.E("Chain.P2PSync", fmt.Sprintf("p2p sync: process block %d", blockHeight), err)
}
}
}

View file

@ -11,11 +11,10 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"log"
"regexp"
"strconv"
coreerr "dappco.re/go/core/log"
corelog "dappco.re/go/core/log"
"dappco.re/go/core/blockchain/config"
"dappco.re/go/core/blockchain/consensus"
@ -52,12 +51,12 @@ func DefaultSyncOptions() SyncOptions {
func (c *Chain) Sync(ctx context.Context, client *rpc.Client, opts SyncOptions) error {
localHeight, err := c.Height()
if err != nil {
return coreerr.E("Chain.Sync", "sync: get local height", err)
return corelog.E("Chain.Sync", "sync: get local height", err)
}
remoteHeight, err := client.GetHeight()
if err != nil {
return coreerr.E("Chain.Sync", "sync: get remote height", err)
return corelog.E("Chain.Sync", "sync: get remote height", err)
}
for localHeight < remoteHeight {
@ -72,22 +71,22 @@ func (c *Chain) Sync(ctx context.Context, client *rpc.Client, opts SyncOptions)
blocks, err := client.GetBlocksDetails(localHeight, batch)
if err != nil {
return coreerr.E("Chain.Sync", fmt.Sprintf("sync: fetch blocks at %d", localHeight), err)
return corelog.E("Chain.Sync", fmt.Sprintf("sync: fetch blocks at %d", localHeight), err)
}
if err := resolveBlockBlobs(blocks, client); err != nil {
return coreerr.E("Chain.Sync", fmt.Sprintf("sync: resolve blobs at %d", localHeight), err)
return corelog.E("Chain.Sync", fmt.Sprintf("sync: resolve blobs at %d", localHeight), err)
}
for _, bd := range blocks {
if err := c.processBlock(bd, opts); err != nil {
return coreerr.E("Chain.Sync", fmt.Sprintf("sync: process block %d", bd.Height), err)
return corelog.E("Chain.Sync", fmt.Sprintf("sync: process block %d", bd.Height), err)
}
}
localHeight, err = c.Height()
if err != nil {
return coreerr.E("Chain.Sync", "sync: get height after batch", err)
return corelog.E("Chain.Sync", "sync: get height after batch", err)
}
}
@ -96,12 +95,12 @@ func (c *Chain) Sync(ctx context.Context, client *rpc.Client, opts SyncOptions)
func (c *Chain) processBlock(bd rpc.BlockDetails, opts SyncOptions) error {
if bd.Height > 0 && bd.Height%100 == 0 {
log.Printf("sync: processing block %d", bd.Height)
corelog.Info("sync processing block", "height", bd.Height)
}
blockBlob, err := hex.DecodeString(bd.Blob)
if err != nil {
return coreerr.E("Chain.processBlock", "decode block hex", err)
return corelog.E("Chain.processBlock", "decode block hex", err)
}
// Build a set of the block's regular tx hashes for lookup.
@ -111,7 +110,7 @@ func (c *Chain) processBlock(bd rpc.BlockDetails, opts SyncOptions) error {
dec := wire.NewDecoder(bytes.NewReader(blockBlob))
blk := wire.DecodeBlock(dec)
if err := dec.Err(); err != nil {
return coreerr.E("Chain.processBlock", "decode block for tx hashes", err)
return corelog.E("Chain.processBlock", "decode block for tx hashes", err)
}
regularTxs := make(map[string]struct{}, len(blk.TxHashes))
@ -126,7 +125,7 @@ func (c *Chain) processBlock(bd rpc.BlockDetails, opts SyncOptions) error {
}
txBlobBytes, err := hex.DecodeString(txInfo.Blob)
if err != nil {
return coreerr.E("Chain.processBlock", fmt.Sprintf("decode tx hex %s", txInfo.ID), err)
return corelog.E("Chain.processBlock", fmt.Sprintf("decode tx hex %s", txInfo.ID), err)
}
txBlobs = append(txBlobs, txBlobBytes)
}
@ -137,10 +136,10 @@ func (c *Chain) processBlock(bd rpc.BlockDetails, opts SyncOptions) error {
computedHash := wire.BlockHash(&blk)
daemonHash, err := types.HashFromHex(bd.ID)
if err != nil {
return coreerr.E("Chain.processBlock", "parse daemon block hash", err)
return corelog.E("Chain.processBlock", "parse daemon block hash", err)
}
if computedHash != daemonHash {
return coreerr.E("Chain.processBlock", fmt.Sprintf("block hash mismatch: computed %s, daemon says %s", computedHash, daemonHash), nil)
return corelog.E("Chain.processBlock", fmt.Sprintf("block hash mismatch: computed %s, daemon says %s", computedHash, daemonHash), nil)
}
return c.processBlockBlobs(blockBlob, txBlobs, bd.Height, diff, opts)
@ -155,7 +154,7 @@ func (c *Chain) processBlockBlobs(blockBlob []byte, txBlobs [][]byte,
dec := wire.NewDecoder(bytes.NewReader(blockBlob))
blk := wire.DecodeBlock(dec)
if err := dec.Err(); err != nil {
return coreerr.E("Chain.processBlockBlobs", "decode block wire", err)
return corelog.E("Chain.processBlockBlobs", "decode block wire", err)
}
// Compute the block hash.
@ -165,10 +164,10 @@ func (c *Chain) processBlockBlobs(blockBlob []byte, txBlobs [][]byte,
if height == 0 {
genesisHash, err := types.HashFromHex(GenesisHash)
if err != nil {
return coreerr.E("Chain.processBlockBlobs", "parse genesis hash", err)
return corelog.E("Chain.processBlockBlobs", "parse genesis hash", err)
}
if blockHash != genesisHash {
return coreerr.E("Chain.processBlockBlobs", fmt.Sprintf("genesis hash %s does not match expected %s", blockHash, GenesisHash), nil)
return corelog.E("Chain.processBlockBlobs", fmt.Sprintf("genesis hash %s does not match expected %s", blockHash, GenesisHash), nil)
}
}
@ -179,7 +178,7 @@ func (c *Chain) processBlockBlobs(blockBlob []byte, txBlobs [][]byte,
// Validate miner transaction structure.
if err := consensus.ValidateMinerTx(&blk.MinerTx, height, opts.Forks); err != nil {
return coreerr.E("Chain.processBlockBlobs", "validate miner tx", err)
return corelog.E("Chain.processBlockBlobs", "validate miner tx", err)
}
// Calculate cumulative difficulty.
@ -187,7 +186,7 @@ func (c *Chain) processBlockBlobs(blockBlob []byte, txBlobs [][]byte,
if height > 0 {
_, prevMeta, err := c.TopBlock()
if err != nil {
return coreerr.E("Chain.processBlockBlobs", "get prev block meta", err)
return corelog.E("Chain.processBlockBlobs", "get prev block meta", err)
}
cumulDiff = prevMeta.CumulativeDiff + difficulty
} else {
@ -198,13 +197,13 @@ func (c *Chain) processBlockBlobs(blockBlob []byte, txBlobs [][]byte,
minerTxHash := wire.TransactionHash(&blk.MinerTx)
minerGindexes, err := c.indexOutputs(minerTxHash, &blk.MinerTx)
if err != nil {
return coreerr.E("Chain.processBlockBlobs", "index miner tx outputs", err)
return corelog.E("Chain.processBlockBlobs", "index miner tx outputs", err)
}
if err := c.PutTransaction(minerTxHash, &blk.MinerTx, &TxMeta{
KeeperBlock: height,
GlobalOutputIndexes: minerGindexes,
}); err != nil {
return coreerr.E("Chain.processBlockBlobs", "store miner tx", err)
return corelog.E("Chain.processBlockBlobs", "store miner tx", err)
}
// Process regular transactions from txBlobs.
@ -212,27 +211,27 @@ func (c *Chain) processBlockBlobs(blockBlob []byte, txBlobs [][]byte,
txDec := wire.NewDecoder(bytes.NewReader(txBlobData))
tx := wire.DecodeTransaction(txDec)
if err := txDec.Err(); err != nil {
return coreerr.E("Chain.processBlockBlobs", fmt.Sprintf("decode tx wire [%d]", i), err)
return corelog.E("Chain.processBlockBlobs", fmt.Sprintf("decode tx wire [%d]", i), err)
}
txHash := wire.TransactionHash(&tx)
// Validate transaction semantics, including the HF5 freeze window.
if err := consensus.ValidateTransactionInBlock(&tx, txBlobData, opts.Forks, height); err != nil {
return coreerr.E("Chain.processBlockBlobs", fmt.Sprintf("validate tx %s", txHash), err)
return corelog.E("Chain.processBlockBlobs", fmt.Sprintf("validate tx %s", txHash), err)
}
// Optionally verify signatures using the chain's output index.
if opts.VerifySignatures {
if err := consensus.VerifyTransactionSignatures(&tx, opts.Forks, height, c.GetRingOutputs, c.GetZCRingOutputs); err != nil {
return coreerr.E("Chain.processBlockBlobs", fmt.Sprintf("verify tx signatures %s", txHash), err)
return corelog.E("Chain.processBlockBlobs", fmt.Sprintf("verify tx signatures %s", txHash), err)
}
}
// Index outputs.
gindexes, err := c.indexOutputs(txHash, &tx)
if err != nil {
return coreerr.E("Chain.processBlockBlobs", fmt.Sprintf("index tx outputs %s", txHash), err)
return corelog.E("Chain.processBlockBlobs", fmt.Sprintf("index tx outputs %s", txHash), err)
}
// Mark key images as spent.
@ -240,15 +239,15 @@ func (c *Chain) processBlockBlobs(blockBlob []byte, txBlobs [][]byte,
switch inp := vin.(type) {
case types.TxInputToKey:
if err := c.MarkSpent(inp.KeyImage, height); err != nil {
return coreerr.E("Chain.processBlockBlobs", fmt.Sprintf("mark spent %s", inp.KeyImage), err)
return corelog.E("Chain.processBlockBlobs", fmt.Sprintf("mark spent %s", inp.KeyImage), err)
}
case types.TxInputHTLC:
if err := c.MarkSpent(inp.KeyImage, height); err != nil {
return coreerr.E("Chain.processBlockBlobs", fmt.Sprintf("mark spent %s", inp.KeyImage), err)
return corelog.E("Chain.processBlockBlobs", fmt.Sprintf("mark spent %s", inp.KeyImage), err)
}
case types.TxInputZC:
if err := c.MarkSpent(inp.KeyImage, height); err != nil {
return coreerr.E("Chain.processBlockBlobs", fmt.Sprintf("mark spent %s", inp.KeyImage), err)
return corelog.E("Chain.processBlockBlobs", fmt.Sprintf("mark spent %s", inp.KeyImage), err)
}
}
}
@ -258,7 +257,7 @@ func (c *Chain) processBlockBlobs(blockBlob []byte, txBlobs [][]byte,
KeeperBlock: height,
GlobalOutputIndexes: gindexes,
}); err != nil {
return coreerr.E("Chain.processBlockBlobs", fmt.Sprintf("store tx %s", txHash), err)
return corelog.E("Chain.processBlockBlobs", fmt.Sprintf("store tx %s", txHash), err)
}
}
@ -333,13 +332,13 @@ func resolveBlockBlobs(blocks []rpc.BlockDetails, client *rpc.Client) error {
// Batch-fetch tx blobs.
txHexes, missed, err := client.GetTransactions(allHashes)
if err != nil {
return coreerr.E("resolveBlockBlobs", "fetch tx blobs", err)
return corelog.E("resolveBlockBlobs", "fetch tx blobs", err)
}
if len(missed) > 0 {
return coreerr.E("resolveBlockBlobs", fmt.Sprintf("daemon missed %d tx(es): %v", len(missed), missed), nil)
return corelog.E("resolveBlockBlobs", fmt.Sprintf("daemon missed %d tx(es): %v", len(missed), missed), nil)
}
if len(txHexes) != len(allHashes) {
return coreerr.E("resolveBlockBlobs", fmt.Sprintf("expected %d tx blobs, got %d", len(allHashes), len(txHexes)), nil)
return corelog.E("resolveBlockBlobs", fmt.Sprintf("expected %d tx blobs, got %d", len(allHashes), len(txHexes)), nil)
}
// Index fetched blobs by hash.
@ -367,16 +366,16 @@ func resolveBlockBlobs(blocks []rpc.BlockDetails, client *rpc.Client) error {
// Parse header from object_in_json.
hdr, err := parseBlockHeader(bd.ObjectInJSON)
if err != nil {
return coreerr.E("resolveBlockBlobs", fmt.Sprintf("block %d: parse header", bd.Height), err)
return corelog.E("resolveBlockBlobs", fmt.Sprintf("block %d: parse header", bd.Height), err)
}
// Miner tx blob is transactions_details[0].
if len(bd.Transactions) == 0 {
return coreerr.E("resolveBlockBlobs", fmt.Sprintf("block %d has no transactions_details", bd.Height), nil)
return corelog.E("resolveBlockBlobs", fmt.Sprintf("block %d has no transactions_details", bd.Height), nil)
}
minerTxBlob, err := hex.DecodeString(bd.Transactions[0].Blob)
if err != nil {
return coreerr.E("resolveBlockBlobs", fmt.Sprintf("block %d: decode miner tx hex", bd.Height), err)
return corelog.E("resolveBlockBlobs", fmt.Sprintf("block %d: decode miner tx hex", bd.Height), err)
}
// Collect regular tx hashes.
@ -384,7 +383,7 @@ func resolveBlockBlobs(blocks []rpc.BlockDetails, client *rpc.Client) error {
for _, txInfo := range bd.Transactions[1:] {
h, err := types.HashFromHex(txInfo.ID)
if err != nil {
return coreerr.E("resolveBlockBlobs", fmt.Sprintf("block %d: parse tx hash %s", bd.Height, txInfo.ID), err)
return corelog.E("resolveBlockBlobs", fmt.Sprintf("block %d: parse tx hash %s", bd.Height, txInfo.ID), err)
}
txHashes = append(txHashes, h)
}
@ -414,17 +413,17 @@ var aggregatedRE = regexp.MustCompile(`"AGGREGATED"\s*:\s*\{([^}]+)\}`)
func parseBlockHeader(objectInJSON string) (*types.BlockHeader, error) {
m := aggregatedRE.FindStringSubmatch(objectInJSON)
if m == nil {
return nil, coreerr.E("parseBlockHeader", "AGGREGATED section not found in object_in_json", nil)
return nil, corelog.E("parseBlockHeader", "AGGREGATED section not found in object_in_json", nil)
}
var hj blockHeaderJSON
if err := json.Unmarshal([]byte("{"+m[1]+"}"), &hj); err != nil {
return nil, coreerr.E("parseBlockHeader", "unmarshal AGGREGATED", err)
return nil, corelog.E("parseBlockHeader", "unmarshal AGGREGATED", err)
}
prevID, err := types.HashFromHex(hj.PrevID)
if err != nil {
return nil, coreerr.E("parseBlockHeader", "parse prev_id", err)
return nil, corelog.E("parseBlockHeader", "parse prev_id", err)
}
return &types.BlockHeader{

View file

@ -30,9 +30,9 @@ const defaultChainSeed = "seeds.lthn.io:36942"
// command path documents the node features directly.
func AddChainCommands(root *cobra.Command) {
var (
dataDir string
seed string
testnet bool
chainDataDir string
seedPeerAddress string
useTestnet bool
)
chainCmd := &cobra.Command{
@ -41,26 +41,26 @@ func AddChainCommands(root *cobra.Command) {
Long: "Manage the Lethean blockchain — sync, explore, and mine.",
}
chainCmd.PersistentFlags().StringVar(&dataDir, "data-dir", defaultChainDataDirPath(), "blockchain data directory")
chainCmd.PersistentFlags().StringVar(&seed, "seed", defaultChainSeed, "seed peer address (host:port)")
chainCmd.PersistentFlags().BoolVar(&testnet, "testnet", false, "use testnet")
chainCmd.PersistentFlags().StringVar(&chainDataDir, "data-dir", defaultChainDataDirPath(), "blockchain data directory")
chainCmd.PersistentFlags().StringVar(&seedPeerAddress, "seed", defaultChainSeed, "seed peer address (host:port)")
chainCmd.PersistentFlags().BoolVar(&useTestnet, "testnet", false, "use testnet")
chainCmd.AddCommand(
newChainExplorerCommand(&dataDir, &seed, &testnet),
newChainSyncCommand(&dataDir, &seed, &testnet),
newChainExplorerCommand(&chainDataDir, &seedPeerAddress, &useTestnet),
newChainSyncCommand(&chainDataDir, &seedPeerAddress, &useTestnet),
)
root.AddCommand(chainCmd)
}
func chainConfigForSeed(testnet bool, seed string) (config.ChainConfig, []config.HardFork, string) {
if testnet {
if seed == defaultChainSeed {
seed = "localhost:46942"
func chainConfigForSeed(useTestnet bool, seedPeerAddress string) (config.ChainConfig, []config.HardFork, string) {
if useTestnet {
if seedPeerAddress == defaultChainSeed {
seedPeerAddress = "localhost:46942"
}
return config.Testnet, config.TestnetForks, seed
return config.Testnet, config.TestnetForks, seedPeerAddress
}
return config.Mainnet, config.MainnetForks, seed
return config.Mainnet, config.MainnetForks, seedPeerAddress
}
func defaultChainDataDirPath() string {
@ -78,15 +78,15 @@ func ensureChainDataDirExists(dataDir string) error {
return nil
}
func validateChainOptions(dataDir, seed string) error {
if dataDir == "" {
func validateChainOptions(chainDataDir, seedPeerAddress string) error {
if chainDataDir == "" {
return coreerr.E("validateChainOptions", "data dir is required", nil)
}
if seed == "" {
if seedPeerAddress == "" {
return coreerr.E("validateChainOptions", "seed is required", nil)
}
if _, _, err := net.SplitHostPort(seed); err != nil {
return coreerr.E("validateChainOptions", fmt.Sprintf("seed %q must be host:port", seed), err)
if _, _, err := net.SplitHostPort(seedPeerAddress); err != nil {
return coreerr.E("validateChainOptions", fmt.Sprintf("seed %q must be host:port", seedPeerAddress), err)
}
return nil
}

View file

@ -12,7 +12,7 @@ import (
"path/filepath"
"sync"
coreerr "dappco.re/go/core/log"
corelog "dappco.re/go/core/log"
cli "dappco.re/go/core/cli/pkg/cli"
store "dappco.re/go/core/store"
@ -29,35 +29,35 @@ import (
// chain explorer --data-dir ~/.lethean/chain
//
// Use it alongside `AddChainCommands` to expose the TUI node view.
func newChainExplorerCommand(dataDir, seed *string, testnet *bool) *cobra.Command {
func newChainExplorerCommand(chainDataDir, seedPeerAddress *string, useTestnet *bool) *cobra.Command {
return &cobra.Command{
Use: "explorer",
Short: "TUI block explorer",
Long: "Interactive terminal block explorer with live sync status.",
Args: cobra.NoArgs,
PreRunE: func(cmd *cobra.Command, args []string) error {
return validateChainOptions(*dataDir, *seed)
return validateChainOptions(*chainDataDir, *seedPeerAddress)
},
RunE: func(cmd *cobra.Command, args []string) error {
return runChainExplorer(*dataDir, *seed, *testnet)
return runChainExplorer(*chainDataDir, *seedPeerAddress, *useTestnet)
},
}
}
func runChainExplorer(dataDir, seed string, testnet bool) error {
if err := ensureChainDataDirExists(dataDir); err != nil {
func runChainExplorer(chainDataDir, seedPeerAddress string, useTestnet bool) error {
if err := ensureChainDataDirExists(chainDataDir); err != nil {
return err
}
dbPath := filepath.Join(dataDir, "chain.db")
dbPath := filepath.Join(chainDataDir, "chain.db")
chainStore, err := store.New(dbPath)
if err != nil {
return coreerr.E("runChainExplorer", "open store", err)
return corelog.E("runChainExplorer", "open store", err)
}
defer chainStore.Close()
blockchain := chain.New(chainStore)
chainConfig, hardForks, resolvedSeed := chainConfigForSeed(testnet, seed)
chainConfig, hardForks, resolvedSeed := chainConfigForSeed(useTestnet, seedPeerAddress)
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
@ -78,6 +78,7 @@ func runChainExplorer(dataDir, seed string, testnet bool) error {
frame.Header(status)
frame.Content(explorer)
frame.Footer(hints)
corelog.Info("running chain explorer", "data_dir", chainDataDir, "seed", resolvedSeed, "testnet", useTestnet)
frame.Run()
cancel() // Signal the sync loop to stop.

View file

@ -8,14 +8,13 @@ package blockchain
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"path/filepath"
"sync"
"syscall"
coreerr "dappco.re/go/core/log"
corelog "dappco.re/go/core/log"
"dappco.re/go/core/blockchain/chain"
"dappco.re/go/core/process"
@ -32,7 +31,7 @@ import (
// chain sync --stop
//
// It keeps the foreground and daemon modes behind a predictable command path.
func newChainSyncCommand(dataDir, seed *string, testnet *bool) *cobra.Command {
func newChainSyncCommand(chainDataDir, seedPeerAddress *string, useTestnet *bool) *cobra.Command {
var (
daemon bool
stop bool
@ -45,18 +44,18 @@ func newChainSyncCommand(dataDir, seed *string, testnet *bool) *cobra.Command {
Args: cobra.NoArgs,
PreRunE: func(cmd *cobra.Command, args []string) error {
if daemon && stop {
return coreerr.E("newChainSyncCommand", "flags --daemon and --stop cannot be combined", nil)
return corelog.E("newChainSyncCommand", "flags --daemon and --stop cannot be combined", nil)
}
return validateChainOptions(*dataDir, *seed)
return validateChainOptions(*chainDataDir, *seedPeerAddress)
},
RunE: func(cmd *cobra.Command, args []string) error {
if stop {
return stopChainSyncDaemon(*dataDir)
return stopChainSyncDaemon(*chainDataDir)
}
if daemon {
return runChainSyncDaemon(*dataDir, *seed, *testnet)
return runChainSyncDaemon(*chainDataDir, *seedPeerAddress, *useTestnet)
}
return runChainSyncForeground(*dataDir, *seed, *testnet)
return runChainSyncForeground(*chainDataDir, *seedPeerAddress, *useTestnet)
},
}
@ -66,36 +65,36 @@ func newChainSyncCommand(dataDir, seed *string, testnet *bool) *cobra.Command {
return cmd
}
func runChainSyncForeground(dataDir, seed string, testnet bool) error {
if err := ensureChainDataDirExists(dataDir); err != nil {
func runChainSyncForeground(chainDataDir, seedPeerAddress string, useTestnet bool) error {
if err := ensureChainDataDirExists(chainDataDir); err != nil {
return err
}
dbPath := filepath.Join(dataDir, "chain.db")
dbPath := filepath.Join(chainDataDir, "chain.db")
chainStore, err := store.New(dbPath)
if err != nil {
return coreerr.E("runChainSyncForeground", "open store", err)
return corelog.E("runChainSyncForeground", "open store", err)
}
defer chainStore.Close()
blockchain := chain.New(chainStore)
chainConfig, hardForks, resolvedSeed := chainConfigForSeed(testnet, seed)
chainConfig, hardForks, resolvedSeed := chainConfigForSeed(useTestnet, seedPeerAddress)
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer cancel()
log.Println("Starting headless P2P sync...")
corelog.Info("starting headless P2P sync", "data_dir", chainDataDir, "seed", resolvedSeed, "testnet", useTestnet)
runChainSyncLoop(ctx, blockchain, &chainConfig, hardForks, resolvedSeed)
log.Println("Sync stopped.")
corelog.Info("headless P2P sync stopped", "data_dir", chainDataDir)
return nil
}
func runChainSyncDaemon(dataDir, seed string, testnet bool) error {
if err := ensureChainDataDirExists(dataDir); err != nil {
func runChainSyncDaemon(chainDataDir, seedPeerAddress string, useTestnet bool) error {
if err := ensureChainDataDirExists(chainDataDir); err != nil {
return err
}
pidFile := filepath.Join(dataDir, "sync.pid")
pidFile := filepath.Join(chainDataDir, "sync.pid")
daemon := process.NewDaemon(process.DaemonOptions{
PIDFile: pidFile,
@ -107,25 +106,25 @@ func runChainSyncDaemon(dataDir, seed string, testnet bool) error {
})
if err := daemon.Start(); err != nil {
return coreerr.E("runChainSyncDaemon", "daemon start", err)
return corelog.E("runChainSyncDaemon", "daemon start", err)
}
dbPath := filepath.Join(dataDir, "chain.db")
dbPath := filepath.Join(chainDataDir, "chain.db")
chainStore, err := store.New(dbPath)
if err != nil {
_ = daemon.Stop()
return coreerr.E("runChainSyncDaemon", "open store", err)
return corelog.E("runChainSyncDaemon", "open store", err)
}
defer chainStore.Close()
blockchain := chain.New(chainStore)
chainConfig, hardForks, resolvedSeed := chainConfigForSeed(testnet, seed)
chainConfig, hardForks, resolvedSeed := chainConfigForSeed(useTestnet, seedPeerAddress)
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer cancel()
daemon.SetReady(true)
log.Println("Sync daemon started.")
corelog.Info("sync daemon started", "data_dir", chainDataDir, "seed", resolvedSeed, "testnet", useTestnet)
var wg sync.WaitGroup
wg.Add(1)
@ -139,22 +138,22 @@ func runChainSyncDaemon(dataDir, seed string, testnet bool) error {
return err
}
func stopChainSyncDaemon(dataDir string) error {
pidFile := filepath.Join(dataDir, "sync.pid")
func stopChainSyncDaemon(chainDataDir string) error {
pidFile := filepath.Join(chainDataDir, "sync.pid")
pid, running := process.ReadPID(pidFile)
if pid == 0 || !running {
return coreerr.E("stopChainSyncDaemon", "no running sync daemon found", nil)
return corelog.E("stopChainSyncDaemon", "no running sync daemon found", nil)
}
processHandle, err := os.FindProcess(pid)
if err != nil {
return coreerr.E("stopChainSyncDaemon", fmt.Sprintf("find process %d", pid), err)
return corelog.E("stopChainSyncDaemon", fmt.Sprintf("find process %d", pid), err)
}
if err := processHandle.Signal(syscall.SIGTERM); err != nil {
return coreerr.E("stopChainSyncDaemon", fmt.Sprintf("signal process %d", pid), err)
return corelog.E("stopChainSyncDaemon", fmt.Sprintf("signal process %d", pid), err)
}
log.Printf("Sent SIGTERM to sync daemon (PID %d)", pid)
corelog.Info("sent SIGTERM to sync daemon", "pid", pid)
return nil
}

View file

@ -10,11 +10,10 @@ import (
"crypto/rand"
"encoding/binary"
"fmt"
"log"
"net"
"time"
coreerr "dappco.re/go/core/log"
corelog "dappco.re/go/core/log"
"dappco.re/go/core/blockchain/chain"
"dappco.re/go/core/blockchain/config"
@ -22,7 +21,7 @@ import (
levin "dappco.re/go/core/p2p/node/levin"
)
func runChainSyncLoop(ctx context.Context, blockchain *chain.Chain, chainConfig *config.ChainConfig, hardForks []config.HardFork, seed string) {
func runChainSyncLoop(ctx context.Context, blockchain *chain.Chain, chainConfig *config.ChainConfig, hardForks []config.HardFork, seedPeerAddress string) {
opts := chain.SyncOptions{
VerifySignatures: false,
Forks: hardForks,
@ -35,8 +34,8 @@ func runChainSyncLoop(ctx context.Context, blockchain *chain.Chain, chainConfig
default:
}
if err := runChainSyncOnce(ctx, blockchain, chainConfig, opts, seed); err != nil {
log.Printf("sync: %v (retrying in 10s)", err)
if err := runChainSyncOnce(ctx, blockchain, chainConfig, opts, seedPeerAddress); err != nil {
corelog.Warn("sync failed, retrying in 10s", "error", err, "seed", seedPeerAddress)
select {
case <-ctx.Done():
return
@ -53,10 +52,10 @@ func runChainSyncLoop(ctx context.Context, blockchain *chain.Chain, chainConfig
}
}
func runChainSyncOnce(ctx context.Context, blockchain *chain.Chain, chainConfig *config.ChainConfig, opts chain.SyncOptions, seed string) error {
conn, err := net.DialTimeout("tcp", seed, 10*time.Second)
func runChainSyncOnce(ctx context.Context, blockchain *chain.Chain, chainConfig *config.ChainConfig, opts chain.SyncOptions, seedPeerAddress string) error {
conn, err := net.DialTimeout("tcp", seedPeerAddress, 10*time.Second)
if err != nil {
return coreerr.E("runChainSyncOnce", fmt.Sprintf("dial %s", seed), err)
return corelog.E("runChainSyncOnce", fmt.Sprintf("dial %s", seedPeerAddress), err)
}
defer conn.Close()
@ -64,13 +63,13 @@ func runChainSyncOnce(ctx context.Context, blockchain *chain.Chain, chainConfig
var peerIDBytes [8]byte
if _, err := rand.Read(peerIDBytes[:]); err != nil {
return coreerr.E("runChainSyncOnce", "generate peer id", err)
return corelog.E("runChainSyncOnce", "generate peer id", err)
}
peerID := binary.LittleEndian.Uint64(peerIDBytes[:])
localHeight, err := blockchain.Height()
if err != nil {
return coreerr.E("runChainSyncOnce", "get local height", err)
return corelog.E("runChainSyncOnce", "get local height", err)
}
handshakeRequest := p2p.HandshakeRequest{
@ -88,27 +87,27 @@ func runChainSyncOnce(ctx context.Context, blockchain *chain.Chain, chainConfig
}
payload, err := p2p.EncodeHandshakeRequest(&handshakeRequest)
if err != nil {
return coreerr.E("runChainSyncOnce", "encode handshake", err)
return corelog.E("runChainSyncOnce", "encode handshake", err)
}
if err := p2pConn.WritePacket(p2p.CommandHandshake, payload, true); err != nil {
return coreerr.E("runChainSyncOnce", "write handshake", err)
return corelog.E("runChainSyncOnce", "write handshake", err)
}
packetHeader, packetData, err := p2pConn.ReadPacket()
if err != nil {
return coreerr.E("runChainSyncOnce", "read handshake", err)
return corelog.E("runChainSyncOnce", "read handshake", err)
}
if packetHeader.Command != uint32(p2p.CommandHandshake) {
return coreerr.E("runChainSyncOnce", fmt.Sprintf("unexpected command %d", packetHeader.Command), nil)
return corelog.E("runChainSyncOnce", fmt.Sprintf("unexpected command %d", packetHeader.Command), nil)
}
var handshakeResponse p2p.HandshakeResponse
if err := handshakeResponse.Decode(packetData); err != nil {
return coreerr.E("runChainSyncOnce", "decode handshake", err)
return corelog.E("runChainSyncOnce", "decode handshake", err)
}
if err := p2p.ValidateHandshakeResponse(&handshakeResponse, chainConfig.NetworkID, chainConfig.IsTestnet); err != nil {
return coreerr.E("runChainSyncOnce", "validate handshake", err)
return corelog.E("runChainSyncOnce", "validate handshake", err)
}
localSyncData := p2p.CoreSyncData{