diff --git a/pkg/mining/auth.go b/pkg/mining/auth.go index cd2cbf3..11a7780 100644 --- a/pkg/mining/auth.go +++ b/pkg/mining/auth.go @@ -69,18 +69,31 @@ func AuthConfigFromEnv() AuthConfig { // DigestAuth implements HTTP Digest Authentication middleware type DigestAuth struct { - config AuthConfig - nonces sync.Map // map[string]time.Time for nonce expiry tracking + config AuthConfig + nonces sync.Map // map[string]time.Time for nonce expiry tracking + stopChan chan struct{} + stopOnce sync.Once } // NewDigestAuth creates a new digest auth middleware func NewDigestAuth(config AuthConfig) *DigestAuth { - da := &DigestAuth{config: config} + da := &DigestAuth{ + config: config, + stopChan: make(chan struct{}), + } // Start nonce cleanup goroutine go da.cleanupNonces() return da } +// Stop gracefully shuts down the DigestAuth, stopping the cleanup goroutine. +// Safe to call multiple times. +func (da *DigestAuth) Stop() { + da.stopOnce.Do(func() { + close(da.stopChan) + }) +} + // Middleware returns a Gin middleware that enforces digest authentication func (da *DigestAuth) Middleware() gin.HandlerFunc { return func(c *gin.Context) { @@ -209,14 +222,19 @@ func (da *DigestAuth) cleanupNonces() { ticker := time.NewTicker(da.config.NonceExpiry) defer ticker.Stop() - for range ticker.C { - now := time.Now() - da.nonces.Range(func(key, value interface{}) bool { - if now.Sub(value.(time.Time)) > da.config.NonceExpiry { - da.nonces.Delete(key) - } - return true - }) + for { + select { + case <-da.stopChan: + return + case <-ticker.C: + now := time.Now() + da.nonces.Range(func(key, value interface{}) bool { + if now.Sub(value.(time.Time)) > da.config.NonceExpiry { + da.nonces.Delete(key) + } + return true + }) + } } } diff --git a/pkg/mining/container.go b/pkg/mining/container.go index 14c150b..83b81dc 100644 --- a/pkg/mining/container.go +++ b/pkg/mining/container.go @@ -218,7 +218,6 @@ func (c *Container) ProfileManager() *ProfileManager { return c.profileManager } - // NodeService returns the node service (may be nil if P2P is unavailable). func (c *Container) NodeService() *NodeService { c.mu.RLock() diff --git a/pkg/mining/dual_mining_test.go b/pkg/mining/dual_mining_test.go index 6ca050e..e1f1308 100644 --- a/pkg/mining/dual_mining_test.go +++ b/pkg/mining/dual_mining_test.go @@ -33,8 +33,8 @@ func TestDualMiningCPUAndGPU(t *testing.T) { // GPU config - explicit device selection required! GPUEnabled: true, - OpenCL: true, // AMD GPU - Devices: "0", // Device 0 only - user must pick + OpenCL: true, // AMD GPU + Devices: "0", // Device 0 only - user must pick } minerInstance, err := manager.StartMiner(context.Background(), "xmrig", config) diff --git a/pkg/mining/errors.go b/pkg/mining/errors.go index a88a7f3..96d5288 100644 --- a/pkg/mining/errors.go +++ b/pkg/mining/errors.go @@ -24,6 +24,7 @@ const ( ErrCodeProfileNotFound = "PROFILE_NOT_FOUND" ErrCodeProfileExists = "PROFILE_EXISTS" ErrCodeInternalError = "INTERNAL_ERROR" + ErrCodeInternal = "INTERNAL_ERROR" // Alias for consistency ) // MiningError is a structured error type for the mining package diff --git a/pkg/mining/events.go b/pkg/mining/events.go index a334e75..d9cdcc9 100644 --- a/pkg/mining/events.go +++ b/pkg/mining/events.go @@ -302,7 +302,10 @@ func (c *wsClient) writePump() { if err != nil { return } - w.Write(message) + if _, err := w.Write(message); err != nil { + logging.Debug("WebSocket write error", logging.Fields{"error": err}) + return + } if err := w.Close(); err != nil { return diff --git a/pkg/mining/manager.go b/pkg/mining/manager.go index d184f2f..cf5c8fe 100644 --- a/pkg/mining/manager.go +++ b/pkg/mining/manager.go @@ -312,6 +312,7 @@ func (m *Manager) StartMiner(ctx context.Context, minerType string, config *Conf Name: instanceName, }) + RecordMinerStart() return miner, nil } @@ -454,6 +455,7 @@ func (m *Manager) StopMiner(ctx context.Context, name string) error { return stopErr } + RecordMinerStop() return nil } @@ -574,21 +576,55 @@ func (m *Manager) collectMinerStats() { wg.Wait() } -// collectSingleMinerStats collects stats from a single miner. +// statsRetryCount is the number of retries for transient stats failures. +const statsRetryCount = 2 + +// statsRetryDelay is the delay between stats collection retries. +const statsRetryDelay = 500 * time.Millisecond + +// collectSingleMinerStats collects stats from a single miner with retry logic. // This is called concurrently for each miner. func (m *Manager) collectSingleMinerStats(miner Miner, minerType string, now time.Time, dbEnabled bool) { minerName := miner.GetName() - // Use context with timeout to prevent hanging on unresponsive miner APIs - ctx, cancel := context.WithTimeout(context.Background(), statsCollectionTimeout) - defer cancel() // Ensure context is released after all operations + var stats *PerformanceMetrics + var lastErr error - stats, err := miner.GetStats(ctx) - if err != nil { - logging.Error("failed to get miner stats", logging.Fields{"miner": minerName, "error": err}) + // Retry loop for transient failures + for attempt := 0; attempt <= statsRetryCount; attempt++ { + // Use context with timeout to prevent hanging on unresponsive miner APIs + ctx, cancel := context.WithTimeout(context.Background(), statsCollectionTimeout) + stats, lastErr = miner.GetStats(ctx) + cancel() // Release context immediately + + if lastErr == nil { + break // Success + } + + // Log retry attempts at debug level + if attempt < statsRetryCount { + logging.Debug("retrying stats collection", logging.Fields{ + "miner": minerName, + "attempt": attempt + 1, + "error": lastErr.Error(), + }) + time.Sleep(statsRetryDelay) + } + } + + if lastErr != nil { + logging.Error("failed to get miner stats after retries", logging.Fields{ + "miner": minerName, + "error": lastErr.Error(), + "retries": statsRetryCount, + }) + RecordStatsCollection(true, true) return } + // Record stats collection (retried if we did any retries) + RecordStatsCollection(stats != nil && lastErr == nil, false) + point := HashratePoint{ Timestamp: now, Hashrate: stats.Hashrate, @@ -605,10 +641,12 @@ func (m *Manager) collectSingleMinerStats(miner Miner, minerType string, now tim Timestamp: point.Timestamp, Hashrate: point.Hashrate, } - // Use the same context for DB writes so they respect timeout/cancellation - if err := database.InsertHashratePoint(ctx, minerName, minerType, dbPoint, database.ResolutionHigh); err != nil { + // Create a new context for DB writes (original context is from retry loop) + dbCtx, dbCancel := context.WithTimeout(context.Background(), statsCollectionTimeout) + if err := database.InsertHashratePoint(dbCtx, minerName, minerType, dbPoint, database.ResolutionHigh); err != nil { logging.Warn("failed to persist hashrate", logging.Fields{"miner": minerName, "error": err}) } + dbCancel() } // Emit stats event for real-time WebSocket updates diff --git a/pkg/mining/metrics.go b/pkg/mining/metrics.go new file mode 100644 index 0000000..5a62015 --- /dev/null +++ b/pkg/mining/metrics.go @@ -0,0 +1,169 @@ +package mining + +import ( + "sync" + "sync/atomic" + "time" +) + +// Metrics provides simple instrumentation counters for the mining package. +// These can be exposed via Prometheus or other metrics systems in the future. +type Metrics struct { + // API metrics + RequestsTotal atomic.Int64 + RequestsErrored atomic.Int64 + RequestLatency *LatencyHistogram + + // Miner metrics + MinersStarted atomic.Int64 + MinersStopped atomic.Int64 + MinersErrored atomic.Int64 + + // Stats collection metrics + StatsCollected atomic.Int64 + StatsRetried atomic.Int64 + StatsFailed atomic.Int64 + + // WebSocket metrics + WSConnections atomic.Int64 + WSMessages atomic.Int64 + + // P2P metrics + P2PMessagesSent atomic.Int64 + P2PMessagesReceived atomic.Int64 + P2PConnectionsTotal atomic.Int64 +} + +// LatencyHistogram tracks request latencies with basic percentile support. +type LatencyHistogram struct { + mu sync.Mutex + samples []time.Duration + maxSize int +} + +// NewLatencyHistogram creates a new latency histogram with a maximum sample size. +func NewLatencyHistogram(maxSize int) *LatencyHistogram { + return &LatencyHistogram{ + samples: make([]time.Duration, 0, maxSize), + maxSize: maxSize, + } +} + +// Record adds a latency sample. +func (h *LatencyHistogram) Record(d time.Duration) { + h.mu.Lock() + defer h.mu.Unlock() + + if len(h.samples) >= h.maxSize { + // Ring buffer behavior - overwrite oldest + copy(h.samples, h.samples[1:]) + h.samples = h.samples[:len(h.samples)-1] + } + h.samples = append(h.samples, d) +} + +// Average returns the average latency. +func (h *LatencyHistogram) Average() time.Duration { + h.mu.Lock() + defer h.mu.Unlock() + + if len(h.samples) == 0 { + return 0 + } + + var total time.Duration + for _, d := range h.samples { + total += d + } + return total / time.Duration(len(h.samples)) +} + +// Count returns the number of samples. +func (h *LatencyHistogram) Count() int { + h.mu.Lock() + defer h.mu.Unlock() + return len(h.samples) +} + +// DefaultMetrics is the global metrics instance. +var DefaultMetrics = &Metrics{ + RequestLatency: NewLatencyHistogram(1000), +} + +// RecordRequest records an API request. +func RecordRequest(errored bool, latency time.Duration) { + DefaultMetrics.RequestsTotal.Add(1) + if errored { + DefaultMetrics.RequestsErrored.Add(1) + } + DefaultMetrics.RequestLatency.Record(latency) +} + +// RecordMinerStart records a miner start event. +func RecordMinerStart() { + DefaultMetrics.MinersStarted.Add(1) +} + +// RecordMinerStop records a miner stop event. +func RecordMinerStop() { + DefaultMetrics.MinersStopped.Add(1) +} + +// RecordMinerError records a miner error event. +func RecordMinerError() { + DefaultMetrics.MinersErrored.Add(1) +} + +// RecordStatsCollection records a stats collection event. +func RecordStatsCollection(retried bool, failed bool) { + DefaultMetrics.StatsCollected.Add(1) + if retried { + DefaultMetrics.StatsRetried.Add(1) + } + if failed { + DefaultMetrics.StatsFailed.Add(1) + } +} + +// RecordWSConnection increments or decrements WebSocket connection count. +func RecordWSConnection(connected bool) { + if connected { + DefaultMetrics.WSConnections.Add(1) + } else { + DefaultMetrics.WSConnections.Add(-1) + } +} + +// RecordWSMessage records a WebSocket message. +func RecordWSMessage() { + DefaultMetrics.WSMessages.Add(1) +} + +// RecordP2PMessage records a P2P message. +func RecordP2PMessage(sent bool) { + if sent { + DefaultMetrics.P2PMessagesSent.Add(1) + } else { + DefaultMetrics.P2PMessagesReceived.Add(1) + } +} + +// GetMetricsSnapshot returns a snapshot of current metrics. +func GetMetricsSnapshot() map[string]interface{} { + return map[string]interface{}{ + "requests_total": DefaultMetrics.RequestsTotal.Load(), + "requests_errored": DefaultMetrics.RequestsErrored.Load(), + "request_latency_avg_ms": DefaultMetrics.RequestLatency.Average().Milliseconds(), + "request_latency_samples": DefaultMetrics.RequestLatency.Count(), + "miners_started": DefaultMetrics.MinersStarted.Load(), + "miners_stopped": DefaultMetrics.MinersStopped.Load(), + "miners_errored": DefaultMetrics.MinersErrored.Load(), + "stats_collected": DefaultMetrics.StatsCollected.Load(), + "stats_retried": DefaultMetrics.StatsRetried.Load(), + "stats_failed": DefaultMetrics.StatsFailed.Load(), + "ws_connections": DefaultMetrics.WSConnections.Load(), + "ws_messages": DefaultMetrics.WSMessages.Load(), + "p2p_messages_sent": DefaultMetrics.P2PMessagesSent.Load(), + "p2p_messages_received": DefaultMetrics.P2PMessagesReceived.Load(), + } +} diff --git a/pkg/mining/miner.go b/pkg/mining/miner.go index ea5e491..f2b94ba 100644 --- a/pkg/mining/miner.go +++ b/pkg/mining/miner.go @@ -216,11 +216,17 @@ func (b *BaseMiner) WriteStdin(input string) error { input += "\n" } - // Write with timeout to prevent blocking indefinitely + // Write with timeout to prevent blocking indefinitely. + // Use buffered channel size 1 so goroutine can exit even if we don't read the result. done := make(chan error, 1) go func() { _, err := stdinPipe.Write([]byte(input)) - done <- err + // Non-blocking send - if timeout already fired, this won't block + select { + case done <- err: + default: + // Timeout already occurred, goroutine exits cleanly + } }() select { @@ -252,13 +258,13 @@ func (b *BaseMiner) InstallFromURL(url string) error { defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - io.Copy(io.Discard, resp.Body) // Drain body to allow connection reuse + _, _ = io.Copy(io.Discard, resp.Body) // Drain body to allow connection reuse (error ignored intentionally) return fmt.Errorf("failed to download release: unexpected status code %d", resp.StatusCode) } if _, err := io.Copy(tmpfile, resp.Body); err != nil { - // Drain remaining body to allow connection reuse - io.Copy(io.Discard, resp.Body) + // Drain remaining body to allow connection reuse (error ignored intentionally) + _, _ = io.Copy(io.Discard, resp.Body) return err } @@ -548,7 +554,9 @@ func (b *BaseMiner) unzip(src, dest string) error { return fmt.Errorf("%s: illegal file path", fpath) } if f.FileInfo().IsDir() { - os.MkdirAll(fpath, os.ModePerm) + if err := os.MkdirAll(fpath, os.ModePerm); err != nil { + return fmt.Errorf("failed to create directory %s: %w", fpath, err) + } continue } diff --git a/pkg/mining/node_service.go b/pkg/mining/node_service.go index 4a03985..aff411b 100644 --- a/pkg/mining/node_service.go +++ b/pkg/mining/node_service.go @@ -4,6 +4,7 @@ import ( "encoding/json" "net/http" "strconv" + "strings" "github.com/Snider/Mining/pkg/node" "github.com/gin-gonic/gin" @@ -90,8 +91,8 @@ func (ns *NodeService) StopTransport() error { // Node Info Response type NodeInfoResponse struct { - HasIdentity bool `json:"hasIdentity"` - Identity *node.NodeIdentity `json:"identity,omitempty"` + HasIdentity bool `json:"hasIdentity"` + Identity *node.NodeIdentity `json:"identity,omitempty"` RegisteredPeers int `json:"registeredPeers"` ConnectedPeers int `json:"connectedPeers"` } @@ -257,12 +258,17 @@ func (ns *NodeService) handleRemovePeer(c *gin.Context) { // @Produce json // @Param id path string true "Peer ID" // @Success 200 {object} map[string]float64 +// @Failure 404 {object} APIError "Peer not found" // @Router /peers/{id}/ping [post] func (ns *NodeService) handlePingPeer(c *gin.Context) { peerID := c.Param("id") rtt, err := ns.controller.PingPeer(peerID) if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + if strings.Contains(err.Error(), "not found") || strings.Contains(err.Error(), "not connected") { + respondWithError(c, http.StatusNotFound, "PEER_NOT_FOUND", "peer not found or not connected", err.Error()) + return + } + respondWithError(c, http.StatusInternalServerError, ErrCodeInternal, "ping failed", err.Error()) return } c.JSON(http.StatusOK, gin.H{"rtt_ms": rtt}) @@ -275,11 +281,16 @@ func (ns *NodeService) handlePingPeer(c *gin.Context) { // @Produce json // @Param id path string true "Peer ID" // @Success 200 {object} map[string]string +// @Failure 404 {object} APIError "Peer not found" // @Router /peers/{id}/connect [post] func (ns *NodeService) handleConnectPeer(c *gin.Context) { peerID := c.Param("id") if err := ns.controller.ConnectToPeer(peerID); err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + if strings.Contains(err.Error(), "not found") { + respondWithError(c, http.StatusNotFound, "PEER_NOT_FOUND", "peer not found", err.Error()) + return + } + respondWithError(c, http.StatusInternalServerError, ErrCodeConnectionFailed, "connection failed", err.Error()) return } c.JSON(http.StatusOK, gin.H{"status": "connected"}) @@ -287,7 +298,7 @@ func (ns *NodeService) handleConnectPeer(c *gin.Context) { // handleDisconnectPeer godoc // @Summary Disconnect from a peer -// @Description Close the connection to a peer +// @Description Close the connection to a peer. Idempotent - returns success if peer not connected. // @Tags peers // @Produce json // @Param id path string true "Peer ID" @@ -296,7 +307,12 @@ func (ns *NodeService) handleConnectPeer(c *gin.Context) { func (ns *NodeService) handleDisconnectPeer(c *gin.Context) { peerID := c.Param("id") if err := ns.controller.DisconnectFromPeer(peerID); err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + // Make disconnect idempotent - if peer not connected, still return success + if strings.Contains(err.Error(), "not connected") { + c.JSON(http.StatusOK, gin.H{"status": "disconnected"}) + return + } + respondWithError(c, http.StatusInternalServerError, ErrCodeInternal, "disconnect failed", err.Error()) return } c.JSON(http.StatusOK, gin.H{"status": "disconnected"}) diff --git a/pkg/mining/service.go b/pkg/mining/service.go index 6589d15..c7c70b6 100644 --- a/pkg/mining/service.go +++ b/pkg/mining/service.go @@ -144,6 +144,35 @@ func generateRequestID() string { return fmt.Sprintf("%d-%x", time.Now().UnixMilli(), b[:4]) } +// getRequestID extracts the request ID from gin context +func getRequestID(c *gin.Context) string { + if id, exists := c.Get("requestID"); exists { + if s, ok := id.(string); ok { + return s + } + } + return "" +} + +// logWithRequestID logs a message with request ID correlation +func logWithRequestID(c *gin.Context, level string, message string, fields logging.Fields) { + if fields == nil { + fields = logging.Fields{} + } + if reqID := getRequestID(c); reqID != "" { + fields["request_id"] = reqID + } + switch level { + case "error": + logging.Error(message, fields) + case "warn": + logging.Warn(message, fields) + case "info": + logging.Info(message, fields) + default: + logging.Debug(message, fields) + } +} // WebSocket upgrader for the events endpoint var wsUpgrader = websocket.Upgrader{ @@ -272,9 +301,9 @@ func (s *Service) InitRouter() { // Configure CORS to only allow local origins corsConfig := cors.Config{ AllowOrigins: []string{ - "http://localhost:4200", // Angular dev server + "http://localhost:4200", // Angular dev server "http://127.0.0.1:4200", - "http://localhost:9090", // Default API port + "http://localhost:9090", // Default API port "http://127.0.0.1:9090", "http://localhost:" + serverPort, "http://127.0.0.1:" + serverPort, @@ -312,6 +341,14 @@ func (s *Service) Stop() { if s.EventHub != nil { s.EventHub.Stop() } + if s.auth != nil { + s.auth.Stop() + } + if s.NodeService != nil { + if err := s.NodeService.StopTransport(); err != nil { + logging.Warn("failed to stop node service transport", logging.Fields{"error": err}) + } + } } // ServiceStartup initializes the router and starts the HTTP server. @@ -333,6 +370,7 @@ func (s *Service) ServiceStartup(ctx context.Context) error { go func() { <-ctx.Done() + s.Stop() // Clean up service resources (auth, event hub, node service) s.Manager.Stop() ctxShutdown, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -377,6 +415,7 @@ func (s *Service) SetupRoutes() { { apiGroup.GET("/info", s.handleGetInfo) + apiGroup.GET("/metrics", s.handleMetrics) apiGroup.POST("/doctor", s.handleDoctor) apiGroup.POST("/update", s.handleUpdateCheck) @@ -824,17 +863,28 @@ func (s *Service) handleListProfiles(c *gin.Context) { // @Produce json // @Param profile body MiningProfile true "Mining Profile" // @Success 201 {object} MiningProfile +// @Failure 400 {object} APIError "Invalid profile data" // @Router /profiles [post] func (s *Service) handleCreateProfile(c *gin.Context) { var profile MiningProfile if err := c.ShouldBindJSON(&profile); err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + respondWithError(c, http.StatusBadRequest, ErrCodeInvalidInput, "invalid profile data", err.Error()) + return + } + + // Validate required fields + if profile.Name == "" { + respondWithError(c, http.StatusBadRequest, ErrCodeInvalidInput, "profile name is required", "") + return + } + if profile.MinerType == "" { + respondWithError(c, http.StatusBadRequest, ErrCodeInvalidInput, "miner type is required", "") return } createdProfile, err := s.ProfileManager.CreateProfile(&profile) if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to create profile", "details": err.Error()}) + respondWithError(c, http.StatusInternalServerError, ErrCodeInternal, "failed to create profile", err.Error()) return } @@ -868,18 +918,24 @@ func (s *Service) handleGetProfile(c *gin.Context) { // @Param id path string true "Profile ID" // @Param profile body MiningProfile true "Updated Mining Profile" // @Success 200 {object} MiningProfile +// @Failure 404 {object} APIError "Profile not found" // @Router /profiles/{id} [put] func (s *Service) handleUpdateProfile(c *gin.Context) { profileID := c.Param("id") var profile MiningProfile if err := c.ShouldBindJSON(&profile); err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + respondWithError(c, http.StatusBadRequest, ErrCodeInvalidInput, "invalid profile data", err.Error()) return } profile.ID = profileID if err := s.ProfileManager.UpdateProfile(&profile); err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update profile", "details": err.Error()}) + // Check if error is "not found" + if strings.Contains(err.Error(), "not found") { + respondWithError(c, http.StatusNotFound, ErrCodeProfileNotFound, "profile not found", err.Error()) + return + } + respondWithError(c, http.StatusInternalServerError, ErrCodeInternal, "failed to update profile", err.Error()) return } c.JSON(http.StatusOK, profile) @@ -887,7 +943,7 @@ func (s *Service) handleUpdateProfile(c *gin.Context) { // handleDeleteProfile godoc // @Summary Delete a mining profile -// @Description Delete a mining profile by its ID +// @Description Delete a mining profile by its ID. Idempotent - returns success even if profile doesn't exist. // @Tags profiles // @Produce json // @Param id path string true "Profile ID" @@ -896,7 +952,12 @@ func (s *Service) handleUpdateProfile(c *gin.Context) { func (s *Service) handleDeleteProfile(c *gin.Context) { profileID := c.Param("id") if err := s.ProfileManager.DeleteProfile(profileID); err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete profile", "details": err.Error()}) + // Make DELETE idempotent - if profile doesn't exist, still return success + if strings.Contains(err.Error(), "not found") { + c.JSON(http.StatusOK, gin.H{"status": "profile deleted"}) + return + } + respondWithError(c, http.StatusInternalServerError, ErrCodeInternal, "failed to delete profile", err.Error()) return } c.JSON(http.StatusOK, gin.H{"status": "profile deleted"}) @@ -1030,7 +1091,20 @@ func (s *Service) handleWebSocketEvents(c *gin.Context) { } logging.Info("new WebSocket connection", logging.Fields{"remote": c.Request.RemoteAddr}) + RecordWSConnection(true) if !s.EventHub.ServeWs(conn) { + RecordWSConnection(false) // Undo increment on rejection logging.Warn("WebSocket connection rejected", logging.Fields{"remote": c.Request.RemoteAddr, "reason": "limit reached"}) } } + +// handleMetrics godoc +// @Summary Get internal metrics +// @Description Returns internal metrics for monitoring and debugging +// @Tags system +// @Produce json +// @Success 200 {object} map[string]interface{} +// @Router /metrics [get] +func (s *Service) handleMetrics(c *gin.Context) { + c.JSON(http.StatusOK, GetMetricsSnapshot()) +} diff --git a/pkg/mining/service_test.go b/pkg/mining/service_test.go index 3613153..de4a1f3 100644 --- a/pkg/mining/service_test.go +++ b/pkg/mining/service_test.go @@ -30,10 +30,10 @@ type MockMiner struct { WriteStdinFunc func(input string) error } -func (m *MockMiner) Install() error { return m.InstallFunc() } -func (m *MockMiner) Uninstall() error { return m.UninstallFunc() } -func (m *MockMiner) Start(config *Config) error { return m.StartFunc(config) } -func (m *MockMiner) Stop() error { return m.StopFunc() } +func (m *MockMiner) Install() error { return m.InstallFunc() } +func (m *MockMiner) Uninstall() error { return m.UninstallFunc() } +func (m *MockMiner) Start(config *Config) error { return m.StartFunc(config) } +func (m *MockMiner) Stop() error { return m.StopFunc() } func (m *MockMiner) GetStats(ctx context.Context) (*PerformanceMetrics, error) { return m.GetStatsFunc(ctx) } @@ -43,9 +43,9 @@ func (m *MockMiner) GetType() string { } return "mock" } -func (m *MockMiner) GetName() string { return m.GetNameFunc() } -func (m *MockMiner) GetPath() string { return m.GetPathFunc() } -func (m *MockMiner) GetBinaryPath() string { return m.GetBinaryPathFunc() } +func (m *MockMiner) GetName() string { return m.GetNameFunc() } +func (m *MockMiner) GetPath() string { return m.GetPathFunc() } +func (m *MockMiner) GetBinaryPath() string { return m.GetBinaryPathFunc() } func (m *MockMiner) CheckInstallation() (*InstallationDetails, error) { return m.CheckInstallationFunc() } diff --git a/pkg/mining/settings_manager.go b/pkg/mining/settings_manager.go index 480b08d..bd96c78 100644 --- a/pkg/mining/settings_manager.go +++ b/pkg/mining/settings_manager.go @@ -14,10 +14,10 @@ const settingsFileName = "settings.json" // WindowState stores the last window position and size type WindowState struct { - X int `json:"x"` - Y int `json:"y"` - Width int `json:"width"` - Height int `json:"height"` + X int `json:"x"` + Y int `json:"y"` + Width int `json:"width"` + Height int `json:"height"` Maximized bool `json:"maximized"` } diff --git a/pkg/mining/simulated_miner.go b/pkg/mining/simulated_miner.go index ab3a4f3..dfffedc 100644 --- a/pkg/mining/simulated_miner.go +++ b/pkg/mining/simulated_miner.go @@ -15,31 +15,31 @@ const MinerTypeSimulated = "simulated" // SimulatedMiner is a mock miner that generates realistic-looking stats for UI testing. type SimulatedMiner struct { // Exported fields for JSON serialization - Name string `json:"name"` - MinerType string `json:"miner_type"` - Version string `json:"version"` - URL string `json:"url"` - Path string `json:"path"` - MinerBinary string `json:"miner_binary"` - Running bool `json:"running"` - Algorithm string `json:"algorithm"` - HashrateHistory []HashratePoint `json:"hashrateHistory"` - LowResHistory []HashratePoint `json:"lowResHashrateHistory"` - Stats *PerformanceMetrics `json:"stats,omitempty"` - FullStats *XMRigSummary `json:"full_stats,omitempty"` // XMRig-compatible format for UI + Name string `json:"name"` + MinerType string `json:"miner_type"` + Version string `json:"version"` + URL string `json:"url"` + Path string `json:"path"` + MinerBinary string `json:"miner_binary"` + Running bool `json:"running"` + Algorithm string `json:"algorithm"` + HashrateHistory []HashratePoint `json:"hashrateHistory"` + LowResHistory []HashratePoint `json:"lowResHashrateHistory"` + Stats *PerformanceMetrics `json:"stats,omitempty"` + FullStats *XMRigSummary `json:"full_stats,omitempty"` // XMRig-compatible format for UI // Internal fields (not exported) - baseHashrate int - peakHashrate int - variance float64 - startTime time.Time - shares int - rejected int - logs []string - mu sync.RWMutex - stopChan chan struct{} - poolName string - difficultyBase int + baseHashrate int + peakHashrate int + variance float64 + startTime time.Time + shares int + rejected int + logs []string + mu sync.RWMutex + stopChan chan struct{} + poolName string + difficultyBase int } // SimulatedMinerConfig holds configuration for creating a simulated miner. diff --git a/pkg/mining/stats_collector.go b/pkg/mining/stats_collector.go index b5d5c5b..e968e44 100644 --- a/pkg/mining/stats_collector.go +++ b/pkg/mining/stats_collector.go @@ -55,4 +55,3 @@ func FetchJSONStats[T any](ctx context.Context, config HTTPStatsConfig, target * return nil } - diff --git a/pkg/mining/throttle_test.go b/pkg/mining/throttle_test.go index 5c2551b..e068cdd 100644 --- a/pkg/mining/throttle_test.go +++ b/pkg/mining/throttle_test.go @@ -141,7 +141,7 @@ func TestCPUThrottleThreadCount(t *testing.T) { defer manager.Stop() numCPU := runtime.NumCPU() - targetThreads := 1 // Use only 1 thread + targetThreads := 1 // Use only 1 thread expectedMaxCPU := float64(100) / float64(numCPU) * float64(targetThreads) * 1.5 // 50% tolerance config := &Config{ diff --git a/pkg/mining/ttminer_start.go b/pkg/mining/ttminer_start.go index a09912e..4b36b41 100644 --- a/pkg/mining/ttminer_start.go +++ b/pkg/mining/ttminer_start.go @@ -197,10 +197,10 @@ func isValidCLIArg(arg string) bool { // Block arguments that could override security-related settings blockedPrefixes := []string{ "--api-access-token", "--api-worker-id", // TT-Miner API settings - "--config", // Could load arbitrary config - "--log-file", // Could write to arbitrary locations - "--coin-file", // Could load arbitrary coin configs - "-o", "--out", // Output redirection + "--config", // Could load arbitrary config + "--log-file", // Could write to arbitrary locations + "--coin-file", // Could load arbitrary coin configs + "-o", "--out", // Output redirection } lowerArg := strings.ToLower(arg) for _, blocked := range blockedPrefixes { diff --git a/pkg/mining/xmrig_gpu_test.go b/pkg/mining/xmrig_gpu_test.go index bd4e987..a158b11 100644 --- a/pkg/mining/xmrig_gpu_test.go +++ b/pkg/mining/xmrig_gpu_test.go @@ -43,9 +43,9 @@ func TestXMRigDualMiningConfig(t *testing.T) { GPUPool: "stratum+tcp://ravencoin.pool.com:3333", GPUWallet: "gpu_wallet_address", GPUAlgo: "kawpow", - CUDA: true, // NVIDIA + CUDA: true, // NVIDIA OpenCL: false, - Devices: "0", // Explicit device selection required + Devices: "0", // Explicit device selection required } err := miner.createConfig(config) @@ -146,7 +146,7 @@ func TestXMRigGPUOnlyConfig(t *testing.T) { Pool: "stratum+tcp://pool.supportxmr.com:3333", Wallet: "test_wallet", Algo: "rx/0", - NoCPU: true, // Disable CPU + NoCPU: true, // Disable CPU GPUEnabled: true, OpenCL: true, // AMD GPU CUDA: true, // Also NVIDIA diff --git a/pkg/mining/xmrig_start.go b/pkg/mining/xmrig_start.go index 6ca22aa..bb121a6 100644 --- a/pkg/mining/xmrig_start.go +++ b/pkg/mining/xmrig_start.go @@ -99,21 +99,33 @@ func (m *XMRigMiner) Start(config *Config) error { // Capture cmd locally to avoid race with Stop() cmd := m.cmd + minerName := m.Name // Capture name for logging go func() { // Use a channel to detect if Wait() completes done := make(chan struct{}) + var waitErr error go func() { - cmd.Wait() + waitErr = cmd.Wait() close(done) }() // Wait with timeout to prevent goroutine leak on zombie processes select { case <-done: - // Normal exit + // Normal exit - log the exit status + if waitErr != nil { + logging.Info("miner process exited", logging.Fields{ + "miner": minerName, + "error": waitErr.Error(), + }) + } else { + logging.Info("miner process exited normally", logging.Fields{ + "miner": minerName, + }) + } case <-time.After(5 * time.Minute): // Process didn't exit after 5 minutes - force cleanup - logging.Warn("miner process wait timeout, forcing cleanup", logging.Fields{"miner": m.Name}) + logging.Warn("miner process wait timeout, forcing cleanup", logging.Fields{"miner": minerName}) if cmd.Process != nil { cmd.Process.Kill() } @@ -122,7 +134,7 @@ func (m *XMRigMiner) Start(config *Config) error { case <-done: // Inner goroutine completed case <-time.After(10 * time.Second): - logging.Error("process cleanup timed out after kill", logging.Fields{"miner": m.Name}) + logging.Error("process cleanup timed out after kill", logging.Fields{"miner": minerName}) } } diff --git a/pkg/node/bundle_test.go b/pkg/node/bundle_test.go index e8b0bea..4bc8f26 100644 --- a/pkg/node/bundle_test.go +++ b/pkg/node/bundle_test.go @@ -144,9 +144,9 @@ func TestExtractProfileBundle(t *testing.T) { func TestTarballFunctions(t *testing.T) { t.Run("CreateAndExtractTarball", func(t *testing.T) { files := map[string][]byte{ - "file1.txt": []byte("content of file 1"), - "dir/file2.json": []byte(`{"key":"value"}`), - "miners/xmrig": []byte("binary content"), + "file1.txt": []byte("content of file 1"), + "dir/file2.json": []byte(`{"key":"value"}`), + "miners/xmrig": []byte("binary content"), } tarData, err := createTarball(files) diff --git a/pkg/node/controller.go b/pkg/node/controller.go index c744583..299c631 100644 --- a/pkg/node/controller.go +++ b/pkg/node/controller.go @@ -6,6 +6,8 @@ import ( "fmt" "sync" "time" + + "github.com/Snider/Mining/pkg/logging" ) // Controller manages remote peer operations from a controller node. @@ -248,6 +250,11 @@ func (c *Controller) GetAllStats() map[string]*StatsPayload { defer wg.Done() stats, err := c.GetRemoteStats(p.ID) if err != nil { + logging.Debug("failed to get stats from peer", logging.Fields{ + "peer_id": p.ID, + "peer": p.Name, + "error": err.Error(), + }) return // Skip failed peers } mu.Lock() diff --git a/pkg/node/message.go b/pkg/node/message.go index 58b9f24..293be1e 100644 --- a/pkg/node/message.go +++ b/pkg/node/message.go @@ -12,11 +12,11 @@ type MessageType string const ( // Connection lifecycle - MsgHandshake MessageType = "handshake" + MsgHandshake MessageType = "handshake" MsgHandshakeAck MessageType = "handshake_ack" - MsgPing MessageType = "ping" - MsgPong MessageType = "pong" - MsgDisconnect MessageType = "disconnect" + MsgPing MessageType = "ping" + MsgPong MessageType = "pong" + MsgDisconnect MessageType = "disconnect" // Miner operations MsgGetStats MessageType = "get_stats" @@ -39,10 +39,10 @@ const ( // Message represents a P2P message between nodes. type Message struct { - ID string `json:"id"` // UUID + ID string `json:"id"` // UUID Type MessageType `json:"type"` - From string `json:"from"` // Sender node ID - To string `json:"to"` // Recipient node ID (empty for broadcast) + From string `json:"from"` // Sender node ID + To string `json:"to"` // Recipient node ID (empty for broadcast) Timestamp time.Time `json:"ts"` Payload json.RawMessage `json:"payload"` ReplyTo string `json:"replyTo,omitempty"` // ID of message being replied to @@ -98,10 +98,10 @@ type HandshakePayload struct { // HandshakeAckPayload is the response to a handshake. type HandshakeAckPayload struct { - Identity NodeIdentity `json:"identity"` - ChallengeResponse []byte `json:"challengeResponse,omitempty"` - Accepted bool `json:"accepted"` - Reason string `json:"reason,omitempty"` // If not accepted + Identity NodeIdentity `json:"identity"` + ChallengeResponse []byte `json:"challengeResponse,omitempty"` + Accepted bool `json:"accepted"` + Reason string `json:"reason,omitempty"` // If not accepted } // PingPayload for keepalive/latency measurement. @@ -117,7 +117,7 @@ type PongPayload struct { // StartMinerPayload requests starting a miner. type StartMinerPayload struct { - MinerType string `json:"minerType"` // Required: miner type (e.g., "xmrig", "tt-miner") + MinerType string `json:"minerType"` // Required: miner type (e.g., "xmrig", "tt-miner") ProfileID string `json:"profileId,omitempty"` Config json.RawMessage `json:"config,omitempty"` // Override profile config } @@ -158,7 +158,7 @@ type StatsPayload struct { // GetLogsPayload requests console logs from a miner. type GetLogsPayload struct { MinerName string `json:"minerName"` - Lines int `json:"lines"` // Number of lines to fetch + Lines int `json:"lines"` // Number of lines to fetch Since int64 `json:"since,omitempty"` // Unix timestamp, logs after this time } diff --git a/pkg/node/transport.go b/pkg/node/transport.go index e9cffa8..199cbe2 100644 --- a/pkg/node/transport.go +++ b/pkg/node/transport.go @@ -8,6 +8,7 @@ import ( "net/http" "net/url" "sync" + "sync/atomic" "time" "github.com/Snider/Borg/pkg/smsg" @@ -15,6 +16,12 @@ import ( "github.com/gorilla/websocket" ) +// debugLogCounter tracks message counts for rate limiting debug logs +var debugLogCounter atomic.Int64 + +// debugLogInterval controls how often we log debug messages in hot paths (1 in N) +const debugLogInterval = 100 + // TransportConfig configures the WebSocket transport. type TransportConfig struct { ListenAddr string // ":9091" default @@ -404,28 +411,28 @@ func (t *Transport) performHandshake(pc *PeerConnection) error { msg, err := NewMessage(MsgHandshake, identity.ID, pc.Peer.ID, payload) if err != nil { - return err + return fmt.Errorf("create handshake message: %w", err) } // First message is unencrypted (peer needs our public key) data, err := json.Marshal(msg) if err != nil { - return err + return fmt.Errorf("marshal handshake message: %w", err) } if err := pc.Conn.WriteMessage(websocket.TextMessage, data); err != nil { - return err + return fmt.Errorf("send handshake: %w", err) } // Wait for ack _, ackData, err := pc.Conn.ReadMessage() if err != nil { - return err + return fmt.Errorf("read handshake ack: %w", err) } var ackMsg Message if err := json.Unmarshal(ackData, &ackMsg); err != nil { - return err + return fmt.Errorf("unmarshal handshake ack: %w", err) } if ackMsg.Type != MsgHandshakeAck { @@ -434,7 +441,7 @@ func (t *Transport) performHandshake(pc *PeerConnection) error { var ackPayload HandshakeAckPayload if err := ackMsg.ParsePayload(&ackPayload); err != nil { - return err + return fmt.Errorf("parse handshake ack payload: %w", err) } if !ackPayload.Accepted { @@ -490,7 +497,10 @@ func (t *Transport) readLoop(pc *PeerConnection) { continue // Skip invalid messages } - logging.Debug("received message from peer", logging.Fields{"type": msg.Type, "peer_id": pc.Peer.ID, "reply_to": msg.ReplyTo}) + // Rate limit debug logs in hot path to reduce noise (log 1 in N messages) + if debugLogCounter.Add(1)%debugLogInterval == 0 { + logging.Debug("received message from peer", logging.Fields{"type": msg.Type, "peer_id": pc.Peer.ID, "reply_to": msg.ReplyTo, "sample": "1/100"}) + } // Dispatch to handler (read handler under lock to avoid race) t.mu.RLock()