fix: Comprehensive code hardening from 4-domain Opus review

Error Handling:
- Fix silent Write() error in WebSocket (events.go)
- Add error context to transport handshake messages
- Check os.MkdirAll error in zip extraction (miner.go)
- Explicitly ignore io.Copy errors on drain with comments
- Add retry logic (2 attempts) for transient stats collection failures

Resource Lifecycle:
- Add shutdown mechanism to DigestAuth goroutine
- Call Service.Stop() on context cancellation
- Add NodeService transport cleanup to Service.Stop()
- Fix WriteStdin goroutine leak on timeout with non-blocking send

API Design:
- Add profile validation (name, miner type required)
- Return 404 instead of 500 for missing profile PUT
- Make DELETE profile idempotent (return success if not found)
- Standardize error responses in node_service.go handlers

Observability:
- Add logging for P2P GetAllStats failures
- Add request ID correlation helper for handler logs
- Add logging for miner process exits (xmrig_start.go)
- Rate limit debug logs in transport hot path (1 in 100)
- Add metrics infrastructure with /metrics endpoint

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
snider 2025-12-31 14:33:30 +00:00
parent 6b73a4b84b
commit d533164893
22 changed files with 468 additions and 114 deletions

View file

@ -69,18 +69,31 @@ func AuthConfigFromEnv() AuthConfig {
// DigestAuth implements HTTP Digest Authentication middleware // DigestAuth implements HTTP Digest Authentication middleware
type DigestAuth struct { type DigestAuth struct {
config AuthConfig config AuthConfig
nonces sync.Map // map[string]time.Time for nonce expiry tracking nonces sync.Map // map[string]time.Time for nonce expiry tracking
stopChan chan struct{}
stopOnce sync.Once
} }
// NewDigestAuth creates a new digest auth middleware // NewDigestAuth creates a new digest auth middleware
func NewDigestAuth(config AuthConfig) *DigestAuth { func NewDigestAuth(config AuthConfig) *DigestAuth {
da := &DigestAuth{config: config} da := &DigestAuth{
config: config,
stopChan: make(chan struct{}),
}
// Start nonce cleanup goroutine // Start nonce cleanup goroutine
go da.cleanupNonces() go da.cleanupNonces()
return da return da
} }
// Stop gracefully shuts down the DigestAuth, stopping the cleanup goroutine.
// Safe to call multiple times.
func (da *DigestAuth) Stop() {
da.stopOnce.Do(func() {
close(da.stopChan)
})
}
// Middleware returns a Gin middleware that enforces digest authentication // Middleware returns a Gin middleware that enforces digest authentication
func (da *DigestAuth) Middleware() gin.HandlerFunc { func (da *DigestAuth) Middleware() gin.HandlerFunc {
return func(c *gin.Context) { return func(c *gin.Context) {
@ -209,14 +222,19 @@ func (da *DigestAuth) cleanupNonces() {
ticker := time.NewTicker(da.config.NonceExpiry) ticker := time.NewTicker(da.config.NonceExpiry)
defer ticker.Stop() defer ticker.Stop()
for range ticker.C { for {
now := time.Now() select {
da.nonces.Range(func(key, value interface{}) bool { case <-da.stopChan:
if now.Sub(value.(time.Time)) > da.config.NonceExpiry { return
da.nonces.Delete(key) case <-ticker.C:
} now := time.Now()
return true da.nonces.Range(func(key, value interface{}) bool {
}) if now.Sub(value.(time.Time)) > da.config.NonceExpiry {
da.nonces.Delete(key)
}
return true
})
}
} }
} }

View file

@ -218,7 +218,6 @@ func (c *Container) ProfileManager() *ProfileManager {
return c.profileManager return c.profileManager
} }
// NodeService returns the node service (may be nil if P2P is unavailable). // NodeService returns the node service (may be nil if P2P is unavailable).
func (c *Container) NodeService() *NodeService { func (c *Container) NodeService() *NodeService {
c.mu.RLock() c.mu.RLock()

View file

@ -33,8 +33,8 @@ func TestDualMiningCPUAndGPU(t *testing.T) {
// GPU config - explicit device selection required! // GPU config - explicit device selection required!
GPUEnabled: true, GPUEnabled: true,
OpenCL: true, // AMD GPU OpenCL: true, // AMD GPU
Devices: "0", // Device 0 only - user must pick Devices: "0", // Device 0 only - user must pick
} }
minerInstance, err := manager.StartMiner(context.Background(), "xmrig", config) minerInstance, err := manager.StartMiner(context.Background(), "xmrig", config)

View file

@ -24,6 +24,7 @@ const (
ErrCodeProfileNotFound = "PROFILE_NOT_FOUND" ErrCodeProfileNotFound = "PROFILE_NOT_FOUND"
ErrCodeProfileExists = "PROFILE_EXISTS" ErrCodeProfileExists = "PROFILE_EXISTS"
ErrCodeInternalError = "INTERNAL_ERROR" ErrCodeInternalError = "INTERNAL_ERROR"
ErrCodeInternal = "INTERNAL_ERROR" // Alias for consistency
) )
// MiningError is a structured error type for the mining package // MiningError is a structured error type for the mining package

View file

@ -302,7 +302,10 @@ func (c *wsClient) writePump() {
if err != nil { if err != nil {
return return
} }
w.Write(message) if _, err := w.Write(message); err != nil {
logging.Debug("WebSocket write error", logging.Fields{"error": err})
return
}
if err := w.Close(); err != nil { if err := w.Close(); err != nil {
return return

View file

@ -312,6 +312,7 @@ func (m *Manager) StartMiner(ctx context.Context, minerType string, config *Conf
Name: instanceName, Name: instanceName,
}) })
RecordMinerStart()
return miner, nil return miner, nil
} }
@ -454,6 +455,7 @@ func (m *Manager) StopMiner(ctx context.Context, name string) error {
return stopErr return stopErr
} }
RecordMinerStop()
return nil return nil
} }
@ -574,21 +576,55 @@ func (m *Manager) collectMinerStats() {
wg.Wait() wg.Wait()
} }
// collectSingleMinerStats collects stats from a single miner. // statsRetryCount is the number of retries for transient stats failures.
const statsRetryCount = 2
// statsRetryDelay is the delay between stats collection retries.
const statsRetryDelay = 500 * time.Millisecond
// collectSingleMinerStats collects stats from a single miner with retry logic.
// This is called concurrently for each miner. // This is called concurrently for each miner.
func (m *Manager) collectSingleMinerStats(miner Miner, minerType string, now time.Time, dbEnabled bool) { func (m *Manager) collectSingleMinerStats(miner Miner, minerType string, now time.Time, dbEnabled bool) {
minerName := miner.GetName() minerName := miner.GetName()
// Use context with timeout to prevent hanging on unresponsive miner APIs var stats *PerformanceMetrics
ctx, cancel := context.WithTimeout(context.Background(), statsCollectionTimeout) var lastErr error
defer cancel() // Ensure context is released after all operations
stats, err := miner.GetStats(ctx) // Retry loop for transient failures
if err != nil { for attempt := 0; attempt <= statsRetryCount; attempt++ {
logging.Error("failed to get miner stats", logging.Fields{"miner": minerName, "error": err}) // Use context with timeout to prevent hanging on unresponsive miner APIs
ctx, cancel := context.WithTimeout(context.Background(), statsCollectionTimeout)
stats, lastErr = miner.GetStats(ctx)
cancel() // Release context immediately
if lastErr == nil {
break // Success
}
// Log retry attempts at debug level
if attempt < statsRetryCount {
logging.Debug("retrying stats collection", logging.Fields{
"miner": minerName,
"attempt": attempt + 1,
"error": lastErr.Error(),
})
time.Sleep(statsRetryDelay)
}
}
if lastErr != nil {
logging.Error("failed to get miner stats after retries", logging.Fields{
"miner": minerName,
"error": lastErr.Error(),
"retries": statsRetryCount,
})
RecordStatsCollection(true, true)
return return
} }
// Record stats collection (retried if we did any retries)
RecordStatsCollection(stats != nil && lastErr == nil, false)
point := HashratePoint{ point := HashratePoint{
Timestamp: now, Timestamp: now,
Hashrate: stats.Hashrate, Hashrate: stats.Hashrate,
@ -605,10 +641,12 @@ func (m *Manager) collectSingleMinerStats(miner Miner, minerType string, now tim
Timestamp: point.Timestamp, Timestamp: point.Timestamp,
Hashrate: point.Hashrate, Hashrate: point.Hashrate,
} }
// Use the same context for DB writes so they respect timeout/cancellation // Create a new context for DB writes (original context is from retry loop)
if err := database.InsertHashratePoint(ctx, minerName, minerType, dbPoint, database.ResolutionHigh); err != nil { dbCtx, dbCancel := context.WithTimeout(context.Background(), statsCollectionTimeout)
if err := database.InsertHashratePoint(dbCtx, minerName, minerType, dbPoint, database.ResolutionHigh); err != nil {
logging.Warn("failed to persist hashrate", logging.Fields{"miner": minerName, "error": err}) logging.Warn("failed to persist hashrate", logging.Fields{"miner": minerName, "error": err})
} }
dbCancel()
} }
// Emit stats event for real-time WebSocket updates // Emit stats event for real-time WebSocket updates

169
pkg/mining/metrics.go Normal file
View file

@ -0,0 +1,169 @@
package mining
import (
"sync"
"sync/atomic"
"time"
)
// Metrics provides simple instrumentation counters for the mining package.
// These can be exposed via Prometheus or other metrics systems in the future.
type Metrics struct {
// API metrics
RequestsTotal atomic.Int64
RequestsErrored atomic.Int64
RequestLatency *LatencyHistogram
// Miner metrics
MinersStarted atomic.Int64
MinersStopped atomic.Int64
MinersErrored atomic.Int64
// Stats collection metrics
StatsCollected atomic.Int64
StatsRetried atomic.Int64
StatsFailed atomic.Int64
// WebSocket metrics
WSConnections atomic.Int64
WSMessages atomic.Int64
// P2P metrics
P2PMessagesSent atomic.Int64
P2PMessagesReceived atomic.Int64
P2PConnectionsTotal atomic.Int64
}
// LatencyHistogram tracks request latencies with basic percentile support.
type LatencyHistogram struct {
mu sync.Mutex
samples []time.Duration
maxSize int
}
// NewLatencyHistogram creates a new latency histogram with a maximum sample size.
func NewLatencyHistogram(maxSize int) *LatencyHistogram {
return &LatencyHistogram{
samples: make([]time.Duration, 0, maxSize),
maxSize: maxSize,
}
}
// Record adds a latency sample.
func (h *LatencyHistogram) Record(d time.Duration) {
h.mu.Lock()
defer h.mu.Unlock()
if len(h.samples) >= h.maxSize {
// Ring buffer behavior - overwrite oldest
copy(h.samples, h.samples[1:])
h.samples = h.samples[:len(h.samples)-1]
}
h.samples = append(h.samples, d)
}
// Average returns the average latency.
func (h *LatencyHistogram) Average() time.Duration {
h.mu.Lock()
defer h.mu.Unlock()
if len(h.samples) == 0 {
return 0
}
var total time.Duration
for _, d := range h.samples {
total += d
}
return total / time.Duration(len(h.samples))
}
// Count returns the number of samples.
func (h *LatencyHistogram) Count() int {
h.mu.Lock()
defer h.mu.Unlock()
return len(h.samples)
}
// DefaultMetrics is the global metrics instance.
var DefaultMetrics = &Metrics{
RequestLatency: NewLatencyHistogram(1000),
}
// RecordRequest records an API request.
func RecordRequest(errored bool, latency time.Duration) {
DefaultMetrics.RequestsTotal.Add(1)
if errored {
DefaultMetrics.RequestsErrored.Add(1)
}
DefaultMetrics.RequestLatency.Record(latency)
}
// RecordMinerStart records a miner start event.
func RecordMinerStart() {
DefaultMetrics.MinersStarted.Add(1)
}
// RecordMinerStop records a miner stop event.
func RecordMinerStop() {
DefaultMetrics.MinersStopped.Add(1)
}
// RecordMinerError records a miner error event.
func RecordMinerError() {
DefaultMetrics.MinersErrored.Add(1)
}
// RecordStatsCollection records a stats collection event.
func RecordStatsCollection(retried bool, failed bool) {
DefaultMetrics.StatsCollected.Add(1)
if retried {
DefaultMetrics.StatsRetried.Add(1)
}
if failed {
DefaultMetrics.StatsFailed.Add(1)
}
}
// RecordWSConnection increments or decrements WebSocket connection count.
func RecordWSConnection(connected bool) {
if connected {
DefaultMetrics.WSConnections.Add(1)
} else {
DefaultMetrics.WSConnections.Add(-1)
}
}
// RecordWSMessage records a WebSocket message.
func RecordWSMessage() {
DefaultMetrics.WSMessages.Add(1)
}
// RecordP2PMessage records a P2P message.
func RecordP2PMessage(sent bool) {
if sent {
DefaultMetrics.P2PMessagesSent.Add(1)
} else {
DefaultMetrics.P2PMessagesReceived.Add(1)
}
}
// GetMetricsSnapshot returns a snapshot of current metrics.
func GetMetricsSnapshot() map[string]interface{} {
return map[string]interface{}{
"requests_total": DefaultMetrics.RequestsTotal.Load(),
"requests_errored": DefaultMetrics.RequestsErrored.Load(),
"request_latency_avg_ms": DefaultMetrics.RequestLatency.Average().Milliseconds(),
"request_latency_samples": DefaultMetrics.RequestLatency.Count(),
"miners_started": DefaultMetrics.MinersStarted.Load(),
"miners_stopped": DefaultMetrics.MinersStopped.Load(),
"miners_errored": DefaultMetrics.MinersErrored.Load(),
"stats_collected": DefaultMetrics.StatsCollected.Load(),
"stats_retried": DefaultMetrics.StatsRetried.Load(),
"stats_failed": DefaultMetrics.StatsFailed.Load(),
"ws_connections": DefaultMetrics.WSConnections.Load(),
"ws_messages": DefaultMetrics.WSMessages.Load(),
"p2p_messages_sent": DefaultMetrics.P2PMessagesSent.Load(),
"p2p_messages_received": DefaultMetrics.P2PMessagesReceived.Load(),
}
}

View file

@ -216,11 +216,17 @@ func (b *BaseMiner) WriteStdin(input string) error {
input += "\n" input += "\n"
} }
// Write with timeout to prevent blocking indefinitely // Write with timeout to prevent blocking indefinitely.
// Use buffered channel size 1 so goroutine can exit even if we don't read the result.
done := make(chan error, 1) done := make(chan error, 1)
go func() { go func() {
_, err := stdinPipe.Write([]byte(input)) _, err := stdinPipe.Write([]byte(input))
done <- err // Non-blocking send - if timeout already fired, this won't block
select {
case done <- err:
default:
// Timeout already occurred, goroutine exits cleanly
}
}() }()
select { select {
@ -252,13 +258,13 @@ func (b *BaseMiner) InstallFromURL(url string) error {
defer resp.Body.Close() defer resp.Body.Close()
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
io.Copy(io.Discard, resp.Body) // Drain body to allow connection reuse _, _ = io.Copy(io.Discard, resp.Body) // Drain body to allow connection reuse (error ignored intentionally)
return fmt.Errorf("failed to download release: unexpected status code %d", resp.StatusCode) return fmt.Errorf("failed to download release: unexpected status code %d", resp.StatusCode)
} }
if _, err := io.Copy(tmpfile, resp.Body); err != nil { if _, err := io.Copy(tmpfile, resp.Body); err != nil {
// Drain remaining body to allow connection reuse // Drain remaining body to allow connection reuse (error ignored intentionally)
io.Copy(io.Discard, resp.Body) _, _ = io.Copy(io.Discard, resp.Body)
return err return err
} }
@ -548,7 +554,9 @@ func (b *BaseMiner) unzip(src, dest string) error {
return fmt.Errorf("%s: illegal file path", fpath) return fmt.Errorf("%s: illegal file path", fpath)
} }
if f.FileInfo().IsDir() { if f.FileInfo().IsDir() {
os.MkdirAll(fpath, os.ModePerm) if err := os.MkdirAll(fpath, os.ModePerm); err != nil {
return fmt.Errorf("failed to create directory %s: %w", fpath, err)
}
continue continue
} }

View file

@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"net/http" "net/http"
"strconv" "strconv"
"strings"
"github.com/Snider/Mining/pkg/node" "github.com/Snider/Mining/pkg/node"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
@ -90,8 +91,8 @@ func (ns *NodeService) StopTransport() error {
// Node Info Response // Node Info Response
type NodeInfoResponse struct { type NodeInfoResponse struct {
HasIdentity bool `json:"hasIdentity"` HasIdentity bool `json:"hasIdentity"`
Identity *node.NodeIdentity `json:"identity,omitempty"` Identity *node.NodeIdentity `json:"identity,omitempty"`
RegisteredPeers int `json:"registeredPeers"` RegisteredPeers int `json:"registeredPeers"`
ConnectedPeers int `json:"connectedPeers"` ConnectedPeers int `json:"connectedPeers"`
} }
@ -257,12 +258,17 @@ func (ns *NodeService) handleRemovePeer(c *gin.Context) {
// @Produce json // @Produce json
// @Param id path string true "Peer ID" // @Param id path string true "Peer ID"
// @Success 200 {object} map[string]float64 // @Success 200 {object} map[string]float64
// @Failure 404 {object} APIError "Peer not found"
// @Router /peers/{id}/ping [post] // @Router /peers/{id}/ping [post]
func (ns *NodeService) handlePingPeer(c *gin.Context) { func (ns *NodeService) handlePingPeer(c *gin.Context) {
peerID := c.Param("id") peerID := c.Param("id")
rtt, err := ns.controller.PingPeer(peerID) rtt, err := ns.controller.PingPeer(peerID)
if err != nil { if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) if strings.Contains(err.Error(), "not found") || strings.Contains(err.Error(), "not connected") {
respondWithError(c, http.StatusNotFound, "PEER_NOT_FOUND", "peer not found or not connected", err.Error())
return
}
respondWithError(c, http.StatusInternalServerError, ErrCodeInternal, "ping failed", err.Error())
return return
} }
c.JSON(http.StatusOK, gin.H{"rtt_ms": rtt}) c.JSON(http.StatusOK, gin.H{"rtt_ms": rtt})
@ -275,11 +281,16 @@ func (ns *NodeService) handlePingPeer(c *gin.Context) {
// @Produce json // @Produce json
// @Param id path string true "Peer ID" // @Param id path string true "Peer ID"
// @Success 200 {object} map[string]string // @Success 200 {object} map[string]string
// @Failure 404 {object} APIError "Peer not found"
// @Router /peers/{id}/connect [post] // @Router /peers/{id}/connect [post]
func (ns *NodeService) handleConnectPeer(c *gin.Context) { func (ns *NodeService) handleConnectPeer(c *gin.Context) {
peerID := c.Param("id") peerID := c.Param("id")
if err := ns.controller.ConnectToPeer(peerID); err != nil { if err := ns.controller.ConnectToPeer(peerID); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) if strings.Contains(err.Error(), "not found") {
respondWithError(c, http.StatusNotFound, "PEER_NOT_FOUND", "peer not found", err.Error())
return
}
respondWithError(c, http.StatusInternalServerError, ErrCodeConnectionFailed, "connection failed", err.Error())
return return
} }
c.JSON(http.StatusOK, gin.H{"status": "connected"}) c.JSON(http.StatusOK, gin.H{"status": "connected"})
@ -287,7 +298,7 @@ func (ns *NodeService) handleConnectPeer(c *gin.Context) {
// handleDisconnectPeer godoc // handleDisconnectPeer godoc
// @Summary Disconnect from a peer // @Summary Disconnect from a peer
// @Description Close the connection to a peer // @Description Close the connection to a peer. Idempotent - returns success if peer not connected.
// @Tags peers // @Tags peers
// @Produce json // @Produce json
// @Param id path string true "Peer ID" // @Param id path string true "Peer ID"
@ -296,7 +307,12 @@ func (ns *NodeService) handleConnectPeer(c *gin.Context) {
func (ns *NodeService) handleDisconnectPeer(c *gin.Context) { func (ns *NodeService) handleDisconnectPeer(c *gin.Context) {
peerID := c.Param("id") peerID := c.Param("id")
if err := ns.controller.DisconnectFromPeer(peerID); err != nil { if err := ns.controller.DisconnectFromPeer(peerID); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) // Make disconnect idempotent - if peer not connected, still return success
if strings.Contains(err.Error(), "not connected") {
c.JSON(http.StatusOK, gin.H{"status": "disconnected"})
return
}
respondWithError(c, http.StatusInternalServerError, ErrCodeInternal, "disconnect failed", err.Error())
return return
} }
c.JSON(http.StatusOK, gin.H{"status": "disconnected"}) c.JSON(http.StatusOK, gin.H{"status": "disconnected"})

View file

@ -144,6 +144,35 @@ func generateRequestID() string {
return fmt.Sprintf("%d-%x", time.Now().UnixMilli(), b[:4]) return fmt.Sprintf("%d-%x", time.Now().UnixMilli(), b[:4])
} }
// getRequestID extracts the request ID from gin context
func getRequestID(c *gin.Context) string {
if id, exists := c.Get("requestID"); exists {
if s, ok := id.(string); ok {
return s
}
}
return ""
}
// logWithRequestID logs a message with request ID correlation
func logWithRequestID(c *gin.Context, level string, message string, fields logging.Fields) {
if fields == nil {
fields = logging.Fields{}
}
if reqID := getRequestID(c); reqID != "" {
fields["request_id"] = reqID
}
switch level {
case "error":
logging.Error(message, fields)
case "warn":
logging.Warn(message, fields)
case "info":
logging.Info(message, fields)
default:
logging.Debug(message, fields)
}
}
// WebSocket upgrader for the events endpoint // WebSocket upgrader for the events endpoint
var wsUpgrader = websocket.Upgrader{ var wsUpgrader = websocket.Upgrader{
@ -272,9 +301,9 @@ func (s *Service) InitRouter() {
// Configure CORS to only allow local origins // Configure CORS to only allow local origins
corsConfig := cors.Config{ corsConfig := cors.Config{
AllowOrigins: []string{ AllowOrigins: []string{
"http://localhost:4200", // Angular dev server "http://localhost:4200", // Angular dev server
"http://127.0.0.1:4200", "http://127.0.0.1:4200",
"http://localhost:9090", // Default API port "http://localhost:9090", // Default API port
"http://127.0.0.1:9090", "http://127.0.0.1:9090",
"http://localhost:" + serverPort, "http://localhost:" + serverPort,
"http://127.0.0.1:" + serverPort, "http://127.0.0.1:" + serverPort,
@ -312,6 +341,14 @@ func (s *Service) Stop() {
if s.EventHub != nil { if s.EventHub != nil {
s.EventHub.Stop() s.EventHub.Stop()
} }
if s.auth != nil {
s.auth.Stop()
}
if s.NodeService != nil {
if err := s.NodeService.StopTransport(); err != nil {
logging.Warn("failed to stop node service transport", logging.Fields{"error": err})
}
}
} }
// ServiceStartup initializes the router and starts the HTTP server. // ServiceStartup initializes the router and starts the HTTP server.
@ -333,6 +370,7 @@ func (s *Service) ServiceStartup(ctx context.Context) error {
go func() { go func() {
<-ctx.Done() <-ctx.Done()
s.Stop() // Clean up service resources (auth, event hub, node service)
s.Manager.Stop() s.Manager.Stop()
ctxShutdown, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctxShutdown, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()
@ -377,6 +415,7 @@ func (s *Service) SetupRoutes() {
{ {
apiGroup.GET("/info", s.handleGetInfo) apiGroup.GET("/info", s.handleGetInfo)
apiGroup.GET("/metrics", s.handleMetrics)
apiGroup.POST("/doctor", s.handleDoctor) apiGroup.POST("/doctor", s.handleDoctor)
apiGroup.POST("/update", s.handleUpdateCheck) apiGroup.POST("/update", s.handleUpdateCheck)
@ -824,17 +863,28 @@ func (s *Service) handleListProfiles(c *gin.Context) {
// @Produce json // @Produce json
// @Param profile body MiningProfile true "Mining Profile" // @Param profile body MiningProfile true "Mining Profile"
// @Success 201 {object} MiningProfile // @Success 201 {object} MiningProfile
// @Failure 400 {object} APIError "Invalid profile data"
// @Router /profiles [post] // @Router /profiles [post]
func (s *Service) handleCreateProfile(c *gin.Context) { func (s *Service) handleCreateProfile(c *gin.Context) {
var profile MiningProfile var profile MiningProfile
if err := c.ShouldBindJSON(&profile); err != nil { if err := c.ShouldBindJSON(&profile); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) respondWithError(c, http.StatusBadRequest, ErrCodeInvalidInput, "invalid profile data", err.Error())
return
}
// Validate required fields
if profile.Name == "" {
respondWithError(c, http.StatusBadRequest, ErrCodeInvalidInput, "profile name is required", "")
return
}
if profile.MinerType == "" {
respondWithError(c, http.StatusBadRequest, ErrCodeInvalidInput, "miner type is required", "")
return return
} }
createdProfile, err := s.ProfileManager.CreateProfile(&profile) createdProfile, err := s.ProfileManager.CreateProfile(&profile)
if err != nil { if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to create profile", "details": err.Error()}) respondWithError(c, http.StatusInternalServerError, ErrCodeInternal, "failed to create profile", err.Error())
return return
} }
@ -868,18 +918,24 @@ func (s *Service) handleGetProfile(c *gin.Context) {
// @Param id path string true "Profile ID" // @Param id path string true "Profile ID"
// @Param profile body MiningProfile true "Updated Mining Profile" // @Param profile body MiningProfile true "Updated Mining Profile"
// @Success 200 {object} MiningProfile // @Success 200 {object} MiningProfile
// @Failure 404 {object} APIError "Profile not found"
// @Router /profiles/{id} [put] // @Router /profiles/{id} [put]
func (s *Service) handleUpdateProfile(c *gin.Context) { func (s *Service) handleUpdateProfile(c *gin.Context) {
profileID := c.Param("id") profileID := c.Param("id")
var profile MiningProfile var profile MiningProfile
if err := c.ShouldBindJSON(&profile); err != nil { if err := c.ShouldBindJSON(&profile); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) respondWithError(c, http.StatusBadRequest, ErrCodeInvalidInput, "invalid profile data", err.Error())
return return
} }
profile.ID = profileID profile.ID = profileID
if err := s.ProfileManager.UpdateProfile(&profile); err != nil { if err := s.ProfileManager.UpdateProfile(&profile); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update profile", "details": err.Error()}) // Check if error is "not found"
if strings.Contains(err.Error(), "not found") {
respondWithError(c, http.StatusNotFound, ErrCodeProfileNotFound, "profile not found", err.Error())
return
}
respondWithError(c, http.StatusInternalServerError, ErrCodeInternal, "failed to update profile", err.Error())
return return
} }
c.JSON(http.StatusOK, profile) c.JSON(http.StatusOK, profile)
@ -887,7 +943,7 @@ func (s *Service) handleUpdateProfile(c *gin.Context) {
// handleDeleteProfile godoc // handleDeleteProfile godoc
// @Summary Delete a mining profile // @Summary Delete a mining profile
// @Description Delete a mining profile by its ID // @Description Delete a mining profile by its ID. Idempotent - returns success even if profile doesn't exist.
// @Tags profiles // @Tags profiles
// @Produce json // @Produce json
// @Param id path string true "Profile ID" // @Param id path string true "Profile ID"
@ -896,7 +952,12 @@ func (s *Service) handleUpdateProfile(c *gin.Context) {
func (s *Service) handleDeleteProfile(c *gin.Context) { func (s *Service) handleDeleteProfile(c *gin.Context) {
profileID := c.Param("id") profileID := c.Param("id")
if err := s.ProfileManager.DeleteProfile(profileID); err != nil { if err := s.ProfileManager.DeleteProfile(profileID); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete profile", "details": err.Error()}) // Make DELETE idempotent - if profile doesn't exist, still return success
if strings.Contains(err.Error(), "not found") {
c.JSON(http.StatusOK, gin.H{"status": "profile deleted"})
return
}
respondWithError(c, http.StatusInternalServerError, ErrCodeInternal, "failed to delete profile", err.Error())
return return
} }
c.JSON(http.StatusOK, gin.H{"status": "profile deleted"}) c.JSON(http.StatusOK, gin.H{"status": "profile deleted"})
@ -1030,7 +1091,20 @@ func (s *Service) handleWebSocketEvents(c *gin.Context) {
} }
logging.Info("new WebSocket connection", logging.Fields{"remote": c.Request.RemoteAddr}) logging.Info("new WebSocket connection", logging.Fields{"remote": c.Request.RemoteAddr})
RecordWSConnection(true)
if !s.EventHub.ServeWs(conn) { if !s.EventHub.ServeWs(conn) {
RecordWSConnection(false) // Undo increment on rejection
logging.Warn("WebSocket connection rejected", logging.Fields{"remote": c.Request.RemoteAddr, "reason": "limit reached"}) logging.Warn("WebSocket connection rejected", logging.Fields{"remote": c.Request.RemoteAddr, "reason": "limit reached"})
} }
} }
// handleMetrics godoc
// @Summary Get internal metrics
// @Description Returns internal metrics for monitoring and debugging
// @Tags system
// @Produce json
// @Success 200 {object} map[string]interface{}
// @Router /metrics [get]
func (s *Service) handleMetrics(c *gin.Context) {
c.JSON(http.StatusOK, GetMetricsSnapshot())
}

View file

@ -30,10 +30,10 @@ type MockMiner struct {
WriteStdinFunc func(input string) error WriteStdinFunc func(input string) error
} }
func (m *MockMiner) Install() error { return m.InstallFunc() } func (m *MockMiner) Install() error { return m.InstallFunc() }
func (m *MockMiner) Uninstall() error { return m.UninstallFunc() } func (m *MockMiner) Uninstall() error { return m.UninstallFunc() }
func (m *MockMiner) Start(config *Config) error { return m.StartFunc(config) } func (m *MockMiner) Start(config *Config) error { return m.StartFunc(config) }
func (m *MockMiner) Stop() error { return m.StopFunc() } func (m *MockMiner) Stop() error { return m.StopFunc() }
func (m *MockMiner) GetStats(ctx context.Context) (*PerformanceMetrics, error) { func (m *MockMiner) GetStats(ctx context.Context) (*PerformanceMetrics, error) {
return m.GetStatsFunc(ctx) return m.GetStatsFunc(ctx)
} }
@ -43,9 +43,9 @@ func (m *MockMiner) GetType() string {
} }
return "mock" return "mock"
} }
func (m *MockMiner) GetName() string { return m.GetNameFunc() } func (m *MockMiner) GetName() string { return m.GetNameFunc() }
func (m *MockMiner) GetPath() string { return m.GetPathFunc() } func (m *MockMiner) GetPath() string { return m.GetPathFunc() }
func (m *MockMiner) GetBinaryPath() string { return m.GetBinaryPathFunc() } func (m *MockMiner) GetBinaryPath() string { return m.GetBinaryPathFunc() }
func (m *MockMiner) CheckInstallation() (*InstallationDetails, error) { func (m *MockMiner) CheckInstallation() (*InstallationDetails, error) {
return m.CheckInstallationFunc() return m.CheckInstallationFunc()
} }

View file

@ -14,10 +14,10 @@ const settingsFileName = "settings.json"
// WindowState stores the last window position and size // WindowState stores the last window position and size
type WindowState struct { type WindowState struct {
X int `json:"x"` X int `json:"x"`
Y int `json:"y"` Y int `json:"y"`
Width int `json:"width"` Width int `json:"width"`
Height int `json:"height"` Height int `json:"height"`
Maximized bool `json:"maximized"` Maximized bool `json:"maximized"`
} }

View file

@ -15,31 +15,31 @@ const MinerTypeSimulated = "simulated"
// SimulatedMiner is a mock miner that generates realistic-looking stats for UI testing. // SimulatedMiner is a mock miner that generates realistic-looking stats for UI testing.
type SimulatedMiner struct { type SimulatedMiner struct {
// Exported fields for JSON serialization // Exported fields for JSON serialization
Name string `json:"name"` Name string `json:"name"`
MinerType string `json:"miner_type"` MinerType string `json:"miner_type"`
Version string `json:"version"` Version string `json:"version"`
URL string `json:"url"` URL string `json:"url"`
Path string `json:"path"` Path string `json:"path"`
MinerBinary string `json:"miner_binary"` MinerBinary string `json:"miner_binary"`
Running bool `json:"running"` Running bool `json:"running"`
Algorithm string `json:"algorithm"` Algorithm string `json:"algorithm"`
HashrateHistory []HashratePoint `json:"hashrateHistory"` HashrateHistory []HashratePoint `json:"hashrateHistory"`
LowResHistory []HashratePoint `json:"lowResHashrateHistory"` LowResHistory []HashratePoint `json:"lowResHashrateHistory"`
Stats *PerformanceMetrics `json:"stats,omitempty"` Stats *PerformanceMetrics `json:"stats,omitempty"`
FullStats *XMRigSummary `json:"full_stats,omitempty"` // XMRig-compatible format for UI FullStats *XMRigSummary `json:"full_stats,omitempty"` // XMRig-compatible format for UI
// Internal fields (not exported) // Internal fields (not exported)
baseHashrate int baseHashrate int
peakHashrate int peakHashrate int
variance float64 variance float64
startTime time.Time startTime time.Time
shares int shares int
rejected int rejected int
logs []string logs []string
mu sync.RWMutex mu sync.RWMutex
stopChan chan struct{} stopChan chan struct{}
poolName string poolName string
difficultyBase int difficultyBase int
} }
// SimulatedMinerConfig holds configuration for creating a simulated miner. // SimulatedMinerConfig holds configuration for creating a simulated miner.

View file

@ -55,4 +55,3 @@ func FetchJSONStats[T any](ctx context.Context, config HTTPStatsConfig, target *
return nil return nil
} }

View file

@ -141,7 +141,7 @@ func TestCPUThrottleThreadCount(t *testing.T) {
defer manager.Stop() defer manager.Stop()
numCPU := runtime.NumCPU() numCPU := runtime.NumCPU()
targetThreads := 1 // Use only 1 thread targetThreads := 1 // Use only 1 thread
expectedMaxCPU := float64(100) / float64(numCPU) * float64(targetThreads) * 1.5 // 50% tolerance expectedMaxCPU := float64(100) / float64(numCPU) * float64(targetThreads) * 1.5 // 50% tolerance
config := &Config{ config := &Config{

View file

@ -197,10 +197,10 @@ func isValidCLIArg(arg string) bool {
// Block arguments that could override security-related settings // Block arguments that could override security-related settings
blockedPrefixes := []string{ blockedPrefixes := []string{
"--api-access-token", "--api-worker-id", // TT-Miner API settings "--api-access-token", "--api-worker-id", // TT-Miner API settings
"--config", // Could load arbitrary config "--config", // Could load arbitrary config
"--log-file", // Could write to arbitrary locations "--log-file", // Could write to arbitrary locations
"--coin-file", // Could load arbitrary coin configs "--coin-file", // Could load arbitrary coin configs
"-o", "--out", // Output redirection "-o", "--out", // Output redirection
} }
lowerArg := strings.ToLower(arg) lowerArg := strings.ToLower(arg)
for _, blocked := range blockedPrefixes { for _, blocked := range blockedPrefixes {

View file

@ -43,9 +43,9 @@ func TestXMRigDualMiningConfig(t *testing.T) {
GPUPool: "stratum+tcp://ravencoin.pool.com:3333", GPUPool: "stratum+tcp://ravencoin.pool.com:3333",
GPUWallet: "gpu_wallet_address", GPUWallet: "gpu_wallet_address",
GPUAlgo: "kawpow", GPUAlgo: "kawpow",
CUDA: true, // NVIDIA CUDA: true, // NVIDIA
OpenCL: false, OpenCL: false,
Devices: "0", // Explicit device selection required Devices: "0", // Explicit device selection required
} }
err := miner.createConfig(config) err := miner.createConfig(config)
@ -146,7 +146,7 @@ func TestXMRigGPUOnlyConfig(t *testing.T) {
Pool: "stratum+tcp://pool.supportxmr.com:3333", Pool: "stratum+tcp://pool.supportxmr.com:3333",
Wallet: "test_wallet", Wallet: "test_wallet",
Algo: "rx/0", Algo: "rx/0",
NoCPU: true, // Disable CPU NoCPU: true, // Disable CPU
GPUEnabled: true, GPUEnabled: true,
OpenCL: true, // AMD GPU OpenCL: true, // AMD GPU
CUDA: true, // Also NVIDIA CUDA: true, // Also NVIDIA

View file

@ -99,21 +99,33 @@ func (m *XMRigMiner) Start(config *Config) error {
// Capture cmd locally to avoid race with Stop() // Capture cmd locally to avoid race with Stop()
cmd := m.cmd cmd := m.cmd
minerName := m.Name // Capture name for logging
go func() { go func() {
// Use a channel to detect if Wait() completes // Use a channel to detect if Wait() completes
done := make(chan struct{}) done := make(chan struct{})
var waitErr error
go func() { go func() {
cmd.Wait() waitErr = cmd.Wait()
close(done) close(done)
}() }()
// Wait with timeout to prevent goroutine leak on zombie processes // Wait with timeout to prevent goroutine leak on zombie processes
select { select {
case <-done: case <-done:
// Normal exit // Normal exit - log the exit status
if waitErr != nil {
logging.Info("miner process exited", logging.Fields{
"miner": minerName,
"error": waitErr.Error(),
})
} else {
logging.Info("miner process exited normally", logging.Fields{
"miner": minerName,
})
}
case <-time.After(5 * time.Minute): case <-time.After(5 * time.Minute):
// Process didn't exit after 5 minutes - force cleanup // Process didn't exit after 5 minutes - force cleanup
logging.Warn("miner process wait timeout, forcing cleanup", logging.Fields{"miner": m.Name}) logging.Warn("miner process wait timeout, forcing cleanup", logging.Fields{"miner": minerName})
if cmd.Process != nil { if cmd.Process != nil {
cmd.Process.Kill() cmd.Process.Kill()
} }
@ -122,7 +134,7 @@ func (m *XMRigMiner) Start(config *Config) error {
case <-done: case <-done:
// Inner goroutine completed // Inner goroutine completed
case <-time.After(10 * time.Second): case <-time.After(10 * time.Second):
logging.Error("process cleanup timed out after kill", logging.Fields{"miner": m.Name}) logging.Error("process cleanup timed out after kill", logging.Fields{"miner": minerName})
} }
} }

View file

@ -144,9 +144,9 @@ func TestExtractProfileBundle(t *testing.T) {
func TestTarballFunctions(t *testing.T) { func TestTarballFunctions(t *testing.T) {
t.Run("CreateAndExtractTarball", func(t *testing.T) { t.Run("CreateAndExtractTarball", func(t *testing.T) {
files := map[string][]byte{ files := map[string][]byte{
"file1.txt": []byte("content of file 1"), "file1.txt": []byte("content of file 1"),
"dir/file2.json": []byte(`{"key":"value"}`), "dir/file2.json": []byte(`{"key":"value"}`),
"miners/xmrig": []byte("binary content"), "miners/xmrig": []byte("binary content"),
} }
tarData, err := createTarball(files) tarData, err := createTarball(files)

View file

@ -6,6 +6,8 @@ import (
"fmt" "fmt"
"sync" "sync"
"time" "time"
"github.com/Snider/Mining/pkg/logging"
) )
// Controller manages remote peer operations from a controller node. // Controller manages remote peer operations from a controller node.
@ -248,6 +250,11 @@ func (c *Controller) GetAllStats() map[string]*StatsPayload {
defer wg.Done() defer wg.Done()
stats, err := c.GetRemoteStats(p.ID) stats, err := c.GetRemoteStats(p.ID)
if err != nil { if err != nil {
logging.Debug("failed to get stats from peer", logging.Fields{
"peer_id": p.ID,
"peer": p.Name,
"error": err.Error(),
})
return // Skip failed peers return // Skip failed peers
} }
mu.Lock() mu.Lock()

View file

@ -12,11 +12,11 @@ type MessageType string
const ( const (
// Connection lifecycle // Connection lifecycle
MsgHandshake MessageType = "handshake" MsgHandshake MessageType = "handshake"
MsgHandshakeAck MessageType = "handshake_ack" MsgHandshakeAck MessageType = "handshake_ack"
MsgPing MessageType = "ping" MsgPing MessageType = "ping"
MsgPong MessageType = "pong" MsgPong MessageType = "pong"
MsgDisconnect MessageType = "disconnect" MsgDisconnect MessageType = "disconnect"
// Miner operations // Miner operations
MsgGetStats MessageType = "get_stats" MsgGetStats MessageType = "get_stats"
@ -39,10 +39,10 @@ const (
// Message represents a P2P message between nodes. // Message represents a P2P message between nodes.
type Message struct { type Message struct {
ID string `json:"id"` // UUID ID string `json:"id"` // UUID
Type MessageType `json:"type"` Type MessageType `json:"type"`
From string `json:"from"` // Sender node ID From string `json:"from"` // Sender node ID
To string `json:"to"` // Recipient node ID (empty for broadcast) To string `json:"to"` // Recipient node ID (empty for broadcast)
Timestamp time.Time `json:"ts"` Timestamp time.Time `json:"ts"`
Payload json.RawMessage `json:"payload"` Payload json.RawMessage `json:"payload"`
ReplyTo string `json:"replyTo,omitempty"` // ID of message being replied to ReplyTo string `json:"replyTo,omitempty"` // ID of message being replied to
@ -98,10 +98,10 @@ type HandshakePayload struct {
// HandshakeAckPayload is the response to a handshake. // HandshakeAckPayload is the response to a handshake.
type HandshakeAckPayload struct { type HandshakeAckPayload struct {
Identity NodeIdentity `json:"identity"` Identity NodeIdentity `json:"identity"`
ChallengeResponse []byte `json:"challengeResponse,omitempty"` ChallengeResponse []byte `json:"challengeResponse,omitempty"`
Accepted bool `json:"accepted"` Accepted bool `json:"accepted"`
Reason string `json:"reason,omitempty"` // If not accepted Reason string `json:"reason,omitempty"` // If not accepted
} }
// PingPayload for keepalive/latency measurement. // PingPayload for keepalive/latency measurement.
@ -117,7 +117,7 @@ type PongPayload struct {
// StartMinerPayload requests starting a miner. // StartMinerPayload requests starting a miner.
type StartMinerPayload struct { type StartMinerPayload struct {
MinerType string `json:"minerType"` // Required: miner type (e.g., "xmrig", "tt-miner") MinerType string `json:"minerType"` // Required: miner type (e.g., "xmrig", "tt-miner")
ProfileID string `json:"profileId,omitempty"` ProfileID string `json:"profileId,omitempty"`
Config json.RawMessage `json:"config,omitempty"` // Override profile config Config json.RawMessage `json:"config,omitempty"` // Override profile config
} }
@ -158,7 +158,7 @@ type StatsPayload struct {
// GetLogsPayload requests console logs from a miner. // GetLogsPayload requests console logs from a miner.
type GetLogsPayload struct { type GetLogsPayload struct {
MinerName string `json:"minerName"` MinerName string `json:"minerName"`
Lines int `json:"lines"` // Number of lines to fetch Lines int `json:"lines"` // Number of lines to fetch
Since int64 `json:"since,omitempty"` // Unix timestamp, logs after this time Since int64 `json:"since,omitempty"` // Unix timestamp, logs after this time
} }

View file

@ -8,6 +8,7 @@ import (
"net/http" "net/http"
"net/url" "net/url"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/Snider/Borg/pkg/smsg" "github.com/Snider/Borg/pkg/smsg"
@ -15,6 +16,12 @@ import (
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
) )
// debugLogCounter tracks message counts for rate limiting debug logs
var debugLogCounter atomic.Int64
// debugLogInterval controls how often we log debug messages in hot paths (1 in N)
const debugLogInterval = 100
// TransportConfig configures the WebSocket transport. // TransportConfig configures the WebSocket transport.
type TransportConfig struct { type TransportConfig struct {
ListenAddr string // ":9091" default ListenAddr string // ":9091" default
@ -404,28 +411,28 @@ func (t *Transport) performHandshake(pc *PeerConnection) error {
msg, err := NewMessage(MsgHandshake, identity.ID, pc.Peer.ID, payload) msg, err := NewMessage(MsgHandshake, identity.ID, pc.Peer.ID, payload)
if err != nil { if err != nil {
return err return fmt.Errorf("create handshake message: %w", err)
} }
// First message is unencrypted (peer needs our public key) // First message is unencrypted (peer needs our public key)
data, err := json.Marshal(msg) data, err := json.Marshal(msg)
if err != nil { if err != nil {
return err return fmt.Errorf("marshal handshake message: %w", err)
} }
if err := pc.Conn.WriteMessage(websocket.TextMessage, data); err != nil { if err := pc.Conn.WriteMessage(websocket.TextMessage, data); err != nil {
return err return fmt.Errorf("send handshake: %w", err)
} }
// Wait for ack // Wait for ack
_, ackData, err := pc.Conn.ReadMessage() _, ackData, err := pc.Conn.ReadMessage()
if err != nil { if err != nil {
return err return fmt.Errorf("read handshake ack: %w", err)
} }
var ackMsg Message var ackMsg Message
if err := json.Unmarshal(ackData, &ackMsg); err != nil { if err := json.Unmarshal(ackData, &ackMsg); err != nil {
return err return fmt.Errorf("unmarshal handshake ack: %w", err)
} }
if ackMsg.Type != MsgHandshakeAck { if ackMsg.Type != MsgHandshakeAck {
@ -434,7 +441,7 @@ func (t *Transport) performHandshake(pc *PeerConnection) error {
var ackPayload HandshakeAckPayload var ackPayload HandshakeAckPayload
if err := ackMsg.ParsePayload(&ackPayload); err != nil { if err := ackMsg.ParsePayload(&ackPayload); err != nil {
return err return fmt.Errorf("parse handshake ack payload: %w", err)
} }
if !ackPayload.Accepted { if !ackPayload.Accepted {
@ -490,7 +497,10 @@ func (t *Transport) readLoop(pc *PeerConnection) {
continue // Skip invalid messages continue // Skip invalid messages
} }
logging.Debug("received message from peer", logging.Fields{"type": msg.Type, "peer_id": pc.Peer.ID, "reply_to": msg.ReplyTo}) // Rate limit debug logs in hot path to reduce noise (log 1 in N messages)
if debugLogCounter.Add(1)%debugLogInterval == 0 {
logging.Debug("received message from peer", logging.Fields{"type": msg.Type, "peer_id": pc.Peer.ID, "reply_to": msg.ReplyTo, "sample": "1/100"})
}
// Dispatch to handler (read handler under lock to avoid race) // Dispatch to handler (read handler under lock to avoid race)
t.mu.RLock() t.mu.RLock()