feat: Implement multiple code review improvements
- P2P-HIGH-1: Add peer scoring system with success/failure/timeout tracking - PERF-HIGH-2: Add JSON encoding buffer pool for hot paths - API-HIGH-1: Standardize error responses using APIError struct - RESIL-MED-5: Add graceful disconnect with reason/code messages All verified items (SQL indexes, keepalive) were already implemented. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
a48ce861da
commit
41cc0c295c
7 changed files with 271 additions and 30 deletions
55
pkg/mining/bufpool.go
Normal file
55
pkg/mining/bufpool.go
Normal file
|
|
@ -0,0 +1,55 @@
|
|||
package mining
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// bufferPool provides reusable byte buffers for JSON encoding.
|
||||
// This reduces allocation overhead in hot paths like WebSocket event serialization.
|
||||
var bufferPool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
return bytes.NewBuffer(make([]byte, 0, 1024))
|
||||
},
|
||||
}
|
||||
|
||||
// getBuffer retrieves a buffer from the pool.
|
||||
func getBuffer() *bytes.Buffer {
|
||||
buf := bufferPool.Get().(*bytes.Buffer)
|
||||
buf.Reset()
|
||||
return buf
|
||||
}
|
||||
|
||||
// putBuffer returns a buffer to the pool.
|
||||
func putBuffer(buf *bytes.Buffer) {
|
||||
// Don't pool buffers that grew too large (>64KB)
|
||||
if buf.Cap() <= 65536 {
|
||||
bufferPool.Put(buf)
|
||||
}
|
||||
}
|
||||
|
||||
// MarshalJSON encodes a value to JSON using a pooled buffer.
|
||||
// Returns a copy of the encoded bytes (safe to use after the function returns).
|
||||
func MarshalJSON(v interface{}) ([]byte, error) {
|
||||
buf := getBuffer()
|
||||
defer putBuffer(buf)
|
||||
|
||||
enc := json.NewEncoder(buf)
|
||||
// Don't escape HTML characters (matches json.Marshal behavior for these use cases)
|
||||
enc.SetEscapeHTML(false)
|
||||
if err := enc.Encode(v); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// json.Encoder.Encode adds a newline; remove it to match json.Marshal
|
||||
data := buf.Bytes()
|
||||
if len(data) > 0 && data[len(data)-1] == '\n' {
|
||||
data = data[:len(data)-1]
|
||||
}
|
||||
|
||||
// Return a copy since the buffer will be reused
|
||||
result := make([]byte, len(data))
|
||||
copy(result, data)
|
||||
return result, nil
|
||||
}
|
||||
|
|
@ -156,7 +156,7 @@ func (h *EventHub) Run() {
|
|||
Timestamp: time.Now(),
|
||||
Data: state,
|
||||
}
|
||||
data, err := json.Marshal(event)
|
||||
data, err := MarshalJSON(event)
|
||||
if err != nil {
|
||||
logging.Error("failed to marshal state sync", logging.Fields{"error": err})
|
||||
return
|
||||
|
|
@ -180,7 +180,7 @@ func (h *EventHub) Run() {
|
|||
logging.Debug("client disconnected", logging.Fields{"total": len(h.clients)})
|
||||
|
||||
case event := <-h.broadcast:
|
||||
data, err := json.Marshal(event)
|
||||
data, err := MarshalJSON(event)
|
||||
if err != nil {
|
||||
logging.Error("failed to marshal event", logging.Fields{"error": err})
|
||||
continue
|
||||
|
|
|
|||
|
|
@ -578,7 +578,7 @@ func (s *Service) handleReady(c *gin.Context) {
|
|||
func (s *Service) handleGetInfo(c *gin.Context) {
|
||||
systemInfo, err := s.updateInstallationCache()
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to get system info", "details": err.Error()})
|
||||
respondWithMiningError(c, ErrInternal("failed to get system info").WithCause(err))
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, systemInfo)
|
||||
|
|
@ -644,7 +644,7 @@ func (s *Service) updateInstallationCache() (*SystemInfo, error) {
|
|||
func (s *Service) handleDoctor(c *gin.Context) {
|
||||
systemInfo, err := s.updateInstallationCache()
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update cache", "details": err.Error()})
|
||||
respondWithMiningError(c, ErrInternal("failed to update cache").WithCause(err))
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, systemInfo)
|
||||
|
|
@ -709,7 +709,7 @@ func (s *Service) handleUpdateCheck(c *gin.Context) {
|
|||
func (s *Service) handleUninstallMiner(c *gin.Context) {
|
||||
minerType := c.Param("miner_name")
|
||||
if err := s.Manager.UninstallMiner(c.Request.Context(), minerType); err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
||||
respondWithMiningError(c, ErrInternal("failed to uninstall miner").WithCause(err))
|
||||
return
|
||||
}
|
||||
if _, err := s.updateInstallationCache(); err != nil {
|
||||
|
|
@ -754,12 +754,12 @@ func (s *Service) handleInstallMiner(c *gin.Context) {
|
|||
minerType := c.Param("miner_name")
|
||||
miner, err := CreateMiner(minerType)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "unknown miner type"})
|
||||
respondWithMiningError(c, ErrUnsupportedMiner(minerType))
|
||||
return
|
||||
}
|
||||
|
||||
if err := miner.Install(); err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
||||
respondWithMiningError(c, ErrInstallFailed(minerType).WithCause(err))
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -769,7 +769,7 @@ func (s *Service) handleInstallMiner(c *gin.Context) {
|
|||
|
||||
details, err := miner.CheckInstallation()
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to verify installation", "details": err.Error()})
|
||||
respondWithMiningError(c, ErrInternal("failed to verify installation").WithCause(err))
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -794,13 +794,13 @@ func (s *Service) handleStartMinerWithProfile(c *gin.Context) {
|
|||
|
||||
var config Config
|
||||
if err := json.Unmarshal(profile.Config, &config); err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to parse profile config", "details": err.Error()})
|
||||
respondWithMiningError(c, ErrInvalidConfig("failed to parse profile config").WithCause(err))
|
||||
return
|
||||
}
|
||||
|
||||
miner, err := s.Manager.StartMiner(c.Request.Context(), profile.MinerType, &config)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
||||
respondWithMiningError(c, ErrStartFailed(profile.Name).WithCause(err))
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, miner)
|
||||
|
|
@ -817,7 +817,7 @@ func (s *Service) handleStartMinerWithProfile(c *gin.Context) {
|
|||
func (s *Service) handleStopMiner(c *gin.Context) {
|
||||
minerName := c.Param("miner_name")
|
||||
if err := s.Manager.StopMiner(c.Request.Context(), minerName); err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
||||
respondWithMiningError(c, ErrStopFailed(minerName).WithCause(err))
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{"status": "stopped"})
|
||||
|
|
@ -840,7 +840,7 @@ func (s *Service) handleGetMinerStats(c *gin.Context) {
|
|||
}
|
||||
stats, err := miner.GetStats(c.Request.Context())
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
||||
respondWithMiningError(c, ErrInternal("failed to get miner stats").WithCause(err))
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, stats)
|
||||
|
|
@ -858,7 +858,7 @@ func (s *Service) handleGetMinerHashrateHistory(c *gin.Context) {
|
|||
minerName := c.Param("miner_name")
|
||||
history, err := s.Manager.GetMinerHashrateHistory(minerName)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": err.Error()})
|
||||
respondWithMiningError(c, ErrMinerNotFound(minerName).WithCause(err))
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, history)
|
||||
|
|
@ -915,12 +915,12 @@ func (s *Service) handleMinerStdin(c *gin.Context) {
|
|||
|
||||
var input StdinInput
|
||||
if err := c.ShouldBindJSON(&input); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid input: " + err.Error()})
|
||||
respondWithMiningError(c, ErrInvalidConfig("invalid input format").WithCause(err))
|
||||
return
|
||||
}
|
||||
|
||||
if err := miner.WriteStdin(input.Input); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
respondWithMiningError(c, ErrInternal("failed to write to stdin").WithCause(err))
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -1075,13 +1075,13 @@ func (s *Service) handleHistoryStatus(c *gin.Context) {
|
|||
func (s *Service) handleAllMinersHistoricalStats(c *gin.Context) {
|
||||
manager, ok := s.Manager.(*Manager)
|
||||
if !ok {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "manager type not supported"})
|
||||
respondWithMiningError(c, ErrInternal("manager type not supported"))
|
||||
return
|
||||
}
|
||||
|
||||
stats, err := manager.GetAllMinerHistoricalStats()
|
||||
if err != nil {
|
||||
c.JSON(http.StatusServiceUnavailable, gin.H{"error": err.Error()})
|
||||
respondWithMiningError(c, ErrDatabaseError("get historical stats").WithCause(err))
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -1100,18 +1100,18 @@ func (s *Service) handleMinerHistoricalStats(c *gin.Context) {
|
|||
minerName := c.Param("miner_name")
|
||||
manager, ok := s.Manager.(*Manager)
|
||||
if !ok {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "manager type not supported"})
|
||||
respondWithMiningError(c, ErrInternal("manager type not supported"))
|
||||
return
|
||||
}
|
||||
|
||||
stats, err := manager.GetMinerHistoricalStats(minerName)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusServiceUnavailable, gin.H{"error": err.Error()})
|
||||
respondWithMiningError(c, ErrDatabaseError("get miner stats").WithCause(err))
|
||||
return
|
||||
}
|
||||
|
||||
if stats == nil {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "no historical data found for miner"})
|
||||
respondWithMiningError(c, ErrMinerNotFound(minerName).WithDetails("no historical data found"))
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -1132,7 +1132,7 @@ func (s *Service) handleMinerHistoricalHashrate(c *gin.Context) {
|
|||
minerName := c.Param("miner_name")
|
||||
manager, ok := s.Manager.(*Manager)
|
||||
if !ok {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "manager type not supported"})
|
||||
respondWithMiningError(c, ErrInternal("manager type not supported"))
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -1153,7 +1153,7 @@ func (s *Service) handleMinerHistoricalHashrate(c *gin.Context) {
|
|||
|
||||
history, err := manager.GetMinerHistoricalHashrate(minerName, since, until)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusServiceUnavailable, gin.H{"error": err.Error()})
|
||||
respondWithMiningError(c, ErrDatabaseError("get hashrate history").WithCause(err))
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
|||
55
pkg/node/bufpool.go
Normal file
55
pkg/node/bufpool.go
Normal file
|
|
@ -0,0 +1,55 @@
|
|||
package node
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// bufferPool provides reusable byte buffers for JSON encoding.
|
||||
// This reduces allocation overhead in hot paths like message serialization.
|
||||
var bufferPool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
return bytes.NewBuffer(make([]byte, 0, 1024))
|
||||
},
|
||||
}
|
||||
|
||||
// getBuffer retrieves a buffer from the pool.
|
||||
func getBuffer() *bytes.Buffer {
|
||||
buf := bufferPool.Get().(*bytes.Buffer)
|
||||
buf.Reset()
|
||||
return buf
|
||||
}
|
||||
|
||||
// putBuffer returns a buffer to the pool.
|
||||
func putBuffer(buf *bytes.Buffer) {
|
||||
// Don't pool buffers that grew too large (>64KB)
|
||||
if buf.Cap() <= 65536 {
|
||||
bufferPool.Put(buf)
|
||||
}
|
||||
}
|
||||
|
||||
// MarshalJSON encodes a value to JSON using a pooled buffer.
|
||||
// Returns a copy of the encoded bytes (safe to use after the function returns).
|
||||
func MarshalJSON(v interface{}) ([]byte, error) {
|
||||
buf := getBuffer()
|
||||
defer putBuffer(buf)
|
||||
|
||||
enc := json.NewEncoder(buf)
|
||||
// Don't escape HTML characters (matches json.Marshal behavior for these use cases)
|
||||
enc.SetEscapeHTML(false)
|
||||
if err := enc.Encode(v); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// json.Encoder.Encode adds a newline; remove it to match json.Marshal
|
||||
data := buf.Bytes()
|
||||
if len(data) > 0 && data[len(data)-1] == '\n' {
|
||||
data = data[:len(data)-1]
|
||||
}
|
||||
|
||||
// Return a copy since the buffer will be reused
|
||||
result := make([]byte, len(data))
|
||||
copy(result, data)
|
||||
return result, nil
|
||||
}
|
||||
|
|
@ -52,7 +52,7 @@ type Message struct {
|
|||
func NewMessage(msgType MessageType, from, to string, payload interface{}) (*Message, error) {
|
||||
var payloadBytes json.RawMessage
|
||||
if payload != nil {
|
||||
data, err := json.Marshal(payload)
|
||||
data, err := MarshalJSON(payload)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -338,6 +338,95 @@ func (r *PeerRegistry) SetConnected(id string, connected bool) {
|
|||
}
|
||||
}
|
||||
|
||||
// Score adjustment constants
|
||||
const (
|
||||
ScoreSuccessIncrement = 1.0 // Increment for successful interaction
|
||||
ScoreFailureDecrement = 5.0 // Decrement for failed interaction
|
||||
ScoreTimeoutDecrement = 3.0 // Decrement for timeout
|
||||
ScoreMinimum = 0.0 // Minimum score
|
||||
ScoreMaximum = 100.0 // Maximum score
|
||||
ScoreDefault = 50.0 // Default score for new peers
|
||||
)
|
||||
|
||||
// RecordSuccess records a successful interaction with a peer, improving their score.
|
||||
func (r *PeerRegistry) RecordSuccess(id string) {
|
||||
r.mu.Lock()
|
||||
peer, exists := r.peers[id]
|
||||
if !exists {
|
||||
r.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
peer.Score = min(peer.Score+ScoreSuccessIncrement, ScoreMaximum)
|
||||
peer.LastSeen = time.Now()
|
||||
r.mu.Unlock()
|
||||
r.save()
|
||||
}
|
||||
|
||||
// RecordFailure records a failed interaction with a peer, reducing their score.
|
||||
func (r *PeerRegistry) RecordFailure(id string) {
|
||||
r.mu.Lock()
|
||||
peer, exists := r.peers[id]
|
||||
if !exists {
|
||||
r.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
peer.Score = max(peer.Score-ScoreFailureDecrement, ScoreMinimum)
|
||||
newScore := peer.Score
|
||||
r.mu.Unlock()
|
||||
r.save()
|
||||
|
||||
logging.Debug("peer score decreased", logging.Fields{
|
||||
"peer_id": id,
|
||||
"new_score": newScore,
|
||||
"reason": "failure",
|
||||
})
|
||||
}
|
||||
|
||||
// RecordTimeout records a timeout when communicating with a peer.
|
||||
func (r *PeerRegistry) RecordTimeout(id string) {
|
||||
r.mu.Lock()
|
||||
peer, exists := r.peers[id]
|
||||
if !exists {
|
||||
r.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
peer.Score = max(peer.Score-ScoreTimeoutDecrement, ScoreMinimum)
|
||||
newScore := peer.Score
|
||||
r.mu.Unlock()
|
||||
r.save()
|
||||
|
||||
logging.Debug("peer score decreased", logging.Fields{
|
||||
"peer_id": id,
|
||||
"new_score": newScore,
|
||||
"reason": "timeout",
|
||||
})
|
||||
}
|
||||
|
||||
// GetPeersByScore returns peers sorted by score (highest first).
|
||||
func (r *PeerRegistry) GetPeersByScore() []*Peer {
|
||||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
|
||||
peers := make([]*Peer, 0, len(r.peers))
|
||||
for _, p := range r.peers {
|
||||
peers = append(peers, p)
|
||||
}
|
||||
|
||||
// Sort by score descending
|
||||
for i := 0; i < len(peers)-1; i++ {
|
||||
for j := i + 1; j < len(peers); j++ {
|
||||
if peers[j].Score > peers[i].Score {
|
||||
peers[i], peers[j] = peers[j], peers[i]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return peers
|
||||
}
|
||||
|
||||
// SelectOptimalPeer returns the best peer based on multi-factor optimization.
|
||||
// Uses Poindexter KD-tree to find the peer closest to ideal metrics.
|
||||
func (r *PeerRegistry) SelectOptimalPeer() *Peer {
|
||||
|
|
|
|||
|
|
@ -245,10 +245,10 @@ func (t *Transport) Start() error {
|
|||
func (t *Transport) Stop() error {
|
||||
t.cancel()
|
||||
|
||||
// Close all connections
|
||||
// Gracefully close all connections with shutdown message
|
||||
t.mu.Lock()
|
||||
for _, pc := range t.conns {
|
||||
pc.Close()
|
||||
pc.GracefulClose("server shutdown", DisconnectShutdown)
|
||||
}
|
||||
t.mu.Unlock()
|
||||
|
||||
|
|
@ -450,7 +450,7 @@ func (t *Transport) handleWSUpgrade(w http.ResponseWriter, r *http.Request) {
|
|||
Reason: "peer not authorized",
|
||||
}
|
||||
rejectMsg, _ := NewMessage(MsgHandshakeAck, identity.ID, payload.Identity.ID, rejectPayload)
|
||||
if rejectData, err := json.Marshal(rejectMsg); err == nil {
|
||||
if rejectData, err := MarshalJSON(rejectMsg); err == nil {
|
||||
conn.WriteMessage(websocket.TextMessage, rejectData)
|
||||
}
|
||||
}
|
||||
|
|
@ -512,7 +512,7 @@ func (t *Transport) handleWSUpgrade(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
// First ack is unencrypted (peer needs to know our public key)
|
||||
ackData, err := json.Marshal(ackMsg)
|
||||
ackData, err := MarshalJSON(ackMsg)
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
return
|
||||
|
|
@ -575,7 +575,7 @@ func (t *Transport) performHandshake(pc *PeerConnection) error {
|
|||
}
|
||||
|
||||
// First message is unencrypted (peer needs our public key)
|
||||
data, err := json.Marshal(msg)
|
||||
data, err := MarshalJSON(msg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal handshake message: %w", err)
|
||||
}
|
||||
|
|
@ -788,10 +788,52 @@ func (pc *PeerConnection) Close() error {
|
|||
return err
|
||||
}
|
||||
|
||||
// DisconnectPayload contains reason for disconnect.
|
||||
type DisconnectPayload struct {
|
||||
Reason string `json:"reason"`
|
||||
Code int `json:"code"` // Optional disconnect code
|
||||
}
|
||||
|
||||
// Disconnect codes
|
||||
const (
|
||||
DisconnectNormal = 1000 // Normal closure
|
||||
DisconnectGoingAway = 1001 // Server/peer going away
|
||||
DisconnectProtocolErr = 1002 // Protocol error
|
||||
DisconnectTimeout = 1003 // Idle timeout
|
||||
DisconnectShutdown = 1004 // Server shutdown
|
||||
)
|
||||
|
||||
// GracefulClose sends a disconnect message before closing the connection.
|
||||
func (pc *PeerConnection) GracefulClose(reason string, code int) error {
|
||||
var err error
|
||||
pc.closeOnce.Do(func() {
|
||||
// Try to send disconnect message (best effort)
|
||||
if pc.transport != nil && pc.SharedSecret != nil {
|
||||
identity := pc.transport.node.GetIdentity()
|
||||
if identity != nil {
|
||||
payload := DisconnectPayload{
|
||||
Reason: reason,
|
||||
Code: code,
|
||||
}
|
||||
msg, msgErr := NewMessage(MsgDisconnect, identity.ID, pc.Peer.ID, payload)
|
||||
if msgErr == nil {
|
||||
// Set short deadline for disconnect message
|
||||
pc.Conn.SetWriteDeadline(time.Now().Add(2 * time.Second))
|
||||
pc.Send(msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Close the underlying connection
|
||||
err = pc.Conn.Close()
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
// encryptMessage encrypts a message using SMSG with the shared secret.
|
||||
func (t *Transport) encryptMessage(msg *Message, sharedSecret []byte) ([]byte, error) {
|
||||
// Serialize message to JSON
|
||||
msgData, err := json.Marshal(msg)
|
||||
// Serialize message to JSON (using pooled buffer for efficiency)
|
||||
msgData, err := MarshalJSON(msg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue