fix: Comprehensive code hardening from 4-domain Opus review

Error Handling:
- Fix silent Write() error in WebSocket (events.go)
- Add error context to transport handshake messages
- Check os.MkdirAll error in zip extraction (miner.go)
- Explicitly ignore io.Copy errors on drain with comments
- Add retry logic (2 attempts) for transient stats collection failures

Resource Lifecycle:
- Add shutdown mechanism to DigestAuth goroutine
- Call Service.Stop() on context cancellation
- Add NodeService transport cleanup to Service.Stop()
- Fix WriteStdin goroutine leak on timeout with non-blocking send

API Design:
- Add profile validation (name, miner type required)
- Return 404 instead of 500 for missing profile PUT
- Make DELETE profile idempotent (return success if not found)
- Standardize error responses in node_service.go handlers

Observability:
- Add logging for P2P GetAllStats failures
- Add request ID correlation helper for handler logs
- Add logging for miner process exits (xmrig_start.go)
- Rate limit debug logs in transport hot path (1 in 100)
- Add metrics infrastructure with /metrics endpoint

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
snider 2025-12-31 14:33:30 +00:00
parent 6b73a4b84b
commit d533164893
22 changed files with 468 additions and 114 deletions

View file

@ -71,16 +71,29 @@ func AuthConfigFromEnv() AuthConfig {
type DigestAuth struct {
config AuthConfig
nonces sync.Map // map[string]time.Time for nonce expiry tracking
stopChan chan struct{}
stopOnce sync.Once
}
// NewDigestAuth creates a new digest auth middleware
func NewDigestAuth(config AuthConfig) *DigestAuth {
da := &DigestAuth{config: config}
da := &DigestAuth{
config: config,
stopChan: make(chan struct{}),
}
// Start nonce cleanup goroutine
go da.cleanupNonces()
return da
}
// Stop gracefully shuts down the DigestAuth, stopping the cleanup goroutine.
// Safe to call multiple times.
func (da *DigestAuth) Stop() {
da.stopOnce.Do(func() {
close(da.stopChan)
})
}
// Middleware returns a Gin middleware that enforces digest authentication
func (da *DigestAuth) Middleware() gin.HandlerFunc {
return func(c *gin.Context) {
@ -209,7 +222,11 @@ func (da *DigestAuth) cleanupNonces() {
ticker := time.NewTicker(da.config.NonceExpiry)
defer ticker.Stop()
for range ticker.C {
for {
select {
case <-da.stopChan:
return
case <-ticker.C:
now := time.Now()
da.nonces.Range(func(key, value interface{}) bool {
if now.Sub(value.(time.Time)) > da.config.NonceExpiry {
@ -219,6 +236,7 @@ func (da *DigestAuth) cleanupNonces() {
})
}
}
}
// parseDigestParams parses the parameters from a digest auth header
func parseDigestParams(header string) map[string]string {

View file

@ -218,7 +218,6 @@ func (c *Container) ProfileManager() *ProfileManager {
return c.profileManager
}
// NodeService returns the node service (may be nil if P2P is unavailable).
func (c *Container) NodeService() *NodeService {
c.mu.RLock()

View file

@ -24,6 +24,7 @@ const (
ErrCodeProfileNotFound = "PROFILE_NOT_FOUND"
ErrCodeProfileExists = "PROFILE_EXISTS"
ErrCodeInternalError = "INTERNAL_ERROR"
ErrCodeInternal = "INTERNAL_ERROR" // Alias for consistency
)
// MiningError is a structured error type for the mining package

View file

@ -302,7 +302,10 @@ func (c *wsClient) writePump() {
if err != nil {
return
}
w.Write(message)
if _, err := w.Write(message); err != nil {
logging.Debug("WebSocket write error", logging.Fields{"error": err})
return
}
if err := w.Close(); err != nil {
return

View file

@ -312,6 +312,7 @@ func (m *Manager) StartMiner(ctx context.Context, minerType string, config *Conf
Name: instanceName,
})
RecordMinerStart()
return miner, nil
}
@ -454,6 +455,7 @@ func (m *Manager) StopMiner(ctx context.Context, name string) error {
return stopErr
}
RecordMinerStop()
return nil
}
@ -574,21 +576,55 @@ func (m *Manager) collectMinerStats() {
wg.Wait()
}
// collectSingleMinerStats collects stats from a single miner.
// statsRetryCount is the number of retries for transient stats failures.
const statsRetryCount = 2
// statsRetryDelay is the delay between stats collection retries.
const statsRetryDelay = 500 * time.Millisecond
// collectSingleMinerStats collects stats from a single miner with retry logic.
// This is called concurrently for each miner.
func (m *Manager) collectSingleMinerStats(miner Miner, minerType string, now time.Time, dbEnabled bool) {
minerName := miner.GetName()
var stats *PerformanceMetrics
var lastErr error
// Retry loop for transient failures
for attempt := 0; attempt <= statsRetryCount; attempt++ {
// Use context with timeout to prevent hanging on unresponsive miner APIs
ctx, cancel := context.WithTimeout(context.Background(), statsCollectionTimeout)
defer cancel() // Ensure context is released after all operations
stats, lastErr = miner.GetStats(ctx)
cancel() // Release context immediately
stats, err := miner.GetStats(ctx)
if err != nil {
logging.Error("failed to get miner stats", logging.Fields{"miner": minerName, "error": err})
if lastErr == nil {
break // Success
}
// Log retry attempts at debug level
if attempt < statsRetryCount {
logging.Debug("retrying stats collection", logging.Fields{
"miner": minerName,
"attempt": attempt + 1,
"error": lastErr.Error(),
})
time.Sleep(statsRetryDelay)
}
}
if lastErr != nil {
logging.Error("failed to get miner stats after retries", logging.Fields{
"miner": minerName,
"error": lastErr.Error(),
"retries": statsRetryCount,
})
RecordStatsCollection(true, true)
return
}
// Record stats collection (retried if we did any retries)
RecordStatsCollection(stats != nil && lastErr == nil, false)
point := HashratePoint{
Timestamp: now,
Hashrate: stats.Hashrate,
@ -605,10 +641,12 @@ func (m *Manager) collectSingleMinerStats(miner Miner, minerType string, now tim
Timestamp: point.Timestamp,
Hashrate: point.Hashrate,
}
// Use the same context for DB writes so they respect timeout/cancellation
if err := database.InsertHashratePoint(ctx, minerName, minerType, dbPoint, database.ResolutionHigh); err != nil {
// Create a new context for DB writes (original context is from retry loop)
dbCtx, dbCancel := context.WithTimeout(context.Background(), statsCollectionTimeout)
if err := database.InsertHashratePoint(dbCtx, minerName, minerType, dbPoint, database.ResolutionHigh); err != nil {
logging.Warn("failed to persist hashrate", logging.Fields{"miner": minerName, "error": err})
}
dbCancel()
}
// Emit stats event for real-time WebSocket updates

169
pkg/mining/metrics.go Normal file
View file

@ -0,0 +1,169 @@
package mining
import (
"sync"
"sync/atomic"
"time"
)
// Metrics provides simple instrumentation counters for the mining package.
// These can be exposed via Prometheus or other metrics systems in the future.
type Metrics struct {
// API metrics
RequestsTotal atomic.Int64
RequestsErrored atomic.Int64
RequestLatency *LatencyHistogram
// Miner metrics
MinersStarted atomic.Int64
MinersStopped atomic.Int64
MinersErrored atomic.Int64
// Stats collection metrics
StatsCollected atomic.Int64
StatsRetried atomic.Int64
StatsFailed atomic.Int64
// WebSocket metrics
WSConnections atomic.Int64
WSMessages atomic.Int64
// P2P metrics
P2PMessagesSent atomic.Int64
P2PMessagesReceived atomic.Int64
P2PConnectionsTotal atomic.Int64
}
// LatencyHistogram tracks request latencies with basic percentile support.
type LatencyHistogram struct {
mu sync.Mutex
samples []time.Duration
maxSize int
}
// NewLatencyHistogram creates a new latency histogram with a maximum sample size.
func NewLatencyHistogram(maxSize int) *LatencyHistogram {
return &LatencyHistogram{
samples: make([]time.Duration, 0, maxSize),
maxSize: maxSize,
}
}
// Record adds a latency sample.
func (h *LatencyHistogram) Record(d time.Duration) {
h.mu.Lock()
defer h.mu.Unlock()
if len(h.samples) >= h.maxSize {
// Ring buffer behavior - overwrite oldest
copy(h.samples, h.samples[1:])
h.samples = h.samples[:len(h.samples)-1]
}
h.samples = append(h.samples, d)
}
// Average returns the average latency.
func (h *LatencyHistogram) Average() time.Duration {
h.mu.Lock()
defer h.mu.Unlock()
if len(h.samples) == 0 {
return 0
}
var total time.Duration
for _, d := range h.samples {
total += d
}
return total / time.Duration(len(h.samples))
}
// Count returns the number of samples.
func (h *LatencyHistogram) Count() int {
h.mu.Lock()
defer h.mu.Unlock()
return len(h.samples)
}
// DefaultMetrics is the global metrics instance.
var DefaultMetrics = &Metrics{
RequestLatency: NewLatencyHistogram(1000),
}
// RecordRequest records an API request.
func RecordRequest(errored bool, latency time.Duration) {
DefaultMetrics.RequestsTotal.Add(1)
if errored {
DefaultMetrics.RequestsErrored.Add(1)
}
DefaultMetrics.RequestLatency.Record(latency)
}
// RecordMinerStart records a miner start event.
func RecordMinerStart() {
DefaultMetrics.MinersStarted.Add(1)
}
// RecordMinerStop records a miner stop event.
func RecordMinerStop() {
DefaultMetrics.MinersStopped.Add(1)
}
// RecordMinerError records a miner error event.
func RecordMinerError() {
DefaultMetrics.MinersErrored.Add(1)
}
// RecordStatsCollection records a stats collection event.
func RecordStatsCollection(retried bool, failed bool) {
DefaultMetrics.StatsCollected.Add(1)
if retried {
DefaultMetrics.StatsRetried.Add(1)
}
if failed {
DefaultMetrics.StatsFailed.Add(1)
}
}
// RecordWSConnection increments or decrements WebSocket connection count.
func RecordWSConnection(connected bool) {
if connected {
DefaultMetrics.WSConnections.Add(1)
} else {
DefaultMetrics.WSConnections.Add(-1)
}
}
// RecordWSMessage records a WebSocket message.
func RecordWSMessage() {
DefaultMetrics.WSMessages.Add(1)
}
// RecordP2PMessage records a P2P message.
func RecordP2PMessage(sent bool) {
if sent {
DefaultMetrics.P2PMessagesSent.Add(1)
} else {
DefaultMetrics.P2PMessagesReceived.Add(1)
}
}
// GetMetricsSnapshot returns a snapshot of current metrics.
func GetMetricsSnapshot() map[string]interface{} {
return map[string]interface{}{
"requests_total": DefaultMetrics.RequestsTotal.Load(),
"requests_errored": DefaultMetrics.RequestsErrored.Load(),
"request_latency_avg_ms": DefaultMetrics.RequestLatency.Average().Milliseconds(),
"request_latency_samples": DefaultMetrics.RequestLatency.Count(),
"miners_started": DefaultMetrics.MinersStarted.Load(),
"miners_stopped": DefaultMetrics.MinersStopped.Load(),
"miners_errored": DefaultMetrics.MinersErrored.Load(),
"stats_collected": DefaultMetrics.StatsCollected.Load(),
"stats_retried": DefaultMetrics.StatsRetried.Load(),
"stats_failed": DefaultMetrics.StatsFailed.Load(),
"ws_connections": DefaultMetrics.WSConnections.Load(),
"ws_messages": DefaultMetrics.WSMessages.Load(),
"p2p_messages_sent": DefaultMetrics.P2PMessagesSent.Load(),
"p2p_messages_received": DefaultMetrics.P2PMessagesReceived.Load(),
}
}

View file

@ -216,11 +216,17 @@ func (b *BaseMiner) WriteStdin(input string) error {
input += "\n"
}
// Write with timeout to prevent blocking indefinitely
// Write with timeout to prevent blocking indefinitely.
// Use buffered channel size 1 so goroutine can exit even if we don't read the result.
done := make(chan error, 1)
go func() {
_, err := stdinPipe.Write([]byte(input))
done <- err
// Non-blocking send - if timeout already fired, this won't block
select {
case done <- err:
default:
// Timeout already occurred, goroutine exits cleanly
}
}()
select {
@ -252,13 +258,13 @@ func (b *BaseMiner) InstallFromURL(url string) error {
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
io.Copy(io.Discard, resp.Body) // Drain body to allow connection reuse
_, _ = io.Copy(io.Discard, resp.Body) // Drain body to allow connection reuse (error ignored intentionally)
return fmt.Errorf("failed to download release: unexpected status code %d", resp.StatusCode)
}
if _, err := io.Copy(tmpfile, resp.Body); err != nil {
// Drain remaining body to allow connection reuse
io.Copy(io.Discard, resp.Body)
// Drain remaining body to allow connection reuse (error ignored intentionally)
_, _ = io.Copy(io.Discard, resp.Body)
return err
}
@ -548,7 +554,9 @@ func (b *BaseMiner) unzip(src, dest string) error {
return fmt.Errorf("%s: illegal file path", fpath)
}
if f.FileInfo().IsDir() {
os.MkdirAll(fpath, os.ModePerm)
if err := os.MkdirAll(fpath, os.ModePerm); err != nil {
return fmt.Errorf("failed to create directory %s: %w", fpath, err)
}
continue
}

View file

@ -4,6 +4,7 @@ import (
"encoding/json"
"net/http"
"strconv"
"strings"
"github.com/Snider/Mining/pkg/node"
"github.com/gin-gonic/gin"
@ -257,12 +258,17 @@ func (ns *NodeService) handleRemovePeer(c *gin.Context) {
// @Produce json
// @Param id path string true "Peer ID"
// @Success 200 {object} map[string]float64
// @Failure 404 {object} APIError "Peer not found"
// @Router /peers/{id}/ping [post]
func (ns *NodeService) handlePingPeer(c *gin.Context) {
peerID := c.Param("id")
rtt, err := ns.controller.PingPeer(peerID)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
if strings.Contains(err.Error(), "not found") || strings.Contains(err.Error(), "not connected") {
respondWithError(c, http.StatusNotFound, "PEER_NOT_FOUND", "peer not found or not connected", err.Error())
return
}
respondWithError(c, http.StatusInternalServerError, ErrCodeInternal, "ping failed", err.Error())
return
}
c.JSON(http.StatusOK, gin.H{"rtt_ms": rtt})
@ -275,11 +281,16 @@ func (ns *NodeService) handlePingPeer(c *gin.Context) {
// @Produce json
// @Param id path string true "Peer ID"
// @Success 200 {object} map[string]string
// @Failure 404 {object} APIError "Peer not found"
// @Router /peers/{id}/connect [post]
func (ns *NodeService) handleConnectPeer(c *gin.Context) {
peerID := c.Param("id")
if err := ns.controller.ConnectToPeer(peerID); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
if strings.Contains(err.Error(), "not found") {
respondWithError(c, http.StatusNotFound, "PEER_NOT_FOUND", "peer not found", err.Error())
return
}
respondWithError(c, http.StatusInternalServerError, ErrCodeConnectionFailed, "connection failed", err.Error())
return
}
c.JSON(http.StatusOK, gin.H{"status": "connected"})
@ -287,7 +298,7 @@ func (ns *NodeService) handleConnectPeer(c *gin.Context) {
// handleDisconnectPeer godoc
// @Summary Disconnect from a peer
// @Description Close the connection to a peer
// @Description Close the connection to a peer. Idempotent - returns success if peer not connected.
// @Tags peers
// @Produce json
// @Param id path string true "Peer ID"
@ -296,7 +307,12 @@ func (ns *NodeService) handleConnectPeer(c *gin.Context) {
func (ns *NodeService) handleDisconnectPeer(c *gin.Context) {
peerID := c.Param("id")
if err := ns.controller.DisconnectFromPeer(peerID); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
// Make disconnect idempotent - if peer not connected, still return success
if strings.Contains(err.Error(), "not connected") {
c.JSON(http.StatusOK, gin.H{"status": "disconnected"})
return
}
respondWithError(c, http.StatusInternalServerError, ErrCodeInternal, "disconnect failed", err.Error())
return
}
c.JSON(http.StatusOK, gin.H{"status": "disconnected"})

View file

@ -144,6 +144,35 @@ func generateRequestID() string {
return fmt.Sprintf("%d-%x", time.Now().UnixMilli(), b[:4])
}
// getRequestID extracts the request ID from gin context
func getRequestID(c *gin.Context) string {
if id, exists := c.Get("requestID"); exists {
if s, ok := id.(string); ok {
return s
}
}
return ""
}
// logWithRequestID logs a message with request ID correlation
func logWithRequestID(c *gin.Context, level string, message string, fields logging.Fields) {
if fields == nil {
fields = logging.Fields{}
}
if reqID := getRequestID(c); reqID != "" {
fields["request_id"] = reqID
}
switch level {
case "error":
logging.Error(message, fields)
case "warn":
logging.Warn(message, fields)
case "info":
logging.Info(message, fields)
default:
logging.Debug(message, fields)
}
}
// WebSocket upgrader for the events endpoint
var wsUpgrader = websocket.Upgrader{
@ -312,6 +341,14 @@ func (s *Service) Stop() {
if s.EventHub != nil {
s.EventHub.Stop()
}
if s.auth != nil {
s.auth.Stop()
}
if s.NodeService != nil {
if err := s.NodeService.StopTransport(); err != nil {
logging.Warn("failed to stop node service transport", logging.Fields{"error": err})
}
}
}
// ServiceStartup initializes the router and starts the HTTP server.
@ -333,6 +370,7 @@ func (s *Service) ServiceStartup(ctx context.Context) error {
go func() {
<-ctx.Done()
s.Stop() // Clean up service resources (auth, event hub, node service)
s.Manager.Stop()
ctxShutdown, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
@ -377,6 +415,7 @@ func (s *Service) SetupRoutes() {
{
apiGroup.GET("/info", s.handleGetInfo)
apiGroup.GET("/metrics", s.handleMetrics)
apiGroup.POST("/doctor", s.handleDoctor)
apiGroup.POST("/update", s.handleUpdateCheck)
@ -824,17 +863,28 @@ func (s *Service) handleListProfiles(c *gin.Context) {
// @Produce json
// @Param profile body MiningProfile true "Mining Profile"
// @Success 201 {object} MiningProfile
// @Failure 400 {object} APIError "Invalid profile data"
// @Router /profiles [post]
func (s *Service) handleCreateProfile(c *gin.Context) {
var profile MiningProfile
if err := c.ShouldBindJSON(&profile); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
respondWithError(c, http.StatusBadRequest, ErrCodeInvalidInput, "invalid profile data", err.Error())
return
}
// Validate required fields
if profile.Name == "" {
respondWithError(c, http.StatusBadRequest, ErrCodeInvalidInput, "profile name is required", "")
return
}
if profile.MinerType == "" {
respondWithError(c, http.StatusBadRequest, ErrCodeInvalidInput, "miner type is required", "")
return
}
createdProfile, err := s.ProfileManager.CreateProfile(&profile)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to create profile", "details": err.Error()})
respondWithError(c, http.StatusInternalServerError, ErrCodeInternal, "failed to create profile", err.Error())
return
}
@ -868,18 +918,24 @@ func (s *Service) handleGetProfile(c *gin.Context) {
// @Param id path string true "Profile ID"
// @Param profile body MiningProfile true "Updated Mining Profile"
// @Success 200 {object} MiningProfile
// @Failure 404 {object} APIError "Profile not found"
// @Router /profiles/{id} [put]
func (s *Service) handleUpdateProfile(c *gin.Context) {
profileID := c.Param("id")
var profile MiningProfile
if err := c.ShouldBindJSON(&profile); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
respondWithError(c, http.StatusBadRequest, ErrCodeInvalidInput, "invalid profile data", err.Error())
return
}
profile.ID = profileID
if err := s.ProfileManager.UpdateProfile(&profile); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update profile", "details": err.Error()})
// Check if error is "not found"
if strings.Contains(err.Error(), "not found") {
respondWithError(c, http.StatusNotFound, ErrCodeProfileNotFound, "profile not found", err.Error())
return
}
respondWithError(c, http.StatusInternalServerError, ErrCodeInternal, "failed to update profile", err.Error())
return
}
c.JSON(http.StatusOK, profile)
@ -887,7 +943,7 @@ func (s *Service) handleUpdateProfile(c *gin.Context) {
// handleDeleteProfile godoc
// @Summary Delete a mining profile
// @Description Delete a mining profile by its ID
// @Description Delete a mining profile by its ID. Idempotent - returns success even if profile doesn't exist.
// @Tags profiles
// @Produce json
// @Param id path string true "Profile ID"
@ -896,7 +952,12 @@ func (s *Service) handleUpdateProfile(c *gin.Context) {
func (s *Service) handleDeleteProfile(c *gin.Context) {
profileID := c.Param("id")
if err := s.ProfileManager.DeleteProfile(profileID); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete profile", "details": err.Error()})
// Make DELETE idempotent - if profile doesn't exist, still return success
if strings.Contains(err.Error(), "not found") {
c.JSON(http.StatusOK, gin.H{"status": "profile deleted"})
return
}
respondWithError(c, http.StatusInternalServerError, ErrCodeInternal, "failed to delete profile", err.Error())
return
}
c.JSON(http.StatusOK, gin.H{"status": "profile deleted"})
@ -1030,7 +1091,20 @@ func (s *Service) handleWebSocketEvents(c *gin.Context) {
}
logging.Info("new WebSocket connection", logging.Fields{"remote": c.Request.RemoteAddr})
RecordWSConnection(true)
if !s.EventHub.ServeWs(conn) {
RecordWSConnection(false) // Undo increment on rejection
logging.Warn("WebSocket connection rejected", logging.Fields{"remote": c.Request.RemoteAddr, "reason": "limit reached"})
}
}
// handleMetrics godoc
// @Summary Get internal metrics
// @Description Returns internal metrics for monitoring and debugging
// @Tags system
// @Produce json
// @Success 200 {object} map[string]interface{}
// @Router /metrics [get]
func (s *Service) handleMetrics(c *gin.Context) {
c.JSON(http.StatusOK, GetMetricsSnapshot())
}

View file

@ -55,4 +55,3 @@ func FetchJSONStats[T any](ctx context.Context, config HTTPStatsConfig, target *
return nil
}

View file

@ -99,21 +99,33 @@ func (m *XMRigMiner) Start(config *Config) error {
// Capture cmd locally to avoid race with Stop()
cmd := m.cmd
minerName := m.Name // Capture name for logging
go func() {
// Use a channel to detect if Wait() completes
done := make(chan struct{})
var waitErr error
go func() {
cmd.Wait()
waitErr = cmd.Wait()
close(done)
}()
// Wait with timeout to prevent goroutine leak on zombie processes
select {
case <-done:
// Normal exit
// Normal exit - log the exit status
if waitErr != nil {
logging.Info("miner process exited", logging.Fields{
"miner": minerName,
"error": waitErr.Error(),
})
} else {
logging.Info("miner process exited normally", logging.Fields{
"miner": minerName,
})
}
case <-time.After(5 * time.Minute):
// Process didn't exit after 5 minutes - force cleanup
logging.Warn("miner process wait timeout, forcing cleanup", logging.Fields{"miner": m.Name})
logging.Warn("miner process wait timeout, forcing cleanup", logging.Fields{"miner": minerName})
if cmd.Process != nil {
cmd.Process.Kill()
}
@ -122,7 +134,7 @@ func (m *XMRigMiner) Start(config *Config) error {
case <-done:
// Inner goroutine completed
case <-time.After(10 * time.Second):
logging.Error("process cleanup timed out after kill", logging.Fields{"miner": m.Name})
logging.Error("process cleanup timed out after kill", logging.Fields{"miner": minerName})
}
}

View file

@ -6,6 +6,8 @@ import (
"fmt"
"sync"
"time"
"github.com/Snider/Mining/pkg/logging"
)
// Controller manages remote peer operations from a controller node.
@ -248,6 +250,11 @@ func (c *Controller) GetAllStats() map[string]*StatsPayload {
defer wg.Done()
stats, err := c.GetRemoteStats(p.ID)
if err != nil {
logging.Debug("failed to get stats from peer", logging.Fields{
"peer_id": p.ID,
"peer": p.Name,
"error": err.Error(),
})
return // Skip failed peers
}
mu.Lock()

View file

@ -8,6 +8,7 @@ import (
"net/http"
"net/url"
"sync"
"sync/atomic"
"time"
"github.com/Snider/Borg/pkg/smsg"
@ -15,6 +16,12 @@ import (
"github.com/gorilla/websocket"
)
// debugLogCounter tracks message counts for rate limiting debug logs
var debugLogCounter atomic.Int64
// debugLogInterval controls how often we log debug messages in hot paths (1 in N)
const debugLogInterval = 100
// TransportConfig configures the WebSocket transport.
type TransportConfig struct {
ListenAddr string // ":9091" default
@ -404,28 +411,28 @@ func (t *Transport) performHandshake(pc *PeerConnection) error {
msg, err := NewMessage(MsgHandshake, identity.ID, pc.Peer.ID, payload)
if err != nil {
return err
return fmt.Errorf("create handshake message: %w", err)
}
// First message is unencrypted (peer needs our public key)
data, err := json.Marshal(msg)
if err != nil {
return err
return fmt.Errorf("marshal handshake message: %w", err)
}
if err := pc.Conn.WriteMessage(websocket.TextMessage, data); err != nil {
return err
return fmt.Errorf("send handshake: %w", err)
}
// Wait for ack
_, ackData, err := pc.Conn.ReadMessage()
if err != nil {
return err
return fmt.Errorf("read handshake ack: %w", err)
}
var ackMsg Message
if err := json.Unmarshal(ackData, &ackMsg); err != nil {
return err
return fmt.Errorf("unmarshal handshake ack: %w", err)
}
if ackMsg.Type != MsgHandshakeAck {
@ -434,7 +441,7 @@ func (t *Transport) performHandshake(pc *PeerConnection) error {
var ackPayload HandshakeAckPayload
if err := ackMsg.ParsePayload(&ackPayload); err != nil {
return err
return fmt.Errorf("parse handshake ack payload: %w", err)
}
if !ackPayload.Accepted {
@ -490,7 +497,10 @@ func (t *Transport) readLoop(pc *PeerConnection) {
continue // Skip invalid messages
}
logging.Debug("received message from peer", logging.Fields{"type": msg.Type, "peer_id": pc.Peer.ID, "reply_to": msg.ReplyTo})
// Rate limit debug logs in hot path to reduce noise (log 1 in N messages)
if debugLogCounter.Add(1)%debugLogInterval == 0 {
logging.Debug("received message from peer", logging.Fields{"type": msg.Type, "peer_id": pc.Peer.ID, "reply_to": msg.ReplyTo, "sample": "1/100"})
}
// Dispatch to handler (read handler under lock to avoid race)
t.mu.RLock()