refactor(node): tighten AX comments across public APIs

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-03-31 14:04:41 +00:00 committed by Snider
parent 96d83a4b21
commit a1d9b08baf
11 changed files with 156 additions and 388 deletions

View file

@ -14,9 +14,7 @@ import (
"forge.lthn.ai/Snider/Borg/pkg/tim"
)
// BundleType defines the type of deployment bundle.
//
// bundleType := BundleProfile
// bundleType := BundleProfile
type BundleType string
const (
@ -28,9 +26,7 @@ const (
BundleFull BundleType = "full"
)
// Bundle represents a deployment bundle for P2P transfer.
//
// bundle := &Bundle{Type: BundleProfile, Name: "xmrig", Data: []byte("{}")}
// bundle := &Bundle{Type: BundleProfile, Name: "xmrig", Data: []byte("{}")}
type Bundle struct {
Type BundleType `json:"type"`
Name string `json:"name"`
@ -38,9 +34,7 @@ type Bundle struct {
Checksum string `json:"checksum"` // SHA-256 of Data
}
// BundleManifest describes the contents of a bundle.
//
// manifest := BundleManifest{Name: "xmrig", Type: BundleMiner}
// manifest := BundleManifest{Name: "xmrig", Type: BundleMiner}
type BundleManifest struct {
Type BundleType `json:"type"`
Name string `json:"name"`
@ -50,9 +44,7 @@ type BundleManifest struct {
CreatedAt string `json:"createdAt"`
}
// CreateProfileBundle creates an encrypted bundle containing a mining profile.
//
// bundle, err := CreateProfileBundle(profileJSON, "xmrig-default", "password")
// bundle, err := CreateProfileBundle(profileJSON, "xmrig-default", "password")
func CreateProfileBundle(profileJSON []byte, name string, password string) (*Bundle, error) {
// Create a TIM with just the profile config
timBundle, err := tim.New()
@ -78,9 +70,7 @@ func CreateProfileBundle(profileJSON []byte, name string, password string) (*Bun
}, nil
}
// CreateProfileBundleUnencrypted creates a plain JSON bundle (for testing or trusted networks).
//
// bundle, err := CreateProfileBundleUnencrypted(profileJSON, "xmrig-default")
// bundle, err := CreateProfileBundleUnencrypted(profileJSON, "xmrig-default")
func CreateProfileBundleUnencrypted(profileJSON []byte, name string) (*Bundle, error) {
checksum := calculateChecksum(profileJSON)
@ -92,9 +82,7 @@ func CreateProfileBundleUnencrypted(profileJSON []byte, name string) (*Bundle, e
}, nil
}
// CreateMinerBundle creates an encrypted bundle containing a miner binary and optional profile.
//
// bundle, err := CreateMinerBundle("/srv/miners/xmrig", profileJSON, "xmrig", "password")
// bundle, err := CreateMinerBundle("/srv/miners/xmrig", profileJSON, "xmrig", "password")
func CreateMinerBundle(minerPath string, profileJSON []byte, name string, password string) (*Bundle, error) {
// Read miner binary
minerContent, err := filesystemRead(minerPath)
@ -144,9 +132,7 @@ func CreateMinerBundle(minerPath string, profileJSON []byte, name string, passwo
}, nil
}
// ExtractProfileBundle decrypts and extracts a profile bundle.
//
// profileJSON, err := ExtractProfileBundle(bundle, "password")
// profileJSON, err := ExtractProfileBundle(bundle, "password")
func ExtractProfileBundle(bundle *Bundle, password string) ([]byte, error) {
// Verify checksum first
if calculateChecksum(bundle.Data) != bundle.Checksum {
@ -167,9 +153,7 @@ func ExtractProfileBundle(bundle *Bundle, password string) ([]byte, error) {
return timBundle.Config, nil
}
// ExtractMinerBundle decrypts and extracts a miner bundle, returning the miner path and profile.
//
// minerPath, profileJSON, err := ExtractMinerBundle(bundle, "password", "/srv/miners")
// minerPath, profileJSON, err := ExtractMinerBundle(bundle, "password", "/srv/miners")
func ExtractMinerBundle(bundle *Bundle, password string, destDir string) (string, []byte, error) {
// Verify checksum
if calculateChecksum(bundle.Data) != bundle.Checksum {
@ -197,9 +181,7 @@ func ExtractMinerBundle(bundle *Bundle, password string, destDir string) (string
return minerPath, timBundle.Config, nil
}
// VerifyBundle checks if a bundle's checksum is valid.
//
// ok := VerifyBundle(bundle)
// ok := VerifyBundle(bundle)
func VerifyBundle(bundle *Bundle) bool {
return calculateChecksum(bundle.Data) == bundle.Checksum
}

View file

@ -21,9 +21,7 @@ type Controller struct {
pendingRequests map[string]chan *Message // message ID -> response channel
}
// NewController wires a controller to a node manager, peer registry, and transport.
//
// controller := NewController(nodeManager, peerRegistry, transport)
// controller := NewController(nodeManager, peerRegistry, transport)
func NewController(nodeManager *NodeManager, peerRegistry *PeerRegistry, transport *Transport) *Controller {
c := &Controller{
nodeManager: nodeManager,
@ -117,9 +115,7 @@ func (c *Controller) sendRequest(peerID string, message *Message, timeout time.D
}
}
// GetRemoteStats requests miner statistics from a remote peer.
//
// stats, err := controller.GetRemoteStats("worker-1")
// stats, err := controller.GetRemoteStats("worker-1")
func (c *Controller) GetRemoteStats(peerID string) (*StatsPayload, error) {
identity := c.nodeManager.GetIdentity()
if identity == nil {
@ -144,9 +140,7 @@ func (c *Controller) GetRemoteStats(peerID string) (*StatsPayload, error) {
return &stats, nil
}
// StartRemoteMiner requests a remote peer to start a miner with a given profile.
//
// err := controller.StartRemoteMiner("worker-1", "xmrig", "profile-1", nil)
// err := controller.StartRemoteMiner("worker-1", "xmrig", "profile-1", nil)
func (c *Controller) StartRemoteMiner(peerID, minerType, profileID string, configOverride RawMessage) error {
identity := c.nodeManager.GetIdentity()
if identity == nil {
@ -185,9 +179,7 @@ func (c *Controller) StartRemoteMiner(peerID, minerType, profileID string, confi
return nil
}
// StopRemoteMiner requests a remote peer to stop a miner.
//
// err := controller.StopRemoteMiner("worker-1", "xmrig-0")
// err := controller.StopRemoteMiner("worker-1", "xmrig-0")
func (c *Controller) StopRemoteMiner(peerID, minerName string) error {
identity := c.nodeManager.GetIdentity()
if identity == nil {
@ -220,9 +212,7 @@ func (c *Controller) StopRemoteMiner(peerID, minerName string) error {
return nil
}
// GetRemoteLogs requests console logs from a remote miner.
//
// logs, err := controller.GetRemoteLogs("worker-1", "xmrig-0", 100)
// logs, err := controller.GetRemoteLogs("worker-1", "xmrig-0", 100)
func (c *Controller) GetRemoteLogs(peerID, minerName string, lines int) ([]string, error) {
identity := c.nodeManager.GetIdentity()
if identity == nil {
@ -252,9 +242,7 @@ func (c *Controller) GetRemoteLogs(peerID, minerName string, lines int) ([]strin
return logs.Lines, nil
}
// GetAllStats fetches stats from all connected peers.
//
// statsByPeerID := controller.GetAllStats()
// statsByPeerID := controller.GetAllStats()
func (c *Controller) GetAllStats() map[string]*StatsPayload {
results := make(map[string]*StatsPayload)
var mu sync.Mutex
@ -283,9 +271,7 @@ func (c *Controller) GetAllStats() map[string]*StatsPayload {
return results
}
// PingPeer sends a ping to a peer and refreshes that peer's metrics.
//
// rttMilliseconds, err := controller.PingPeer("worker-1")
// rttMilliseconds, err := controller.PingPeer("worker-1")
func (c *Controller) PingPeer(peerID string) (float64, error) {
identity := c.nodeManager.GetIdentity()
if identity == nil {
@ -323,9 +309,7 @@ func (c *Controller) PingPeer(peerID string) (float64, error) {
return rtt, nil
}
// ConnectToPeer opens a transport connection to a peer.
//
// err := controller.ConnectToPeer("worker-1")
// err := controller.ConnectToPeer("worker-1")
func (c *Controller) ConnectToPeer(peerID string) error {
peer := c.peerRegistry.GetPeer(peerID)
if peer == nil {
@ -336,9 +320,7 @@ func (c *Controller) ConnectToPeer(peerID string) error {
return err
}
// DisconnectFromPeer closes the active connection to a peer.
//
// err := controller.DisconnectFromPeer("worker-1")
// err := controller.DisconnectFromPeer("worker-1")
func (c *Controller) DisconnectFromPeer(peerID string) error {
conn := c.transport.GetConnection(peerID)
if conn == nil {

View file

@ -22,10 +22,7 @@ const (
IntentCustom byte = 0xFF // Extended / application-level sub-protocols
)
// IntentHandler processes a UEPS packet that has been routed by intent.
// Implementations receive the fully parsed and HMAC-verified packet.
//
// var handler IntentHandler = func(packet *ueps.ParsedPacket) error { return nil }
// var handler IntentHandler = func(packet *ueps.ParsedPacket) error { return nil }
type IntentHandler func(packet *ueps.ParsedPacket) error
// dispatcher := NewDispatcher()
@ -58,10 +55,10 @@ func (d *Dispatcher) RegisterHandler(intentID byte, handler IntentHandler) {
})
}
// for intentID, handler := range dispatcher.Handlers() {
// _ = intentID
// _ = handler
// }
// for intentID, handler := range dispatcher.Handlers() {
// _ = intentID
// _ = handler
// }
func (d *Dispatcher) Handlers() iter.Seq2[byte, IntentHandler] {
return func(yield func(byte, IntentHandler) bool) {
d.mu.RLock()

View file

@ -16,12 +16,10 @@ import (
"github.com/adrg/xdg"
)
// ChallengeSize is the size of the challenge in bytes
// challenge := make([]byte, ChallengeSize)
const ChallengeSize = 32
// GenerateChallenge creates a random challenge for authentication.
//
// challenge, err := GenerateChallenge()
// challenge, err := GenerateChallenge()
func GenerateChallenge() ([]byte, error) {
challenge := make([]byte, ChallengeSize)
if _, err := rand.Read(challenge); err != nil {
@ -30,27 +28,20 @@ func GenerateChallenge() ([]byte, error) {
return challenge, nil
}
// SignChallenge creates an HMAC signature of a challenge using a shared secret.
// The signature proves possession of the shared secret without revealing it.
//
// signature := SignChallenge(challenge, sharedSecret)
// signature := SignChallenge(challenge, sharedSecret)
func SignChallenge(challenge []byte, sharedSecret []byte) []byte {
mac := hmac.New(sha256.New, sharedSecret)
mac.Write(challenge)
return mac.Sum(nil)
}
// VerifyChallenge verifies that a challenge response was signed with the correct shared secret.
//
// ok := VerifyChallenge(challenge, signature, sharedSecret)
// ok := VerifyChallenge(challenge, signature, sharedSecret)
func VerifyChallenge(challenge, response, sharedSecret []byte) bool {
expected := SignChallenge(challenge, sharedSecret)
return hmac.Equal(response, expected)
}
// NodeRole defines the operational mode of a node.
//
// role := RoleWorker
// role := RoleWorker
type NodeRole string
const (
@ -62,9 +53,7 @@ const (
RoleDual NodeRole = "dual"
)
// NodeIdentity represents the public identity of a node.
//
// identity := NodeIdentity{Name: "worker-1", Role: RoleWorker}
// identity := NodeIdentity{Name: "worker-1", Role: RoleWorker}
type NodeIdentity struct {
ID string `json:"id"` // Derived from public key (first 16 bytes hex)
Name string `json:"name"` // Human-friendly name
@ -83,9 +72,7 @@ type NodeManager struct {
mu sync.RWMutex
}
// NewNodeManager loads the default node identity store.
//
// nodeManager, err := NewNodeManager()
// nodeManager, err := NewNodeManager()
func NewNodeManager() (*NodeManager, error) {
keyPath, err := xdg.DataFile("lethean-desktop/node/private.key")
if err != nil {
@ -100,12 +87,10 @@ func NewNodeManager() (*NodeManager, error) {
return NewNodeManagerFromPaths(keyPath, configPath)
}
// NewNodeManagerFromPaths loads or creates a node identity store at explicit paths.
//
// Missing files are treated as a fresh install; malformed or partial identity
// state is returned as an error so callers can handle it explicitly.
//
// nodeManager, err := NewNodeManagerFromPaths("/srv/p2p/private.key", "/srv/p2p/node.json")
// nodeManager, err := NewNodeManagerFromPaths("/srv/p2p/private.key", "/srv/p2p/node.json")
func NewNodeManagerFromPaths(keyPath, configPath string) (*NodeManager, error) {
nm := &NodeManager{
keyPath: keyPath,
@ -131,9 +116,7 @@ func (n *NodeManager) HasIdentity() bool {
return n.identity != nil
}
// GetIdentity returns a copy of the loaded node identity.
//
// identity := nodeManager.GetIdentity()
// identity := nodeManager.GetIdentity()
func (n *NodeManager) GetIdentity() *NodeIdentity {
n.mu.RLock()
defer n.mu.RUnlock()
@ -145,9 +128,7 @@ func (n *NodeManager) GetIdentity() *NodeIdentity {
return &identity
}
// GenerateIdentity writes a new node identity for the given name and role.
//
// err := nodeManager.GenerateIdentity("worker-1", RoleWorker)
// err := nodeManager.GenerateIdentity("worker-1", RoleWorker)
func (n *NodeManager) GenerateIdentity(name string, role NodeRole) error {
n.mu.Lock()
defer n.mu.Unlock()
@ -187,9 +168,7 @@ func (n *NodeManager) GenerateIdentity(name string, role NodeRole) error {
return nil
}
// DeriveSharedSecret hashes an X25519 shared secret for use as a symmetric key.
//
// sharedSecret, err := nodeManager.DeriveSharedSecret(peer.PublicKey)
// sharedSecret, err := nodeManager.DeriveSharedSecret(peer.PublicKey)
func (n *NodeManager) DeriveSharedSecret(peerPubKeyBase64 string) ([]byte, error) {
n.mu.RLock()
defer n.mu.RUnlock()
@ -292,9 +271,7 @@ func (n *NodeManager) loadIdentity() error {
return nil
}
// Delete removes the node identity and key files from disk.
//
// err := nodeManager.Delete()
// err := nodeManager.Delete()
func (n *NodeManager) Delete() error {
n.mu.Lock()
defer n.mu.Unlock()

View file

@ -47,9 +47,7 @@ var (
ErrorPayloadTooBig = core.E("levin", "payload exceeds maximum size", nil)
)
// Header is the 33-byte packed header that prefixes every Levin message.
//
// header := Header{Command: CommandHandshake, ExpectResponse: true}
// header := Header{Command: CommandHandshake, ExpectResponse: true}
type Header struct {
Signature uint64
PayloadSize uint64
@ -60,9 +58,7 @@ type Header struct {
ProtocolVersion uint32
}
// EncodeHeader serialises header into a fixed-size 33-byte array (little-endian).
//
// encoded := EncodeHeader(header)
// encoded := EncodeHeader(header)
func EncodeHeader(header *Header) [HeaderSize]byte {
var headerBytes [HeaderSize]byte
binary.LittleEndian.PutUint64(headerBytes[0:8], header.Signature)
@ -79,10 +75,7 @@ func EncodeHeader(header *Header) [HeaderSize]byte {
return headerBytes
}
// DecodeHeader deserialises a 33-byte array into a Header, validating
// the magic signature.
//
// header, err := DecodeHeader(headerBytes)
// header, err := DecodeHeader(headerBytes)
func DecodeHeader(headerBytes [HeaderSize]byte) (Header, error) {
var header Header
header.Signature = binary.LittleEndian.Uint64(headerBytes[0:8])

View file

@ -48,16 +48,10 @@ var (
ErrorStorageUnknownType = core.E("levin.storage", "unknown type tag", nil)
)
// Section is an ordered map of named values forming a portable storage section.
// Field iteration order is always alphabetical by key for deterministic encoding.
//
// section := Section{"id": StringValue([]byte("peer-1"))}
// section := Section{"id": StringValue([]byte("peer-1"))}
type Section map[string]Value
// Value holds a typed portable storage value. Use the constructor functions
// (Uint64Value, StringValue, ObjectValue, etc.) to create instances.
//
// value := StringValue([]byte("peer-1"))
// value := StringValue([]byte("peer-1"))
type Value struct {
Type uint8
@ -80,94 +74,62 @@ type Value struct {
// Scalar constructors
// ---------------------------------------------------------------------------
// Uint64Value creates a Value of TypeUint64.
//
// value := Uint64Value(42)
// value := Uint64Value(42)
func Uint64Value(value uint64) Value { return Value{Type: TypeUint64, uintVal: value} }
// Uint32Value creates a Value of TypeUint32.
//
// value := Uint32Value(42)
// value := Uint32Value(42)
func Uint32Value(value uint32) Value { return Value{Type: TypeUint32, uintVal: uint64(value)} }
// Uint16Value creates a Value of TypeUint16.
//
// value := Uint16Value(42)
// value := Uint16Value(42)
func Uint16Value(value uint16) Value { return Value{Type: TypeUint16, uintVal: uint64(value)} }
// Uint8Value creates a Value of TypeUint8.
//
// value := Uint8Value(42)
// value := Uint8Value(42)
func Uint8Value(value uint8) Value { return Value{Type: TypeUint8, uintVal: uint64(value)} }
// Int64Value creates a Value of TypeInt64.
//
// value := Int64Value(42)
// value := Int64Value(42)
func Int64Value(value int64) Value { return Value{Type: TypeInt64, intVal: value} }
// Int32Value creates a Value of TypeInt32.
//
// value := Int32Value(42)
// value := Int32Value(42)
func Int32Value(value int32) Value { return Value{Type: TypeInt32, intVal: int64(value)} }
// Int16Value creates a Value of TypeInt16.
//
// value := Int16Value(42)
// value := Int16Value(42)
func Int16Value(value int16) Value { return Value{Type: TypeInt16, intVal: int64(value)} }
// Int8Value creates a Value of TypeInt8.
//
// value := Int8Value(42)
// value := Int8Value(42)
func Int8Value(value int8) Value { return Value{Type: TypeInt8, intVal: int64(value)} }
// BoolValue creates a Value of TypeBool.
//
// value := BoolValue(true)
// value := BoolValue(true)
func BoolValue(value bool) Value { return Value{Type: TypeBool, boolVal: value} }
// DoubleValue creates a Value of TypeDouble.
//
// value := DoubleValue(3.14)
// value := DoubleValue(3.14)
func DoubleValue(value float64) Value { return Value{Type: TypeDouble, floatVal: value} }
// StringValue creates a Value of TypeString. The slice is not copied.
//
// value := StringValue([]byte("hello"))
// value := StringValue([]byte("hello"))
func StringValue(value []byte) Value { return Value{Type: TypeString, bytesVal: value} }
// ObjectValue creates a Value of TypeObject wrapping a nested Section.
//
// value := ObjectValue(Section{"id": StringValue([]byte("peer-1"))})
// value := ObjectValue(Section{"id": StringValue([]byte("peer-1"))})
func ObjectValue(section Section) Value { return Value{Type: TypeObject, objectVal: section} }
// ---------------------------------------------------------------------------
// Array constructors
// ---------------------------------------------------------------------------
// Uint64ArrayValue creates a typed array of uint64 values.
//
// value := Uint64ArrayValue([]uint64{1, 2, 3})
// value := Uint64ArrayValue([]uint64{1, 2, 3})
func Uint64ArrayValue(values []uint64) Value {
return Value{Type: ArrayFlag | TypeUint64, uint64Array: values}
}
// Uint32ArrayValue creates a typed array of uint32 values.
//
// value := Uint32ArrayValue([]uint32{1, 2, 3})
// value := Uint32ArrayValue([]uint32{1, 2, 3})
func Uint32ArrayValue(values []uint32) Value {
return Value{Type: ArrayFlag | TypeUint32, uint32Array: values}
}
// StringArrayValue creates a typed array of byte-string values.
//
// value := StringArrayValue([][]byte{[]byte("a"), []byte("b")})
// value := StringArrayValue([][]byte{[]byte("a"), []byte("b")})
func StringArrayValue(values [][]byte) Value {
return Value{Type: ArrayFlag | TypeString, stringArray: values}
}
// ObjectArrayValue creates a typed array of Section values.
//
// value := ObjectArrayValue([]Section{{"id": StringValue([]byte("peer-1"))}})
// value := ObjectArrayValue([]Section{{"id": StringValue([]byte("peer-1"))}})
func ObjectArrayValue(values []Section) Value {
return Value{Type: ArrayFlag | TypeObject, objectArray: values}
}
@ -176,7 +138,7 @@ func ObjectArrayValue(values []Section) Value {
// Scalar accessors
// ---------------------------------------------------------------------------
// AsUint64 returns the uint64 value or an error on type mismatch.
// value, err := Uint64Value(42).AsUint64()
func (v Value) AsUint64() (uint64, error) {
if v.Type != TypeUint64 {
return 0, ErrorStorageTypeMismatch
@ -184,7 +146,7 @@ func (v Value) AsUint64() (uint64, error) {
return v.uintVal, nil
}
// AsUint32 returns the uint32 value or an error on type mismatch.
// value, err := Uint32Value(42).AsUint32()
func (v Value) AsUint32() (uint32, error) {
if v.Type != TypeUint32 {
return 0, ErrorStorageTypeMismatch
@ -192,7 +154,7 @@ func (v Value) AsUint32() (uint32, error) {
return uint32(v.uintVal), nil
}
// AsUint16 returns the uint16 value or an error on type mismatch.
// value, err := Uint16Value(42).AsUint16()
func (v Value) AsUint16() (uint16, error) {
if v.Type != TypeUint16 {
return 0, ErrorStorageTypeMismatch
@ -200,7 +162,7 @@ func (v Value) AsUint16() (uint16, error) {
return uint16(v.uintVal), nil
}
// AsUint8 returns the uint8 value or an error on type mismatch.
// value, err := Uint8Value(42).AsUint8()
func (v Value) AsUint8() (uint8, error) {
if v.Type != TypeUint8 {
return 0, ErrorStorageTypeMismatch
@ -208,7 +170,7 @@ func (v Value) AsUint8() (uint8, error) {
return uint8(v.uintVal), nil
}
// AsInt64 returns the int64 value or an error on type mismatch.
// value, err := Int64Value(42).AsInt64()
func (v Value) AsInt64() (int64, error) {
if v.Type != TypeInt64 {
return 0, ErrorStorageTypeMismatch
@ -216,7 +178,7 @@ func (v Value) AsInt64() (int64, error) {
return v.intVal, nil
}
// AsInt32 returns the int32 value or an error on type mismatch.
// value, err := Int32Value(42).AsInt32()
func (v Value) AsInt32() (int32, error) {
if v.Type != TypeInt32 {
return 0, ErrorStorageTypeMismatch
@ -224,7 +186,7 @@ func (v Value) AsInt32() (int32, error) {
return int32(v.intVal), nil
}
// AsInt16 returns the int16 value or an error on type mismatch.
// value, err := Int16Value(42).AsInt16()
func (v Value) AsInt16() (int16, error) {
if v.Type != TypeInt16 {
return 0, ErrorStorageTypeMismatch
@ -232,7 +194,7 @@ func (v Value) AsInt16() (int16, error) {
return int16(v.intVal), nil
}
// AsInt8 returns the int8 value or an error on type mismatch.
// value, err := Int8Value(42).AsInt8()
func (v Value) AsInt8() (int8, error) {
if v.Type != TypeInt8 {
return 0, ErrorStorageTypeMismatch
@ -240,7 +202,7 @@ func (v Value) AsInt8() (int8, error) {
return int8(v.intVal), nil
}
// AsBool returns the bool value or an error on type mismatch.
// value, err := BoolValue(true).AsBool()
func (v Value) AsBool() (bool, error) {
if v.Type != TypeBool {
return false, ErrorStorageTypeMismatch
@ -248,7 +210,7 @@ func (v Value) AsBool() (bool, error) {
return v.boolVal, nil
}
// AsDouble returns the float64 value or an error on type mismatch.
// value, err := DoubleValue(3.14).AsDouble()
func (v Value) AsDouble() (float64, error) {
if v.Type != TypeDouble {
return 0, ErrorStorageTypeMismatch
@ -256,7 +218,7 @@ func (v Value) AsDouble() (float64, error) {
return v.floatVal, nil
}
// AsString returns the byte-string value or an error on type mismatch.
// value, err := StringValue([]byte("hello")).AsString()
func (v Value) AsString() ([]byte, error) {
if v.Type != TypeString {
return nil, ErrorStorageTypeMismatch
@ -264,7 +226,7 @@ func (v Value) AsString() ([]byte, error) {
return v.bytesVal, nil
}
// AsSection returns the nested Section or an error on type mismatch.
// section, err := ObjectValue(Section{"id": StringValue([]byte("peer-1"))}).AsSection()
func (v Value) AsSection() (Section, error) {
if v.Type != TypeObject {
return nil, ErrorStorageTypeMismatch
@ -276,7 +238,7 @@ func (v Value) AsSection() (Section, error) {
// Array accessors
// ---------------------------------------------------------------------------
// AsUint64Array returns the []uint64 array or an error on type mismatch.
// values, err := Uint64ArrayValue([]uint64{1, 2, 3}).AsUint64Array()
func (v Value) AsUint64Array() ([]uint64, error) {
if v.Type != (ArrayFlag | TypeUint64) {
return nil, ErrorStorageTypeMismatch
@ -284,7 +246,7 @@ func (v Value) AsUint64Array() ([]uint64, error) {
return v.uint64Array, nil
}
// AsUint32Array returns the []uint32 array or an error on type mismatch.
// values, err := Uint32ArrayValue([]uint32{1, 2, 3}).AsUint32Array()
func (v Value) AsUint32Array() ([]uint32, error) {
if v.Type != (ArrayFlag | TypeUint32) {
return nil, ErrorStorageTypeMismatch
@ -292,7 +254,7 @@ func (v Value) AsUint32Array() ([]uint32, error) {
return v.uint32Array, nil
}
// AsStringArray returns the [][]byte array or an error on type mismatch.
// values, err := StringArrayValue([][]byte{[]byte("a"), []byte("b")}).AsStringArray()
func (v Value) AsStringArray() ([][]byte, error) {
if v.Type != (ArrayFlag | TypeString) {
return nil, ErrorStorageTypeMismatch
@ -300,7 +262,7 @@ func (v Value) AsStringArray() ([][]byte, error) {
return v.stringArray, nil
}
// AsSectionArray returns the []Section array or an error on type mismatch.
// values, err := ObjectArrayValue([]Section{{"id": StringValue([]byte("peer-1"))}}).AsSectionArray()
func (v Value) AsSectionArray() ([]Section, error) {
if v.Type != (ArrayFlag | TypeObject) {
return nil, ErrorStorageTypeMismatch
@ -312,11 +274,7 @@ func (v Value) AsSectionArray() ([]Section, error) {
// Encoder
// ---------------------------------------------------------------------------
// EncodeStorage serialises a Section to the portable storage binary format,
// including the 9-byte header. Keys are sorted alphabetically to ensure
// deterministic output.
//
// data, err := EncodeStorage(section)
// data, err := EncodeStorage(section)
func EncodeStorage(section Section) ([]byte, error) {
buffer := make([]byte, 0, 256)
@ -486,10 +444,7 @@ func encodeArray(buf []byte, v Value) ([]byte, error) {
// Decoder
// ---------------------------------------------------------------------------
// DecodeStorage deserialises portable storage binary data (including the
// 9-byte header) into a Section.
//
// section, err := DecodeStorage(data)
// section, err := DecodeStorage(data)
func DecodeStorage(data []byte) (Section, error) {
if len(data) < StorageHeaderSize {
return nil, ErrorStorageTruncated

View file

@ -28,11 +28,7 @@ var ErrorVarintTruncated = core.E("levin", "truncated varint", nil)
// ErrorVarintOverflow is returned when the value is too large to encode.
var ErrorVarintOverflow = core.E("levin", "varint overflow", nil)
// PackVarint encodes value using the epee portable-storage varint scheme.
// The low two bits of the first byte indicate the total encoded width;
// the remaining bits carry the value in little-endian order.
//
// encoded := PackVarint(42)
// encoded := PackVarint(42)
func PackVarint(value uint64) []byte {
switch {
case value <= varintMax1:
@ -55,10 +51,7 @@ func PackVarint(value uint64) []byte {
}
}
// UnpackVarint decodes one epee portable-storage varint from buffer.
// It returns the decoded value, the number of bytes consumed, and any error.
//
// value, err := UnpackVarint(buffer)
// value, err := UnpackVarint(buffer)
func UnpackVarint(buffer []byte) (value uint64, bytesConsumed int, err error) {
if len(buffer) == 0 {
return 0, 0, ErrorVarintTruncated

View file

@ -22,14 +22,10 @@ const (
// versions := SupportedProtocolVersions
var SupportedProtocolVersions = []string{"1.0"}
// RawMessage stores an already-encoded JSON payload for deferred decoding.
//
// payload := RawMessage(`{"pool":"pool.example.com:3333"}`)
// payload := RawMessage(`{"pool":"pool.example.com:3333"}`)
type RawMessage []byte
// MarshalJSON preserves the raw JSON payload when the message is encoded.
//
// data, err := RawMessage(`{"ok":true}`).MarshalJSON()
// data, err := RawMessage(`{"ok":true}`).MarshalJSON()
func (m RawMessage) MarshalJSON() ([]byte, error) {
if m == nil {
return []byte("null"), nil
@ -38,10 +34,8 @@ func (m RawMessage) MarshalJSON() ([]byte, error) {
return m, nil
}
// UnmarshalJSON stores the raw JSON payload bytes without decoding them.
//
// var payload RawMessage
// _ = payload.UnmarshalJSON([]byte(`{"ok":true}`))
// var payload RawMessage
// _ = payload.UnmarshalJSON([]byte(`{"ok":true}`))
func (m *RawMessage) UnmarshalJSON(data []byte) error {
if m == nil {
return core.E("node.RawMessage.UnmarshalJSON", "raw message target is nil", nil)
@ -51,9 +45,7 @@ func (m *RawMessage) UnmarshalJSON(data []byte) error {
return nil
}
// IsProtocolVersionSupported checks if a given version is supported.
//
// ok := IsProtocolVersionSupported("1.0")
// ok := IsProtocolVersionSupported("1.0")
func IsProtocolVersionSupported(version string) bool {
return slices.Contains(SupportedProtocolVersions, version)
}
@ -90,9 +82,7 @@ const (
MessageError MessageType = "error"
)
// Message represents a P2P message between nodes.
//
// message, err := NewMessage(MessagePing, "controller", "worker", PingPayload{SentAt: time.Now().UnixMilli()})
// message, err := NewMessage(MessagePing, "controller", "worker", PingPayload{SentAt: time.Now().UnixMilli()})
type Message struct {
ID string `json:"id"` // UUID
Type MessageType `json:"type"`
@ -103,9 +93,7 @@ type Message struct {
ReplyTo string `json:"replyTo,omitempty"` // ID of message being replied to
}
// NewMessage builds a message with a generated ID and timestamp.
//
// message, err := NewMessage(MessagePing, "controller", "worker-1", PingPayload{SentAt: 42})
// message, err := NewMessage(MessagePing, "controller", "worker-1", PingPayload{SentAt: 42})
func NewMessage(messageType MessageType, from, to string, payload any) (*Message, error) {
var payloadBytes RawMessage
if payload != nil {
@ -151,18 +139,14 @@ func (m *Message) ParsePayload(target any) error {
// --- Payload Types ---
// HandshakePayload is sent during connection establishment.
//
// payload := HandshakePayload{Identity: NodeIdentity{Name: "worker-1"}, Version: ProtocolVersion}
// payload := HandshakePayload{Identity: NodeIdentity{Name: "worker-1"}, Version: ProtocolVersion}
type HandshakePayload struct {
Identity NodeIdentity `json:"identity"`
Challenge []byte `json:"challenge,omitempty"` // Random bytes for auth
Version string `json:"version"` // Protocol version
}
// HandshakeAckPayload is the response to a handshake.
//
// ack := HandshakeAckPayload{Accepted: true}
// ack := HandshakeAckPayload{Accepted: true}
type HandshakeAckPayload struct {
Identity NodeIdentity `json:"identity"`
ChallengeResponse []byte `json:"challengeResponse,omitempty"`
@ -170,49 +154,37 @@ type HandshakeAckPayload struct {
Reason string `json:"reason,omitempty"` // If not accepted
}
// PingPayload for keepalive/latency measurement.
//
// payload := PingPayload{SentAt: 42}
// payload := PingPayload{SentAt: 42}
type PingPayload struct {
SentAt int64 `json:"sentAt"` // Unix timestamp in milliseconds
}
// PongPayload response to ping.
//
// payload := PongPayload{SentAt: 42, ReceivedAt: 43}
// payload := PongPayload{SentAt: 42, ReceivedAt: 43}
type PongPayload struct {
SentAt int64 `json:"sentAt"` // Echo of ping's sentAt
ReceivedAt int64 `json:"receivedAt"` // When ping was received
}
// StartMinerPayload requests starting a miner.
//
// payload := StartMinerPayload{MinerType: "xmrig"}
// payload := StartMinerPayload{MinerType: "xmrig"}
type StartMinerPayload struct {
MinerType string `json:"minerType"` // Required: miner type (e.g., "xmrig", "tt-miner")
ProfileID string `json:"profileId,omitempty"`
Config RawMessage `json:"config,omitempty"` // Override profile config
}
// StopMinerPayload requests stopping a miner.
//
// payload := StopMinerPayload{MinerName: "xmrig-0"}
// payload := StopMinerPayload{MinerName: "xmrig-0"}
type StopMinerPayload struct {
MinerName string `json:"minerName"`
}
// MinerAckPayload acknowledges a miner start/stop operation.
//
// ack := MinerAckPayload{Success: true, MinerName: "xmrig-0"}
// ack := MinerAckPayload{Success: true, MinerName: "xmrig-0"}
type MinerAckPayload struct {
Success bool `json:"success"`
MinerName string `json:"minerName,omitempty"`
Error string `json:"error,omitempty"`
}
// MinerStatsItem represents stats for a single miner.
//
// miner := MinerStatsItem{Name: "xmrig-0", Hashrate: 1200}
// miner := MinerStatsItem{Name: "xmrig-0", Hashrate: 1200}
type MinerStatsItem struct {
Name string `json:"name"`
Type string `json:"type"`
@ -225,9 +197,7 @@ type MinerStatsItem struct {
CPUThreads int `json:"cpuThreads,omitempty"`
}
// StatsPayload contains miner statistics.
//
// stats := StatsPayload{NodeID: "worker-1"}
// stats := StatsPayload{NodeID: "worker-1"}
type StatsPayload struct {
NodeID string `json:"nodeId"`
NodeName string `json:"nodeName"`
@ -235,27 +205,21 @@ type StatsPayload struct {
Uptime int64 `json:"uptime"` // Node uptime in seconds
}
// LogsRequestPayload requests console logs from a miner.
//
// payload := LogsRequestPayload{MinerName: "xmrig-0", Lines: 100}
// payload := LogsRequestPayload{MinerName: "xmrig-0", Lines: 100}
type LogsRequestPayload struct {
MinerName string `json:"minerName"`
Lines int `json:"lines"` // Number of lines to fetch
Since int64 `json:"since,omitempty"` // Unix timestamp, logs after this time
}
// LogsPayload contains console log lines.
//
// payload := LogsPayload{MinerName: "xmrig-0", Lines: []string{"started"}}
// payload := LogsPayload{MinerName: "xmrig-0", Lines: []string{"started"}}
type LogsPayload struct {
MinerName string `json:"minerName"`
Lines []string `json:"lines"`
HasMore bool `json:"hasMore"` // More logs available
}
// DeployPayload contains a deployment bundle.
//
// payload := DeployPayload{Name: "xmrig", BundleType: string(BundleMiner)}
// payload := DeployPayload{Name: "xmrig", BundleType: string(BundleMiner)}
type DeployPayload struct {
BundleType string `json:"type"` // "profile" | "miner" | "full"
Data []byte `json:"data"` // STIM-encrypted bundle
@ -263,18 +227,14 @@ type DeployPayload struct {
Name string `json:"name"` // Profile or miner name
}
// DeployAckPayload acknowledges a deployment.
//
// ack := DeployAckPayload{Success: true, Name: "xmrig"}
// ack := DeployAckPayload{Success: true, Name: "xmrig"}
type DeployAckPayload struct {
Success bool `json:"success"`
Name string `json:"name,omitempty"`
Error string `json:"error,omitempty"`
}
// ErrorPayload contains error information.
//
// payload := ErrorPayload{Code: ErrorCodeOperationFailed, Message: "start failed"}
// payload := ErrorPayload{Code: ErrorCodeOperationFailed, Message: "start failed"}
type ErrorPayload struct {
Code int `json:"code"`
Message string `json:"message"`

View file

@ -15,16 +15,14 @@ import (
"github.com/adrg/xdg"
)
// Peer represents a known remote node.
//
// peer := &Peer{
// ID: "worker-1",
// Name: "Worker 1",
// Address: "127.0.0.1:9101",
// PingMilliseconds: 42.5,
// GeographicKilometres: 100,
// Score: 80,
// }
// ID: "worker-1",
// Name: "Worker 1",
// Address: "127.0.0.1:9101",
// PingMilliseconds: 42.5,
// GeographicKilometres: 100,
// Score: 80,
// }
type Peer struct {
ID string `json:"id"`
Name string `json:"name"`
@ -47,9 +45,7 @@ type Peer struct {
// peerRegistrySaveDebounceInterval is the minimum time between disk writes.
const peerRegistrySaveDebounceInterval = 5 * time.Second
// PeerAuthMode controls how unknown peers are handled
//
// mode := PeerAuthAllowlist
// mode := PeerAuthAllowlist
type PeerAuthMode int
const (
@ -125,9 +121,7 @@ var (
scoreWeight = 1.2
)
// NewPeerRegistry loads the default peer registry.
//
// peerRegistry, err := NewPeerRegistry()
// peerRegistry, err := NewPeerRegistry()
func NewPeerRegistry() (*PeerRegistry, error) {
peersPath, err := xdg.ConfigFile("lethean-desktop/peers.json")
if err != nil {
@ -137,12 +131,10 @@ func NewPeerRegistry() (*PeerRegistry, error) {
return NewPeerRegistryFromPath(peersPath)
}
// NewPeerRegistryFromPath loads or creates a peer registry at an explicit path.
//
// Missing files are treated as an empty registry; malformed registry files are
// returned as errors so callers can repair the persisted state.
//
// peerRegistry, err := NewPeerRegistryFromPath("/srv/p2p/peers.json")
// peerRegistry, err := NewPeerRegistryFromPath("/srv/p2p/peers.json")
func NewPeerRegistryFromPath(peersPath string) (*PeerRegistry, error) {
pr := &PeerRegistry{
peers: make(map[string]*Peer),
@ -165,9 +157,7 @@ func NewPeerRegistryFromPath(peersPath string) (*PeerRegistry, error) {
return pr, nil
}
// SetAuthMode changes how unknown peers are handled.
//
// registry.SetAuthMode(PeerAuthAllowlist)
// registry.SetAuthMode(PeerAuthAllowlist)
func (r *PeerRegistry) SetAuthMode(mode PeerAuthMode) {
r.allowedPublicKeyMu.Lock()
defer r.allowedPublicKeyMu.Unlock()
@ -175,18 +165,14 @@ func (r *PeerRegistry) SetAuthMode(mode PeerAuthMode) {
logging.Info("peer auth mode changed", logging.Fields{"mode": mode})
}
// GetAuthMode returns the current authentication mode.
//
// mode := registry.GetAuthMode()
// mode := registry.GetAuthMode()
func (r *PeerRegistry) GetAuthMode() PeerAuthMode {
r.allowedPublicKeyMu.RLock()
defer r.allowedPublicKeyMu.RUnlock()
return r.authMode
}
// AllowPublicKey adds a public key to the allowlist.
//
// registry.AllowPublicKey(peer.PublicKey)
// registry.AllowPublicKey(peer.PublicKey)
func (r *PeerRegistry) AllowPublicKey(publicKey string) {
r.allowedPublicKeyMu.Lock()
defer r.allowedPublicKeyMu.Unlock()
@ -194,9 +180,7 @@ func (r *PeerRegistry) AllowPublicKey(publicKey string) {
logging.Debug("public key added to allowlist", logging.Fields{"key": safeKeyPrefix(publicKey)})
}
// RevokePublicKey removes a public key from the allowlist.
//
// registry.RevokePublicKey(peer.PublicKey)
// registry.RevokePublicKey(peer.PublicKey)
func (r *PeerRegistry) RevokePublicKey(publicKey string) {
r.allowedPublicKeyMu.Lock()
defer r.allowedPublicKeyMu.Unlock()
@ -204,20 +188,17 @@ func (r *PeerRegistry) RevokePublicKey(publicKey string) {
logging.Debug("public key removed from allowlist", logging.Fields{"key": safeKeyPrefix(publicKey)})
}
// IsPublicKeyAllowed checks if a public key is in the allowlist.
//
// allowed := registry.IsPublicKeyAllowed(peer.PublicKey)
// allowed := registry.IsPublicKeyAllowed(peer.PublicKey)
func (r *PeerRegistry) IsPublicKeyAllowed(publicKey string) bool {
r.allowedPublicKeyMu.RLock()
defer r.allowedPublicKeyMu.RUnlock()
return r.allowedPublicKeys[publicKey]
}
// IsPeerAllowed checks if a peer is allowed to connect based on auth mode.
// Returns true when AuthMode is Open (all allowed), or when Allowlist mode is active
// and the peer is pre-registered or its public key is in the allowlist.
//
// allowed := registry.IsPeerAllowed(peer.ID, peer.PublicKey)
// allowed := registry.IsPeerAllowed(peer.ID, peer.PublicKey)
func (r *PeerRegistry) IsPeerAllowed(peerID string, publicKey string) bool {
r.allowedPublicKeyMu.RLock()
authMode := r.authMode
@ -242,18 +223,14 @@ func (r *PeerRegistry) IsPeerAllowed(peerID string, publicKey string) bool {
return keyAllowed
}
// ListAllowedPublicKeys returns all allowlisted public keys.
//
// keys := registry.ListAllowedPublicKeys()
// keys := registry.ListAllowedPublicKeys()
func (r *PeerRegistry) ListAllowedPublicKeys() []string {
return slices.Collect(r.AllowedPublicKeys())
}
// AllowedPublicKeys returns an iterator over all allowlisted public keys.
//
// for key := range registry.AllowedPublicKeys() {
// log.Printf("allowed: %s", key[:16])
// }
// log.Printf("allowed: %s", key[:16])
// }
func (r *PeerRegistry) AllowedPublicKeys() iter.Seq[string] {
return func(yield func(string) bool) {
r.allowedPublicKeyMu.RLock()
@ -267,10 +244,9 @@ func (r *PeerRegistry) AllowedPublicKeys() iter.Seq[string] {
}
}
// AddPeer adds a new peer to the registry.
// Persistence is debounced — writes are batched every 5s. Call Close() before shutdown.
//
// err := registry.AddPeer(&Peer{ID: "worker-1", Address: "10.0.0.1:9091", Role: RoleWorker})
// err := registry.AddPeer(&Peer{ID: "worker-1", Address: "10.0.0.1:9091", Role: RoleWorker})
func (r *PeerRegistry) AddPeer(peer *Peer) error {
r.mu.Lock()
@ -306,10 +282,9 @@ func (r *PeerRegistry) AddPeer(peer *Peer) error {
return nil
}
// UpdatePeer updates an existing peer's information.
// Persistence is debounced. Call Close() to flush before shutdown.
//
// err := registry.UpdatePeer(&Peer{ID: "worker-1", Score: 90})
// err := registry.UpdatePeer(&Peer{ID: "worker-1", Score: 90})
func (r *PeerRegistry) UpdatePeer(peer *Peer) error {
r.mu.Lock()
@ -326,10 +301,9 @@ func (r *PeerRegistry) UpdatePeer(peer *Peer) error {
return nil
}
// RemovePeer removes a peer from the registry.
// Persistence is debounced. Call Close() to flush before shutdown.
//
// err := registry.RemovePeer("worker-1")
// err := registry.RemovePeer("worker-1")
func (r *PeerRegistry) RemovePeer(id string) error {
r.mu.Lock()
@ -346,9 +320,7 @@ func (r *PeerRegistry) RemovePeer(id string) error {
return nil
}
// GetPeer returns a copy of the peer with the supplied ID.
//
// peer := registry.GetPeer("worker-1")
// peer := registry.GetPeer("worker-1")
func (r *PeerRegistry) GetPeer(id string) *Peer {
r.mu.RLock()
defer r.mu.RUnlock()
@ -363,13 +335,16 @@ func (r *PeerRegistry) GetPeer(id string) *Peer {
return &peerCopy
}
// ListPeers returns all registered peers.
// peers := registry.ListPeers()
func (r *PeerRegistry) ListPeers() []*Peer {
return slices.Collect(r.Peers())
}
// Peers returns an iterator over all registered peers.
// Each peer is a copy to prevent mutation.
//
// for peer := range registry.Peers() {
// _ = peer
// }
func (r *PeerRegistry) Peers() iter.Seq[*Peer] {
return func(yield func(*Peer) bool) {
r.mu.RLock()
@ -384,10 +359,7 @@ func (r *PeerRegistry) Peers() iter.Seq[*Peer] {
}
}
// UpdateMetrics updates a peer's performance metrics.
//
// registry.UpdateMetrics("worker-1", 42.5, 100, 3)
//
// registry.UpdateMetrics("worker-1", 42.5, 100, 3)
// Note: Persistence is debounced. Call Close() to flush before shutdown.
func (r *PeerRegistry) UpdateMetrics(id string, pingMilliseconds, geographicKilometres float64, hopCount int) error {
r.mu.Lock()
@ -410,7 +382,7 @@ func (r *PeerRegistry) UpdateMetrics(id string, pingMilliseconds, geographicKilo
return nil
}
// UpdateScore updates a peer's reliability score.
// registry.UpdateScore("worker-1", 90)
// Note: Persistence is debounced. Call Close() to flush before shutdown.
func (r *PeerRegistry) UpdateScore(id string, score float64) error {
r.mu.Lock()
@ -432,9 +404,7 @@ func (r *PeerRegistry) UpdateScore(id string, score float64) error {
return nil
}
// SetConnected updates a peer's connection state.
//
// registry.SetConnected("worker-1", true)
// registry.SetConnected("worker-1", true)
func (r *PeerRegistry) SetConnected(id string, connected bool) {
r.mu.Lock()
defer r.mu.Unlock()
@ -457,9 +427,7 @@ const (
ScoreDefault = 50.0 // Default score for new peers
)
// RecordSuccess records a successful interaction with a peer, improving their score.
//
// registry.RecordSuccess("worker-1")
// registry.RecordSuccess("worker-1")
func (r *PeerRegistry) RecordSuccess(id string) {
r.mu.Lock()
peer, exists := r.peers[id]
@ -474,9 +442,7 @@ func (r *PeerRegistry) RecordSuccess(id string) {
r.scheduleSave()
}
// RecordFailure records a failed interaction with a peer, reducing their score.
//
// registry.RecordFailure("worker-1")
// registry.RecordFailure("worker-1")
func (r *PeerRegistry) RecordFailure(id string) {
r.mu.Lock()
peer, exists := r.peers[id]
@ -497,9 +463,7 @@ func (r *PeerRegistry) RecordFailure(id string) {
})
}
// RecordTimeout records a timeout when communicating with a peer.
//
// registry.RecordTimeout("worker-1")
// registry.RecordTimeout("worker-1")
func (r *PeerRegistry) RecordTimeout(id string) {
r.mu.Lock()
peer, exists := r.peers[id]
@ -520,9 +484,7 @@ func (r *PeerRegistry) RecordTimeout(id string) {
})
}
// GetPeersByScore returns peers sorted by score, highest first.
//
// peers := registry.GetPeersByScore()
// peers := registry.GetPeersByScore()
func (r *PeerRegistry) GetPeersByScore() []*Peer {
r.mu.RLock()
defer r.mu.RUnlock()
@ -543,11 +505,9 @@ func (r *PeerRegistry) GetPeersByScore() []*Peer {
return peers
}
// PeersByScore returns an iterator over peers sorted by score (highest first).
//
// for peer := range registry.PeersByScore() {
// log.Printf("peer %s score=%.0f", peer.ID, peer.Score)
// }
// log.Printf("peer %s score=%.0f", peer.ID, peer.Score)
// }
func (r *PeerRegistry) PeersByScore() iter.Seq[*Peer] {
return func(yield func(*Peer) bool) {
peers := r.GetPeersByScore()
@ -559,10 +519,9 @@ func (r *PeerRegistry) PeersByScore() iter.Seq[*Peer] {
}
}
// SelectOptimalPeer returns the best peer based on multi-factor optimisation.
// Uses Poindexter KD-tree to find the peer closest to ideal metrics (low ping, low hops, high score).
//
// peer := registry.SelectOptimalPeer()
// peer := registry.SelectOptimalPeer()
func (r *PeerRegistry) SelectOptimalPeer() *Peer {
r.mu.RLock()
defer r.mu.RUnlock()
@ -589,9 +548,7 @@ func (r *PeerRegistry) SelectOptimalPeer() *Peer {
return &peerCopy
}
// SelectNearestPeers returns the n best peers based on multi-factor optimisation.
//
// peers := registry.SelectNearestPeers(3)
// peers := registry.SelectNearestPeers(3)
func (r *PeerRegistry) SelectNearestPeers(n int) []*Peer {
r.mu.RLock()
defer r.mu.RUnlock()
@ -616,15 +573,16 @@ func (r *PeerRegistry) SelectNearestPeers(n int) []*Peer {
return peers
}
// GetConnectedPeers returns all currently connected peers as a slice.
//
// connectedPeers := registry.GetConnectedPeers()
// connectedPeers := registry.GetConnectedPeers()
func (r *PeerRegistry) GetConnectedPeers() []*Peer {
return slices.Collect(r.ConnectedPeers())
}
// ConnectedPeers returns an iterator over all currently connected peers.
// Each peer is a copy to prevent mutation.
//
// for peer := range registry.ConnectedPeers() {
// _ = peer
// }
func (r *PeerRegistry) ConnectedPeers() iter.Seq[*Peer] {
return func(yield func(*Peer) bool) {
r.mu.RLock()
@ -641,9 +599,7 @@ func (r *PeerRegistry) ConnectedPeers() iter.Seq[*Peer] {
}
}
// Count returns the number of registered peers.
//
// n := registry.Count()
// n := registry.Count()
func (r *PeerRegistry) Count() int {
r.mu.RLock()
defer r.mu.RUnlock()
@ -751,7 +707,7 @@ func (r *PeerRegistry) saveNow() error {
return nil
}
// Close flushes any pending changes and releases resources.
// registry.Close()
func (r *PeerRegistry) Close() error {
// Cancel any pending timer and save immediately if changes are queued.
r.saveMutex.Lock()

View file

@ -4,9 +4,7 @@ import (
core "dappco.re/go/core"
)
// ProtocolError represents an error from the remote peer.
//
// err := &ProtocolError{Code: ErrorCodeOperationFailed, Message: "start failed"}
// err := &ProtocolError{Code: ErrorCodeOperationFailed, Message: "start failed"}
type ProtocolError struct {
Code int
Message string
@ -16,14 +14,10 @@ func (e *ProtocolError) Error() string {
return core.Sprintf("remote error (%d): %s", e.Code, e.Message)
}
// ResponseHandler provides helpers for handling protocol responses.
//
// handler := &ResponseHandler{}
// handler := &ResponseHandler{}
type ResponseHandler struct{}
// ValidateResponse checks a response against the expected type.
//
// err := handler.ValidateResponse(resp, MessageStats)
// err := handler.ValidateResponse(resp, MessageStats)
func (h *ResponseHandler) ValidateResponse(resp *Message, expectedType MessageType) error {
if resp == nil {
return core.E("ResponseHandler.ValidateResponse", "nil response", nil)
@ -46,9 +40,7 @@ func (h *ResponseHandler) ValidateResponse(resp *Message, expectedType MessageTy
return nil
}
// ParseResponse validates the response and parses the payload into the target.
//
// err := handler.ParseResponse(resp, MessageStats, &stats)
// err := handler.ParseResponse(resp, MessageStats, &stats)
func (h *ResponseHandler) ParseResponse(resp *Message, expectedType MessageType, target any) error {
if err := h.ValidateResponse(resp, expectedType); err != nil {
return err
@ -80,17 +72,13 @@ func ParseResponse(resp *Message, expectedType MessageType, target any) error {
return DefaultResponseHandler.ParseResponse(resp, expectedType, target)
}
// IsProtocolError returns true if the error is a ProtocolError.
//
// ok := IsProtocolError(err)
// ok := IsProtocolError(err)
func IsProtocolError(err error) bool {
_, ok := err.(*ProtocolError)
return ok
}
// GetProtocolErrorCode returns the error code if err is a ProtocolError, otherwise returns 0.
//
// code := GetProtocolErrorCode(err)
// code := GetProtocolErrorCode(err)
func GetProtocolErrorCode(err error) int {
if pe, ok := err.(*ProtocolError); ok {
return pe.Code

View file

@ -39,9 +39,7 @@ const (
defaultTransportMaximumConnections = 100
)
// TransportConfig configures the WebSocket transport.
//
// transportConfig := DefaultTransportConfig()
// transportConfig := DefaultTransportConfig()
type TransportConfig struct {
ListenAddr string // ":9091" default
WebSocketPath string // "/ws" - WebSocket endpoint path
@ -53,9 +51,7 @@ type TransportConfig struct {
PongTimeout time.Duration // Timeout waiting for pong
}
// DefaultTransportConfig returns sensible defaults.
//
// transportConfig := DefaultTransportConfig()
// transportConfig := DefaultTransportConfig()
func DefaultTransportConfig() TransportConfig {
return TransportConfig{
ListenAddr: defaultTransportListenAddress,
@ -92,23 +88,17 @@ func (c TransportConfig) maximumConnections() int {
return defaultTransportMaximumConnections
}
// MessageHandler processes incoming messages.
//
// var handler MessageHandler = func(peerConnection *PeerConnection, message *Message) {}
// var handler MessageHandler = func(peerConnection *PeerConnection, message *Message) {}
type MessageHandler func(peerConnection *PeerConnection, message *Message)
// MessageDeduplicator tracks recent message IDs to prevent duplicate processing.
//
// deduplicator := NewMessageDeduplicator(5 * time.Minute)
// deduplicator := NewMessageDeduplicator(5 * time.Minute)
type MessageDeduplicator struct {
recentMessageTimes map[string]time.Time
mutex sync.RWMutex
timeToLive time.Duration
}
// NewMessageDeduplicator creates a deduplicator with the supplied retention window.
//
// deduplicator := NewMessageDeduplicator(5 * time.Minute)
// deduplicator := NewMessageDeduplicator(5 * time.Minute)
func NewMessageDeduplicator(retentionWindow time.Duration) *MessageDeduplicator {
d := &MessageDeduplicator{
recentMessageTimes: make(map[string]time.Time),
@ -170,10 +160,7 @@ type PeerRateLimiter struct {
mutex sync.Mutex
}
// NewPeerRateLimiter creates a token bucket seeded with maxTokens and refilled
// at refillRate tokens per second.
//
// rateLimiter := NewPeerRateLimiter(100, 50)
// rateLimiter := NewPeerRateLimiter(100, 50)
func NewPeerRateLimiter(maxTokens, refillPerSecond int) *PeerRateLimiter {
return &PeerRateLimiter{
availableTokens: maxTokens,
@ -979,9 +966,7 @@ func (pc *PeerConnection) Close() error {
return err
}
// DisconnectPayload contains reason for disconnect.
//
// payload := DisconnectPayload{Reason: "shutdown", Code: DisconnectNormal}
// payload := DisconnectPayload{Reason: "shutdown", Code: DisconnectNormal}
type DisconnectPayload struct {
Reason string `json:"reason"`
Code int `json:"code"` // Optional disconnect code