diff --git a/pkg/mining/mining.go b/pkg/mining/mining.go index 7355ec2..16235d6 100644 --- a/pkg/mining/mining.go +++ b/pkg/mining/mining.go @@ -2,6 +2,8 @@ package mining import ( "context" + "fmt" + "strings" "time" ) @@ -157,6 +159,80 @@ type Config struct { CLIArgs string `json:"cliArgs,omitempty"` // Additional CLI arguments } +// Validate checks the Config for common errors and security issues. +// Returns nil if valid, otherwise returns a descriptive error. +func (c *Config) Validate() error { + // Pool URL validation + if c.Pool != "" { + // Block shell metacharacters in pool URL + if containsShellChars(c.Pool) { + return fmt.Errorf("pool URL contains invalid characters") + } + } + + // Wallet validation (basic alphanumeric + special chars allowed in addresses) + if c.Wallet != "" { + if containsShellChars(c.Wallet) { + return fmt.Errorf("wallet address contains invalid characters") + } + // Most wallet addresses are 40-128 chars + if len(c.Wallet) > 256 { + return fmt.Errorf("wallet address too long (max 256 chars)") + } + } + + // Thread count validation + if c.Threads < 0 { + return fmt.Errorf("threads cannot be negative") + } + if c.Threads > 1024 { + return fmt.Errorf("threads value too high (max 1024)") + } + + // Algorithm validation (alphanumeric, dash, slash) + if c.Algo != "" { + if !isValidAlgo(c.Algo) { + return fmt.Errorf("algorithm name contains invalid characters") + } + } + + // Intensity validation + if c.Intensity < 0 || c.Intensity > 100 { + return fmt.Errorf("intensity must be between 0 and 100") + } + if c.GPUIntensity < 0 || c.GPUIntensity > 100 { + return fmt.Errorf("GPU intensity must be between 0 and 100") + } + + // Donate level validation + if c.DonateLevel < 0 || c.DonateLevel > 100 { + return fmt.Errorf("donate level must be between 0 and 100") + } + + return nil +} + +// containsShellChars checks for shell metacharacters that could enable injection +func containsShellChars(s string) bool { + dangerous := []string{";", "|", "&", "`", "$", "(", ")", "{", "}", "<", ">", "\n", "\r", "\\", "'", "\"", "!"} + for _, d := range dangerous { + if strings.Contains(s, d) { + return true + } + } + return false +} + +// isValidAlgo checks if an algorithm name contains only valid characters +func isValidAlgo(algo string) bool { + for _, r := range algo { + if !((r >= 'a' && r <= 'z') || (r >= 'A' && r <= 'Z') || (r >= '0' && r <= '9') || r == '-' || r == '/' || r == '_') { + return false + } + } + return true +} + // PerformanceMetrics represents the performance metrics for a miner. type PerformanceMetrics struct { Hashrate int `json:"hashrate"` diff --git a/pkg/mining/service.go b/pkg/mining/service.go index c7c70b6..d87b424 100644 --- a/pkg/mining/service.go +++ b/pkg/mining/service.go @@ -210,7 +210,12 @@ func NewService(manager ManagerInterface, listenAddr string, displayAddr string, profileManager, err := NewProfileManager() if err != nil { - return nil, fmt.Errorf("failed to initialize profile manager: %w", err) + logging.Warn("failed to initialize profile manager", logging.Fields{"error": err}) + // Continue without profile manager - profile features will be degraded + // Create a minimal in-memory profile manager as fallback + profileManager = &ProfileManager{ + profiles: make(map[string]*MiningProfile), + } } // Initialize node service (optional - only fails if XDG paths are broken) @@ -408,6 +413,10 @@ func (s *Service) ServiceStartup(ctx context.Context) error { func (s *Service) SetupRoutes() { apiGroup := s.Router.Group(s.APIBasePath) + // Health endpoints (no auth required for orchestration/monitoring) + apiGroup.GET("/health", s.handleHealth) + apiGroup.GET("/ready", s.handleReady) + // Apply authentication middleware if enabled if s.auth != nil { apiGroup.Use(s.auth.Middleware()) @@ -483,6 +492,81 @@ func (s *Service) SetupRoutes() { logging.Info("MCP server enabled", logging.Fields{"endpoint": s.APIBasePath + "/mcp"}) } +// HealthResponse represents the health check response +type HealthResponse struct { + Status string `json:"status"` + Components map[string]string `json:"components,omitempty"` +} + +// handleHealth godoc +// @Summary Health check endpoint +// @Description Returns service health status. Used for liveness probes. +// @Tags system +// @Produce json +// @Success 200 {object} HealthResponse +// @Router /health [get] +func (s *Service) handleHealth(c *gin.Context) { + c.JSON(http.StatusOK, HealthResponse{ + Status: "healthy", + }) +} + +// handleReady godoc +// @Summary Readiness check endpoint +// @Description Returns service readiness with component status. Used for readiness probes. +// @Tags system +// @Produce json +// @Success 200 {object} HealthResponse +// @Success 503 {object} HealthResponse +// @Router /ready [get] +func (s *Service) handleReady(c *gin.Context) { + components := make(map[string]string) + allReady := true + + // Check manager + if s.Manager != nil { + components["manager"] = "ready" + } else { + components["manager"] = "not initialized" + allReady = false + } + + // Check profile manager + if s.ProfileManager != nil { + components["profiles"] = "ready" + } else { + components["profiles"] = "degraded" + // Don't fail readiness for degraded profile manager + } + + // Check event hub + if s.EventHub != nil { + components["events"] = "ready" + } else { + components["events"] = "not initialized" + allReady = false + } + + // Check node service (optional) + if s.NodeService != nil { + components["p2p"] = "ready" + } else { + components["p2p"] = "disabled" + } + + status := "ready" + httpStatus := http.StatusOK + if !allReady { + status = "not ready" + httpStatus = http.StatusServiceUnavailable + } + + c.JSON(httpStatus, HealthResponse{ + Status: status, + Components: components, + }) +} + // handleGetInfo godoc // @Summary Get live miner installation information // @Description Retrieves live installation details for all miners, along with system information. diff --git a/pkg/mining/xmrig.go b/pkg/mining/xmrig.go index 3aacd9a..dd230b4 100644 --- a/pkg/mining/xmrig.go +++ b/pkg/mining/xmrig.go @@ -23,7 +23,14 @@ type XMRigMiner struct { } var ( - httpClient = &http.Client{Timeout: 30 * time.Second} + httpClient = &http.Client{ + Timeout: 30 * time.Second, + Transport: &http.Transport{ + MaxIdleConns: 100, + MaxIdleConnsPerHost: 10, + IdleConnTimeout: 90 * time.Second, + }, + } httpClientMu sync.RWMutex ) diff --git a/pkg/node/transport.go b/pkg/node/transport.go index c14f86d..1f0d1d2 100644 --- a/pkg/node/transport.go +++ b/pkg/node/transport.go @@ -52,6 +52,49 @@ func DefaultTransportConfig() TransportConfig { // MessageHandler processes incoming messages. type MessageHandler func(conn *PeerConnection, msg *Message) +// MessageDeduplicator tracks seen message IDs to prevent duplicate processing +type MessageDeduplicator struct { + seen map[string]time.Time + mu sync.RWMutex + ttl time.Duration +} + +// NewMessageDeduplicator creates a deduplicator with specified TTL +func NewMessageDeduplicator(ttl time.Duration) *MessageDeduplicator { + d := &MessageDeduplicator{ + seen: make(map[string]time.Time), + ttl: ttl, + } + return d +} + +// IsDuplicate checks if a message ID has been seen recently +func (d *MessageDeduplicator) IsDuplicate(msgID string) bool { + d.mu.RLock() + _, exists := d.seen[msgID] + d.mu.RUnlock() + return exists +} + +// Mark records a message ID as seen +func (d *MessageDeduplicator) Mark(msgID string) { + d.mu.Lock() + d.seen[msgID] = time.Now() + d.mu.Unlock() +} + +// Cleanup removes expired entries +func (d *MessageDeduplicator) Cleanup() { + d.mu.Lock() + defer d.mu.Unlock() + now := time.Now() + for id, seen := range d.seen { + if now.Sub(seen) > d.ttl { + delete(d.seen, id) + } + } +} + // Transport manages WebSocket connections with SMSG encryption. type Transport struct { config TransportConfig @@ -62,12 +105,54 @@ type Transport struct { node *NodeManager registry *PeerRegistry handler MessageHandler + dedup *MessageDeduplicator // Message deduplication mu sync.RWMutex ctx context.Context cancel context.CancelFunc wg sync.WaitGroup } +// PeerRateLimiter implements a simple token bucket rate limiter per peer +type PeerRateLimiter struct { + tokens int + maxTokens int + refillRate int // tokens per second + lastRefill time.Time + mu sync.Mutex +} + +// NewPeerRateLimiter creates a rate limiter with specified messages/second +func NewPeerRateLimiter(maxTokens, refillRate int) *PeerRateLimiter { + return &PeerRateLimiter{ + tokens: maxTokens, + maxTokens: maxTokens, + refillRate: refillRate, + lastRefill: time.Now(), + } +} + +// Allow checks if a message is allowed and consumes a token if so +func (r *PeerRateLimiter) Allow() bool { + r.mu.Lock() + defer r.mu.Unlock() + + // Refill tokens based on elapsed time + now := time.Now() + elapsed := now.Sub(r.lastRefill) + tokensToAdd := int(elapsed.Seconds()) * r.refillRate + if tokensToAdd > 0 { + r.tokens = min(r.tokens+tokensToAdd, r.maxTokens) + r.lastRefill = now + } + + // Check if we have tokens available + if r.tokens > 0 { + r.tokens-- + return true + } + return false +} + // PeerConnection represents an active connection to a peer. type PeerConnection struct { Peer *Peer @@ -76,7 +161,8 @@ type PeerConnection struct { LastActivity time.Time writeMu sync.Mutex // Serialize WebSocket writes transport *Transport - closeOnce sync.Once // Ensure Close() is only called once + closeOnce sync.Once // Ensure Close() is only called once + rateLimiter *PeerRateLimiter // Per-peer message rate limiting } // NewTransport creates a new WebSocket transport. @@ -88,6 +174,7 @@ func NewTransport(node *NodeManager, registry *PeerRegistry, config TransportCon node: node, registry: registry, conns: make(map[string]*PeerConnection), + dedup: NewMessageDeduplicator(5 * time.Minute), // 5 minute TTL for dedup upgrader: websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, @@ -135,6 +222,22 @@ func (t *Transport) Start() error { } }() + // Start message deduplication cleanup goroutine + t.wg.Add(1) + go func() { + defer t.wg.Done() + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + for { + select { + case <-t.ctx.Done(): + return + case <-ticker.C: + t.dedup.Cleanup() + } + } + }() + return nil } @@ -194,6 +297,7 @@ func (t *Transport) Connect(peer *Peer) (*PeerConnection, error) { Conn: conn, LastActivity: time.Now(), transport: t, + rateLimiter: NewPeerRateLimiter(100, 50), // 100 burst, 50/sec refill } // Perform handshake with challenge-response authentication @@ -379,6 +483,7 @@ func (t *Transport) handleWSUpgrade(w http.ResponseWriter, r *http.Request) { SharedSecret: sharedSecret, LastActivity: time.Now(), transport: t, + rateLimiter: NewPeerRateLimiter(100, 50), // 100 burst, 50/sec refill } // Send handshake acknowledgment @@ -574,6 +679,12 @@ func (t *Transport) readLoop(pc *PeerConnection) { pc.LastActivity = time.Now() + // Check rate limit before processing + if pc.rateLimiter != nil && !pc.rateLimiter.Allow() { + logging.Warn("peer rate limited, dropping message", logging.Fields{"peer_id": pc.Peer.ID}) + continue // Drop message from rate-limited peer + } + // Decrypt message using SMSG with shared secret msg, err := t.decryptMessage(data, pc.SharedSecret) if err != nil { @@ -581,6 +692,13 @@ func (t *Transport) readLoop(pc *PeerConnection) { continue // Skip invalid messages } + // Check for duplicate messages (prevents amplification attacks) + if t.dedup.IsDuplicate(msg.ID) { + logging.Debug("dropping duplicate message", logging.Fields{"msg_id": msg.ID, "peer_id": pc.Peer.ID}) + continue + } + t.dedup.Mark(msg.ID) + // Rate limit debug logs in hot path to reduce noise (log 1 in N messages) if debugLogCounter.Add(1)%debugLogInterval == 0 { logging.Debug("received message from peer", logging.Fields{"type": msg.Type, "peer_id": pc.Peer.ID, "reply_to": msg.ReplyTo, "sample": "1/100"})