diff --git a/pkg/mining/manager.go b/pkg/mining/manager.go index a95b558..28bdc55 100644 --- a/pkg/mining/manager.go +++ b/pkg/mining/manager.go @@ -34,6 +34,7 @@ type Manager struct { miners map[string]Miner mu sync.RWMutex stopChan chan struct{} + stopOnce sync.Once waitGroup sync.WaitGroup dbEnabled bool dbRetention int @@ -505,25 +506,28 @@ func (m *Manager) GetMinerHashrateHistory(name string) ([]HashratePoint, error) } // Stop stops all running miners, background goroutines, and closes resources. +// Safe to call multiple times - subsequent calls are no-ops. func (m *Manager) Stop() { - // Stop all running miners first - m.mu.Lock() - for name, miner := range m.miners { - if err := miner.Stop(); err != nil { - log.Printf("Warning: failed to stop miner %s: %v", name, err) + m.stopOnce.Do(func() { + // Stop all running miners first + m.mu.Lock() + for name, miner := range m.miners { + if err := miner.Stop(); err != nil { + log.Printf("Warning: failed to stop miner %s: %v", name, err) + } } - } - m.mu.Unlock() + m.mu.Unlock() - close(m.stopChan) - m.waitGroup.Wait() + close(m.stopChan) + m.waitGroup.Wait() - // Close the database - if m.dbEnabled { - if err := database.Close(); err != nil { - log.Printf("Warning: failed to close database: %v", err) + // Close the database + if m.dbEnabled { + if err := database.Close(); err != nil { + log.Printf("Warning: failed to close database: %v", err) + } } - } + }) } // GetMinerHistoricalStats returns historical stats from the database for a miner. diff --git a/pkg/mining/miner.go b/pkg/mining/miner.go index 62c8933..51f5e74 100644 --- a/pkg/mining/miner.go +++ b/pkg/mining/miner.go @@ -39,6 +39,9 @@ func NewLogBuffer(maxLines int) *LogBuffer { } } +// maxLineLength is the maximum length of a single log line to prevent memory bloat. +const maxLineLength = 2000 + // Write implements io.Writer for capturing output. func (lb *LogBuffer) Write(p []byte) (n int, err error) { lb.mu.Lock() @@ -52,13 +55,19 @@ func (lb *LogBuffer) Write(p []byte) (n int, err error) { if line == "" { continue } + // Truncate excessively long lines to prevent memory bloat + if len(line) > maxLineLength { + line = line[:maxLineLength] + "... [truncated]" + } // Add timestamp prefix timestampedLine := fmt.Sprintf("[%s] %s", time.Now().Format("15:04:05"), line) lb.lines = append(lb.lines, timestampedLine) - // Trim if over max + // Trim if over max - force reallocation to release memory if len(lb.lines) > lb.maxLines { - lb.lines = lb.lines[len(lb.lines)-lb.maxLines:] + newSlice := make([]string, lb.maxLines) + copy(newSlice, lb.lines[len(lb.lines)-lb.maxLines:]) + lb.lines = newSlice } } return len(p), nil @@ -225,6 +234,8 @@ func (b *BaseMiner) InstallFromURL(url string) error { } if _, err := io.Copy(tmpfile, resp.Body); err != nil { + // Drain remaining body to allow connection reuse + io.Copy(io.Discard, resp.Body) return err } @@ -438,7 +449,14 @@ func (b *BaseMiner) ReduceHashrateHistory(now time.Time) { newHighResHistory = append(newHighResHistory, p) } } - b.HashrateHistory = newHighResHistory + // Force reallocation if significantly oversized to free memory + if cap(b.HashrateHistory) > 1000 && len(newHighResHistory) < cap(b.HashrateHistory)/2 { + trimmed := make([]HashratePoint, len(newHighResHistory)) + copy(trimmed, newHighResHistory) + b.HashrateHistory = trimmed + } else { + b.HashrateHistory = newHighResHistory + } if len(pointsToAggregate) == 0 { b.LastLowResAggregation = now @@ -480,7 +498,16 @@ func (b *BaseMiner) ReduceHashrateHistory(now time.Time) { firstValidLowResIndex = len(b.LowResHashrateHistory) } } - b.LowResHashrateHistory = b.LowResHashrateHistory[firstValidLowResIndex:] + + // Force reallocation if significantly oversized to free memory + newLowResLen := len(b.LowResHashrateHistory) - firstValidLowResIndex + if cap(b.LowResHashrateHistory) > 1000 && newLowResLen < cap(b.LowResHashrateHistory)/2 { + trimmed := make([]HashratePoint, newLowResLen) + copy(trimmed, b.LowResHashrateHistory[firstValidLowResIndex:]) + b.LowResHashrateHistory = trimmed + } else { + b.LowResHashrateHistory = b.LowResHashrateHistory[firstValidLowResIndex:] + } b.LastLowResAggregation = now } diff --git a/pkg/mining/service.go b/pkg/mining/service.go index 6151a31..743b7f0 100644 --- a/pkg/mining/service.go +++ b/pkg/mining/service.go @@ -68,7 +68,11 @@ func NewService(manager ManagerInterface, listenAddr string, displayAddr string, ProfileManager: profileManager, NodeService: nodeService, Server: &http.Server{ - Addr: listenAddr, + Addr: listenAddr, + ReadTimeout: 30 * time.Second, + WriteTimeout: 30 * time.Second, + IdleTimeout: 60 * time.Second, + ReadHeaderTimeout: 10 * time.Second, }, DisplayAddr: displayAddr, SwaggerInstanceName: instanceName, @@ -83,6 +87,14 @@ func NewService(manager ManagerInterface, listenAddr string, displayAddr string, func (s *Service) InitRouter() { s.Router = gin.Default() + // Extract port safely from server address for CORS + serverPort := "9090" // default fallback + if s.Server.Addr != "" { + if _, port, err := net.SplitHostPort(s.Server.Addr); err == nil && port != "" { + serverPort = port + } + } + // Configure CORS to only allow local origins corsConfig := cors.Config{ AllowOrigins: []string{ @@ -90,8 +102,8 @@ func (s *Service) InitRouter() { "http://127.0.0.1:4200", "http://localhost:9090", // Default API port "http://127.0.0.1:9090", - "http://localhost:" + strings.Split(s.Server.Addr, ":")[len(strings.Split(s.Server.Addr, ":"))-1], - "http://127.0.0.1:" + strings.Split(s.Server.Addr, ":")[len(strings.Split(s.Server.Addr, ":"))-1], + "http://localhost:" + serverPort, + "http://127.0.0.1:" + serverPort, "wails://wails", // Wails desktop app }, AllowMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"}, @@ -101,6 +113,13 @@ func (s *Service) InitRouter() { MaxAge: 12 * time.Hour, } s.Router.Use(cors.New(corsConfig)) + + // Add request body size limit middleware (1MB max) + s.Router.Use(func(c *gin.Context) { + c.Request.Body = http.MaxBytesReader(c.Writer, c.Request.Body, 1<<20) // 1MB + c.Next() + }) + s.SetupRoutes() } diff --git a/pkg/mining/ttminer_start.go b/pkg/mining/ttminer_start.go index f39887c..20ba909 100644 --- a/pkg/mining/ttminer_start.go +++ b/pkg/mining/ttminer_start.go @@ -8,6 +8,7 @@ import ( "os" "os/exec" "strings" + "time" ) // Start launches the TT-Miner with the given configuration. @@ -67,7 +68,26 @@ func (m *TTMiner) Start(config *Config) error { // Capture cmd locally to avoid race with Stop() cmd := m.cmd go func() { - err := cmd.Wait() + // Use a channel to detect if Wait() completes + done := make(chan error, 1) + go func() { + done <- cmd.Wait() + }() + + // Wait with timeout to prevent goroutine leak on zombie processes + var err error + select { + case err = <-done: + // Normal exit + case <-time.After(5 * time.Minute): + // Process didn't exit after 5 minutes - force cleanup + log.Printf("TT-Miner process wait timeout, forcing cleanup") + if cmd.Process != nil { + cmd.Process.Kill() + } + err = <-done // Wait for the inner goroutine to finish + } + m.mu.Lock() // Only clear if this is still the same command (not restarted) if m.cmd == cmd { diff --git a/pkg/mining/xmrig_start.go b/pkg/mining/xmrig_start.go index 1e07067..e757e9c 100644 --- a/pkg/mining/xmrig_start.go +++ b/pkg/mining/xmrig_start.go @@ -10,6 +10,7 @@ import ( "os/exec" "path/filepath" "strings" + "time" ) // Start launches the XMRig miner with the specified configuration. @@ -90,7 +91,26 @@ func (m *XMRigMiner) Start(config *Config) error { // Capture cmd locally to avoid race with Stop() cmd := m.cmd go func() { - cmd.Wait() + // Use a channel to detect if Wait() completes + done := make(chan struct{}) + go func() { + cmd.Wait() + close(done) + }() + + // Wait with timeout to prevent goroutine leak on zombie processes + select { + case <-done: + // Normal exit + case <-time.After(5 * time.Minute): + // Process didn't exit after 5 minutes - force cleanup + log.Printf("Miner process wait timeout, forcing cleanup") + if cmd.Process != nil { + cmd.Process.Kill() + } + <-done // Wait for the inner goroutine to finish + } + m.mu.Lock() // Only clear if this is still the same command (not restarted) if m.cmd == cmd { diff --git a/pkg/node/controller.go b/pkg/node/controller.go index 8b5dbd0..15e571a 100644 --- a/pkg/node/controller.go +++ b/pkg/node/controller.go @@ -83,11 +83,12 @@ func (c *Controller) sendRequest(peerID string, msg *Message, timeout time.Durat c.pending[msg.ID] = respCh c.mu.Unlock() - // Clean up on exit + // Clean up on exit - ensure channel is closed and removed from map defer func() { c.mu.Lock() delete(c.pending, msg.ID) c.mu.Unlock() + close(respCh) // Close channel to allow garbage collection }() // Send the message @@ -341,6 +342,9 @@ func (c *Controller) GetTotalHashrate() float64 { var total float64 for _, stats := range allStats { + if stats == nil { + continue + } for _, miner := range stats.Miners { total += miner.Hashrate } diff --git a/pkg/node/transport.go b/pkg/node/transport.go index d838e4f..5337c0f 100644 --- a/pkg/node/transport.go +++ b/pkg/node/transport.go @@ -77,7 +77,20 @@ func NewTransport(node *NodeManager, registry *PeerRegistry, config TransportCon upgrader: websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, - CheckOrigin: func(r *http.Request) bool { return true }, // Allow all origins + CheckOrigin: func(r *http.Request) bool { + // Allow local connections only for security + origin := r.Header.Get("Origin") + if origin == "" { + return true // No origin header (non-browser client) + } + // Allow localhost and 127.0.0.1 origins + u, err := url.Parse(origin) + if err != nil { + return false + } + host := u.Hostname() + return host == "localhost" || host == "127.0.0.1" || host == "::1" + }, }, ctx: ctx, cancel: cancel, @@ -238,6 +251,16 @@ func (t *Transport) GetConnection(peerID string) *PeerConnection { // handleWSUpgrade handles incoming WebSocket connections. func (t *Transport) handleWSUpgrade(w http.ResponseWriter, r *http.Request) { + // Enforce MaxConns limit + t.mu.RLock() + currentConns := len(t.conns) + t.mu.RUnlock() + + if currentConns >= t.config.MaxConns { + http.Error(w, "Too many connections", http.StatusServiceUnavailable) + return + } + conn, err := t.upgrader.Upgrade(w, r, nil) if err != nil { return @@ -415,6 +438,13 @@ func (t *Transport) readLoop(pc *PeerConnection) { default: } + // Set read deadline to prevent blocking forever on unresponsive connections + readDeadline := t.config.PingInterval + t.config.PongTimeout + if err := pc.Conn.SetReadDeadline(time.Now().Add(readDeadline)); err != nil { + log.Printf("[readLoop] SetReadDeadline error for %s: %v", pc.Peer.ID, err) + return + } + _, data, err := pc.Conn.ReadMessage() if err != nil { log.Printf("[readLoop] Read error from %s: %v", pc.Peer.ID, err) @@ -495,6 +525,11 @@ func (pc *PeerConnection) Send(msg *Message) error { return err } + // Set write deadline to prevent blocking forever + if err := pc.Conn.SetWriteDeadline(time.Now().Add(10 * time.Second)); err != nil { + return fmt.Errorf("failed to set write deadline: %w", err) + } + return pc.Conn.WriteMessage(websocket.BinaryMessage, data) }