fix: Add timeouts, atomic writes, and thread safety improvements

- Add 30s context timeout for database transactions in hashrate.go
- Add helper function for parsing SQLite timestamps with error logging
- Implement atomic file writes (temp + rename) for profile_manager.go,
  config_manager.go, and peer.go to prevent corruption on crash
- Add 5s timeout for stats collection per miner in manager.go
- Add 5s timeout for stdin writes in miner.go
- Clean up config file on failed miner start in xmrig_start.go
- Implement debounced saves (5s) for peer registry to reduce disk I/O
- Fix CheckInstallation data race in xmrig.go and ttminer.go by adding
  proper mutex protection around shared field updates
- Add 10s handshake timeout for WebSocket connections in transport.go
- Update peer_test.go to call Close() before reload to flush changes

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
snider 2025-12-31 02:55:30 +00:00
parent 74bbf14de4
commit b24a3f00d6
11 changed files with 324 additions and 64 deletions

View file

@ -1,10 +1,41 @@
package database
import (
"context"
"fmt"
"log"
"time"
)
// dbOperationTimeout is the maximum time for database operations
const dbOperationTimeout = 30 * time.Second
// parseSQLiteTimestamp parses timestamp strings from SQLite which may use various formats.
// Logs a warning if parsing fails and returns zero time.
func parseSQLiteTimestamp(s string) time.Time {
if s == "" {
return time.Time{}
}
// Try common SQLite timestamp formats
formats := []string{
"2006-01-02 15:04:05.999999999-07:00",
time.RFC3339Nano,
time.RFC3339,
"2006-01-02 15:04:05",
"2006-01-02T15:04:05Z",
}
for _, format := range formats {
if t, err := time.Parse(format, s); err == nil {
return t
}
}
log.Printf("Warning: failed to parse timestamp '%s' from database", s)
return time.Time{}
}
// Resolution indicates the data resolution type
type Resolution string
@ -49,13 +80,17 @@ func InsertHashratePoints(minerName, minerType string, points []HashratePoint, r
return nil
}
tx, err := db.Begin()
// Use context with timeout to prevent hanging on locked database
ctx, cancel := context.WithTimeout(context.Background(), dbOperationTimeout)
defer cancel()
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.Rollback()
stmt, err := tx.Prepare(`
stmt, err := tx.PrepareContext(ctx, `
INSERT INTO hashrate_history (miner_name, miner_type, timestamp, hashrate, resolution)
VALUES (?, ?, ?, ?, ?)
`)
@ -65,7 +100,7 @@ func InsertHashratePoints(minerName, minerType string, points []HashratePoint, r
defer stmt.Close()
for _, point := range points {
_, err := stmt.Exec(minerName, minerType, point.Timestamp, point.Hashrate, string(resolution))
_, err := stmt.ExecContext(ctx, minerName, minerType, point.Timestamp, point.Hashrate, string(resolution))
if err != nil {
return fmt.Errorf("failed to insert point: %w", err)
}
@ -235,15 +270,9 @@ func GetHashrateStats(minerName string) (*HashrateStats, error) {
return nil, err
}
// Parse timestamps
stats.FirstSeen, _ = time.Parse("2006-01-02 15:04:05.999999999-07:00", firstSeenStr)
if stats.FirstSeen.IsZero() {
stats.FirstSeen, _ = time.Parse(time.RFC3339Nano, firstSeenStr)
}
stats.LastSeen, _ = time.Parse("2006-01-02 15:04:05.999999999-07:00", lastSeenStr)
if stats.LastSeen.IsZero() {
stats.LastSeen, _ = time.Parse(time.RFC3339Nano, lastSeenStr)
}
// Parse timestamps using helper that logs errors
stats.FirstSeen = parseSQLiteTimestamp(firstSeenStr)
stats.LastSeen = parseSQLiteTimestamp(lastSeenStr)
return &stats, nil
}
@ -290,15 +319,9 @@ func GetAllMinerStats() ([]HashrateStats, error) {
); err != nil {
return nil, err
}
// Parse timestamps
stats.FirstSeen, _ = time.Parse("2006-01-02 15:04:05.999999999-07:00", firstSeenStr)
if stats.FirstSeen.IsZero() {
stats.FirstSeen, _ = time.Parse(time.RFC3339Nano, firstSeenStr)
}
stats.LastSeen, _ = time.Parse("2006-01-02 15:04:05.999999999-07:00", lastSeenStr)
if stats.LastSeen.IsZero() {
stats.LastSeen, _ = time.Parse(time.RFC3339Nano, lastSeenStr)
}
// Parse timestamps using helper that logs errors
stats.FirstSeen = parseSQLiteTimestamp(firstSeenStr)
stats.LastSeen = parseSQLiteTimestamp(lastSeenStr)
allStats = append(allStats, stats)
}

View file

@ -83,6 +83,7 @@ func LoadMinersConfig() (*MinersConfig, error) {
}
// SaveMinersConfig saves the miners configuration to the file system.
// Uses atomic write pattern: write to temp file, then rename.
func SaveMinersConfig(cfg *MinersConfig) error {
configMu.Lock()
defer configMu.Unlock()
@ -92,7 +93,8 @@ func SaveMinersConfig(cfg *MinersConfig) error {
return fmt.Errorf("could not determine miners config path: %w", err)
}
if err := os.MkdirAll(filepath.Dir(configPath), 0755); err != nil {
dir := filepath.Dir(configPath)
if err := os.MkdirAll(dir, 0755); err != nil {
return fmt.Errorf("failed to create config directory: %w", err)
}
@ -101,8 +103,43 @@ func SaveMinersConfig(cfg *MinersConfig) error {
return fmt.Errorf("failed to marshal miners config: %w", err)
}
if err := os.WriteFile(configPath, data, 0600); err != nil {
return fmt.Errorf("failed to write miners config file: %w", err)
// Atomic write: write to temp file, then rename
tmpFile, err := os.CreateTemp(dir, "miners-config-*.tmp")
if err != nil {
return fmt.Errorf("failed to create temp file: %w", err)
}
tmpPath := tmpFile.Name()
// Clean up temp file on error
success := false
defer func() {
if !success {
os.Remove(tmpPath)
}
}()
if _, err := tmpFile.Write(data); err != nil {
tmpFile.Close()
return fmt.Errorf("failed to write temp file: %w", err)
}
if err := tmpFile.Sync(); err != nil {
tmpFile.Close()
return fmt.Errorf("failed to sync temp file: %w", err)
}
if err := tmpFile.Close(); err != nil {
return fmt.Errorf("failed to close temp file: %w", err)
}
if err := os.Chmod(tmpPath, 0600); err != nil {
return fmt.Errorf("failed to set temp file permissions: %w", err)
}
if err := os.Rename(tmpPath, configPath); err != nil {
return fmt.Errorf("failed to rename temp file: %w", err)
}
success = true
return nil
}

View file

@ -435,6 +435,9 @@ func (m *Manager) startStatsCollection() {
}()
}
// statsCollectionTimeout is the maximum time to wait for stats from a single miner.
const statsCollectionTimeout = 5 * time.Second
// collectMinerStats iterates through active miners and collects their stats.
func (m *Manager) collectMinerStats() {
m.mu.RLock()
@ -465,7 +468,11 @@ func (m *Manager) collectMinerStats() {
continue // Miner was removed, skip it
}
stats, err := miner.GetStats(context.Background())
// Use context with timeout to prevent hanging on unresponsive miner APIs
ctx, cancel := context.WithTimeout(context.Background(), statsCollectionTimeout)
stats, err := miner.GetStats(ctx)
cancel() // Release context resources immediately
if err != nil {
log.Printf("Error getting stats for miner %s: %v\n", minerName, err)
continue

View file

@ -191,12 +191,17 @@ func (b *BaseMiner) Stop() error {
return nil
}
// stdinWriteTimeout is the maximum time to wait for stdin write to complete.
const stdinWriteTimeout = 5 * time.Second
// WriteStdin sends input to the miner's stdin (for console commands).
func (b *BaseMiner) WriteStdin(input string) error {
b.mu.RLock()
defer b.mu.RUnlock()
stdinPipe := b.stdinPipe
running := b.Running
b.mu.RUnlock()
if !b.Running || b.stdinPipe == nil {
if !running || stdinPipe == nil {
return errors.New("miner is not running or stdin not available")
}
@ -205,8 +210,19 @@ func (b *BaseMiner) WriteStdin(input string) error {
input += "\n"
}
_, err := b.stdinPipe.Write([]byte(input))
return err
// Write with timeout to prevent blocking indefinitely
done := make(chan error, 1)
go func() {
_, err := stdinPipe.Write([]byte(input))
done <- err
}()
select {
case err := <-done:
return err
case <-time.After(stdinWriteTimeout):
return errors.New("stdin write timeout: miner may be unresponsive")
}
}
// Uninstall removes all files related to the miner.

View file

@ -67,6 +67,7 @@ func (pm *ProfileManager) loadProfiles() error {
// saveProfiles writes the current profiles from memory to the JSON file.
// This is an internal method and assumes the caller holds the appropriate lock.
// Uses atomic write pattern: write to temp file, sync, then rename.
func (pm *ProfileManager) saveProfiles() error {
profileList := make([]*MiningProfile, 0, len(pm.profiles))
for _, p := range pm.profiles {
@ -78,7 +79,49 @@ func (pm *ProfileManager) saveProfiles() error {
return err
}
return os.WriteFile(pm.configPath, data, 0600)
// Atomic write: write to temp file in same directory, then rename
dir := filepath.Dir(pm.configPath)
tmpFile, err := os.CreateTemp(dir, "profiles-*.tmp")
if err != nil {
return fmt.Errorf("failed to create temp file: %w", err)
}
tmpPath := tmpFile.Name()
// Clean up temp file on any error
success := false
defer func() {
if !success {
os.Remove(tmpPath)
}
}()
if _, err := tmpFile.Write(data); err != nil {
tmpFile.Close()
return fmt.Errorf("failed to write temp file: %w", err)
}
// Sync to ensure data is flushed to disk before rename
if err := tmpFile.Sync(); err != nil {
tmpFile.Close()
return fmt.Errorf("failed to sync temp file: %w", err)
}
if err := tmpFile.Close(); err != nil {
return fmt.Errorf("failed to close temp file: %w", err)
}
// Set permissions before rename
if err := os.Chmod(tmpPath, 0600); err != nil {
return fmt.Errorf("failed to set temp file permissions: %w", err)
}
// Atomic rename (on POSIX systems)
if err := os.Rename(tmpPath, pm.configPath); err != nil {
return fmt.Errorf("failed to rename temp file: %w", err)
}
success = true
return nil
}
// CreateProfile adds a new profile and saves it.

View file

@ -151,31 +151,30 @@ func (m *TTMiner) Uninstall() error {
}
// CheckInstallation verifies if the TT-Miner is installed correctly.
// Thread-safe: properly locks before modifying shared fields.
func (m *TTMiner) CheckInstallation() (*InstallationDetails, error) {
binaryPath, err := m.findMinerBinary()
if err != nil {
return &InstallationDetails{IsInstalled: false}, err
}
m.MinerBinary = binaryPath
m.Path = filepath.Dir(binaryPath)
// TT-Miner uses --version to check version
// Run version command before acquiring lock (I/O operation)
cmd := exec.Command(binaryPath, "--version")
var out bytes.Buffer
cmd.Stdout = &out
var version string
if err := cmd.Run(); err != nil {
m.Version = "Unknown (could not run executable)"
version = "Unknown (could not run executable)"
} else {
// Parse version from output
output := strings.TrimSpace(out.String())
fields := strings.Fields(output)
if len(fields) >= 2 {
m.Version = fields[1]
version = fields[1]
} else if len(fields) >= 1 {
m.Version = fields[0]
version = fields[0]
} else {
m.Version = "Unknown (could not parse version)"
version = "Unknown (could not parse version)"
}
}
@ -185,11 +184,18 @@ func (m *TTMiner) CheckInstallation() (*InstallationDetails, error) {
configPath = "Error: Could not determine config path"
}
// Update shared fields under lock
m.mu.Lock()
m.MinerBinary = binaryPath
m.Path = filepath.Dir(binaryPath)
m.Version = version
m.mu.Unlock()
return &InstallationDetails{
IsInstalled: true,
MinerBinary: m.MinerBinary,
Path: m.Path,
Version: m.Version,
MinerBinary: binaryPath,
Path: filepath.Dir(binaryPath),
Version: version,
ConfigPath: configPath,
}, nil
}

View file

@ -152,41 +152,52 @@ func (m *XMRigMiner) Uninstall() error {
}
// CheckInstallation verifies if the XMRig miner is installed correctly.
// Thread-safe: properly locks before modifying shared fields.
func (m *XMRigMiner) CheckInstallation() (*InstallationDetails, error) {
binaryPath, err := m.findMinerBinary()
if err != nil {
return &InstallationDetails{IsInstalled: false}, err
}
m.MinerBinary = binaryPath
m.Path = filepath.Dir(binaryPath)
// Run version command before acquiring lock (I/O operation)
cmd := exec.Command(binaryPath, "--version")
var out bytes.Buffer
cmd.Stdout = &out
var version string
if err := cmd.Run(); err != nil {
m.Version = "Unknown (could not run executable)"
version = "Unknown (could not run executable)"
} else {
fields := strings.Fields(out.String())
if len(fields) >= 2 {
m.Version = fields[1]
version = fields[1]
} else {
m.Version = "Unknown (could not parse version)"
version = "Unknown (could not parse version)"
}
}
// Get the config path using the helper (use instance name if set)
configPath, err := getXMRigConfigPath(m.Name)
m.mu.RLock()
instanceName := m.Name
m.mu.RUnlock()
configPath, err := getXMRigConfigPath(instanceName)
if err != nil {
// Log the error but don't fail CheckInstallation if config path can't be determined
configPath = "Error: Could not determine config path"
}
// Update shared fields under lock
m.mu.Lock()
m.MinerBinary = binaryPath
m.Path = filepath.Dir(binaryPath)
m.Version = version
m.mu.Unlock()
return &InstallationDetails{
IsInstalled: true,
MinerBinary: m.MinerBinary,
Path: m.Path,
Version: m.Version,
ConfigPath: configPath, // Include the config path
MinerBinary: binaryPath,
Path: filepath.Dir(binaryPath),
Version: version,
ConfigPath: configPath,
}, nil
}

View file

@ -83,7 +83,11 @@ func (m *XMRigMiner) Start(config *Config) error {
if err := m.cmd.Start(); err != nil {
stdinPipe.Close()
return err
// Clean up config file on failed start
if m.ConfigPath != "" {
os.Remove(m.ConfigPath)
}
return fmt.Errorf("failed to start miner: %w", err)
}
m.Running = true

View file

@ -3,6 +3,7 @@ package node
import (
"encoding/json"
"fmt"
"log"
"os"
"path/filepath"
"sync"
@ -32,12 +33,22 @@ type Peer struct {
Connected bool `json:"-"`
}
// saveDebounceInterval is the minimum time between disk writes.
const saveDebounceInterval = 5 * time.Second
// PeerRegistry manages known peers with KD-tree based selection.
type PeerRegistry struct {
peers map[string]*Peer
kdTree *poindexter.KDTree[string] // KD-tree with peer ID as payload
path string
mu sync.RWMutex
// Debounce disk writes
dirty bool // Whether there are unsaved changes
saveTimer *time.Timer // Timer for debounced save
saveMu sync.Mutex // Protects dirty and saveTimer
stopChan chan struct{} // Signal to stop background save
saveStopOnce sync.Once // Ensure stopChan is closed only once
}
// Dimension weights for peer selection
@ -63,8 +74,9 @@ func NewPeerRegistry() (*PeerRegistry, error) {
// This is primarily useful for testing to avoid xdg path caching issues.
func NewPeerRegistryWithPath(peersPath string) (*PeerRegistry, error) {
pr := &PeerRegistry{
peers: make(map[string]*Peer),
path: peersPath,
peers: make(map[string]*Peer),
path: peersPath,
stopChan: make(chan struct{}),
}
// Try to load existing peers
@ -81,13 +93,14 @@ func NewPeerRegistryWithPath(peersPath string) (*PeerRegistry, error) {
// AddPeer adds a new peer to the registry.
func (r *PeerRegistry) AddPeer(peer *Peer) error {
r.mu.Lock()
defer r.mu.Unlock()
if peer.ID == "" {
r.mu.Unlock()
return fmt.Errorf("peer ID is required")
}
if _, exists := r.peers[peer.ID]; exists {
r.mu.Unlock()
return fmt.Errorf("peer %s already exists", peer.ID)
}
@ -101,6 +114,7 @@ func (r *PeerRegistry) AddPeer(peer *Peer) error {
r.peers[peer.ID] = peer
r.rebuildKDTree()
r.mu.Unlock()
return r.save()
}
@ -108,14 +122,15 @@ func (r *PeerRegistry) AddPeer(peer *Peer) error {
// UpdatePeer updates an existing peer's information.
func (r *PeerRegistry) UpdatePeer(peer *Peer) error {
r.mu.Lock()
defer r.mu.Unlock()
if _, exists := r.peers[peer.ID]; !exists {
r.mu.Unlock()
return fmt.Errorf("peer %s not found", peer.ID)
}
r.peers[peer.ID] = peer
r.rebuildKDTree()
r.mu.Unlock()
return r.save()
}
@ -123,14 +138,15 @@ func (r *PeerRegistry) UpdatePeer(peer *Peer) error {
// RemovePeer removes a peer from the registry.
func (r *PeerRegistry) RemovePeer(id string) error {
r.mu.Lock()
defer r.mu.Unlock()
if _, exists := r.peers[id]; !exists {
r.mu.Unlock()
return fmt.Errorf("peer %s not found", id)
}
delete(r.peers, id)
r.rebuildKDTree()
r.mu.Unlock()
return r.save()
}
@ -166,10 +182,10 @@ func (r *PeerRegistry) ListPeers() []*Peer {
// UpdateMetrics updates a peer's performance metrics.
func (r *PeerRegistry) UpdateMetrics(id string, pingMS, geoKM float64, hops int) error {
r.mu.Lock()
defer r.mu.Unlock()
peer, exists := r.peers[id]
if !exists {
r.mu.Unlock()
return fmt.Errorf("peer %s not found", id)
}
@ -179,6 +195,7 @@ func (r *PeerRegistry) UpdateMetrics(id string, pingMS, geoKM float64, hops int)
peer.LastSeen = time.Now()
r.rebuildKDTree()
r.mu.Unlock()
return r.save()
}
@ -186,10 +203,10 @@ func (r *PeerRegistry) UpdateMetrics(id string, pingMS, geoKM float64, hops int)
// UpdateScore updates a peer's reliability score.
func (r *PeerRegistry) UpdateScore(id string, score float64) error {
r.mu.Lock()
defer r.mu.Unlock()
peer, exists := r.peers[id]
if !exists {
r.mu.Unlock()
return fmt.Errorf("peer %s not found", id)
}
@ -202,6 +219,7 @@ func (r *PeerRegistry) UpdateScore(id string, score float64) error {
peer.Score = score
r.rebuildKDTree()
r.mu.Unlock()
return r.save()
}
@ -329,8 +347,43 @@ func (r *PeerRegistry) rebuildKDTree() {
r.kdTree = tree
}
// save persists peers to disk.
func (r *PeerRegistry) save() error {
// scheduleSave schedules a debounced save operation.
// Multiple calls within saveDebounceInterval will be coalesced into a single save.
// Must NOT be called with r.mu held.
func (r *PeerRegistry) scheduleSave() {
r.saveMu.Lock()
defer r.saveMu.Unlock()
r.dirty = true
// If timer already running, let it handle the save
if r.saveTimer != nil {
return
}
// Start a new timer
r.saveTimer = time.AfterFunc(saveDebounceInterval, func() {
r.saveMu.Lock()
r.saveTimer = nil
shouldSave := r.dirty
r.dirty = false
r.saveMu.Unlock()
if shouldSave {
r.mu.RLock()
err := r.saveNow()
r.mu.RUnlock()
if err != nil {
// Log error but continue - best effort persistence
log.Printf("Warning: failed to save peer registry: %v", err)
}
}
})
}
// saveNow persists peers to disk immediately.
// Must be called with r.mu held (at least RLock).
func (r *PeerRegistry) saveNow() error {
// Ensure directory exists
dir := filepath.Dir(r.path)
if err := os.MkdirAll(dir, 0755); err != nil {
@ -348,13 +401,54 @@ func (r *PeerRegistry) save() error {
return fmt.Errorf("failed to marshal peers: %w", err)
}
if err := os.WriteFile(r.path, data, 0644); err != nil {
return fmt.Errorf("failed to write peers: %w", err)
// Use atomic write pattern: write to temp file, then rename
tmpPath := r.path + ".tmp"
if err := os.WriteFile(tmpPath, data, 0644); err != nil {
return fmt.Errorf("failed to write peers temp file: %w", err)
}
if err := os.Rename(tmpPath, r.path); err != nil {
os.Remove(tmpPath) // Clean up temp file
return fmt.Errorf("failed to rename peers file: %w", err)
}
return nil
}
// Close flushes any pending changes and releases resources.
func (r *PeerRegistry) Close() error {
r.saveStopOnce.Do(func() {
close(r.stopChan)
})
// Cancel pending timer and save immediately if dirty
r.saveMu.Lock()
if r.saveTimer != nil {
r.saveTimer.Stop()
r.saveTimer = nil
}
shouldSave := r.dirty
r.dirty = false
r.saveMu.Unlock()
if shouldSave {
r.mu.RLock()
err := r.saveNow()
r.mu.RUnlock()
return err
}
return nil
}
// save is a helper that schedules a debounced save.
// Kept for backward compatibility but now debounces writes.
// Must NOT be called with r.mu held.
func (r *PeerRegistry) save() error {
r.scheduleSave()
return nil // Errors will be logged asynchronously
}
// load reads peers from disk.
func (r *PeerRegistry) load() error {
data, err := os.ReadFile(r.path)

View file

@ -345,6 +345,11 @@ func TestPeerRegistry_Persistence(t *testing.T) {
pr1.AddPeer(peer)
// Flush pending changes before reloading
if err := pr1.Close(); err != nil {
t.Fatalf("failed to close first registry: %v", err)
}
// Load in new registry from same path
pr2, err := NewPeerRegistryWithPath(peersPath)
if err != nil {

View file

@ -266,6 +266,10 @@ func (t *Transport) handleWSUpgrade(w http.ResponseWriter, r *http.Request) {
return
}
// Set handshake timeout to prevent slow/malicious clients from blocking
handshakeTimeout := 10 * time.Second
conn.SetReadDeadline(time.Now().Add(handshakeTimeout))
// Wait for handshake from client
_, data, err := conn.ReadMessage()
if err != nil {
@ -365,6 +369,16 @@ func (t *Transport) handleWSUpgrade(w http.ResponseWriter, r *http.Request) {
// performHandshake initiates handshake with a peer.
func (t *Transport) performHandshake(pc *PeerConnection) error {
// Set handshake timeout
handshakeTimeout := 10 * time.Second
pc.Conn.SetWriteDeadline(time.Now().Add(handshakeTimeout))
pc.Conn.SetReadDeadline(time.Now().Add(handshakeTimeout))
defer func() {
// Reset deadlines after handshake
pc.Conn.SetWriteDeadline(time.Time{})
pc.Conn.SetReadDeadline(time.Time{})
}()
identity := t.node.GetIdentity()
payload := HandshakePayload{