Replace all fmt.Sprintf and fmt.Errorf calls with string concatenation and the existing MiningError constructors (ErrMinerNotFound, ErrMinerExists, ErrInvalidConfig, ErrInternal, ErrDatabaseError). Adds strconv for integer formatting. No fmt import remains in the file. Co-Authored-By: Charon <charon@lethean.io>
777 lines
21 KiB
Go
777 lines
21 KiB
Go
package mining
|
|
|
|
import (
|
|
"context"
|
|
"net"
|
|
"regexp"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"forge.lthn.ai/Snider/Mining/pkg/database"
|
|
"forge.lthn.ai/Snider/Mining/pkg/logging"
|
|
)
|
|
|
|
// safe := instanceNameRegex.ReplaceAllString("my algo!", "_") // => "my_algo_"
|
|
var instanceNameRegex = regexp.MustCompile(`[^a-zA-Z0-9_/-]`)
|
|
|
|
// ManagerInterface defines the contract for a miner manager.
|
|
type ManagerInterface interface {
|
|
StartMiner(ctx context.Context, minerType string, config *Config) (Miner, error)
|
|
StopMiner(ctx context.Context, name string) error
|
|
GetMiner(name string) (Miner, error)
|
|
ListMiners() []Miner
|
|
ListAvailableMiners() []AvailableMiner
|
|
GetMinerHashrateHistory(name string) ([]HashratePoint, error)
|
|
UninstallMiner(ctx context.Context, minerType string) error
|
|
Stop()
|
|
}
|
|
|
|
// Manager handles the lifecycle and operations of multiple miners.
|
|
type Manager struct {
|
|
miners map[string]Miner
|
|
mu sync.RWMutex
|
|
stopChan chan struct{}
|
|
stopOnce sync.Once
|
|
waitGroup sync.WaitGroup
|
|
databaseEnabled bool
|
|
databaseRetention int
|
|
eventHub *EventHub
|
|
eventHubMu sync.RWMutex // Separate mutex for eventHub to avoid deadlock with main mu
|
|
}
|
|
|
|
// m.SetEventHub(eventHub)
|
|
func (m *Manager) SetEventHub(hub *EventHub) {
|
|
m.eventHubMu.Lock()
|
|
defer m.eventHubMu.Unlock()
|
|
m.eventHub = hub
|
|
}
|
|
|
|
// emitEvent broadcasts an event if an event hub is configured
|
|
// Uses separate eventHubMu to avoid deadlock when called while holding m.mu
|
|
func (m *Manager) emitEvent(eventType EventType, data interface{}) {
|
|
m.eventHubMu.RLock()
|
|
hub := m.eventHub
|
|
m.eventHubMu.RUnlock()
|
|
|
|
if hub != nil {
|
|
hub.Broadcast(NewEvent(eventType, data))
|
|
}
|
|
}
|
|
|
|
var _ ManagerInterface = (*Manager)(nil)
|
|
|
|
// manager := mining.NewManager()
|
|
// defer manager.Stop()
|
|
func NewManager() *Manager {
|
|
m := &Manager{
|
|
miners: make(map[string]Miner),
|
|
stopChan: make(chan struct{}),
|
|
waitGroup: sync.WaitGroup{},
|
|
}
|
|
m.syncMinersConfig() // Ensure config file is populated
|
|
m.initDatabase()
|
|
m.autostartMiners()
|
|
m.startStatsCollection()
|
|
return m
|
|
}
|
|
|
|
// manager := mining.NewManagerForSimulation()
|
|
// manager.StartMiner(ctx, "xmrig", &mining.Config{Algo: "rx/0"})
|
|
func NewManagerForSimulation() *Manager {
|
|
m := &Manager{
|
|
miners: make(map[string]Miner),
|
|
stopChan: make(chan struct{}),
|
|
waitGroup: sync.WaitGroup{},
|
|
}
|
|
// Skip syncMinersConfig and autostartMiners for simulation
|
|
m.startStatsCollection()
|
|
return m
|
|
}
|
|
|
|
// initDatabase initializes the SQLite database based on config.
|
|
func (m *Manager) initDatabase() {
|
|
minersConfiguration, err := LoadMinersConfig()
|
|
if err != nil {
|
|
logging.Warn("could not load config for database init", logging.Fields{"error": err})
|
|
return
|
|
}
|
|
|
|
m.databaseEnabled = minersConfiguration.Database.Enabled
|
|
m.databaseRetention = minersConfiguration.Database.RetentionDays
|
|
if m.databaseRetention == 0 {
|
|
m.databaseRetention = 30
|
|
}
|
|
|
|
if !m.databaseEnabled {
|
|
logging.Debug("database persistence is disabled")
|
|
return
|
|
}
|
|
|
|
databaseConfiguration := database.Config{
|
|
Enabled: true,
|
|
RetentionDays: m.databaseRetention,
|
|
}
|
|
|
|
if err := database.Initialize(databaseConfiguration); err != nil {
|
|
logging.Warn("failed to initialize database", logging.Fields{"error": err})
|
|
m.databaseEnabled = false
|
|
return
|
|
}
|
|
|
|
logging.Info("database persistence enabled", logging.Fields{"retention_days": m.databaseRetention})
|
|
|
|
// Start periodic cleanup
|
|
m.startDBCleanup()
|
|
}
|
|
|
|
// startDBCleanup starts a goroutine that periodically cleans old data.
|
|
func (m *Manager) startDBCleanup() {
|
|
m.waitGroup.Add(1)
|
|
go func() {
|
|
defer m.waitGroup.Done()
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
logging.Error("panic in database cleanup goroutine", logging.Fields{"panic": r})
|
|
}
|
|
}()
|
|
// Run cleanup once per hour
|
|
ticker := time.NewTicker(time.Hour)
|
|
defer ticker.Stop()
|
|
|
|
// Run initial cleanup
|
|
if err := database.Cleanup(m.databaseRetention); err != nil {
|
|
logging.Warn("database cleanup failed", logging.Fields{"error": err})
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
if err := database.Cleanup(m.databaseRetention); err != nil {
|
|
logging.Warn("database cleanup failed", logging.Fields{"error": err})
|
|
}
|
|
case <-m.stopChan:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// syncMinersConfig ensures the miners.json config file has entries for all available miners.
|
|
func (m *Manager) syncMinersConfig() {
|
|
minersConfiguration, err := LoadMinersConfig()
|
|
if err != nil {
|
|
logging.Warn("could not load miners config for sync", logging.Fields{"error": err})
|
|
return
|
|
}
|
|
|
|
availableMiners := m.ListAvailableMiners()
|
|
configUpdated := false
|
|
|
|
for _, availableMiner := range availableMiners {
|
|
found := false
|
|
for _, configuredMiner := range minersConfiguration.Miners {
|
|
if strings.EqualFold(configuredMiner.MinerType, availableMiner.Name) {
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
if !found {
|
|
minersConfiguration.Miners = append(minersConfiguration.Miners, MinerAutostartConfig{
|
|
MinerType: availableMiner.Name,
|
|
Autostart: false,
|
|
Config: nil, // No default config
|
|
})
|
|
configUpdated = true
|
|
logging.Info("added default config for missing miner", logging.Fields{"miner": availableMiner.Name})
|
|
}
|
|
}
|
|
|
|
if configUpdated {
|
|
if err := SaveMinersConfig(minersConfiguration); err != nil {
|
|
logging.Warn("failed to save updated miners config", logging.Fields{"error": err})
|
|
}
|
|
}
|
|
}
|
|
|
|
// autostartMiners loads the miners config and starts any miners marked for autostart.
|
|
func (m *Manager) autostartMiners() {
|
|
minersConfiguration, err := LoadMinersConfig()
|
|
if err != nil {
|
|
logging.Warn("could not load miners config for autostart", logging.Fields{"error": err})
|
|
return
|
|
}
|
|
|
|
for _, autostartEntry := range minersConfiguration.Miners {
|
|
if autostartEntry.Autostart && autostartEntry.Config != nil {
|
|
logging.Info("autostarting miner", logging.Fields{"type": autostartEntry.MinerType})
|
|
if _, err := m.StartMiner(context.Background(), autostartEntry.MinerType, autostartEntry.Config); err != nil {
|
|
logging.Error("failed to autostart miner", logging.Fields{"type": autostartEntry.MinerType, "error": err})
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// port, err := findAvailablePort()
|
|
// if err != nil { return 0, err }
|
|
// config.HTTPPort = port
|
|
func findAvailablePort() (int, error) {
|
|
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
listener, err := net.ListenTCP("tcp", addr)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer listener.Close()
|
|
return listener.Addr().(*net.TCPAddr).Port, nil
|
|
}
|
|
|
|
// miner, err := manager.StartMiner(ctx, "xmrig", &mining.Config{Algo: "rx/0"})
|
|
func (m *Manager) StartMiner(ctx context.Context, minerType string, config *Config) (Miner, error) {
|
|
// Check for cancellation before acquiring lock
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
default:
|
|
}
|
|
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
if config == nil {
|
|
config = &Config{}
|
|
}
|
|
|
|
miner, err := CreateMiner(minerType)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
instanceName := miner.GetName()
|
|
if config.Algo != "" {
|
|
// Sanitize algo to prevent directory traversal or invalid filenames
|
|
sanitizedAlgo := instanceNameRegex.ReplaceAllString(config.Algo, "_")
|
|
instanceName = instanceName + "-" + sanitizedAlgo
|
|
} else {
|
|
instanceName = instanceName + "-" + strconv.FormatInt(time.Now().UnixNano()%1000, 10)
|
|
}
|
|
|
|
if _, exists := m.miners[instanceName]; exists {
|
|
return nil, ErrMinerExists(instanceName)
|
|
}
|
|
|
|
// Validate user-provided HTTPPort if specified
|
|
if config.HTTPPort != 0 {
|
|
if config.HTTPPort < 1024 || config.HTTPPort > 65535 {
|
|
return nil, ErrInvalidConfig("HTTPPort must be between 1024 and 65535, got " + strconv.Itoa(config.HTTPPort))
|
|
}
|
|
}
|
|
|
|
apiPort, err := findAvailablePort()
|
|
if err != nil {
|
|
return nil, ErrInternal("failed to find an available port for the miner API").WithCause(err)
|
|
}
|
|
if config.HTTPPort == 0 {
|
|
config.HTTPPort = apiPort
|
|
}
|
|
|
|
if xmrigMiner, ok := miner.(*XMRigMiner); ok {
|
|
xmrigMiner.Name = instanceName
|
|
if xmrigMiner.API != nil {
|
|
xmrigMiner.API.ListenPort = apiPort
|
|
}
|
|
}
|
|
if ttMiner, ok := miner.(*TTMiner); ok {
|
|
ttMiner.Name = instanceName
|
|
if ttMiner.API != nil {
|
|
ttMiner.API.ListenPort = apiPort
|
|
}
|
|
}
|
|
|
|
// Emit starting event before actually starting
|
|
m.emitEvent(EventMinerStarting, MinerEventData{
|
|
Name: instanceName,
|
|
})
|
|
|
|
if err := miner.Start(config); err != nil {
|
|
// Emit error event
|
|
m.emitEvent(EventMinerError, MinerEventData{
|
|
Name: instanceName,
|
|
Error: err.Error(),
|
|
})
|
|
return nil, err
|
|
}
|
|
|
|
m.miners[instanceName] = miner
|
|
|
|
if err := m.updateMinerConfig(minerType, true, config); err != nil {
|
|
logging.Warn("failed to save miner config for autostart", logging.Fields{"error": err})
|
|
}
|
|
|
|
logMessage := "CryptoCurrency Miner started: " + miner.GetName() + " (Binary: " + miner.GetBinaryPath() + ")"
|
|
logToSyslog(logMessage)
|
|
|
|
// Emit started event
|
|
m.emitEvent(EventMinerStarted, MinerEventData{
|
|
Name: instanceName,
|
|
})
|
|
|
|
RecordMinerStart()
|
|
return miner, nil
|
|
}
|
|
|
|
// UninstallMiner stops, uninstalls, and removes a miner's configuration.
|
|
// The context can be used to cancel the operation.
|
|
func (m *Manager) UninstallMiner(ctx context.Context, minerType string) error {
|
|
// Check for cancellation before acquiring lock
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
m.mu.Lock()
|
|
// Collect miners to stop and delete (can't modify map during iteration)
|
|
minersToDelete := make([]string, 0)
|
|
minersToStop := make([]Miner, 0)
|
|
for name, runningMiner := range m.miners {
|
|
if rm, ok := runningMiner.(*XMRigMiner); ok && strings.EqualFold(rm.ExecutableName, minerType) {
|
|
minersToStop = append(minersToStop, runningMiner)
|
|
minersToDelete = append(minersToDelete, name)
|
|
}
|
|
if rm, ok := runningMiner.(*TTMiner); ok && strings.EqualFold(rm.ExecutableName, minerType) {
|
|
minersToStop = append(minersToStop, runningMiner)
|
|
minersToDelete = append(minersToDelete, name)
|
|
}
|
|
}
|
|
// Delete from map first, then release lock before stopping (Stop may block)
|
|
for _, name := range minersToDelete {
|
|
delete(m.miners, name)
|
|
}
|
|
m.mu.Unlock()
|
|
|
|
// Stop miners outside the lock to avoid blocking
|
|
for i, miner := range minersToStop {
|
|
if err := miner.Stop(); err != nil {
|
|
logging.Warn("failed to stop running miner during uninstall", logging.Fields{"miner": minersToDelete[i], "error": err})
|
|
}
|
|
}
|
|
|
|
miner, err := CreateMiner(minerType)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := miner.Uninstall(); err != nil {
|
|
return ErrInternal("failed to uninstall miner files").WithCause(err)
|
|
}
|
|
|
|
return UpdateMinersConfig(func(configuration *MinersConfig) error {
|
|
var updatedMiners []MinerAutostartConfig
|
|
for _, autostartEntry := range configuration.Miners {
|
|
if !strings.EqualFold(autostartEntry.MinerType, minerType) {
|
|
updatedMiners = append(updatedMiners, autostartEntry)
|
|
}
|
|
}
|
|
configuration.Miners = updatedMiners
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// updateMinerConfig saves the autostart and last-used config for a miner.
|
|
func (m *Manager) updateMinerConfig(minerType string, autostart bool, config *Config) error {
|
|
return UpdateMinersConfig(func(configuration *MinersConfig) error {
|
|
found := false
|
|
for i, autostartEntry := range configuration.Miners {
|
|
if strings.EqualFold(autostartEntry.MinerType, minerType) {
|
|
configuration.Miners[i].Autostart = autostart
|
|
configuration.Miners[i].Config = config
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if !found {
|
|
configuration.Miners = append(configuration.Miners, MinerAutostartConfig{
|
|
MinerType: minerType,
|
|
Autostart: autostart,
|
|
Config: config,
|
|
})
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// manager.StopMiner(ctx, "xmrig/monero")
|
|
// manager.StopMiner(ctx, "ttminer/rtx4090") // still removes if already stopped
|
|
func (m *Manager) StopMiner(ctx context.Context, name string) error {
|
|
// Check for cancellation before acquiring lock
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
miner, exists := m.miners[name]
|
|
if !exists {
|
|
for k := range m.miners {
|
|
if strings.HasPrefix(k, name) {
|
|
miner = m.miners[k]
|
|
name = k
|
|
exists = true
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
if !exists {
|
|
return ErrMinerNotFound(name)
|
|
}
|
|
|
|
// Emit stopping event
|
|
m.emitEvent(EventMinerStopping, MinerEventData{
|
|
Name: name,
|
|
})
|
|
|
|
// Try to stop the miner, but always remove it from the map
|
|
// This handles the case where a miner crashed or was killed externally
|
|
stopErr := miner.Stop()
|
|
|
|
// Always remove from map - if it's not running, we still want to clean it up
|
|
delete(m.miners, name)
|
|
|
|
// Emit stopped event
|
|
reason := "stopped"
|
|
if stopErr != nil && stopErr.Error() != "miner is not running" {
|
|
reason = stopErr.Error()
|
|
}
|
|
m.emitEvent(EventMinerStopped, MinerEventData{
|
|
Name: name,
|
|
Reason: reason,
|
|
})
|
|
|
|
// Only return error if it wasn't just "miner is not running"
|
|
if stopErr != nil && stopErr.Error() != "miner is not running" {
|
|
return stopErr
|
|
}
|
|
|
|
RecordMinerStop()
|
|
return nil
|
|
}
|
|
|
|
// miner, err := m.GetMiner("xmrig-randomx")
|
|
// if err != nil { /* miner not found */ }
|
|
func (m *Manager) GetMiner(name string) (Miner, error) {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
miner, exists := m.miners[name]
|
|
if !exists {
|
|
return nil, ErrMinerNotFound(name)
|
|
}
|
|
return miner, nil
|
|
}
|
|
|
|
// miners := m.ListMiners()
|
|
// for _, miner := range miners { fmt.Println(miner.GetName()) }
|
|
func (m *Manager) ListMiners() []Miner {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
miners := make([]Miner, 0, len(m.miners))
|
|
for _, miner := range m.miners {
|
|
miners = append(miners, miner)
|
|
}
|
|
return miners
|
|
}
|
|
|
|
// sim := NewSimulatedMiner(SimulatedMinerConfig{Name: "sim-rx0"})
|
|
// if err := manager.RegisterMiner(sim); err != nil { return err }
|
|
func (m *Manager) RegisterMiner(miner Miner) error {
|
|
name := miner.GetName()
|
|
|
|
m.mu.Lock()
|
|
if _, exists := m.miners[name]; exists {
|
|
m.mu.Unlock()
|
|
return ErrMinerExists(name)
|
|
}
|
|
m.miners[name] = miner
|
|
m.mu.Unlock()
|
|
|
|
logging.Info("registered miner", logging.Fields{"name": name})
|
|
|
|
// Emit miner started event (outside lock)
|
|
m.emitEvent(EventMinerStarted, map[string]interface{}{
|
|
"name": name,
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
// ListAvailableMiners returns a list of available miners that can be started.
|
|
func (m *Manager) ListAvailableMiners() []AvailableMiner {
|
|
return []AvailableMiner{
|
|
{
|
|
Name: "xmrig",
|
|
Description: "XMRig is a high performance, open source, cross platform RandomX, KawPow, CryptoNight and AstroBWT CPU/GPU miner and RandomX benchmark.",
|
|
},
|
|
{
|
|
Name: "tt-miner",
|
|
Description: "TT-Miner is a high performance NVIDIA GPU miner for various algorithms including Ethash, KawPow, ProgPow, and more. Requires CUDA.",
|
|
},
|
|
}
|
|
}
|
|
|
|
// startStatsCollection starts a goroutine to periodically collect stats from active miners.
|
|
func (m *Manager) startStatsCollection() {
|
|
m.waitGroup.Add(1)
|
|
go func() {
|
|
defer m.waitGroup.Done()
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
logging.Error("panic in stats collection goroutine", logging.Fields{"panic": r})
|
|
}
|
|
}()
|
|
ticker := time.NewTicker(HighResolutionInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
m.collectMinerStats()
|
|
case <-m.stopChan:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// statsCollectionTimeout is the maximum time to wait for stats from a single miner.
|
|
const statsCollectionTimeout = 5 * time.Second
|
|
|
|
// collectMinerStats iterates through active miners and collects their stats.
|
|
// Stats are collected in parallel to reduce overall collection time.
|
|
func (m *Manager) collectMinerStats() {
|
|
// Take a snapshot of miners under read lock - minimize lock duration
|
|
m.mu.RLock()
|
|
if len(m.miners) == 0 {
|
|
m.mu.RUnlock()
|
|
return
|
|
}
|
|
|
|
type minerInfo struct {
|
|
miner Miner
|
|
minerType string
|
|
}
|
|
miners := make([]minerInfo, 0, len(m.miners))
|
|
for _, miner := range m.miners {
|
|
// Use the miner's GetType() method for proper type identification
|
|
miners = append(miners, minerInfo{miner: miner, minerType: miner.GetType()})
|
|
}
|
|
databaseEnabled := m.databaseEnabled // Copy to avoid holding lock
|
|
m.mu.RUnlock()
|
|
|
|
now := time.Now()
|
|
|
|
// Collect stats from all miners in parallel
|
|
var waitGroup sync.WaitGroup
|
|
for _, mi := range miners {
|
|
waitGroup.Add(1)
|
|
go func(miner Miner, minerType string) {
|
|
defer waitGroup.Done()
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
logging.Error("panic in single miner stats collection", logging.Fields{
|
|
"panic": r,
|
|
"miner": miner.GetName(),
|
|
})
|
|
}
|
|
}()
|
|
m.collectSingleMinerStats(miner, minerType, now, databaseEnabled)
|
|
}(mi.miner, mi.minerType)
|
|
}
|
|
waitGroup.Wait()
|
|
}
|
|
|
|
// statsRetryCount is the number of retries for transient stats failures.
|
|
const statsRetryCount = 2
|
|
|
|
// statsRetryDelay is the delay between stats collection retries.
|
|
const statsRetryDelay = 500 * time.Millisecond
|
|
|
|
// collectSingleMinerStats collects stats from a single miner with retry logic.
|
|
// This is called concurrently for each miner.
|
|
func (m *Manager) collectSingleMinerStats(miner Miner, minerType string, now time.Time, databaseEnabled bool) {
|
|
minerName := miner.GetName()
|
|
|
|
var stats *PerformanceMetrics
|
|
var lastErr error
|
|
|
|
// Retry loop for transient failures
|
|
for attempt := 0; attempt <= statsRetryCount; attempt++ {
|
|
// Use context with timeout to prevent hanging on unresponsive miner APIs
|
|
ctx, cancel := context.WithTimeout(context.Background(), statsCollectionTimeout)
|
|
stats, lastErr = miner.GetStats(ctx)
|
|
cancel() // Release context immediately
|
|
|
|
if lastErr == nil {
|
|
break // Success
|
|
}
|
|
|
|
// Log retry attempts at debug level
|
|
if attempt < statsRetryCount {
|
|
logging.Debug("retrying stats collection", logging.Fields{
|
|
"miner": minerName,
|
|
"attempt": attempt + 1,
|
|
"error": lastErr.Error(),
|
|
})
|
|
time.Sleep(statsRetryDelay)
|
|
}
|
|
}
|
|
|
|
if lastErr != nil {
|
|
logging.Error("failed to get miner stats after retries", logging.Fields{
|
|
"miner": minerName,
|
|
"error": lastErr.Error(),
|
|
"retries": statsRetryCount,
|
|
})
|
|
RecordStatsCollection(true, true)
|
|
return
|
|
}
|
|
|
|
// Record stats collection (retried if we did any retries)
|
|
RecordStatsCollection(stats != nil && lastErr == nil, false)
|
|
|
|
point := HashratePoint{
|
|
Timestamp: now,
|
|
Hashrate: stats.Hashrate,
|
|
}
|
|
|
|
// Add to in-memory history (rolling window)
|
|
// Note: AddHashratePoint and ReduceHashrateHistory must be thread-safe
|
|
miner.AddHashratePoint(point)
|
|
miner.ReduceHashrateHistory(now)
|
|
|
|
// Persist to database if enabled
|
|
if databaseEnabled {
|
|
dbPoint := database.HashratePoint{
|
|
Timestamp: point.Timestamp,
|
|
Hashrate: point.Hashrate,
|
|
}
|
|
// Create a new context for DB writes (original context is from retry loop)
|
|
dbCtx, dbCancel := context.WithTimeout(context.Background(), statsCollectionTimeout)
|
|
if err := database.InsertHashratePoint(dbCtx, minerName, minerType, dbPoint, database.ResolutionHigh); err != nil {
|
|
logging.Warn("failed to persist hashrate", logging.Fields{"miner": minerName, "error": err})
|
|
}
|
|
dbCancel()
|
|
}
|
|
|
|
// Emit stats event for real-time WebSocket updates
|
|
m.emitEvent(EventMinerStats, MinerStatsData{
|
|
Name: minerName,
|
|
Hashrate: stats.Hashrate,
|
|
Shares: stats.Shares,
|
|
Rejected: stats.Rejected,
|
|
Uptime: stats.Uptime,
|
|
Algorithm: stats.Algorithm,
|
|
DiffCurrent: stats.DiffCurrent,
|
|
})
|
|
}
|
|
|
|
// GetMinerHashrateHistory returns the hashrate history for a specific miner.
|
|
func (m *Manager) GetMinerHashrateHistory(name string) ([]HashratePoint, error) {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
miner, exists := m.miners[name]
|
|
if !exists {
|
|
return nil, ErrMinerNotFound(name)
|
|
}
|
|
return miner.GetHashrateHistory(), nil
|
|
}
|
|
|
|
// ShutdownTimeout is the maximum time to wait for goroutines during shutdown
|
|
const ShutdownTimeout = 10 * time.Second
|
|
|
|
// defer manager.Stop() // safe in main() or test cleanup; subsequent calls are no-ops
|
|
func (m *Manager) Stop() {
|
|
m.stopOnce.Do(func() {
|
|
// Stop all running miners first
|
|
m.mu.Lock()
|
|
for name, miner := range m.miners {
|
|
if err := miner.Stop(); err != nil {
|
|
logging.Warn("failed to stop miner", logging.Fields{"miner": name, "error": err})
|
|
}
|
|
}
|
|
m.mu.Unlock()
|
|
|
|
close(m.stopChan)
|
|
|
|
// Wait for goroutines with timeout
|
|
done := make(chan struct{})
|
|
go func() {
|
|
m.waitGroup.Wait()
|
|
close(done)
|
|
}()
|
|
|
|
select {
|
|
case <-done:
|
|
logging.Info("all goroutines stopped gracefully")
|
|
case <-time.After(ShutdownTimeout):
|
|
logging.Warn("shutdown timeout - some goroutines may not have stopped")
|
|
}
|
|
|
|
// Close the database
|
|
if m.databaseEnabled {
|
|
if err := database.Close(); err != nil {
|
|
logging.Warn("failed to close database", logging.Fields{"error": err})
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
// GetMinerHistoricalStats returns historical stats from the database for a miner.
|
|
func (m *Manager) GetMinerHistoricalStats(minerName string) (*database.HashrateStats, error) {
|
|
if !m.databaseEnabled {
|
|
return nil, ErrDatabaseError("database persistence is disabled")
|
|
}
|
|
return database.GetHashrateStats(minerName)
|
|
}
|
|
|
|
// GetMinerHistoricalHashrate returns historical hashrate data from the database.
|
|
func (m *Manager) GetMinerHistoricalHashrate(minerName string, since, until time.Time) ([]HashratePoint, error) {
|
|
if !m.databaseEnabled {
|
|
return nil, ErrDatabaseError("database persistence is disabled")
|
|
}
|
|
|
|
dbPoints, err := database.GetHashrateHistory(minerName, database.ResolutionHigh, since, until)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Convert database points to mining points
|
|
points := make([]HashratePoint, len(dbPoints))
|
|
for i, p := range dbPoints {
|
|
points[i] = HashratePoint{
|
|
Timestamp: p.Timestamp,
|
|
Hashrate: p.Hashrate,
|
|
}
|
|
}
|
|
return points, nil
|
|
}
|
|
|
|
// GetAllMinerHistoricalStats returns historical stats for all miners from the database.
|
|
func (m *Manager) GetAllMinerHistoricalStats() ([]database.HashrateStats, error) {
|
|
if !m.databaseEnabled {
|
|
return nil, ErrDatabaseError("database persistence is disabled")
|
|
}
|
|
return database.GetAllMinerStats()
|
|
}
|
|
|
|
// IsDatabaseEnabled returns whether database persistence is enabled.
|
|
func (m *Manager) IsDatabaseEnabled() bool {
|
|
return m.databaseEnabled
|
|
}
|