diff --git a/logging/logger_test.go b/logging/logger_test.go index 11ba5cb..08c48a9 100644 --- a/logging/logger_test.go +++ b/logging/logger_test.go @@ -2,6 +2,7 @@ package logging import ( "bytes" + "sync" "testing" core "dappco.re/go/core" @@ -266,3 +267,35 @@ func TestLogger_MergeFields_Good(t *testing.T) { t.Error("Later fields should override earlier ones") } } + +func TestLogger_ParseLevel_Bad(t *testing.T) { + _, err := ParseLevel("bogus") + if err == nil { + t.Error("ParseLevel should return an error for an unrecognised level string") + } +} + +func TestLogger_ConcurrentWrite_Ugly(t *testing.T) { + var buf bytes.Buffer + logger := New(Config{ + Output: &buf, + Level: LevelDebug, + }) + + const goroutines = 50 + var wg sync.WaitGroup + wg.Add(goroutines) + + for i := range goroutines { + go func(n int) { + defer wg.Done() + logger.Infof("concurrent message %d", n) + }(i) + } + + wg.Wait() + // Only assert no panics / races occurred; output ordering is non-deterministic. + if buf.Len() == 0 { + t.Error("expected concurrent log writes to produce output") + } +} diff --git a/node/bundle.go b/node/bundle.go index 03a78ed..de52eec 100644 --- a/node/bundle.go +++ b/node/bundle.go @@ -224,11 +224,9 @@ func createTarball(files map[string][]byte) ([]byte, error) { var buf bytes.Buffer tarWriter := tar.NewWriter(&buf) - // Track directories we've created createdDirectories := make(map[string]bool) for name, content := range files { - // Create parent directories if needed dir := core.PathDir(name) if dir != "." && !createdDirectories[dir] { header := &tar.Header{ @@ -242,7 +240,7 @@ func createTarball(files map[string][]byte) ([]byte, error) { createdDirectories[dir] = true } - // Determine file mode (executable for binaries in miners/) + // Binaries in miners/ and non-JSON content get executable permissions. mode := int64(0644) if core.PathDir(name) == "miners" || !isJSON(content) { mode = 0755 diff --git a/node/dispatcher.go b/node/dispatcher.go index ac78c49..126e3b2 100644 --- a/node/dispatcher.go +++ b/node/dispatcher.go @@ -71,8 +71,11 @@ func NewDispatcher() *Dispatcher { } // RegisterHandler associates an IntentHandler with a specific IntentID. -// Calling RegisterHandler with an IntentID that already has a handler will -// replace the previous handler. +// Replacing an existing handler is allowed — the new handler takes effect immediately. +// +// dispatcher.RegisterHandler(IntentCompute, func(packet *ueps.ParsedPacket) error { +// return processComputeJob(packet.Payload) +// }) func (d *Dispatcher) RegisterHandler(intentID byte, handler IntentHandler) { d.mu.Lock() defer d.mu.Unlock() @@ -83,6 +86,10 @@ func (d *Dispatcher) RegisterHandler(intentID byte, handler IntentHandler) { } // Handlers returns an iterator over all registered intent handlers. +// +// for intentID, handler := range dispatcher.Handlers() { +// log.Printf("registered intent 0x%02X", intentID) +// } func (d *Dispatcher) Handlers() iter.Seq2[byte, IntentHandler] { return func(yield func(byte, IntentHandler) bool) { d.mu.RLock() @@ -152,3 +159,4 @@ var ( // ErrorNilPacket is returned when a nil packet is passed to Dispatch. ErrorNilPacket = core.E("Dispatcher.Dispatch", "nil packet", nil) ) + diff --git a/node/errors.go b/node/errors.go index 7d29af4..f660be1 100644 --- a/node/errors.go +++ b/node/errors.go @@ -12,3 +12,4 @@ var ( // attempted but no MinerManager has been set on the Worker. ErrorMinerManagerNotConfigured = core.E("node", "miner manager not configured", nil) ) + diff --git a/node/peer.go b/node/peer.go index 7776f7d..aeafe82 100644 --- a/node/peer.go +++ b/node/peer.go @@ -187,6 +187,8 @@ func (r *PeerRegistry) GetAuthMode() PeerAuthMode { } // AllowPublicKey adds a public key to the allowlist. +// +// registry.AllowPublicKey(peer.PublicKey) func (r *PeerRegistry) AllowPublicKey(publicKey string) { r.allowedPublicKeyMu.Lock() defer r.allowedPublicKeyMu.Unlock() @@ -195,6 +197,8 @@ func (r *PeerRegistry) AllowPublicKey(publicKey string) { } // RevokePublicKey removes a public key from the allowlist. +// +// registry.RevokePublicKey(peer.PublicKey) func (r *PeerRegistry) RevokePublicKey(publicKey string) { r.allowedPublicKeyMu.Lock() defer r.allowedPublicKeyMu.Unlock() @@ -203,6 +207,8 @@ func (r *PeerRegistry) RevokePublicKey(publicKey string) { } // IsPublicKeyAllowed checks if a public key is in the allowlist. +// +// allowed := registry.IsPublicKeyAllowed(peer.PublicKey) func (r *PeerRegistry) IsPublicKeyAllowed(publicKey string) bool { r.allowedPublicKeyMu.RLock() defer r.allowedPublicKeyMu.RUnlock() @@ -210,9 +216,10 @@ func (r *PeerRegistry) IsPublicKeyAllowed(publicKey string) bool { } // IsPeerAllowed checks if a peer is allowed to connect based on auth mode. -// Returns true if: -// - AuthMode is Open (allow all) -// - AuthMode is Allowlist AND (peer is pre-registered OR public key is allowlisted) +// Returns true when AuthMode is Open (all allowed), or when Allowlist mode is active +// and the peer is pre-registered or its public key is in the allowlist. +// +// allowed := registry.IsPeerAllowed(peer.ID, peer.PublicKey) func (r *PeerRegistry) IsPeerAllowed(peerID string, publicKey string) bool { r.allowedPublicKeyMu.RLock() authMode := r.authMode @@ -238,11 +245,17 @@ func (r *PeerRegistry) IsPeerAllowed(peerID string, publicKey string) bool { } // ListAllowedPublicKeys returns all allowlisted public keys. +// +// keys := registry.ListAllowedPublicKeys() func (r *PeerRegistry) ListAllowedPublicKeys() []string { return slices.Collect(r.AllowedPublicKeys()) } // AllowedPublicKeys returns an iterator over all allowlisted public keys. +// +// for key := range registry.AllowedPublicKeys() { +// log.Printf("allowed: %s", key[:16]) +// } func (r *PeerRegistry) AllowedPublicKeys() iter.Seq[string] { return func(yield func(string) bool) { r.allowedPublicKeyMu.RLock() @@ -257,8 +270,9 @@ func (r *PeerRegistry) AllowedPublicKeys() iter.Seq[string] { } // AddPeer adds a new peer to the registry. -// Note: Persistence is debounced (writes batched every 5s). Call Close() to ensure -// all changes are flushed to disk before shutdown. +// Persistence is debounced — writes are batched every 5s. Call Close() before shutdown. +// +// err := registry.AddPeer(&Peer{ID: "worker-1", Address: "10.0.0.1:9091", Role: RoleWorker}) func (r *PeerRegistry) AddPeer(peer *Peer) error { r.mu.Lock() @@ -295,7 +309,9 @@ func (r *PeerRegistry) AddPeer(peer *Peer) error { } // UpdatePeer updates an existing peer's information. -// Note: Persistence is debounced. Call Close() to flush before shutdown. +// Persistence is debounced. Call Close() to flush before shutdown. +// +// err := registry.UpdatePeer(&Peer{ID: "worker-1", Score: 90}) func (r *PeerRegistry) UpdatePeer(peer *Peer) error { r.mu.Lock() @@ -313,7 +329,9 @@ func (r *PeerRegistry) UpdatePeer(peer *Peer) error { } // RemovePeer removes a peer from the registry. -// Note: Persistence is debounced. Call Close() to flush before shutdown. +// Persistence is debounced. Call Close() to flush before shutdown. +// +// err := registry.RemovePeer("worker-1") func (r *PeerRegistry) RemovePeer(id string) error { r.mu.Lock() @@ -442,6 +460,8 @@ const ( ) // RecordSuccess records a successful interaction with a peer, improving their score. +// +// registry.RecordSuccess("worker-1") func (r *PeerRegistry) RecordSuccess(id string) { r.mu.Lock() peer, exists := r.peers[id] @@ -457,6 +477,8 @@ func (r *PeerRegistry) RecordSuccess(id string) { } // RecordFailure records a failed interaction with a peer, reducing their score. +// +// registry.RecordFailure("worker-1") func (r *PeerRegistry) RecordFailure(id string) { r.mu.Lock() peer, exists := r.peers[id] @@ -478,6 +500,8 @@ func (r *PeerRegistry) RecordFailure(id string) { } // RecordTimeout records a timeout when communicating with a peer. +// +// registry.RecordTimeout("worker-1") func (r *PeerRegistry) RecordTimeout(id string) { r.mu.Lock() peer, exists := r.peers[id] @@ -522,6 +546,10 @@ func (r *PeerRegistry) GetPeersByScore() []*Peer { } // PeersByScore returns an iterator over peers sorted by score (highest first). +// +// for peer := range registry.PeersByScore() { +// log.Printf("peer %s score=%.0f", peer.ID, peer.Score) +// } func (r *PeerRegistry) PeersByScore() iter.Seq[*Peer] { return func(yield func(*Peer) bool) { peers := r.GetPeersByScore() @@ -533,8 +561,10 @@ func (r *PeerRegistry) PeersByScore() iter.Seq[*Peer] { } } -// SelectOptimalPeer returns the best peer based on multi-factor optimization. -// Uses Poindexter KD-tree to find the peer closest to ideal metrics. +// SelectOptimalPeer returns the best peer based on multi-factor optimisation. +// Uses Poindexter KD-tree to find the peer closest to ideal metrics (low ping, low hops, high score). +// +// peer := registry.SelectOptimalPeer() func (r *PeerRegistry) SelectOptimalPeer() *Peer { r.mu.RLock() defer r.mu.RUnlock() @@ -561,7 +591,9 @@ func (r *PeerRegistry) SelectOptimalPeer() *Peer { return &peerCopy } -// SelectNearestPeers returns the n best peers based on multi-factor optimization. +// SelectNearestPeers returns the n best peers based on multi-factor optimisation. +// +// peers := registry.SelectNearestPeers(3) func (r *PeerRegistry) SelectNearestPeers(n int) []*Peer { r.mu.RLock() defer r.mu.RUnlock() @@ -612,6 +644,8 @@ func (r *PeerRegistry) ConnectedPeers() iter.Seq[*Peer] { } // Count returns the number of registered peers. +// +// n := registry.Count() func (r *PeerRegistry) Count() int { r.mu.RLock() defer r.mu.RUnlock() diff --git a/node/transport.go b/node/transport.go index 595f0c4..4acdfff 100644 --- a/node/transport.go +++ b/node/transport.go @@ -311,6 +311,8 @@ func (t *Transport) agentUserAgent() string { } // Start opens the WebSocket listener and background maintenance loops. +// +// err := transport.Start() func (t *Transport) Start() error { mux := http.NewServeMux() mux.HandleFunc(t.config.webSocketPath(), t.handleWebSocketUpgrade) @@ -380,6 +382,8 @@ func (t *Transport) Start() error { } // Stop closes active connections and shuts the transport down cleanly. +// +// err := transport.Stop() func (t *Transport) Stop() error { t.cancelLifecycle() @@ -482,6 +486,8 @@ func (t *Transport) Connect(peer *Peer) (*PeerConnection, error) { } // Send transmits an encrypted message to a connected peer. +// +// err := transport.Send("worker-1", message) func (t *Transport) Send(peerID string, msg *Message) error { t.mutex.RLock() pc, exists := t.connections[peerID] @@ -495,6 +501,10 @@ func (t *Transport) Send(peerID string, msg *Message) error { } // Connections returns an iterator over all active peer connections. +// +// for pc := range transport.Connections() { +// log.Printf("connected: %s", pc.Peer.ID) +// } func (t *Transport) Connections() iter.Seq[*PeerConnection] { return func(yield func(*PeerConnection) bool) { t.mutex.RLock() @@ -509,13 +519,14 @@ func (t *Transport) Connections() iter.Seq[*PeerConnection] { } // Broadcast sends a message to every connected peer except the sender. -// The sender is identified by msg.From and excluded to prevent echo. +// The sender (msg.From) is excluded to prevent echo. +// +// err := transport.Broadcast(announcement) func (t *Transport) Broadcast(msg *Message) error { conns := slices.Collect(t.Connections()) var lastErr error for _, pc := range conns { - // Exclude sender from broadcast to prevent echo (P2P-MED-6) if pc.Peer != nil && pc.Peer.ID == msg.From { continue } @@ -963,26 +974,28 @@ func (t *Transport) removeConnection(pc *PeerConnection) { } // Send sends an encrypted message over the connection. +// +// err := peerConnection.Send(message) func (pc *PeerConnection) Send(msg *Message) error { pc.writeMutex.Lock() defer pc.writeMutex.Unlock() - // Encrypt message using SMSG data, err := pc.transport.encryptMessage(msg, pc.SharedSecret) if err != nil { return err } - // Set write deadline to prevent blocking forever if err := pc.Conn.SetWriteDeadline(time.Now().Add(10 * time.Second)); err != nil { return core.E("PeerConnection.Send", "failed to set write deadline", err) } - defer pc.Conn.SetWriteDeadline(time.Time{}) // Reset deadline after send + defer pc.Conn.SetWriteDeadline(time.Time{}) return pc.Conn.WriteMessage(websocket.BinaryMessage, data) } // Close closes the connection. +// +// err := peerConnection.Close() func (pc *PeerConnection) Close() error { var err error pc.closeOnce.Do(func() { @@ -1009,6 +1022,8 @@ const ( ) // GracefulClose sends a disconnect message before closing the connection. +// +// err := peerConnection.GracefulClose("server shutdown", DisconnectShutdown) func (pc *PeerConnection) GracefulClose(reason string, code int) error { var err error pc.closeOnce.Do(func() { @@ -1083,3 +1098,4 @@ func (t *Transport) ConnectedPeerCount() int { defer t.mutex.RUnlock() return len(t.connections) } + diff --git a/node/worker.go b/node/worker.go index 9c0669d..711c16b 100644 --- a/node/worker.go +++ b/node/worker.go @@ -175,14 +175,13 @@ func (w *Worker) handleStats(message *Message) (*Message, error) { return message.Reply(MessageStats, stats) } -// convertMinerStats converts miner stats to the protocol format. +// convertMinerStats converts a running miner's raw stats map to the wire protocol format. func convertMinerStats(miner MinerInstance, rawStats any) MinerStatsItem { item := MinerStatsItem{ Name: miner.GetName(), Type: miner.GetType(), } - // Try to extract common fields from the stats if statsMap, ok := rawStats.(map[string]any); ok { if hashrate, ok := statsMap["hashrate"].(float64); ok { item.Hashrate = hashrate