fix(chain): resolve missing block blobs from daemon RPC
The Zano daemon's get_blocks_details RPC does not populate the blob field. This adds resolveBlockBlobs() which batch-fetches miner tx blobs via /gettransactions and reconstructs block wire blobs from the parsed header (object_in_json AGGREGATED section) and raw tx bytes. Also fixes regular tx processing to skip the miner tx entry that the daemon includes in transactions_details. Co-Authored-By: Charon <charon@lethean.io>
This commit is contained in:
parent
ad4139c012
commit
53311b14d1
1 changed files with 171 additions and 1 deletions
172
chain/sync.go
172
chain/sync.go
|
|
@ -9,8 +9,10 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"regexp"
|
||||
"strconv"
|
||||
|
||||
"forge.lthn.ai/core/go-blockchain/config"
|
||||
|
|
@ -74,6 +76,10 @@ func (c *Chain) Sync(ctx context.Context, client *rpc.Client, opts SyncOptions)
|
|||
return fmt.Errorf("sync: fetch blocks at %d: %w", localHeight, err)
|
||||
}
|
||||
|
||||
if err := resolveBlockBlobs(blocks, client); err != nil {
|
||||
return fmt.Errorf("sync: resolve blobs at %d: %w", localHeight, err)
|
||||
}
|
||||
|
||||
for _, bd := range blocks {
|
||||
if err := c.processBlock(bd, opts); err != nil {
|
||||
return fmt.Errorf("sync: process block %d: %w", bd.Height, err)
|
||||
|
|
@ -162,8 +168,17 @@ func (c *Chain) processBlock(bd rpc.BlockDetails, opts SyncOptions) error {
|
|||
return fmt.Errorf("store miner tx: %w", err)
|
||||
}
|
||||
|
||||
// Process regular transactions.
|
||||
// Build a set of the block's regular tx hashes for lookup.
|
||||
regularTxs := make(map[string]struct{}, len(blk.TxHashes))
|
||||
for _, h := range blk.TxHashes {
|
||||
regularTxs[h.String()] = struct{}{}
|
||||
}
|
||||
|
||||
// Process regular transactions (skip the miner tx).
|
||||
for _, txInfo := range bd.Transactions {
|
||||
if _, isRegular := regularTxs[txInfo.ID]; !isRegular {
|
||||
continue // skip miner tx entry
|
||||
}
|
||||
txBlob, err := hex.DecodeString(txInfo.Blob)
|
||||
if err != nil {
|
||||
return fmt.Errorf("decode tx hex %s: %w", txInfo.ID, err)
|
||||
|
|
@ -253,3 +268,158 @@ func (c *Chain) indexOutputs(txHash types.Hash, tx *types.Transaction) ([]uint64
|
|||
}
|
||||
return gindexes, nil
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Block blob reconstruction
|
||||
// ---------------------------------------------------------------------------
|
||||
// The Zano daemon's get_blocks_details RPC does not populate the "blob" field.
|
||||
// To process blocks through the normal wire decoder we reconstruct the blob
|
||||
// from the header fields (parsed from object_in_json) and the miner tx blob
|
||||
// fetched via /gettransactions.
|
||||
|
||||
// resolveBlockBlobs fills in missing Blob fields for BlockDetails and TxInfo
|
||||
// entries. The Zano daemon's get_blocks_details RPC does not populate blob
|
||||
// fields, so we batch-fetch all tx blobs via /gettransactions and reconstruct
|
||||
// each block's wire blob from the parsed header and raw miner tx bytes.
|
||||
func resolveBlockBlobs(blocks []rpc.BlockDetails, client *rpc.Client) error {
|
||||
// Collect all tx hashes that need blobs (miner txs + regular txs).
|
||||
var allHashes []string
|
||||
hashSet := map[string]struct{}{}
|
||||
for i := range blocks {
|
||||
if blocks[i].Blob != "" {
|
||||
continue // block already has blob
|
||||
}
|
||||
for _, tx := range blocks[i].Transactions {
|
||||
if tx.Blob != "" {
|
||||
continue
|
||||
}
|
||||
if _, seen := hashSet[tx.ID]; !seen {
|
||||
allHashes = append(allHashes, tx.ID)
|
||||
hashSet[tx.ID] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(allHashes) == 0 {
|
||||
return nil // all blobs present
|
||||
}
|
||||
|
||||
// Batch-fetch tx blobs.
|
||||
txHexes, missed, err := client.GetTransactions(allHashes)
|
||||
if err != nil {
|
||||
return fmt.Errorf("fetch tx blobs: %w", err)
|
||||
}
|
||||
if len(missed) > 0 {
|
||||
return fmt.Errorf("daemon missed %d tx(es): %v", len(missed), missed)
|
||||
}
|
||||
if len(txHexes) != len(allHashes) {
|
||||
return fmt.Errorf("expected %d tx blobs, got %d", len(allHashes), len(txHexes))
|
||||
}
|
||||
|
||||
// Index fetched blobs by hash.
|
||||
blobByHash := make(map[string]string, len(allHashes))
|
||||
for j, h := range allHashes {
|
||||
blobByHash[h] = txHexes[j]
|
||||
}
|
||||
|
||||
// Fill in tx blobs and reconstruct block blobs.
|
||||
for i := range blocks {
|
||||
if blocks[i].Blob != "" {
|
||||
continue
|
||||
}
|
||||
bd := &blocks[i]
|
||||
|
||||
// Fill in regular tx blobs.
|
||||
for j := range bd.Transactions {
|
||||
if bd.Transactions[j].Blob == "" {
|
||||
if hex, ok := blobByHash[bd.Transactions[j].ID]; ok {
|
||||
bd.Transactions[j].Blob = hex
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Parse header from object_in_json.
|
||||
hdr, err := parseBlockHeader(bd.ObjectInJSON)
|
||||
if err != nil {
|
||||
return fmt.Errorf("block %d: parse header: %w", bd.Height, err)
|
||||
}
|
||||
|
||||
// Miner tx blob is transactions_details[0].
|
||||
if len(bd.Transactions) == 0 {
|
||||
return fmt.Errorf("block %d has no transactions_details", bd.Height)
|
||||
}
|
||||
minerTxBlob, err := hex.DecodeString(bd.Transactions[0].Blob)
|
||||
if err != nil {
|
||||
return fmt.Errorf("block %d: decode miner tx hex: %w", bd.Height, err)
|
||||
}
|
||||
|
||||
// Collect regular tx hashes.
|
||||
var txHashes []types.Hash
|
||||
for _, txInfo := range bd.Transactions[1:] {
|
||||
h, err := types.HashFromHex(txInfo.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("block %d: parse tx hash %s: %w", bd.Height, txInfo.ID, err)
|
||||
}
|
||||
txHashes = append(txHashes, h)
|
||||
}
|
||||
|
||||
blob := buildBlockBlob(hdr, minerTxBlob, txHashes)
|
||||
bd.Blob = hex.EncodeToString(blob)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// blockHeaderJSON matches the AGGREGATED section of object_in_json.
|
||||
type blockHeaderJSON struct {
|
||||
MajorVersion uint8 `json:"major_version"`
|
||||
Nonce uint64 `json:"nonce"`
|
||||
PrevID string `json:"prev_id"`
|
||||
MinorVersion uint64 `json:"minor_version"`
|
||||
Timestamp uint64 `json:"timestamp"`
|
||||
Flags uint8 `json:"flags"`
|
||||
}
|
||||
|
||||
// aggregatedRE extracts the first AGGREGATED JSON object from object_in_json.
|
||||
var aggregatedRE = regexp.MustCompile(`"AGGREGATED"\s*:\s*\{([^}]+)\}`)
|
||||
|
||||
// parseBlockHeader extracts block header fields from the daemon's
|
||||
// object_in_json string. The Zano daemon serialises blocks using an
|
||||
// AGGREGATED wrapper that contains the header fields as JSON.
|
||||
func parseBlockHeader(objectInJSON string) (*types.BlockHeader, error) {
|
||||
m := aggregatedRE.FindStringSubmatch(objectInJSON)
|
||||
if m == nil {
|
||||
return nil, fmt.Errorf("AGGREGATED section not found in object_in_json")
|
||||
}
|
||||
|
||||
var hj blockHeaderJSON
|
||||
if err := json.Unmarshal([]byte("{"+m[1]+"}"), &hj); err != nil {
|
||||
return nil, fmt.Errorf("unmarshal AGGREGATED: %w", err)
|
||||
}
|
||||
|
||||
prevID, err := types.HashFromHex(hj.PrevID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse prev_id: %w", err)
|
||||
}
|
||||
|
||||
return &types.BlockHeader{
|
||||
MajorVersion: hj.MajorVersion,
|
||||
Nonce: hj.Nonce,
|
||||
PrevID: prevID,
|
||||
MinorVersion: hj.MinorVersion,
|
||||
Timestamp: hj.Timestamp,
|
||||
Flags: hj.Flags,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// buildBlockBlob constructs the consensus wire blob for a block from its
|
||||
// header, raw miner tx bytes, and regular transaction hashes.
|
||||
func buildBlockBlob(hdr *types.BlockHeader, minerTxBlob []byte, txHashes []types.Hash) []byte {
|
||||
var buf bytes.Buffer
|
||||
enc := wire.NewEncoder(&buf)
|
||||
wire.EncodeBlockHeader(enc, hdr)
|
||||
buf.Write(minerTxBlob)
|
||||
enc.WriteVarint(uint64(len(txHashes)))
|
||||
for i := range txHashes {
|
||||
enc.WriteBlob32((*[32]byte)(&txHashes[i]))
|
||||
}
|
||||
return buf.Bytes()
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue