From 41cc0c295cf0219d40370d65ab430383ae7cd3ab Mon Sep 17 00:00:00 2001 From: snider Date: Wed, 31 Dec 2025 15:45:25 +0000 Subject: [PATCH] feat: Implement multiple code review improvements MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - P2P-HIGH-1: Add peer scoring system with success/failure/timeout tracking - PERF-HIGH-2: Add JSON encoding buffer pool for hot paths - API-HIGH-1: Standardize error responses using APIError struct - RESIL-MED-5: Add graceful disconnect with reason/code messages All verified items (SQL indexes, keepalive) were already implemented. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- pkg/mining/bufpool.go | 55 ++++++++++++++++++++++++++ pkg/mining/events.go | 4 +- pkg/mining/service.go | 40 +++++++++---------- pkg/node/bufpool.go | 55 ++++++++++++++++++++++++++ pkg/node/message.go | 2 +- pkg/node/peer.go | 89 +++++++++++++++++++++++++++++++++++++++++++ pkg/node/transport.go | 56 +++++++++++++++++++++++---- 7 files changed, 271 insertions(+), 30 deletions(-) create mode 100644 pkg/mining/bufpool.go create mode 100644 pkg/node/bufpool.go diff --git a/pkg/mining/bufpool.go b/pkg/mining/bufpool.go new file mode 100644 index 0000000..a1282fc --- /dev/null +++ b/pkg/mining/bufpool.go @@ -0,0 +1,55 @@ +package mining + +import ( + "bytes" + "encoding/json" + "sync" +) + +// bufferPool provides reusable byte buffers for JSON encoding. +// This reduces allocation overhead in hot paths like WebSocket event serialization. +var bufferPool = sync.Pool{ + New: func() interface{} { + return bytes.NewBuffer(make([]byte, 0, 1024)) + }, +} + +// getBuffer retrieves a buffer from the pool. +func getBuffer() *bytes.Buffer { + buf := bufferPool.Get().(*bytes.Buffer) + buf.Reset() + return buf +} + +// putBuffer returns a buffer to the pool. +func putBuffer(buf *bytes.Buffer) { + // Don't pool buffers that grew too large (>64KB) + if buf.Cap() <= 65536 { + bufferPool.Put(buf) + } +} + +// MarshalJSON encodes a value to JSON using a pooled buffer. +// Returns a copy of the encoded bytes (safe to use after the function returns). +func MarshalJSON(v interface{}) ([]byte, error) { + buf := getBuffer() + defer putBuffer(buf) + + enc := json.NewEncoder(buf) + // Don't escape HTML characters (matches json.Marshal behavior for these use cases) + enc.SetEscapeHTML(false) + if err := enc.Encode(v); err != nil { + return nil, err + } + + // json.Encoder.Encode adds a newline; remove it to match json.Marshal + data := buf.Bytes() + if len(data) > 0 && data[len(data)-1] == '\n' { + data = data[:len(data)-1] + } + + // Return a copy since the buffer will be reused + result := make([]byte, len(data)) + copy(result, data) + return result, nil +} diff --git a/pkg/mining/events.go b/pkg/mining/events.go index f3de661..49f9b3c 100644 --- a/pkg/mining/events.go +++ b/pkg/mining/events.go @@ -156,7 +156,7 @@ func (h *EventHub) Run() { Timestamp: time.Now(), Data: state, } - data, err := json.Marshal(event) + data, err := MarshalJSON(event) if err != nil { logging.Error("failed to marshal state sync", logging.Fields{"error": err}) return @@ -180,7 +180,7 @@ func (h *EventHub) Run() { logging.Debug("client disconnected", logging.Fields{"total": len(h.clients)}) case event := <-h.broadcast: - data, err := json.Marshal(event) + data, err := MarshalJSON(event) if err != nil { logging.Error("failed to marshal event", logging.Fields{"error": err}) continue diff --git a/pkg/mining/service.go b/pkg/mining/service.go index d87b424..b968a18 100644 --- a/pkg/mining/service.go +++ b/pkg/mining/service.go @@ -578,7 +578,7 @@ func (s *Service) handleReady(c *gin.Context) { func (s *Service) handleGetInfo(c *gin.Context) { systemInfo, err := s.updateInstallationCache() if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to get system info", "details": err.Error()}) + respondWithMiningError(c, ErrInternal("failed to get system info").WithCause(err)) return } c.JSON(http.StatusOK, systemInfo) @@ -644,7 +644,7 @@ func (s *Service) updateInstallationCache() (*SystemInfo, error) { func (s *Service) handleDoctor(c *gin.Context) { systemInfo, err := s.updateInstallationCache() if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update cache", "details": err.Error()}) + respondWithMiningError(c, ErrInternal("failed to update cache").WithCause(err)) return } c.JSON(http.StatusOK, systemInfo) @@ -709,7 +709,7 @@ func (s *Service) handleUpdateCheck(c *gin.Context) { func (s *Service) handleUninstallMiner(c *gin.Context) { minerType := c.Param("miner_name") if err := s.Manager.UninstallMiner(c.Request.Context(), minerType); err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + respondWithMiningError(c, ErrInternal("failed to uninstall miner").WithCause(err)) return } if _, err := s.updateInstallationCache(); err != nil { @@ -754,12 +754,12 @@ func (s *Service) handleInstallMiner(c *gin.Context) { minerType := c.Param("miner_name") miner, err := CreateMiner(minerType) if err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": "unknown miner type"}) + respondWithMiningError(c, ErrUnsupportedMiner(minerType)) return } if err := miner.Install(); err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + respondWithMiningError(c, ErrInstallFailed(minerType).WithCause(err)) return } @@ -769,7 +769,7 @@ func (s *Service) handleInstallMiner(c *gin.Context) { details, err := miner.CheckInstallation() if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to verify installation", "details": err.Error()}) + respondWithMiningError(c, ErrInternal("failed to verify installation").WithCause(err)) return } @@ -794,13 +794,13 @@ func (s *Service) handleStartMinerWithProfile(c *gin.Context) { var config Config if err := json.Unmarshal(profile.Config, &config); err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to parse profile config", "details": err.Error()}) + respondWithMiningError(c, ErrInvalidConfig("failed to parse profile config").WithCause(err)) return } miner, err := s.Manager.StartMiner(c.Request.Context(), profile.MinerType, &config) if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + respondWithMiningError(c, ErrStartFailed(profile.Name).WithCause(err)) return } c.JSON(http.StatusOK, miner) @@ -817,7 +817,7 @@ func (s *Service) handleStartMinerWithProfile(c *gin.Context) { func (s *Service) handleStopMiner(c *gin.Context) { minerName := c.Param("miner_name") if err := s.Manager.StopMiner(c.Request.Context(), minerName); err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + respondWithMiningError(c, ErrStopFailed(minerName).WithCause(err)) return } c.JSON(http.StatusOK, gin.H{"status": "stopped"}) @@ -840,7 +840,7 @@ func (s *Service) handleGetMinerStats(c *gin.Context) { } stats, err := miner.GetStats(c.Request.Context()) if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + respondWithMiningError(c, ErrInternal("failed to get miner stats").WithCause(err)) return } c.JSON(http.StatusOK, stats) @@ -858,7 +858,7 @@ func (s *Service) handleGetMinerHashrateHistory(c *gin.Context) { minerName := c.Param("miner_name") history, err := s.Manager.GetMinerHashrateHistory(minerName) if err != nil { - c.JSON(http.StatusNotFound, gin.H{"error": err.Error()}) + respondWithMiningError(c, ErrMinerNotFound(minerName).WithCause(err)) return } c.JSON(http.StatusOK, history) @@ -915,12 +915,12 @@ func (s *Service) handleMinerStdin(c *gin.Context) { var input StdinInput if err := c.ShouldBindJSON(&input); err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": "invalid input: " + err.Error()}) + respondWithMiningError(c, ErrInvalidConfig("invalid input format").WithCause(err)) return } if err := miner.WriteStdin(input.Input); err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + respondWithMiningError(c, ErrInternal("failed to write to stdin").WithCause(err)) return } @@ -1075,13 +1075,13 @@ func (s *Service) handleHistoryStatus(c *gin.Context) { func (s *Service) handleAllMinersHistoricalStats(c *gin.Context) { manager, ok := s.Manager.(*Manager) if !ok { - c.JSON(http.StatusInternalServerError, gin.H{"error": "manager type not supported"}) + respondWithMiningError(c, ErrInternal("manager type not supported")) return } stats, err := manager.GetAllMinerHistoricalStats() if err != nil { - c.JSON(http.StatusServiceUnavailable, gin.H{"error": err.Error()}) + respondWithMiningError(c, ErrDatabaseError("get historical stats").WithCause(err)) return } @@ -1100,18 +1100,18 @@ func (s *Service) handleMinerHistoricalStats(c *gin.Context) { minerName := c.Param("miner_name") manager, ok := s.Manager.(*Manager) if !ok { - c.JSON(http.StatusInternalServerError, gin.H{"error": "manager type not supported"}) + respondWithMiningError(c, ErrInternal("manager type not supported")) return } stats, err := manager.GetMinerHistoricalStats(minerName) if err != nil { - c.JSON(http.StatusServiceUnavailable, gin.H{"error": err.Error()}) + respondWithMiningError(c, ErrDatabaseError("get miner stats").WithCause(err)) return } if stats == nil { - c.JSON(http.StatusNotFound, gin.H{"error": "no historical data found for miner"}) + respondWithMiningError(c, ErrMinerNotFound(minerName).WithDetails("no historical data found")) return } @@ -1132,7 +1132,7 @@ func (s *Service) handleMinerHistoricalHashrate(c *gin.Context) { minerName := c.Param("miner_name") manager, ok := s.Manager.(*Manager) if !ok { - c.JSON(http.StatusInternalServerError, gin.H{"error": "manager type not supported"}) + respondWithMiningError(c, ErrInternal("manager type not supported")) return } @@ -1153,7 +1153,7 @@ func (s *Service) handleMinerHistoricalHashrate(c *gin.Context) { history, err := manager.GetMinerHistoricalHashrate(minerName, since, until) if err != nil { - c.JSON(http.StatusServiceUnavailable, gin.H{"error": err.Error()}) + respondWithMiningError(c, ErrDatabaseError("get hashrate history").WithCause(err)) return } diff --git a/pkg/node/bufpool.go b/pkg/node/bufpool.go new file mode 100644 index 0000000..a4f0e68 --- /dev/null +++ b/pkg/node/bufpool.go @@ -0,0 +1,55 @@ +package node + +import ( + "bytes" + "encoding/json" + "sync" +) + +// bufferPool provides reusable byte buffers for JSON encoding. +// This reduces allocation overhead in hot paths like message serialization. +var bufferPool = sync.Pool{ + New: func() interface{} { + return bytes.NewBuffer(make([]byte, 0, 1024)) + }, +} + +// getBuffer retrieves a buffer from the pool. +func getBuffer() *bytes.Buffer { + buf := bufferPool.Get().(*bytes.Buffer) + buf.Reset() + return buf +} + +// putBuffer returns a buffer to the pool. +func putBuffer(buf *bytes.Buffer) { + // Don't pool buffers that grew too large (>64KB) + if buf.Cap() <= 65536 { + bufferPool.Put(buf) + } +} + +// MarshalJSON encodes a value to JSON using a pooled buffer. +// Returns a copy of the encoded bytes (safe to use after the function returns). +func MarshalJSON(v interface{}) ([]byte, error) { + buf := getBuffer() + defer putBuffer(buf) + + enc := json.NewEncoder(buf) + // Don't escape HTML characters (matches json.Marshal behavior for these use cases) + enc.SetEscapeHTML(false) + if err := enc.Encode(v); err != nil { + return nil, err + } + + // json.Encoder.Encode adds a newline; remove it to match json.Marshal + data := buf.Bytes() + if len(data) > 0 && data[len(data)-1] == '\n' { + data = data[:len(data)-1] + } + + // Return a copy since the buffer will be reused + result := make([]byte, len(data)) + copy(result, data) + return result, nil +} diff --git a/pkg/node/message.go b/pkg/node/message.go index 293be1e..77032c7 100644 --- a/pkg/node/message.go +++ b/pkg/node/message.go @@ -52,7 +52,7 @@ type Message struct { func NewMessage(msgType MessageType, from, to string, payload interface{}) (*Message, error) { var payloadBytes json.RawMessage if payload != nil { - data, err := json.Marshal(payload) + data, err := MarshalJSON(payload) if err != nil { return nil, err } diff --git a/pkg/node/peer.go b/pkg/node/peer.go index b1023f3..994fb06 100644 --- a/pkg/node/peer.go +++ b/pkg/node/peer.go @@ -338,6 +338,95 @@ func (r *PeerRegistry) SetConnected(id string, connected bool) { } } +// Score adjustment constants +const ( + ScoreSuccessIncrement = 1.0 // Increment for successful interaction + ScoreFailureDecrement = 5.0 // Decrement for failed interaction + ScoreTimeoutDecrement = 3.0 // Decrement for timeout + ScoreMinimum = 0.0 // Minimum score + ScoreMaximum = 100.0 // Maximum score + ScoreDefault = 50.0 // Default score for new peers +) + +// RecordSuccess records a successful interaction with a peer, improving their score. +func (r *PeerRegistry) RecordSuccess(id string) { + r.mu.Lock() + peer, exists := r.peers[id] + if !exists { + r.mu.Unlock() + return + } + + peer.Score = min(peer.Score+ScoreSuccessIncrement, ScoreMaximum) + peer.LastSeen = time.Now() + r.mu.Unlock() + r.save() +} + +// RecordFailure records a failed interaction with a peer, reducing their score. +func (r *PeerRegistry) RecordFailure(id string) { + r.mu.Lock() + peer, exists := r.peers[id] + if !exists { + r.mu.Unlock() + return + } + + peer.Score = max(peer.Score-ScoreFailureDecrement, ScoreMinimum) + newScore := peer.Score + r.mu.Unlock() + r.save() + + logging.Debug("peer score decreased", logging.Fields{ + "peer_id": id, + "new_score": newScore, + "reason": "failure", + }) +} + +// RecordTimeout records a timeout when communicating with a peer. +func (r *PeerRegistry) RecordTimeout(id string) { + r.mu.Lock() + peer, exists := r.peers[id] + if !exists { + r.mu.Unlock() + return + } + + peer.Score = max(peer.Score-ScoreTimeoutDecrement, ScoreMinimum) + newScore := peer.Score + r.mu.Unlock() + r.save() + + logging.Debug("peer score decreased", logging.Fields{ + "peer_id": id, + "new_score": newScore, + "reason": "timeout", + }) +} + +// GetPeersByScore returns peers sorted by score (highest first). +func (r *PeerRegistry) GetPeersByScore() []*Peer { + r.mu.RLock() + defer r.mu.RUnlock() + + peers := make([]*Peer, 0, len(r.peers)) + for _, p := range r.peers { + peers = append(peers, p) + } + + // Sort by score descending + for i := 0; i < len(peers)-1; i++ { + for j := i + 1; j < len(peers); j++ { + if peers[j].Score > peers[i].Score { + peers[i], peers[j] = peers[j], peers[i] + } + } + } + + return peers +} + // SelectOptimalPeer returns the best peer based on multi-factor optimization. // Uses Poindexter KD-tree to find the peer closest to ideal metrics. func (r *PeerRegistry) SelectOptimalPeer() *Peer { diff --git a/pkg/node/transport.go b/pkg/node/transport.go index 1f0d1d2..3c8e9f1 100644 --- a/pkg/node/transport.go +++ b/pkg/node/transport.go @@ -245,10 +245,10 @@ func (t *Transport) Start() error { func (t *Transport) Stop() error { t.cancel() - // Close all connections + // Gracefully close all connections with shutdown message t.mu.Lock() for _, pc := range t.conns { - pc.Close() + pc.GracefulClose("server shutdown", DisconnectShutdown) } t.mu.Unlock() @@ -450,7 +450,7 @@ func (t *Transport) handleWSUpgrade(w http.ResponseWriter, r *http.Request) { Reason: "peer not authorized", } rejectMsg, _ := NewMessage(MsgHandshakeAck, identity.ID, payload.Identity.ID, rejectPayload) - if rejectData, err := json.Marshal(rejectMsg); err == nil { + if rejectData, err := MarshalJSON(rejectMsg); err == nil { conn.WriteMessage(websocket.TextMessage, rejectData) } } @@ -512,7 +512,7 @@ func (t *Transport) handleWSUpgrade(w http.ResponseWriter, r *http.Request) { } // First ack is unencrypted (peer needs to know our public key) - ackData, err := json.Marshal(ackMsg) + ackData, err := MarshalJSON(ackMsg) if err != nil { conn.Close() return @@ -575,7 +575,7 @@ func (t *Transport) performHandshake(pc *PeerConnection) error { } // First message is unencrypted (peer needs our public key) - data, err := json.Marshal(msg) + data, err := MarshalJSON(msg) if err != nil { return fmt.Errorf("marshal handshake message: %w", err) } @@ -788,10 +788,52 @@ func (pc *PeerConnection) Close() error { return err } +// DisconnectPayload contains reason for disconnect. +type DisconnectPayload struct { + Reason string `json:"reason"` + Code int `json:"code"` // Optional disconnect code +} + +// Disconnect codes +const ( + DisconnectNormal = 1000 // Normal closure + DisconnectGoingAway = 1001 // Server/peer going away + DisconnectProtocolErr = 1002 // Protocol error + DisconnectTimeout = 1003 // Idle timeout + DisconnectShutdown = 1004 // Server shutdown +) + +// GracefulClose sends a disconnect message before closing the connection. +func (pc *PeerConnection) GracefulClose(reason string, code int) error { + var err error + pc.closeOnce.Do(func() { + // Try to send disconnect message (best effort) + if pc.transport != nil && pc.SharedSecret != nil { + identity := pc.transport.node.GetIdentity() + if identity != nil { + payload := DisconnectPayload{ + Reason: reason, + Code: code, + } + msg, msgErr := NewMessage(MsgDisconnect, identity.ID, pc.Peer.ID, payload) + if msgErr == nil { + // Set short deadline for disconnect message + pc.Conn.SetWriteDeadline(time.Now().Add(2 * time.Second)) + pc.Send(msg) + } + } + } + + // Close the underlying connection + err = pc.Conn.Close() + }) + return err +} + // encryptMessage encrypts a message using SMSG with the shared secret. func (t *Transport) encryptMessage(msg *Message, sharedSecret []byte) ([]byte, error) { - // Serialize message to JSON - msgData, err := json.Marshal(msg) + // Serialize message to JSON (using pooled buffer for efficiency) + msgData, err := MarshalJSON(msg) if err != nil { return nil, err }