feat(chain): add context cancellation and progress logging to Sync
Sync() now accepts context.Context for graceful shutdown. Logs progress every 100 blocks. Co-Authored-By: Charon <charon@lethean.io>
This commit is contained in:
parent
4bac1f6c04
commit
c066f4ac92
3 changed files with 25 additions and 11 deletions
|
|
@ -8,6 +8,7 @@
|
|||
package chain
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"testing"
|
||||
"time"
|
||||
|
|
@ -49,7 +50,7 @@ func TestIntegration_SyncFirst10Blocks(t *testing.T) {
|
|||
if h >= targetHeight {
|
||||
break
|
||||
}
|
||||
if err := c.Sync(client, DefaultSyncOptions()); err != nil {
|
||||
if err := c.Sync(context.Background(), client, DefaultSyncOptions()); err != nil {
|
||||
t.Fatalf("Sync: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,8 +7,10 @@ package chain
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"log"
|
||||
"strconv"
|
||||
|
||||
"forge.lthn.ai/core/go-blockchain/config"
|
||||
|
|
@ -43,7 +45,7 @@ func DefaultSyncOptions() SyncOptions {
|
|||
|
||||
// Sync fetches blocks from the daemon and stores them locally.
|
||||
// It is a blocking function — the caller controls retry and scheduling.
|
||||
func (c *Chain) Sync(client *rpc.Client, opts SyncOptions) error {
|
||||
func (c *Chain) Sync(ctx context.Context, client *rpc.Client, opts SyncOptions) error {
|
||||
localHeight, err := c.Height()
|
||||
if err != nil {
|
||||
return fmt.Errorf("sync: get local height: %w", err)
|
||||
|
|
@ -55,6 +57,12 @@ func (c *Chain) Sync(client *rpc.Client, opts SyncOptions) error {
|
|||
}
|
||||
|
||||
for localHeight < remoteHeight {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
remaining := remoteHeight - localHeight
|
||||
batch := uint64(syncBatchSize)
|
||||
if remaining < batch {
|
||||
|
|
@ -82,6 +90,10 @@ func (c *Chain) Sync(client *rpc.Client, opts SyncOptions) error {
|
|||
}
|
||||
|
||||
func (c *Chain) processBlock(bd rpc.BlockDetails, opts SyncOptions) error {
|
||||
if bd.Height > 0 && bd.Height%100 == 0 {
|
||||
log.Printf("sync: processing block %d", bd.Height)
|
||||
}
|
||||
|
||||
// Decode block blob.
|
||||
blockBlob, err := hex.DecodeString(bd.Blob)
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ package chain
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
|
|
@ -116,7 +117,7 @@ func TestSync_Good_SingleBlock(t *testing.T) {
|
|||
|
||||
client := rpc.NewClient(srv.URL)
|
||||
|
||||
err := c.Sync(client, DefaultSyncOptions())
|
||||
err := c.Sync(context.Background(), client, DefaultSyncOptions())
|
||||
if err != nil {
|
||||
t.Fatalf("Sync: %v", err)
|
||||
}
|
||||
|
|
@ -282,7 +283,7 @@ func TestSync_Good_TwoBlocks_WithRegularTx(t *testing.T) {
|
|||
|
||||
client := rpc.NewClient(srv.URL)
|
||||
|
||||
err := c.Sync(client, DefaultSyncOptions())
|
||||
err := c.Sync(context.Background(), client, DefaultSyncOptions())
|
||||
if err != nil {
|
||||
t.Fatalf("Sync: %v", err)
|
||||
}
|
||||
|
|
@ -387,7 +388,7 @@ func TestSync_Good_AlreadySynced(t *testing.T) {
|
|||
c := New(s)
|
||||
|
||||
client := rpc.NewClient(srv.URL)
|
||||
err := c.Sync(client, DefaultSyncOptions())
|
||||
err := c.Sync(context.Background(), client, DefaultSyncOptions())
|
||||
if err != nil {
|
||||
t.Fatalf("Sync on empty: %v", err)
|
||||
}
|
||||
|
|
@ -410,7 +411,7 @@ func TestSync_Bad_GetHeightError(t *testing.T) {
|
|||
c := New(s)
|
||||
|
||||
client := rpc.NewClient(srv.URL)
|
||||
err := c.Sync(client, DefaultSyncOptions())
|
||||
err := c.Sync(context.Background(), client, DefaultSyncOptions())
|
||||
if err == nil {
|
||||
t.Fatal("Sync: expected error from bad getheight, got nil")
|
||||
}
|
||||
|
|
@ -446,7 +447,7 @@ func TestSync_Bad_FetchBlocksError(t *testing.T) {
|
|||
c := New(s)
|
||||
|
||||
client := rpc.NewClient(srv.URL)
|
||||
err := c.Sync(client, DefaultSyncOptions())
|
||||
err := c.Sync(context.Background(), client, DefaultSyncOptions())
|
||||
if err == nil {
|
||||
t.Fatal("Sync: expected error from bad get_blocks_details, got nil")
|
||||
}
|
||||
|
|
@ -505,7 +506,7 @@ func TestSync_Bad_GenesisHashMismatch(t *testing.T) {
|
|||
c := New(s)
|
||||
|
||||
client := rpc.NewClient(srv.URL)
|
||||
err := c.Sync(client, DefaultSyncOptions())
|
||||
err := c.Sync(context.Background(), client, DefaultSyncOptions())
|
||||
if err == nil {
|
||||
t.Fatal("Sync: expected genesis hash mismatch error, got nil")
|
||||
}
|
||||
|
|
@ -567,7 +568,7 @@ func TestSync_Bad_BlockHashMismatch(t *testing.T) {
|
|||
c := New(s)
|
||||
|
||||
client := rpc.NewClient(srv.URL)
|
||||
err := c.Sync(client, DefaultSyncOptions())
|
||||
err := c.Sync(context.Background(), client, DefaultSyncOptions())
|
||||
if err == nil {
|
||||
t.Fatal("Sync: expected block hash mismatch error, got nil")
|
||||
}
|
||||
|
|
@ -668,7 +669,7 @@ func TestSync_Bad_InvalidRegularTxBlob(t *testing.T) {
|
|||
c := New(s)
|
||||
|
||||
client := rpc.NewClient(srv.URL)
|
||||
err := c.Sync(client, DefaultSyncOptions())
|
||||
err := c.Sync(context.Background(), client, DefaultSyncOptions())
|
||||
if err == nil {
|
||||
t.Fatal("Sync: expected error from invalid tx blob, got nil")
|
||||
}
|
||||
|
|
@ -728,7 +729,7 @@ func TestSync_Bad_InvalidBlockBlob(t *testing.T) {
|
|||
c := New(s)
|
||||
|
||||
client := rpc.NewClient(srv.URL)
|
||||
err := c.Sync(client, DefaultSyncOptions())
|
||||
err := c.Sync(context.Background(), client, DefaultSyncOptions())
|
||||
if err == nil {
|
||||
t.Fatal("Sync: expected error from invalid block blob, got nil")
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue