diff --git a/pkg/database/database_test.go b/pkg/database/database_test.go index 2bb3f7c..1068d57 100644 --- a/pkg/database/database_test.go +++ b/pkg/database/database_test.go @@ -353,3 +353,145 @@ func TestIsInitialized(t *testing.T) { t.Error("Should not be initialized after Close()") } } + +func TestSchemaCreation(t *testing.T) { + cleanup := setupTestDB(t) + defer cleanup() + + // Verify tables exist by querying sqlite_master + dbMu.RLock() + defer dbMu.RUnlock() + + // Check hashrate_history table + var tableName string + err := db.QueryRow("SELECT name FROM sqlite_master WHERE type='table' AND name='hashrate_history'").Scan(&tableName) + if err != nil { + t.Errorf("hashrate_history table should exist: %v", err) + } + + // Check miner_sessions table + err = db.QueryRow("SELECT name FROM sqlite_master WHERE type='table' AND name='miner_sessions'").Scan(&tableName) + if err != nil { + t.Errorf("miner_sessions table should exist: %v", err) + } + + // Verify indexes exist + var indexName string + err = db.QueryRow("SELECT name FROM sqlite_master WHERE type='index' AND name='idx_hashrate_miner_time'").Scan(&indexName) + if err != nil { + t.Errorf("idx_hashrate_miner_time index should exist: %v", err) + } + + err = db.QueryRow("SELECT name FROM sqlite_master WHERE type='index' AND name='idx_sessions_miner'").Scan(&indexName) + if err != nil { + t.Errorf("idx_sessions_miner index should exist: %v", err) + } +} + +func TestReInitializeExistingDB(t *testing.T) { + tmpDir := t.TempDir() + dbPath := filepath.Join(tmpDir, "reinit_test.db") + + cfg := Config{ + Enabled: true, + Path: dbPath, + RetentionDays: 7, + } + + // First initialization + if err := Initialize(cfg); err != nil { + t.Fatalf("First initialization failed: %v", err) + } + + // Insert some data + minerName := "reinit-test-miner" + point := HashratePoint{ + Timestamp: time.Now(), + Hashrate: 1234, + } + if err := InsertHashratePoint(nil, minerName, "xmrig", point, ResolutionHigh); err != nil { + t.Fatalf("Failed to insert point: %v", err) + } + + // Close and re-initialize (simulates app restart) + if err := Close(); err != nil { + t.Fatalf("Close failed: %v", err) + } + + // Re-initialize with same path + if err := Initialize(cfg); err != nil { + t.Fatalf("Re-initialization failed: %v", err) + } + defer func() { + Close() + os.Remove(dbPath) + }() + + // Verify data persisted + history, err := GetHashrateHistory(minerName, ResolutionHigh, time.Now().Add(-time.Hour), time.Now().Add(time.Hour)) + if err != nil { + t.Fatalf("Failed to get history after reinit: %v", err) + } + + if len(history) != 1 { + t.Errorf("Expected 1 point after reinit, got %d", len(history)) + } + + if len(history) > 0 && history[0].Hashrate != 1234 { + t.Errorf("Expected hashrate 1234, got %d", history[0].Hashrate) + } +} + +func TestConcurrentDatabaseAccess(t *testing.T) { + cleanup := setupTestDB(t) + defer cleanup() + + const numGoroutines = 10 + const numOpsPerGoroutine = 20 + + done := make(chan bool, numGoroutines) + errors := make(chan error, numGoroutines*numOpsPerGoroutine) + + now := time.Now() + + // Launch multiple goroutines doing concurrent reads/writes + for i := 0; i < numGoroutines; i++ { + go func(id int) { + minerName := "concurrent-miner-" + string(rune('A'+id)) + for j := 0; j < numOpsPerGoroutine; j++ { + // Write + point := HashratePoint{ + Timestamp: now.Add(time.Duration(-j) * time.Second), + Hashrate: 1000 + j, + } + if err := InsertHashratePoint(nil, minerName, "xmrig", point, ResolutionHigh); err != nil { + errors <- err + } + + // Read + _, err := GetHashrateHistory(minerName, ResolutionHigh, now.Add(-time.Hour), now) + if err != nil { + errors <- err + } + } + done <- true + }(i) + } + + // Wait for all goroutines + for i := 0; i < numGoroutines; i++ { + <-done + } + close(errors) + + // Check for errors + var errCount int + for err := range errors { + t.Errorf("Concurrent access error: %v", err) + errCount++ + } + + if errCount > 0 { + t.Errorf("Got %d errors during concurrent access", errCount) + } +} diff --git a/pkg/mining/supervisor.go b/pkg/mining/supervisor.go new file mode 100644 index 0000000..d3f425d --- /dev/null +++ b/pkg/mining/supervisor.go @@ -0,0 +1,203 @@ +package mining + +import ( + "context" + "sync" + "time" + + "github.com/Snider/Mining/pkg/logging" +) + +// TaskFunc is a function that can be supervised. +type TaskFunc func(ctx context.Context) + +// SupervisedTask represents a background task with restart capability. +type SupervisedTask struct { + name string + task TaskFunc + restartDelay time.Duration + maxRestarts int + restartCount int + running bool + lastStartTime time.Time + cancel context.CancelFunc + mu sync.Mutex +} + +// TaskSupervisor manages background tasks with automatic restart on failure. +type TaskSupervisor struct { + tasks map[string]*SupervisedTask + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + mu sync.RWMutex + started bool +} + +// NewTaskSupervisor creates a new task supervisor. +func NewTaskSupervisor() *TaskSupervisor { + ctx, cancel := context.WithCancel(context.Background()) + return &TaskSupervisor{ + tasks: make(map[string]*SupervisedTask), + ctx: ctx, + cancel: cancel, + } +} + +// RegisterTask registers a task for supervision. +// The task will be automatically restarted if it exits or panics. +func (s *TaskSupervisor) RegisterTask(name string, task TaskFunc, restartDelay time.Duration, maxRestarts int) { + s.mu.Lock() + defer s.mu.Unlock() + + s.tasks[name] = &SupervisedTask{ + name: name, + task: task, + restartDelay: restartDelay, + maxRestarts: maxRestarts, + } +} + +// Start starts all registered tasks. +func (s *TaskSupervisor) Start() { + s.mu.Lock() + if s.started { + s.mu.Unlock() + return + } + s.started = true + s.mu.Unlock() + + s.mu.RLock() + for name, task := range s.tasks { + s.startTask(name, task) + } + s.mu.RUnlock() +} + +// startTask starts a single supervised task. +func (s *TaskSupervisor) startTask(name string, st *SupervisedTask) { + st.mu.Lock() + if st.running { + st.mu.Unlock() + return + } + st.running = true + st.lastStartTime = time.Now() + + taskCtx, taskCancel := context.WithCancel(s.ctx) + st.cancel = taskCancel + st.mu.Unlock() + + s.wg.Add(1) + go func() { + defer s.wg.Done() + + for { + select { + case <-s.ctx.Done(): + return + default: + } + + // Run the task with panic recovery + func() { + defer func() { + if r := recover(); r != nil { + logging.Error("supervised task panicked", logging.Fields{ + "task": name, + "panic": r, + }) + } + }() + st.task(taskCtx) + }() + + // Check if we should restart + st.mu.Lock() + st.restartCount++ + shouldRestart := st.restartCount <= st.maxRestarts || st.maxRestarts < 0 + restartDelay := st.restartDelay + st.mu.Unlock() + + if !shouldRestart { + logging.Warn("supervised task reached max restarts", logging.Fields{ + "task": name, + "maxRestart": st.maxRestarts, + }) + return + } + + select { + case <-s.ctx.Done(): + return + case <-time.After(restartDelay): + logging.Info("restarting supervised task", logging.Fields{ + "task": name, + "restartCount": st.restartCount, + }) + } + } + }() + + logging.Info("started supervised task", logging.Fields{"task": name}) +} + +// Stop stops all supervised tasks. +func (s *TaskSupervisor) Stop() { + s.cancel() + s.wg.Wait() + + s.mu.Lock() + s.started = false + for _, task := range s.tasks { + task.mu.Lock() + task.running = false + task.mu.Unlock() + } + s.mu.Unlock() + + logging.Info("task supervisor stopped") +} + +// GetTaskStatus returns the status of a task. +func (s *TaskSupervisor) GetTaskStatus(name string) (running bool, restartCount int, found bool) { + s.mu.RLock() + task, ok := s.tasks[name] + s.mu.RUnlock() + + if !ok { + return false, 0, false + } + + task.mu.Lock() + defer task.mu.Unlock() + return task.running, task.restartCount, true +} + +// GetAllTaskStatuses returns status of all tasks. +func (s *TaskSupervisor) GetAllTaskStatuses() map[string]TaskStatus { + s.mu.RLock() + defer s.mu.RUnlock() + + statuses := make(map[string]TaskStatus, len(s.tasks)) + for name, task := range s.tasks { + task.mu.Lock() + statuses[name] = TaskStatus{ + Name: name, + Running: task.running, + RestartCount: task.restartCount, + LastStart: task.lastStartTime, + } + task.mu.Unlock() + } + return statuses +} + +// TaskStatus contains the status of a supervised task. +type TaskStatus struct { + Name string `json:"name"` + Running bool `json:"running"` + RestartCount int `json:"restartCount"` + LastStart time.Time `json:"lastStart"` +}