diff --git a/pkg/database/hashrate.go b/pkg/database/hashrate.go index fb61186..498a27f 100644 --- a/pkg/database/hashrate.go +++ b/pkg/database/hashrate.go @@ -1,10 +1,41 @@ package database import ( + "context" "fmt" + "log" "time" ) +// dbOperationTimeout is the maximum time for database operations +const dbOperationTimeout = 30 * time.Second + +// parseSQLiteTimestamp parses timestamp strings from SQLite which may use various formats. +// Logs a warning if parsing fails and returns zero time. +func parseSQLiteTimestamp(s string) time.Time { + if s == "" { + return time.Time{} + } + + // Try common SQLite timestamp formats + formats := []string{ + "2006-01-02 15:04:05.999999999-07:00", + time.RFC3339Nano, + time.RFC3339, + "2006-01-02 15:04:05", + "2006-01-02T15:04:05Z", + } + + for _, format := range formats { + if t, err := time.Parse(format, s); err == nil { + return t + } + } + + log.Printf("Warning: failed to parse timestamp '%s' from database", s) + return time.Time{} +} + // Resolution indicates the data resolution type type Resolution string @@ -49,13 +80,17 @@ func InsertHashratePoints(minerName, minerType string, points []HashratePoint, r return nil } - tx, err := db.Begin() + // Use context with timeout to prevent hanging on locked database + ctx, cancel := context.WithTimeout(context.Background(), dbOperationTimeout) + defer cancel() + + tx, err := db.BeginTx(ctx, nil) if err != nil { return fmt.Errorf("failed to begin transaction: %w", err) } defer tx.Rollback() - stmt, err := tx.Prepare(` + stmt, err := tx.PrepareContext(ctx, ` INSERT INTO hashrate_history (miner_name, miner_type, timestamp, hashrate, resolution) VALUES (?, ?, ?, ?, ?) `) @@ -65,7 +100,7 @@ func InsertHashratePoints(minerName, minerType string, points []HashratePoint, r defer stmt.Close() for _, point := range points { - _, err := stmt.Exec(minerName, minerType, point.Timestamp, point.Hashrate, string(resolution)) + _, err := stmt.ExecContext(ctx, minerName, minerType, point.Timestamp, point.Hashrate, string(resolution)) if err != nil { return fmt.Errorf("failed to insert point: %w", err) } @@ -235,15 +270,9 @@ func GetHashrateStats(minerName string) (*HashrateStats, error) { return nil, err } - // Parse timestamps - stats.FirstSeen, _ = time.Parse("2006-01-02 15:04:05.999999999-07:00", firstSeenStr) - if stats.FirstSeen.IsZero() { - stats.FirstSeen, _ = time.Parse(time.RFC3339Nano, firstSeenStr) - } - stats.LastSeen, _ = time.Parse("2006-01-02 15:04:05.999999999-07:00", lastSeenStr) - if stats.LastSeen.IsZero() { - stats.LastSeen, _ = time.Parse(time.RFC3339Nano, lastSeenStr) - } + // Parse timestamps using helper that logs errors + stats.FirstSeen = parseSQLiteTimestamp(firstSeenStr) + stats.LastSeen = parseSQLiteTimestamp(lastSeenStr) return &stats, nil } @@ -290,15 +319,9 @@ func GetAllMinerStats() ([]HashrateStats, error) { ); err != nil { return nil, err } - // Parse timestamps - stats.FirstSeen, _ = time.Parse("2006-01-02 15:04:05.999999999-07:00", firstSeenStr) - if stats.FirstSeen.IsZero() { - stats.FirstSeen, _ = time.Parse(time.RFC3339Nano, firstSeenStr) - } - stats.LastSeen, _ = time.Parse("2006-01-02 15:04:05.999999999-07:00", lastSeenStr) - if stats.LastSeen.IsZero() { - stats.LastSeen, _ = time.Parse(time.RFC3339Nano, lastSeenStr) - } + // Parse timestamps using helper that logs errors + stats.FirstSeen = parseSQLiteTimestamp(firstSeenStr) + stats.LastSeen = parseSQLiteTimestamp(lastSeenStr) allStats = append(allStats, stats) } diff --git a/pkg/mining/config_manager.go b/pkg/mining/config_manager.go index 9aaf87b..3553b00 100644 --- a/pkg/mining/config_manager.go +++ b/pkg/mining/config_manager.go @@ -83,6 +83,7 @@ func LoadMinersConfig() (*MinersConfig, error) { } // SaveMinersConfig saves the miners configuration to the file system. +// Uses atomic write pattern: write to temp file, then rename. func SaveMinersConfig(cfg *MinersConfig) error { configMu.Lock() defer configMu.Unlock() @@ -92,7 +93,8 @@ func SaveMinersConfig(cfg *MinersConfig) error { return fmt.Errorf("could not determine miners config path: %w", err) } - if err := os.MkdirAll(filepath.Dir(configPath), 0755); err != nil { + dir := filepath.Dir(configPath) + if err := os.MkdirAll(dir, 0755); err != nil { return fmt.Errorf("failed to create config directory: %w", err) } @@ -101,8 +103,43 @@ func SaveMinersConfig(cfg *MinersConfig) error { return fmt.Errorf("failed to marshal miners config: %w", err) } - if err := os.WriteFile(configPath, data, 0600); err != nil { - return fmt.Errorf("failed to write miners config file: %w", err) + // Atomic write: write to temp file, then rename + tmpFile, err := os.CreateTemp(dir, "miners-config-*.tmp") + if err != nil { + return fmt.Errorf("failed to create temp file: %w", err) } + tmpPath := tmpFile.Name() + + // Clean up temp file on error + success := false + defer func() { + if !success { + os.Remove(tmpPath) + } + }() + + if _, err := tmpFile.Write(data); err != nil { + tmpFile.Close() + return fmt.Errorf("failed to write temp file: %w", err) + } + + if err := tmpFile.Sync(); err != nil { + tmpFile.Close() + return fmt.Errorf("failed to sync temp file: %w", err) + } + + if err := tmpFile.Close(); err != nil { + return fmt.Errorf("failed to close temp file: %w", err) + } + + if err := os.Chmod(tmpPath, 0600); err != nil { + return fmt.Errorf("failed to set temp file permissions: %w", err) + } + + if err := os.Rename(tmpPath, configPath); err != nil { + return fmt.Errorf("failed to rename temp file: %w", err) + } + + success = true return nil } diff --git a/pkg/mining/manager.go b/pkg/mining/manager.go index 28bdc55..7cd8f76 100644 --- a/pkg/mining/manager.go +++ b/pkg/mining/manager.go @@ -435,6 +435,9 @@ func (m *Manager) startStatsCollection() { }() } +// statsCollectionTimeout is the maximum time to wait for stats from a single miner. +const statsCollectionTimeout = 5 * time.Second + // collectMinerStats iterates through active miners and collects their stats. func (m *Manager) collectMinerStats() { m.mu.RLock() @@ -465,7 +468,11 @@ func (m *Manager) collectMinerStats() { continue // Miner was removed, skip it } - stats, err := miner.GetStats(context.Background()) + // Use context with timeout to prevent hanging on unresponsive miner APIs + ctx, cancel := context.WithTimeout(context.Background(), statsCollectionTimeout) + stats, err := miner.GetStats(ctx) + cancel() // Release context resources immediately + if err != nil { log.Printf("Error getting stats for miner %s: %v\n", minerName, err) continue diff --git a/pkg/mining/miner.go b/pkg/mining/miner.go index 51f5e74..2f9be2d 100644 --- a/pkg/mining/miner.go +++ b/pkg/mining/miner.go @@ -191,12 +191,17 @@ func (b *BaseMiner) Stop() error { return nil } +// stdinWriteTimeout is the maximum time to wait for stdin write to complete. +const stdinWriteTimeout = 5 * time.Second + // WriteStdin sends input to the miner's stdin (for console commands). func (b *BaseMiner) WriteStdin(input string) error { b.mu.RLock() - defer b.mu.RUnlock() + stdinPipe := b.stdinPipe + running := b.Running + b.mu.RUnlock() - if !b.Running || b.stdinPipe == nil { + if !running || stdinPipe == nil { return errors.New("miner is not running or stdin not available") } @@ -205,8 +210,19 @@ func (b *BaseMiner) WriteStdin(input string) error { input += "\n" } - _, err := b.stdinPipe.Write([]byte(input)) - return err + // Write with timeout to prevent blocking indefinitely + done := make(chan error, 1) + go func() { + _, err := stdinPipe.Write([]byte(input)) + done <- err + }() + + select { + case err := <-done: + return err + case <-time.After(stdinWriteTimeout): + return errors.New("stdin write timeout: miner may be unresponsive") + } } // Uninstall removes all files related to the miner. diff --git a/pkg/mining/profile_manager.go b/pkg/mining/profile_manager.go index d233659..eb994c5 100644 --- a/pkg/mining/profile_manager.go +++ b/pkg/mining/profile_manager.go @@ -67,6 +67,7 @@ func (pm *ProfileManager) loadProfiles() error { // saveProfiles writes the current profiles from memory to the JSON file. // This is an internal method and assumes the caller holds the appropriate lock. +// Uses atomic write pattern: write to temp file, sync, then rename. func (pm *ProfileManager) saveProfiles() error { profileList := make([]*MiningProfile, 0, len(pm.profiles)) for _, p := range pm.profiles { @@ -78,7 +79,49 @@ func (pm *ProfileManager) saveProfiles() error { return err } - return os.WriteFile(pm.configPath, data, 0600) + // Atomic write: write to temp file in same directory, then rename + dir := filepath.Dir(pm.configPath) + tmpFile, err := os.CreateTemp(dir, "profiles-*.tmp") + if err != nil { + return fmt.Errorf("failed to create temp file: %w", err) + } + tmpPath := tmpFile.Name() + + // Clean up temp file on any error + success := false + defer func() { + if !success { + os.Remove(tmpPath) + } + }() + + if _, err := tmpFile.Write(data); err != nil { + tmpFile.Close() + return fmt.Errorf("failed to write temp file: %w", err) + } + + // Sync to ensure data is flushed to disk before rename + if err := tmpFile.Sync(); err != nil { + tmpFile.Close() + return fmt.Errorf("failed to sync temp file: %w", err) + } + + if err := tmpFile.Close(); err != nil { + return fmt.Errorf("failed to close temp file: %w", err) + } + + // Set permissions before rename + if err := os.Chmod(tmpPath, 0600); err != nil { + return fmt.Errorf("failed to set temp file permissions: %w", err) + } + + // Atomic rename (on POSIX systems) + if err := os.Rename(tmpPath, pm.configPath); err != nil { + return fmt.Errorf("failed to rename temp file: %w", err) + } + + success = true + return nil } // CreateProfile adds a new profile and saves it. diff --git a/pkg/mining/ttminer.go b/pkg/mining/ttminer.go index 19edfc2..f42aba3 100644 --- a/pkg/mining/ttminer.go +++ b/pkg/mining/ttminer.go @@ -151,31 +151,30 @@ func (m *TTMiner) Uninstall() error { } // CheckInstallation verifies if the TT-Miner is installed correctly. +// Thread-safe: properly locks before modifying shared fields. func (m *TTMiner) CheckInstallation() (*InstallationDetails, error) { binaryPath, err := m.findMinerBinary() if err != nil { return &InstallationDetails{IsInstalled: false}, err } - m.MinerBinary = binaryPath - m.Path = filepath.Dir(binaryPath) - - // TT-Miner uses --version to check version + // Run version command before acquiring lock (I/O operation) cmd := exec.Command(binaryPath, "--version") var out bytes.Buffer cmd.Stdout = &out + var version string if err := cmd.Run(); err != nil { - m.Version = "Unknown (could not run executable)" + version = "Unknown (could not run executable)" } else { // Parse version from output output := strings.TrimSpace(out.String()) fields := strings.Fields(output) if len(fields) >= 2 { - m.Version = fields[1] + version = fields[1] } else if len(fields) >= 1 { - m.Version = fields[0] + version = fields[0] } else { - m.Version = "Unknown (could not parse version)" + version = "Unknown (could not parse version)" } } @@ -185,11 +184,18 @@ func (m *TTMiner) CheckInstallation() (*InstallationDetails, error) { configPath = "Error: Could not determine config path" } + // Update shared fields under lock + m.mu.Lock() + m.MinerBinary = binaryPath + m.Path = filepath.Dir(binaryPath) + m.Version = version + m.mu.Unlock() + return &InstallationDetails{ IsInstalled: true, - MinerBinary: m.MinerBinary, - Path: m.Path, - Version: m.Version, + MinerBinary: binaryPath, + Path: filepath.Dir(binaryPath), + Version: version, ConfigPath: configPath, }, nil } diff --git a/pkg/mining/xmrig.go b/pkg/mining/xmrig.go index e9c20f3..7be552b 100644 --- a/pkg/mining/xmrig.go +++ b/pkg/mining/xmrig.go @@ -152,41 +152,52 @@ func (m *XMRigMiner) Uninstall() error { } // CheckInstallation verifies if the XMRig miner is installed correctly. +// Thread-safe: properly locks before modifying shared fields. func (m *XMRigMiner) CheckInstallation() (*InstallationDetails, error) { binaryPath, err := m.findMinerBinary() if err != nil { return &InstallationDetails{IsInstalled: false}, err } - m.MinerBinary = binaryPath - m.Path = filepath.Dir(binaryPath) - + // Run version command before acquiring lock (I/O operation) cmd := exec.Command(binaryPath, "--version") var out bytes.Buffer cmd.Stdout = &out + var version string if err := cmd.Run(); err != nil { - m.Version = "Unknown (could not run executable)" + version = "Unknown (could not run executable)" } else { fields := strings.Fields(out.String()) if len(fields) >= 2 { - m.Version = fields[1] + version = fields[1] } else { - m.Version = "Unknown (could not parse version)" + version = "Unknown (could not parse version)" } } // Get the config path using the helper (use instance name if set) - configPath, err := getXMRigConfigPath(m.Name) + m.mu.RLock() + instanceName := m.Name + m.mu.RUnlock() + + configPath, err := getXMRigConfigPath(instanceName) if err != nil { // Log the error but don't fail CheckInstallation if config path can't be determined configPath = "Error: Could not determine config path" } + // Update shared fields under lock + m.mu.Lock() + m.MinerBinary = binaryPath + m.Path = filepath.Dir(binaryPath) + m.Version = version + m.mu.Unlock() + return &InstallationDetails{ IsInstalled: true, - MinerBinary: m.MinerBinary, - Path: m.Path, - Version: m.Version, - ConfigPath: configPath, // Include the config path + MinerBinary: binaryPath, + Path: filepath.Dir(binaryPath), + Version: version, + ConfigPath: configPath, }, nil } diff --git a/pkg/mining/xmrig_start.go b/pkg/mining/xmrig_start.go index e757e9c..165cc10 100644 --- a/pkg/mining/xmrig_start.go +++ b/pkg/mining/xmrig_start.go @@ -83,7 +83,11 @@ func (m *XMRigMiner) Start(config *Config) error { if err := m.cmd.Start(); err != nil { stdinPipe.Close() - return err + // Clean up config file on failed start + if m.ConfigPath != "" { + os.Remove(m.ConfigPath) + } + return fmt.Errorf("failed to start miner: %w", err) } m.Running = true diff --git a/pkg/node/peer.go b/pkg/node/peer.go index dbcf5d1..08ed927 100644 --- a/pkg/node/peer.go +++ b/pkg/node/peer.go @@ -3,6 +3,7 @@ package node import ( "encoding/json" "fmt" + "log" "os" "path/filepath" "sync" @@ -32,12 +33,22 @@ type Peer struct { Connected bool `json:"-"` } +// saveDebounceInterval is the minimum time between disk writes. +const saveDebounceInterval = 5 * time.Second + // PeerRegistry manages known peers with KD-tree based selection. type PeerRegistry struct { peers map[string]*Peer kdTree *poindexter.KDTree[string] // KD-tree with peer ID as payload path string mu sync.RWMutex + + // Debounce disk writes + dirty bool // Whether there are unsaved changes + saveTimer *time.Timer // Timer for debounced save + saveMu sync.Mutex // Protects dirty and saveTimer + stopChan chan struct{} // Signal to stop background save + saveStopOnce sync.Once // Ensure stopChan is closed only once } // Dimension weights for peer selection @@ -63,8 +74,9 @@ func NewPeerRegistry() (*PeerRegistry, error) { // This is primarily useful for testing to avoid xdg path caching issues. func NewPeerRegistryWithPath(peersPath string) (*PeerRegistry, error) { pr := &PeerRegistry{ - peers: make(map[string]*Peer), - path: peersPath, + peers: make(map[string]*Peer), + path: peersPath, + stopChan: make(chan struct{}), } // Try to load existing peers @@ -81,13 +93,14 @@ func NewPeerRegistryWithPath(peersPath string) (*PeerRegistry, error) { // AddPeer adds a new peer to the registry. func (r *PeerRegistry) AddPeer(peer *Peer) error { r.mu.Lock() - defer r.mu.Unlock() if peer.ID == "" { + r.mu.Unlock() return fmt.Errorf("peer ID is required") } if _, exists := r.peers[peer.ID]; exists { + r.mu.Unlock() return fmt.Errorf("peer %s already exists", peer.ID) } @@ -101,6 +114,7 @@ func (r *PeerRegistry) AddPeer(peer *Peer) error { r.peers[peer.ID] = peer r.rebuildKDTree() + r.mu.Unlock() return r.save() } @@ -108,14 +122,15 @@ func (r *PeerRegistry) AddPeer(peer *Peer) error { // UpdatePeer updates an existing peer's information. func (r *PeerRegistry) UpdatePeer(peer *Peer) error { r.mu.Lock() - defer r.mu.Unlock() if _, exists := r.peers[peer.ID]; !exists { + r.mu.Unlock() return fmt.Errorf("peer %s not found", peer.ID) } r.peers[peer.ID] = peer r.rebuildKDTree() + r.mu.Unlock() return r.save() } @@ -123,14 +138,15 @@ func (r *PeerRegistry) UpdatePeer(peer *Peer) error { // RemovePeer removes a peer from the registry. func (r *PeerRegistry) RemovePeer(id string) error { r.mu.Lock() - defer r.mu.Unlock() if _, exists := r.peers[id]; !exists { + r.mu.Unlock() return fmt.Errorf("peer %s not found", id) } delete(r.peers, id) r.rebuildKDTree() + r.mu.Unlock() return r.save() } @@ -166,10 +182,10 @@ func (r *PeerRegistry) ListPeers() []*Peer { // UpdateMetrics updates a peer's performance metrics. func (r *PeerRegistry) UpdateMetrics(id string, pingMS, geoKM float64, hops int) error { r.mu.Lock() - defer r.mu.Unlock() peer, exists := r.peers[id] if !exists { + r.mu.Unlock() return fmt.Errorf("peer %s not found", id) } @@ -179,6 +195,7 @@ func (r *PeerRegistry) UpdateMetrics(id string, pingMS, geoKM float64, hops int) peer.LastSeen = time.Now() r.rebuildKDTree() + r.mu.Unlock() return r.save() } @@ -186,10 +203,10 @@ func (r *PeerRegistry) UpdateMetrics(id string, pingMS, geoKM float64, hops int) // UpdateScore updates a peer's reliability score. func (r *PeerRegistry) UpdateScore(id string, score float64) error { r.mu.Lock() - defer r.mu.Unlock() peer, exists := r.peers[id] if !exists { + r.mu.Unlock() return fmt.Errorf("peer %s not found", id) } @@ -202,6 +219,7 @@ func (r *PeerRegistry) UpdateScore(id string, score float64) error { peer.Score = score r.rebuildKDTree() + r.mu.Unlock() return r.save() } @@ -329,8 +347,43 @@ func (r *PeerRegistry) rebuildKDTree() { r.kdTree = tree } -// save persists peers to disk. -func (r *PeerRegistry) save() error { +// scheduleSave schedules a debounced save operation. +// Multiple calls within saveDebounceInterval will be coalesced into a single save. +// Must NOT be called with r.mu held. +func (r *PeerRegistry) scheduleSave() { + r.saveMu.Lock() + defer r.saveMu.Unlock() + + r.dirty = true + + // If timer already running, let it handle the save + if r.saveTimer != nil { + return + } + + // Start a new timer + r.saveTimer = time.AfterFunc(saveDebounceInterval, func() { + r.saveMu.Lock() + r.saveTimer = nil + shouldSave := r.dirty + r.dirty = false + r.saveMu.Unlock() + + if shouldSave { + r.mu.RLock() + err := r.saveNow() + r.mu.RUnlock() + if err != nil { + // Log error but continue - best effort persistence + log.Printf("Warning: failed to save peer registry: %v", err) + } + } + }) +} + +// saveNow persists peers to disk immediately. +// Must be called with r.mu held (at least RLock). +func (r *PeerRegistry) saveNow() error { // Ensure directory exists dir := filepath.Dir(r.path) if err := os.MkdirAll(dir, 0755); err != nil { @@ -348,13 +401,54 @@ func (r *PeerRegistry) save() error { return fmt.Errorf("failed to marshal peers: %w", err) } - if err := os.WriteFile(r.path, data, 0644); err != nil { - return fmt.Errorf("failed to write peers: %w", err) + // Use atomic write pattern: write to temp file, then rename + tmpPath := r.path + ".tmp" + if err := os.WriteFile(tmpPath, data, 0644); err != nil { + return fmt.Errorf("failed to write peers temp file: %w", err) + } + + if err := os.Rename(tmpPath, r.path); err != nil { + os.Remove(tmpPath) // Clean up temp file + return fmt.Errorf("failed to rename peers file: %w", err) } return nil } +// Close flushes any pending changes and releases resources. +func (r *PeerRegistry) Close() error { + r.saveStopOnce.Do(func() { + close(r.stopChan) + }) + + // Cancel pending timer and save immediately if dirty + r.saveMu.Lock() + if r.saveTimer != nil { + r.saveTimer.Stop() + r.saveTimer = nil + } + shouldSave := r.dirty + r.dirty = false + r.saveMu.Unlock() + + if shouldSave { + r.mu.RLock() + err := r.saveNow() + r.mu.RUnlock() + return err + } + + return nil +} + +// save is a helper that schedules a debounced save. +// Kept for backward compatibility but now debounces writes. +// Must NOT be called with r.mu held. +func (r *PeerRegistry) save() error { + r.scheduleSave() + return nil // Errors will be logged asynchronously +} + // load reads peers from disk. func (r *PeerRegistry) load() error { data, err := os.ReadFile(r.path) diff --git a/pkg/node/peer_test.go b/pkg/node/peer_test.go index d3a154f..a4dd871 100644 --- a/pkg/node/peer_test.go +++ b/pkg/node/peer_test.go @@ -345,6 +345,11 @@ func TestPeerRegistry_Persistence(t *testing.T) { pr1.AddPeer(peer) + // Flush pending changes before reloading + if err := pr1.Close(); err != nil { + t.Fatalf("failed to close first registry: %v", err) + } + // Load in new registry from same path pr2, err := NewPeerRegistryWithPath(peersPath) if err != nil { diff --git a/pkg/node/transport.go b/pkg/node/transport.go index 5337c0f..6e47b5f 100644 --- a/pkg/node/transport.go +++ b/pkg/node/transport.go @@ -266,6 +266,10 @@ func (t *Transport) handleWSUpgrade(w http.ResponseWriter, r *http.Request) { return } + // Set handshake timeout to prevent slow/malicious clients from blocking + handshakeTimeout := 10 * time.Second + conn.SetReadDeadline(time.Now().Add(handshakeTimeout)) + // Wait for handshake from client _, data, err := conn.ReadMessage() if err != nil { @@ -365,6 +369,16 @@ func (t *Transport) handleWSUpgrade(w http.ResponseWriter, r *http.Request) { // performHandshake initiates handshake with a peer. func (t *Transport) performHandshake(pc *PeerConnection) error { + // Set handshake timeout + handshakeTimeout := 10 * time.Second + pc.Conn.SetWriteDeadline(time.Now().Add(handshakeTimeout)) + pc.Conn.SetReadDeadline(time.Now().Add(handshakeTimeout)) + defer func() { + // Reset deadlines after handshake + pc.Conn.SetWriteDeadline(time.Time{}) + pc.Conn.SetReadDeadline(time.Time{}) + }() + identity := t.node.GetIdentity() payload := HandshakePayload{