refactor(node): align AX comments across public APIs
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
b5c7516224
commit
8fc3be03a6
11 changed files with 84 additions and 251 deletions
|
|
@ -11,9 +11,7 @@ import (
|
|||
core "dappco.re/go/core"
|
||||
)
|
||||
|
||||
// Level represents the severity of a log message.
|
||||
//
|
||||
// level := LevelInfo
|
||||
// level := LevelInfo
|
||||
type Level int
|
||||
|
||||
const (
|
||||
|
|
@ -43,9 +41,7 @@ func (l Level) String() string {
|
|||
}
|
||||
}
|
||||
|
||||
// Logger provides structured logging with configurable output and level.
|
||||
//
|
||||
// logger := New(DefaultConfig())
|
||||
// logger := New(DefaultConfig())
|
||||
type Logger struct {
|
||||
mu sync.RWMutex
|
||||
output io.Writer
|
||||
|
|
@ -53,18 +49,14 @@ type Logger struct {
|
|||
component string
|
||||
}
|
||||
|
||||
// Config holds configuration for creating a new Logger.
|
||||
//
|
||||
// config := Config{Output: io.Discard, Level: LevelDebug, Component: "sync"}
|
||||
// config := Config{Output: io.Discard, Level: LevelDebug, Component: "sync"}
|
||||
type Config struct {
|
||||
Output io.Writer
|
||||
Level Level
|
||||
Component string
|
||||
}
|
||||
|
||||
// DefaultConfig returns the default logger configuration.
|
||||
//
|
||||
// config := DefaultConfig()
|
||||
// config := DefaultConfig()
|
||||
func DefaultConfig() Config {
|
||||
return Config{
|
||||
Output: defaultOutput,
|
||||
|
|
@ -73,9 +65,7 @@ func DefaultConfig() Config {
|
|||
}
|
||||
}
|
||||
|
||||
// New creates a logger from an explicit configuration.
|
||||
//
|
||||
// logger := New(DefaultConfig())
|
||||
// logger := New(DefaultConfig())
|
||||
func New(config Config) *Logger {
|
||||
if config.Output == nil {
|
||||
config.Output = defaultOutput
|
||||
|
|
@ -87,9 +77,7 @@ func New(config Config) *Logger {
|
|||
}
|
||||
}
|
||||
|
||||
// ComponentLogger returns a new Logger scoped to one component.
|
||||
//
|
||||
// transportLogger := logger.ComponentLogger("transport")
|
||||
// transportLogger := logger.ComponentLogger("transport")
|
||||
func (l *Logger) ComponentLogger(component string) *Logger {
|
||||
return &Logger{
|
||||
output: l.output,
|
||||
|
|
@ -98,27 +86,21 @@ func (l *Logger) ComponentLogger(component string) *Logger {
|
|||
}
|
||||
}
|
||||
|
||||
// SetLevel changes the minimum log level.
|
||||
//
|
||||
// logger.SetLevel(LevelDebug)
|
||||
// logger.SetLevel(LevelDebug)
|
||||
func (l *Logger) SetLevel(level Level) {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
l.level = level
|
||||
}
|
||||
|
||||
// GetLevel returns the current log level.
|
||||
//
|
||||
// level := logger.GetLevel()
|
||||
// level := logger.GetLevel()
|
||||
func (l *Logger) GetLevel() Level {
|
||||
l.mu.RLock()
|
||||
defer l.mu.RUnlock()
|
||||
return l.level
|
||||
}
|
||||
|
||||
// Fields represents key-value pairs for structured logging.
|
||||
//
|
||||
// fields := Fields{"peer_id": "node-1", "attempt": 2}
|
||||
// fields := Fields{"peer_id": "node-1", "attempt": 2}
|
||||
type Fields map[string]any
|
||||
|
||||
type stderrWriter struct{}
|
||||
|
|
@ -228,92 +210,68 @@ var (
|
|||
globalMu sync.RWMutex
|
||||
)
|
||||
|
||||
// SetGlobal installs the global logger instance.
|
||||
//
|
||||
// SetGlobal(New(DefaultConfig()))
|
||||
// SetGlobal(New(DefaultConfig()))
|
||||
func SetGlobal(l *Logger) {
|
||||
globalMu.Lock()
|
||||
defer globalMu.Unlock()
|
||||
globalLogger = l
|
||||
}
|
||||
|
||||
// GetGlobal returns the global logger instance.
|
||||
//
|
||||
// logger := GetGlobal()
|
||||
// logger := GetGlobal()
|
||||
func GetGlobal() *Logger {
|
||||
globalMu.RLock()
|
||||
defer globalMu.RUnlock()
|
||||
return globalLogger
|
||||
}
|
||||
|
||||
// SetGlobalLevel changes the global logger level.
|
||||
//
|
||||
// SetGlobalLevel(LevelDebug)
|
||||
// SetGlobalLevel(LevelDebug)
|
||||
func SetGlobalLevel(level Level) {
|
||||
globalMu.RLock()
|
||||
defer globalMu.RUnlock()
|
||||
globalLogger.SetLevel(level)
|
||||
}
|
||||
|
||||
// Debug logs a debug message using the global logger.
|
||||
//
|
||||
// Debug("connected", Fields{"peer_id": "node-1"})
|
||||
// Debug("connected", Fields{"peer_id": "node-1"})
|
||||
func Debug(message string, fields ...Fields) {
|
||||
GetGlobal().Debug(message, fields...)
|
||||
}
|
||||
|
||||
// Info logs an informational message using the global logger.
|
||||
//
|
||||
// Info("worker started", Fields{"component": "transport"})
|
||||
// Info("worker started", Fields{"component": "transport"})
|
||||
func Info(message string, fields ...Fields) {
|
||||
GetGlobal().Info(message, fields...)
|
||||
}
|
||||
|
||||
// Warn logs a warning message using the global logger.
|
||||
//
|
||||
// Warn("peer rate limited", Fields{"peer_id": "node-1"})
|
||||
// Warn("peer rate limited", Fields{"peer_id": "node-1"})
|
||||
func Warn(message string, fields ...Fields) {
|
||||
GetGlobal().Warn(message, fields...)
|
||||
}
|
||||
|
||||
// Error logs an error message using the global logger.
|
||||
//
|
||||
// Error("send failed", Fields{"peer_id": "node-1"})
|
||||
// Error("send failed", Fields{"peer_id": "node-1"})
|
||||
func Error(message string, fields ...Fields) {
|
||||
GetGlobal().Error(message, fields...)
|
||||
}
|
||||
|
||||
// Debugf logs a formatted debug message using the global logger.
|
||||
//
|
||||
// Debugf("connected peer %s", "node-1")
|
||||
// Debugf("connected peer %s", "node-1")
|
||||
func Debugf(format string, args ...any) {
|
||||
GetGlobal().Debugf(format, args...)
|
||||
}
|
||||
|
||||
// Infof logs a formatted informational message using the global logger.
|
||||
//
|
||||
// Infof("worker %s ready", "node-1")
|
||||
// Infof("worker %s ready", "node-1")
|
||||
func Infof(format string, args ...any) {
|
||||
GetGlobal().Infof(format, args...)
|
||||
}
|
||||
|
||||
// Warnf logs a formatted warning message using the global logger.
|
||||
//
|
||||
// Warnf("peer %s is slow", "node-1")
|
||||
// Warnf("peer %s is slow", "node-1")
|
||||
func Warnf(format string, args ...any) {
|
||||
GetGlobal().Warnf(format, args...)
|
||||
}
|
||||
|
||||
// Errorf logs a formatted error message using the global logger.
|
||||
//
|
||||
// Errorf("peer %s failed", "node-1")
|
||||
// Errorf("peer %s failed", "node-1")
|
||||
func Errorf(format string, args ...any) {
|
||||
GetGlobal().Errorf(format, args...)
|
||||
}
|
||||
|
||||
// ParseLevel parses a string into a log level.
|
||||
//
|
||||
// level, err := ParseLevel("warn")
|
||||
// level, err := ParseLevel("warn")
|
||||
func ParseLevel(s string) (Level, error) {
|
||||
switch core.Upper(s) {
|
||||
case "DEBUG":
|
||||
|
|
|
|||
|
|
@ -10,9 +10,7 @@ import (
|
|||
"dappco.re/go/core/p2p/logging"
|
||||
)
|
||||
|
||||
// Controller drives remote peer operations from a controller node.
|
||||
//
|
||||
// controller := NewController(nodeManager, peerRegistry, transport)
|
||||
// controller := NewController(nodeManager, peerRegistry, transport)
|
||||
type Controller struct {
|
||||
nodeManager *NodeManager
|
||||
peerRegistry *PeerRegistry
|
||||
|
|
|
|||
|
|
@ -10,11 +10,7 @@ import (
|
|||
"dappco.re/go/core/p2p/ueps"
|
||||
)
|
||||
|
||||
// ThreatScoreThreshold is the maximum allowable threat score. Packets exceeding
|
||||
// this value are silently dropped by the circuit breaker and logged as threat
|
||||
// events. The threshold sits at ~76% of the uint16 range (50,000 / 65,535),
|
||||
// providing headroom for legitimate elevated-risk traffic whilst rejecting
|
||||
// clearly hostile payloads.
|
||||
// threshold := ThreatScoreThreshold
|
||||
const ThreatScoreThreshold uint16 = 50000
|
||||
|
||||
// Well-known intent identifiers. These correspond to the semantic tokens
|
||||
|
|
@ -32,34 +28,16 @@ const (
|
|||
// var handler IntentHandler = func(packet *ueps.ParsedPacket) error { return nil }
|
||||
type IntentHandler func(packet *ueps.ParsedPacket) error
|
||||
|
||||
// Dispatcher routes verified UEPS packets to registered intent handlers.
|
||||
// It enforces a threat circuit breaker before routing: any packet whose
|
||||
// ThreatScore exceeds ThreatScoreThreshold is dropped and logged.
|
||||
//
|
||||
// dispatcher := NewDispatcher()
|
||||
//
|
||||
// Design decisions:
|
||||
//
|
||||
// - Handlers are registered per IntentID (1:1 mapping).
|
||||
//
|
||||
// - Unknown intents are logged at WARN level and silently dropped (no error
|
||||
// returned to the caller) to avoid back-pressure on the transport layer.
|
||||
//
|
||||
// - High-threat packets are dropped silently (logged at WARN) rather than
|
||||
// returning an error, consistent with the "don't even parse the payload"
|
||||
// philosophy from the original stub.
|
||||
//
|
||||
// - The dispatcher is safe for concurrent use; a RWMutex protects the
|
||||
// handler map.
|
||||
// dispatcher := NewDispatcher()
|
||||
// dispatcher.RegisterHandler(IntentCompute, func(packet *ueps.ParsedPacket) error { return nil })
|
||||
// err := dispatcher.Dispatch(packet)
|
||||
type Dispatcher struct {
|
||||
handlers map[byte]IntentHandler
|
||||
mu sync.RWMutex
|
||||
log *logging.Logger
|
||||
}
|
||||
|
||||
// NewDispatcher creates a Dispatcher with no registered handlers.
|
||||
//
|
||||
// dispatcher := NewDispatcher()
|
||||
// dispatcher := NewDispatcher()
|
||||
func NewDispatcher() *Dispatcher {
|
||||
return &Dispatcher{
|
||||
handlers: make(map[byte]IntentHandler),
|
||||
|
|
@ -70,12 +48,7 @@ func NewDispatcher() *Dispatcher {
|
|||
}
|
||||
}
|
||||
|
||||
// RegisterHandler associates an IntentHandler with a specific IntentID.
|
||||
// Replacing an existing handler is allowed — the new handler takes effect immediately.
|
||||
//
|
||||
// dispatcher.RegisterHandler(IntentCompute, func(packet *ueps.ParsedPacket) error {
|
||||
// return processComputeJob(packet.Payload)
|
||||
// })
|
||||
// dispatcher.RegisterHandler(IntentCompute, func(packet *ueps.ParsedPacket) error { return nil })
|
||||
func (d *Dispatcher) RegisterHandler(intentID byte, handler IntentHandler) {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
|
|
@ -85,11 +58,10 @@ func (d *Dispatcher) RegisterHandler(intentID byte, handler IntentHandler) {
|
|||
})
|
||||
}
|
||||
|
||||
// Handlers returns an iterator over all registered intent handlers.
|
||||
//
|
||||
// for intentID, handler := range dispatcher.Handlers() {
|
||||
// log.Printf("registered intent 0x%02X", intentID)
|
||||
// }
|
||||
// for intentID, handler := range dispatcher.Handlers() {
|
||||
// _ = intentID
|
||||
// _ = handler
|
||||
// }
|
||||
func (d *Dispatcher) Handlers() iter.Seq2[byte, IntentHandler] {
|
||||
return func(yield func(byte, IntentHandler) bool) {
|
||||
d.mu.RLock()
|
||||
|
|
@ -103,17 +75,7 @@ func (d *Dispatcher) Handlers() iter.Seq2[byte, IntentHandler] {
|
|||
}
|
||||
}
|
||||
|
||||
// Dispatch routes a parsed UEPS packet through the threat circuit breaker
|
||||
// and then to the appropriate intent handler.
|
||||
//
|
||||
// Behaviour:
|
||||
// - Returns ErrorThreatScoreExceeded if the packet's ThreatScore exceeds the
|
||||
// threshold (packet is dropped and logged).
|
||||
// - Returns ErrorUnknownIntent if no handler is registered for the IntentID
|
||||
// (packet is dropped and logged).
|
||||
// - Returns nil on successful delivery to a handler, or any error the
|
||||
// handler itself returns.
|
||||
// - A nil packet returns ErrorNilPacket immediately.
|
||||
// err := dispatcher.Dispatch(packet)
|
||||
func (d *Dispatcher) Dispatch(packet *ueps.ParsedPacket) error {
|
||||
if packet == nil {
|
||||
return ErrorNilPacket
|
||||
|
|
@ -146,17 +108,13 @@ func (d *Dispatcher) Dispatch(packet *ueps.ParsedPacket) error {
|
|||
return handler(packet)
|
||||
}
|
||||
|
||||
// Sentinel errors returned by Dispatch.
|
||||
var (
|
||||
// ErrorThreatScoreExceeded is returned when a packet's ThreatScore exceeds
|
||||
// the safety threshold.
|
||||
// err := ErrorThreatScoreExceeded
|
||||
ErrorThreatScoreExceeded = core.E("Dispatcher.Dispatch", core.Sprintf("packet rejected: threat score exceeds safety threshold (%d)", ThreatScoreThreshold), nil)
|
||||
|
||||
// ErrorUnknownIntent is returned when no handler is registered for the
|
||||
// packet's IntentID.
|
||||
// err := ErrorUnknownIntent
|
||||
ErrorUnknownIntent = core.E("Dispatcher.Dispatch", "packet dropped: unknown intent", nil)
|
||||
|
||||
// ErrorNilPacket is returned when a nil packet is passed to Dispatch.
|
||||
// err := ErrorNilPacket
|
||||
ErrorNilPacket = core.E("Dispatcher.Dispatch", "nil packet", nil)
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -2,14 +2,10 @@ package node
|
|||
|
||||
import core "dappco.re/go/core"
|
||||
|
||||
// Shared error sentinels for the node package.
|
||||
var (
|
||||
// ErrorIdentityNotInitialized is returned when a node operation requires
|
||||
// a node identity but none has been generated or loaded.
|
||||
// err := ErrorIdentityNotInitialized
|
||||
ErrorIdentityNotInitialized = core.E("node", "node identity not initialized", nil)
|
||||
|
||||
// ErrorMinerManagerNotConfigured is returned when a miner operation is
|
||||
// attempted but no MinerManager has been set on the Worker.
|
||||
// err := ErrorMinerManagerNotConfigured
|
||||
ErrorMinerManagerNotConfigured = core.E("node", "miner manager not configured", nil)
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -73,9 +73,7 @@ type NodeIdentity struct {
|
|||
Role NodeRole `json:"role"`
|
||||
}
|
||||
|
||||
// NodeManager handles node identity operations including key generation and storage.
|
||||
//
|
||||
// nodeManager, err := NewNodeManager()
|
||||
// nodeManager, err := NewNodeManager()
|
||||
type NodeManager struct {
|
||||
identity *NodeIdentity
|
||||
privateKey []byte // Never serialized to JSON
|
||||
|
|
@ -126,7 +124,7 @@ func NewNodeManagerFromPaths(keyPath, configPath string) (*NodeManager, error) {
|
|||
return nm, nil
|
||||
}
|
||||
|
||||
// HasIdentity returns true if a node identity has been initialized.
|
||||
// hasIdentity := nodeManager.HasIdentity()
|
||||
func (n *NodeManager) HasIdentity() bool {
|
||||
n.mu.RLock()
|
||||
defer n.mu.RUnlock()
|
||||
|
|
|
|||
|
|
@ -10,26 +10,22 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
// Levin protocol flags.
|
||||
// flags := FlagRequest | FlagResponse
|
||||
const (
|
||||
FlagRequest uint32 = 0x00000001
|
||||
FlagResponse uint32 = 0x00000002
|
||||
)
|
||||
|
||||
// LevinProtocolVersion is the protocol version field written into every header.
|
||||
// header.ProtocolVersion = LevinProtocolVersion
|
||||
const LevinProtocolVersion uint32 = 1
|
||||
|
||||
// Default timeout values for Connection read and write operations.
|
||||
// conn.ReadTimeout = DefaultReadTimeout
|
||||
const (
|
||||
DefaultReadTimeout = 120 * time.Second
|
||||
DefaultWriteTimeout = 30 * time.Second
|
||||
)
|
||||
|
||||
// Connection wraps a net.Conn and provides framed Levin packet I/O.
|
||||
// All writes are serialised by an internal mutex, making it safe to call
|
||||
// WritePacket and WriteResponse concurrently from multiple goroutines.
|
||||
//
|
||||
// connection := NewConnection(conn)
|
||||
// conn := NewConnection(netConn)
|
||||
type Connection struct {
|
||||
// MaxPayloadSize is the upper bound accepted for incoming payloads.
|
||||
// Defaults to the package-level MaxPayloadSize (100 MB).
|
||||
|
|
@ -45,9 +41,7 @@ type Connection struct {
|
|||
writeMutex sync.Mutex
|
||||
}
|
||||
|
||||
// NewConnection creates a Connection that wraps conn with sensible defaults.
|
||||
//
|
||||
// connection := NewConnection(conn)
|
||||
// conn := NewConnection(netConn)
|
||||
func NewConnection(conn net.Conn) *Connection {
|
||||
return &Connection{
|
||||
MaxPayloadSize: MaxPayloadSize,
|
||||
|
|
@ -57,9 +51,7 @@ func NewConnection(conn net.Conn) *Connection {
|
|||
}
|
||||
}
|
||||
|
||||
// WritePacket sends a Levin request or notification.
|
||||
//
|
||||
// err := conn.WritePacket(CommandPing, payload, true)
|
||||
// err := conn.WritePacket(CommandPing, payload, true)
|
||||
func (c *Connection) WritePacket(cmd uint32, payload []byte, expectResponse bool) error {
|
||||
header := Header{
|
||||
Signature: Signature,
|
||||
|
|
@ -73,9 +65,7 @@ func (c *Connection) WritePacket(cmd uint32, payload []byte, expectResponse bool
|
|||
return c.writeFrame(&header, payload)
|
||||
}
|
||||
|
||||
// WriteResponse sends a Levin response packet with the given return code.
|
||||
//
|
||||
// err := conn.WriteResponse(CommandPing, payload, ReturnOK)
|
||||
// err := conn.WriteResponse(CommandPing, payload, ReturnOK)
|
||||
func (c *Connection) WriteResponse(cmd uint32, payload []byte, returnCode int32) error {
|
||||
header := Header{
|
||||
Signature: Signature,
|
||||
|
|
@ -113,9 +103,7 @@ func (c *Connection) writeFrame(header *Header, payload []byte) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// ReadPacket reads and validates the next Levin packet.
|
||||
//
|
||||
// header, payload, err := conn.ReadPacket()
|
||||
// header, payload, err := conn.ReadPacket()
|
||||
func (c *Connection) ReadPacket() (Header, []byte, error) {
|
||||
if err := c.conn.SetReadDeadline(time.Now().Add(c.ReadTimeout)); err != nil {
|
||||
return Header{}, nil, err
|
||||
|
|
@ -150,16 +138,12 @@ func (c *Connection) ReadPacket() (Header, []byte, error) {
|
|||
return header, payload, nil
|
||||
}
|
||||
|
||||
// Close closes the underlying network connection.
|
||||
//
|
||||
// err := conn.Close()
|
||||
// err := conn.Close()
|
||||
func (c *Connection) Close() error {
|
||||
return c.conn.Close()
|
||||
}
|
||||
|
||||
// RemoteAddr returns the remote address of the underlying connection as a string.
|
||||
//
|
||||
// addr := conn.RemoteAddr()
|
||||
// addr := conn.RemoteAddr()
|
||||
func (c *Connection) RemoteAddr() string {
|
||||
return c.conn.RemoteAddr().String()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -126,9 +126,7 @@ func NewMessage(messageType MessageType, from, to string, payload any) (*Message
|
|||
}, nil
|
||||
}
|
||||
|
||||
// Reply creates a response message that points back to the original.
|
||||
//
|
||||
// reply, err := message.Reply(MessagePong, PongPayload{SentAt: 42, ReceivedAt: 43})
|
||||
// reply, err := message.Reply(MessagePong, PongPayload{SentAt: 42, ReceivedAt: 43})
|
||||
func (m *Message) Reply(messageType MessageType, payload any) (*Message, error) {
|
||||
reply, err := NewMessage(messageType, m.To, m.From, payload)
|
||||
if err != nil {
|
||||
|
|
@ -138,10 +136,8 @@ func (m *Message) Reply(messageType MessageType, payload any) (*Message, error)
|
|||
return reply, nil
|
||||
}
|
||||
|
||||
// ParsePayload decodes the payload into the supplied target.
|
||||
//
|
||||
// var ping PingPayload
|
||||
// err := message.ParsePayload(&ping)
|
||||
// var ping PingPayload
|
||||
// err := message.ParsePayload(&ping)
|
||||
func (m *Message) ParsePayload(target any) error {
|
||||
if m.Payload == nil {
|
||||
return nil
|
||||
|
|
@ -295,9 +291,7 @@ const (
|
|||
ErrorCodeTimeout = 1005
|
||||
)
|
||||
|
||||
// NewErrorMessage builds an error response message for an existing request.
|
||||
//
|
||||
// errorMessage, err := NewErrorMessage("worker-1", "controller-1", ErrorCodeOperationFailed, "miner start failed", "req-1")
|
||||
// errorMessage, err := NewErrorMessage("worker-1", "controller-1", ErrorCodeOperationFailed, "miner start failed", "req-1")
|
||||
func NewErrorMessage(from, to string, code int, message string, replyTo string) (*Message, error) {
|
||||
errorMessage, err := NewMessage(MessageError, from, to, ErrorPayload{
|
||||
Code: code,
|
||||
|
|
|
|||
|
|
@ -53,7 +53,7 @@ const peerRegistrySaveDebounceInterval = 5 * time.Second
|
|||
type PeerAuthMode int
|
||||
|
||||
const (
|
||||
// PeerAuthOpen allows any peer to connect (original behavior)
|
||||
// PeerAuthOpen allows any peer to connect.
|
||||
PeerAuthOpen PeerAuthMode = iota
|
||||
// PeerAuthAllowlist only allows pre-registered peers or those with allowed public keys
|
||||
PeerAuthAllowlist
|
||||
|
|
@ -98,9 +98,7 @@ func validatePeerName(name string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// PeerRegistry manages known peers with KD-tree based selection.
|
||||
//
|
||||
// peerRegistry, err := NewPeerRegistry()
|
||||
// peerRegistry, err := NewPeerRegistry()
|
||||
type PeerRegistry struct {
|
||||
peers map[string]*Peer
|
||||
kdTree *poindexter.KDTree[string] // KD-tree with peer ID as payload
|
||||
|
|
|
|||
|
|
@ -63,7 +63,7 @@ func (h *ResponseHandler) ParseResponse(resp *Message, expectedType MessageType,
|
|||
return nil
|
||||
}
|
||||
|
||||
// DefaultResponseHandler is the default response handler instance.
|
||||
// handler := DefaultResponseHandler
|
||||
var DefaultResponseHandler = &ResponseHandler{}
|
||||
|
||||
// ValidateResponse is a convenience function using the default handler.
|
||||
|
|
|
|||
|
|
@ -144,9 +144,7 @@ func (d *MessageDeduplicator) Cleanup() {
|
|||
}
|
||||
}
|
||||
|
||||
// Transport manages WebSocket connections with SMSG encryption.
|
||||
//
|
||||
// transport := NewTransport(nodeManager, peerRegistry, DefaultTransportConfig())
|
||||
// transport := NewTransport(nodeManager, peerRegistry, DefaultTransportConfig())
|
||||
type Transport struct {
|
||||
config TransportConfig
|
||||
httpServer *http.Server
|
||||
|
|
@ -163,9 +161,7 @@ type Transport struct {
|
|||
waitGroup sync.WaitGroup
|
||||
}
|
||||
|
||||
// PeerRateLimiter implements a simple token bucket rate limiter per peer.
|
||||
//
|
||||
// rateLimiter := NewPeerRateLimiter(100, 50)
|
||||
// rateLimiter := NewPeerRateLimiter(100, 50)
|
||||
type PeerRateLimiter struct {
|
||||
availableTokens int
|
||||
capacity int
|
||||
|
|
@ -187,7 +183,7 @@ func NewPeerRateLimiter(maxTokens, refillPerSecond int) *PeerRateLimiter {
|
|||
}
|
||||
}
|
||||
|
||||
// Allow checks if a message is allowed and consumes a token if so
|
||||
// allowed := rateLimiter.Allow()
|
||||
func (r *PeerRateLimiter) Allow() bool {
|
||||
r.mutex.Lock()
|
||||
defer r.mutex.Unlock()
|
||||
|
|
@ -209,9 +205,7 @@ func (r *PeerRateLimiter) Allow() bool {
|
|||
return false
|
||||
}
|
||||
|
||||
// PeerConnection represents an active connection to a peer.
|
||||
//
|
||||
// peerConnection := &PeerConnection{Peer: &Peer{ID: "worker-1"}}
|
||||
// peerConnection := &PeerConnection{Peer: &Peer{ID: "worker-1"}}
|
||||
type PeerConnection struct {
|
||||
Peer *Peer
|
||||
Conn *websocket.Conn
|
||||
|
|
@ -224,9 +218,7 @@ type PeerConnection struct {
|
|||
rateLimiter *PeerRateLimiter // Per-peer message rate limiting
|
||||
}
|
||||
|
||||
// NewTransport creates a WebSocket transport for a node and peer registry.
|
||||
//
|
||||
// transport := NewTransport(nodeManager, peerRegistry, DefaultTransportConfig())
|
||||
// transport := NewTransport(nodeManager, peerRegistry, DefaultTransportConfig())
|
||||
func NewTransport(node *NodeManager, registry *PeerRegistry, config TransportConfig) *Transport {
|
||||
lifecycleContext, cancelLifecycle := context.WithCancel(context.Background())
|
||||
|
||||
|
|
@ -310,9 +302,7 @@ func (t *Transport) agentUserAgent() string {
|
|||
)
|
||||
}
|
||||
|
||||
// Start opens the WebSocket listener and background maintenance loops.
|
||||
//
|
||||
// err := transport.Start()
|
||||
// err := transport.Start()
|
||||
func (t *Transport) Start() error {
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc(t.config.webSocketPath(), t.handleWebSocketUpgrade)
|
||||
|
|
@ -381,9 +371,7 @@ func (t *Transport) Start() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Stop closes active connections and shuts the transport down cleanly.
|
||||
//
|
||||
// err := transport.Stop()
|
||||
// err := transport.Stop()
|
||||
func (t *Transport) Stop() error {
|
||||
t.cancelLifecycle()
|
||||
|
||||
|
|
@ -410,18 +398,14 @@ func (t *Transport) Stop() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// OnMessage installs the handler for incoming messages before Start.
|
||||
//
|
||||
// transport.OnMessage(worker.HandleMessage)
|
||||
// transport.OnMessage(worker.HandleMessage)
|
||||
func (t *Transport) OnMessage(handler MessageHandler) {
|
||||
t.mutex.Lock()
|
||||
defer t.mutex.Unlock()
|
||||
t.messageHandler = handler
|
||||
}
|
||||
|
||||
// Connect dials a peer, completes the handshake, and starts the session loops.
|
||||
//
|
||||
// pc, err := transport.Connect(&Peer{ID: "worker-1", Address: "127.0.0.1:9091"})
|
||||
// pc, err := transport.Connect(&Peer{ID: "worker-1", Address: "127.0.0.1:9091"})
|
||||
func (t *Transport) Connect(peer *Peer) (*PeerConnection, error) {
|
||||
// Build WebSocket URL
|
||||
scheme := "ws"
|
||||
|
|
@ -485,9 +469,7 @@ func (t *Transport) Connect(peer *Peer) (*PeerConnection, error) {
|
|||
return pc, nil
|
||||
}
|
||||
|
||||
// Send transmits an encrypted message to a connected peer.
|
||||
//
|
||||
// err := transport.Send("worker-1", message)
|
||||
// err := transport.Send("worker-1", message)
|
||||
func (t *Transport) Send(peerID string, msg *Message) error {
|
||||
t.mutex.RLock()
|
||||
pc, exists := t.connections[peerID]
|
||||
|
|
@ -500,11 +482,9 @@ func (t *Transport) Send(peerID string, msg *Message) error {
|
|||
return pc.Send(msg)
|
||||
}
|
||||
|
||||
// Connections returns an iterator over all active peer connections.
|
||||
//
|
||||
// for pc := range transport.Connections() {
|
||||
// log.Printf("connected: %s", pc.Peer.ID)
|
||||
// }
|
||||
// for pc := range transport.Connections() {
|
||||
// _ = pc
|
||||
// }
|
||||
func (t *Transport) Connections() iter.Seq[*PeerConnection] {
|
||||
return func(yield func(*PeerConnection) bool) {
|
||||
t.mutex.RLock()
|
||||
|
|
@ -518,10 +498,7 @@ func (t *Transport) Connections() iter.Seq[*PeerConnection] {
|
|||
}
|
||||
}
|
||||
|
||||
// Broadcast sends a message to every connected peer except the sender.
|
||||
// The sender (msg.From) is excluded to prevent echo.
|
||||
//
|
||||
// err := transport.Broadcast(announcement)
|
||||
// err := transport.Broadcast(announcement)
|
||||
func (t *Transport) Broadcast(msg *Message) error {
|
||||
conns := slices.Collect(t.Connections())
|
||||
|
||||
|
|
@ -537,9 +514,7 @@ func (t *Transport) Broadcast(msg *Message) error {
|
|||
return lastErr
|
||||
}
|
||||
|
||||
// GetConnection returns an active connection to a peer.
|
||||
//
|
||||
// connection := transport.GetConnection("worker-1")
|
||||
// connection := transport.GetConnection("worker-1")
|
||||
func (t *Transport) GetConnection(peerID string) *PeerConnection {
|
||||
t.mutex.RLock()
|
||||
defer t.mutex.RUnlock()
|
||||
|
|
@ -973,9 +948,7 @@ func (t *Transport) removeConnection(pc *PeerConnection) {
|
|||
pc.Close()
|
||||
}
|
||||
|
||||
// Send sends an encrypted message over the connection.
|
||||
//
|
||||
// err := peerConnection.Send(message)
|
||||
// err := peerConnection.Send(message)
|
||||
func (pc *PeerConnection) Send(msg *Message) error {
|
||||
pc.writeMutex.Lock()
|
||||
defer pc.writeMutex.Unlock()
|
||||
|
|
@ -993,9 +966,7 @@ func (pc *PeerConnection) Send(msg *Message) error {
|
|||
return pc.Conn.WriteMessage(websocket.BinaryMessage, data)
|
||||
}
|
||||
|
||||
// Close closes the connection.
|
||||
//
|
||||
// err := peerConnection.Close()
|
||||
// err := peerConnection.Close()
|
||||
func (pc *PeerConnection) Close() error {
|
||||
var err error
|
||||
pc.closeOnce.Do(func() {
|
||||
|
|
@ -1021,9 +992,7 @@ const (
|
|||
DisconnectShutdown = 1004 // Server shutdown
|
||||
)
|
||||
|
||||
// GracefulClose sends a disconnect message before closing the connection.
|
||||
//
|
||||
// err := peerConnection.GracefulClose("server shutdown", DisconnectShutdown)
|
||||
// err := peerConnection.GracefulClose("server shutdown", DisconnectShutdown)
|
||||
func (pc *PeerConnection) GracefulClose(reason string, code int) error {
|
||||
var err error
|
||||
pc.closeOnce.Do(func() {
|
||||
|
|
@ -1090,12 +1059,9 @@ func (t *Transport) decryptMessage(data []byte, sharedSecret []byte) (*Message,
|
|||
return &msg, nil
|
||||
}
|
||||
|
||||
// ConnectedPeerCount returns the number of connected peers.
|
||||
//
|
||||
// count := transport.ConnectedPeerCount()
|
||||
// count := transport.ConnectedPeerCount()
|
||||
func (t *Transport) ConnectedPeerCount() int {
|
||||
t.mutex.RLock()
|
||||
defer t.mutex.RUnlock()
|
||||
return len(t.connections)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -10,10 +10,7 @@ import (
|
|||
"github.com/adrg/xdg"
|
||||
)
|
||||
|
||||
// MinerManager interface for the mining package integration.
|
||||
// This allows the node package to interact with mining.Manager without import cycles.
|
||||
//
|
||||
// var minerManager MinerManager
|
||||
// var minerManager MinerManager
|
||||
type MinerManager interface {
|
||||
StartMiner(minerType string, config any) (MinerInstance, error)
|
||||
StopMiner(name string) error
|
||||
|
|
@ -21,9 +18,7 @@ type MinerManager interface {
|
|||
GetMiner(name string) (MinerInstance, error)
|
||||
}
|
||||
|
||||
// MinerInstance represents a running miner for stats collection.
|
||||
//
|
||||
// var miner MinerInstance
|
||||
// var miner MinerInstance
|
||||
type MinerInstance interface {
|
||||
GetName() string
|
||||
GetType() string
|
||||
|
|
@ -31,17 +26,13 @@ type MinerInstance interface {
|
|||
GetConsoleHistory(lines int) []string
|
||||
}
|
||||
|
||||
// ProfileManager interface for profile operations.
|
||||
//
|
||||
// var profileManager ProfileManager
|
||||
// var profileManager ProfileManager
|
||||
type ProfileManager interface {
|
||||
GetProfile(id string) (any, error)
|
||||
SaveProfile(profile any) error
|
||||
}
|
||||
|
||||
// Worker handles incoming messages on a worker node.
|
||||
//
|
||||
// worker := NewWorker(nodeManager, transport)
|
||||
// worker := NewWorker(nodeManager, transport)
|
||||
type Worker struct {
|
||||
nodeManager *NodeManager
|
||||
transport *Transport
|
||||
|
|
@ -51,9 +42,7 @@ type Worker struct {
|
|||
DeploymentDirectory string // Base directory for deployments (defaults to xdg.DataHome)
|
||||
}
|
||||
|
||||
// NewWorker creates a new Worker instance.
|
||||
//
|
||||
// worker := NewWorker(nodeManager, transport)
|
||||
// worker := NewWorker(nodeManager, transport)
|
||||
func NewWorker(nodeManager *NodeManager, transport *Transport) *Worker {
|
||||
return &Worker{
|
||||
nodeManager: nodeManager,
|
||||
|
|
@ -63,23 +52,17 @@ func NewWorker(nodeManager *NodeManager, transport *Transport) *Worker {
|
|||
}
|
||||
}
|
||||
|
||||
// SetMinerManager attaches the miner manager used for miner operations.
|
||||
//
|
||||
// worker.SetMinerManager(minerManager)
|
||||
// worker.SetMinerManager(minerManager)
|
||||
func (w *Worker) SetMinerManager(manager MinerManager) {
|
||||
w.minerManager = manager
|
||||
}
|
||||
|
||||
// SetProfileManager attaches the profile manager used for profile operations.
|
||||
//
|
||||
// worker.SetProfileManager(profileManager)
|
||||
// worker.SetProfileManager(profileManager)
|
||||
func (w *Worker) SetProfileManager(manager ProfileManager) {
|
||||
w.profileManager = manager
|
||||
}
|
||||
|
||||
// HandleMessage routes an incoming message to the correct worker handler.
|
||||
//
|
||||
// worker.HandleMessage(peerConnection, message)
|
||||
// worker.HandleMessage(peerConnection, message)
|
||||
func (w *Worker) HandleMessage(peerConnection *PeerConnection, message *Message) {
|
||||
var response *Message
|
||||
var err error
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue