feat: Add database migration tests and task supervisor
- TEST-HIGH-5: Add comprehensive database tests for schema, migrations, re-initialization, and concurrent access - RESIL-MED-6: Add TaskSupervisor for background task monitoring with automatic restart on failure 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
41cc0c295c
commit
3d8247c757
2 changed files with 345 additions and 0 deletions
|
|
@ -353,3 +353,145 @@ func TestIsInitialized(t *testing.T) {
|
||||||
t.Error("Should not be initialized after Close()")
|
t.Error("Should not be initialized after Close()")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSchemaCreation(t *testing.T) {
|
||||||
|
cleanup := setupTestDB(t)
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
// Verify tables exist by querying sqlite_master
|
||||||
|
dbMu.RLock()
|
||||||
|
defer dbMu.RUnlock()
|
||||||
|
|
||||||
|
// Check hashrate_history table
|
||||||
|
var tableName string
|
||||||
|
err := db.QueryRow("SELECT name FROM sqlite_master WHERE type='table' AND name='hashrate_history'").Scan(&tableName)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("hashrate_history table should exist: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check miner_sessions table
|
||||||
|
err = db.QueryRow("SELECT name FROM sqlite_master WHERE type='table' AND name='miner_sessions'").Scan(&tableName)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("miner_sessions table should exist: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify indexes exist
|
||||||
|
var indexName string
|
||||||
|
err = db.QueryRow("SELECT name FROM sqlite_master WHERE type='index' AND name='idx_hashrate_miner_time'").Scan(&indexName)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("idx_hashrate_miner_time index should exist: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = db.QueryRow("SELECT name FROM sqlite_master WHERE type='index' AND name='idx_sessions_miner'").Scan(&indexName)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("idx_sessions_miner index should exist: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReInitializeExistingDB(t *testing.T) {
|
||||||
|
tmpDir := t.TempDir()
|
||||||
|
dbPath := filepath.Join(tmpDir, "reinit_test.db")
|
||||||
|
|
||||||
|
cfg := Config{
|
||||||
|
Enabled: true,
|
||||||
|
Path: dbPath,
|
||||||
|
RetentionDays: 7,
|
||||||
|
}
|
||||||
|
|
||||||
|
// First initialization
|
||||||
|
if err := Initialize(cfg); err != nil {
|
||||||
|
t.Fatalf("First initialization failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Insert some data
|
||||||
|
minerName := "reinit-test-miner"
|
||||||
|
point := HashratePoint{
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
Hashrate: 1234,
|
||||||
|
}
|
||||||
|
if err := InsertHashratePoint(nil, minerName, "xmrig", point, ResolutionHigh); err != nil {
|
||||||
|
t.Fatalf("Failed to insert point: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close and re-initialize (simulates app restart)
|
||||||
|
if err := Close(); err != nil {
|
||||||
|
t.Fatalf("Close failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Re-initialize with same path
|
||||||
|
if err := Initialize(cfg); err != nil {
|
||||||
|
t.Fatalf("Re-initialization failed: %v", err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
Close()
|
||||||
|
os.Remove(dbPath)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Verify data persisted
|
||||||
|
history, err := GetHashrateHistory(minerName, ResolutionHigh, time.Now().Add(-time.Hour), time.Now().Add(time.Hour))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to get history after reinit: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(history) != 1 {
|
||||||
|
t.Errorf("Expected 1 point after reinit, got %d", len(history))
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(history) > 0 && history[0].Hashrate != 1234 {
|
||||||
|
t.Errorf("Expected hashrate 1234, got %d", history[0].Hashrate)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestConcurrentDatabaseAccess(t *testing.T) {
|
||||||
|
cleanup := setupTestDB(t)
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
const numGoroutines = 10
|
||||||
|
const numOpsPerGoroutine = 20
|
||||||
|
|
||||||
|
done := make(chan bool, numGoroutines)
|
||||||
|
errors := make(chan error, numGoroutines*numOpsPerGoroutine)
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
// Launch multiple goroutines doing concurrent reads/writes
|
||||||
|
for i := 0; i < numGoroutines; i++ {
|
||||||
|
go func(id int) {
|
||||||
|
minerName := "concurrent-miner-" + string(rune('A'+id))
|
||||||
|
for j := 0; j < numOpsPerGoroutine; j++ {
|
||||||
|
// Write
|
||||||
|
point := HashratePoint{
|
||||||
|
Timestamp: now.Add(time.Duration(-j) * time.Second),
|
||||||
|
Hashrate: 1000 + j,
|
||||||
|
}
|
||||||
|
if err := InsertHashratePoint(nil, minerName, "xmrig", point, ResolutionHigh); err != nil {
|
||||||
|
errors <- err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read
|
||||||
|
_, err := GetHashrateHistory(minerName, ResolutionHigh, now.Add(-time.Hour), now)
|
||||||
|
if err != nil {
|
||||||
|
errors <- err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
done <- true
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for all goroutines
|
||||||
|
for i := 0; i < numGoroutines; i++ {
|
||||||
|
<-done
|
||||||
|
}
|
||||||
|
close(errors)
|
||||||
|
|
||||||
|
// Check for errors
|
||||||
|
var errCount int
|
||||||
|
for err := range errors {
|
||||||
|
t.Errorf("Concurrent access error: %v", err)
|
||||||
|
errCount++
|
||||||
|
}
|
||||||
|
|
||||||
|
if errCount > 0 {
|
||||||
|
t.Errorf("Got %d errors during concurrent access", errCount)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
203
pkg/mining/supervisor.go
Normal file
203
pkg/mining/supervisor.go
Normal file
|
|
@ -0,0 +1,203 @@
|
||||||
|
package mining
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/Snider/Mining/pkg/logging"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TaskFunc is a function that can be supervised.
|
||||||
|
type TaskFunc func(ctx context.Context)
|
||||||
|
|
||||||
|
// SupervisedTask represents a background task with restart capability.
|
||||||
|
type SupervisedTask struct {
|
||||||
|
name string
|
||||||
|
task TaskFunc
|
||||||
|
restartDelay time.Duration
|
||||||
|
maxRestarts int
|
||||||
|
restartCount int
|
||||||
|
running bool
|
||||||
|
lastStartTime time.Time
|
||||||
|
cancel context.CancelFunc
|
||||||
|
mu sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskSupervisor manages background tasks with automatic restart on failure.
|
||||||
|
type TaskSupervisor struct {
|
||||||
|
tasks map[string]*SupervisedTask
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
|
wg sync.WaitGroup
|
||||||
|
mu sync.RWMutex
|
||||||
|
started bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewTaskSupervisor creates a new task supervisor.
|
||||||
|
func NewTaskSupervisor() *TaskSupervisor {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
return &TaskSupervisor{
|
||||||
|
tasks: make(map[string]*SupervisedTask),
|
||||||
|
ctx: ctx,
|
||||||
|
cancel: cancel,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RegisterTask registers a task for supervision.
|
||||||
|
// The task will be automatically restarted if it exits or panics.
|
||||||
|
func (s *TaskSupervisor) RegisterTask(name string, task TaskFunc, restartDelay time.Duration, maxRestarts int) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
s.tasks[name] = &SupervisedTask{
|
||||||
|
name: name,
|
||||||
|
task: task,
|
||||||
|
restartDelay: restartDelay,
|
||||||
|
maxRestarts: maxRestarts,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start starts all registered tasks.
|
||||||
|
func (s *TaskSupervisor) Start() {
|
||||||
|
s.mu.Lock()
|
||||||
|
if s.started {
|
||||||
|
s.mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.started = true
|
||||||
|
s.mu.Unlock()
|
||||||
|
|
||||||
|
s.mu.RLock()
|
||||||
|
for name, task := range s.tasks {
|
||||||
|
s.startTask(name, task)
|
||||||
|
}
|
||||||
|
s.mu.RUnlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// startTask starts a single supervised task.
|
||||||
|
func (s *TaskSupervisor) startTask(name string, st *SupervisedTask) {
|
||||||
|
st.mu.Lock()
|
||||||
|
if st.running {
|
||||||
|
st.mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
st.running = true
|
||||||
|
st.lastStartTime = time.Now()
|
||||||
|
|
||||||
|
taskCtx, taskCancel := context.WithCancel(s.ctx)
|
||||||
|
st.cancel = taskCancel
|
||||||
|
st.mu.Unlock()
|
||||||
|
|
||||||
|
s.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer s.wg.Done()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-s.ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run the task with panic recovery
|
||||||
|
func() {
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
logging.Error("supervised task panicked", logging.Fields{
|
||||||
|
"task": name,
|
||||||
|
"panic": r,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
st.task(taskCtx)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Check if we should restart
|
||||||
|
st.mu.Lock()
|
||||||
|
st.restartCount++
|
||||||
|
shouldRestart := st.restartCount <= st.maxRestarts || st.maxRestarts < 0
|
||||||
|
restartDelay := st.restartDelay
|
||||||
|
st.mu.Unlock()
|
||||||
|
|
||||||
|
if !shouldRestart {
|
||||||
|
logging.Warn("supervised task reached max restarts", logging.Fields{
|
||||||
|
"task": name,
|
||||||
|
"maxRestart": st.maxRestarts,
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-s.ctx.Done():
|
||||||
|
return
|
||||||
|
case <-time.After(restartDelay):
|
||||||
|
logging.Info("restarting supervised task", logging.Fields{
|
||||||
|
"task": name,
|
||||||
|
"restartCount": st.restartCount,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
logging.Info("started supervised task", logging.Fields{"task": name})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop stops all supervised tasks.
|
||||||
|
func (s *TaskSupervisor) Stop() {
|
||||||
|
s.cancel()
|
||||||
|
s.wg.Wait()
|
||||||
|
|
||||||
|
s.mu.Lock()
|
||||||
|
s.started = false
|
||||||
|
for _, task := range s.tasks {
|
||||||
|
task.mu.Lock()
|
||||||
|
task.running = false
|
||||||
|
task.mu.Unlock()
|
||||||
|
}
|
||||||
|
s.mu.Unlock()
|
||||||
|
|
||||||
|
logging.Info("task supervisor stopped")
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetTaskStatus returns the status of a task.
|
||||||
|
func (s *TaskSupervisor) GetTaskStatus(name string) (running bool, restartCount int, found bool) {
|
||||||
|
s.mu.RLock()
|
||||||
|
task, ok := s.tasks[name]
|
||||||
|
s.mu.RUnlock()
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return false, 0, false
|
||||||
|
}
|
||||||
|
|
||||||
|
task.mu.Lock()
|
||||||
|
defer task.mu.Unlock()
|
||||||
|
return task.running, task.restartCount, true
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetAllTaskStatuses returns status of all tasks.
|
||||||
|
func (s *TaskSupervisor) GetAllTaskStatuses() map[string]TaskStatus {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
|
||||||
|
statuses := make(map[string]TaskStatus, len(s.tasks))
|
||||||
|
for name, task := range s.tasks {
|
||||||
|
task.mu.Lock()
|
||||||
|
statuses[name] = TaskStatus{
|
||||||
|
Name: name,
|
||||||
|
Running: task.running,
|
||||||
|
RestartCount: task.restartCount,
|
||||||
|
LastStart: task.lastStartTime,
|
||||||
|
}
|
||||||
|
task.mu.Unlock()
|
||||||
|
}
|
||||||
|
return statuses
|
||||||
|
}
|
||||||
|
|
||||||
|
// TaskStatus contains the status of a supervised task.
|
||||||
|
type TaskStatus struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Running bool `json:"running"`
|
||||||
|
RestartCount int `json:"restartCount"`
|
||||||
|
LastStart time.Time `json:"lastStart"`
|
||||||
|
}
|
||||||
Loading…
Add table
Reference in a new issue