fix: Comprehensive code hardening from 4-domain Opus review
Error Handling: - Fix silent Write() error in WebSocket (events.go) - Add error context to transport handshake messages - Check os.MkdirAll error in zip extraction (miner.go) - Explicitly ignore io.Copy errors on drain with comments - Add retry logic (2 attempts) for transient stats collection failures Resource Lifecycle: - Add shutdown mechanism to DigestAuth goroutine - Call Service.Stop() on context cancellation - Add NodeService transport cleanup to Service.Stop() - Fix WriteStdin goroutine leak on timeout with non-blocking send API Design: - Add profile validation (name, miner type required) - Return 404 instead of 500 for missing profile PUT - Make DELETE profile idempotent (return success if not found) - Standardize error responses in node_service.go handlers Observability: - Add logging for P2P GetAllStats failures - Add request ID correlation helper for handler logs - Add logging for miner process exits (xmrig_start.go) - Rate limit debug logs in transport hot path (1 in 100) - Add metrics infrastructure with /metrics endpoint 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
6b73a4b84b
commit
d533164893
22 changed files with 468 additions and 114 deletions
|
|
@ -69,18 +69,31 @@ func AuthConfigFromEnv() AuthConfig {
|
||||||
|
|
||||||
// DigestAuth implements HTTP Digest Authentication middleware
|
// DigestAuth implements HTTP Digest Authentication middleware
|
||||||
type DigestAuth struct {
|
type DigestAuth struct {
|
||||||
config AuthConfig
|
config AuthConfig
|
||||||
nonces sync.Map // map[string]time.Time for nonce expiry tracking
|
nonces sync.Map // map[string]time.Time for nonce expiry tracking
|
||||||
|
stopChan chan struct{}
|
||||||
|
stopOnce sync.Once
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDigestAuth creates a new digest auth middleware
|
// NewDigestAuth creates a new digest auth middleware
|
||||||
func NewDigestAuth(config AuthConfig) *DigestAuth {
|
func NewDigestAuth(config AuthConfig) *DigestAuth {
|
||||||
da := &DigestAuth{config: config}
|
da := &DigestAuth{
|
||||||
|
config: config,
|
||||||
|
stopChan: make(chan struct{}),
|
||||||
|
}
|
||||||
// Start nonce cleanup goroutine
|
// Start nonce cleanup goroutine
|
||||||
go da.cleanupNonces()
|
go da.cleanupNonces()
|
||||||
return da
|
return da
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Stop gracefully shuts down the DigestAuth, stopping the cleanup goroutine.
|
||||||
|
// Safe to call multiple times.
|
||||||
|
func (da *DigestAuth) Stop() {
|
||||||
|
da.stopOnce.Do(func() {
|
||||||
|
close(da.stopChan)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// Middleware returns a Gin middleware that enforces digest authentication
|
// Middleware returns a Gin middleware that enforces digest authentication
|
||||||
func (da *DigestAuth) Middleware() gin.HandlerFunc {
|
func (da *DigestAuth) Middleware() gin.HandlerFunc {
|
||||||
return func(c *gin.Context) {
|
return func(c *gin.Context) {
|
||||||
|
|
@ -209,14 +222,19 @@ func (da *DigestAuth) cleanupNonces() {
|
||||||
ticker := time.NewTicker(da.config.NonceExpiry)
|
ticker := time.NewTicker(da.config.NonceExpiry)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
for range ticker.C {
|
for {
|
||||||
now := time.Now()
|
select {
|
||||||
da.nonces.Range(func(key, value interface{}) bool {
|
case <-da.stopChan:
|
||||||
if now.Sub(value.(time.Time)) > da.config.NonceExpiry {
|
return
|
||||||
da.nonces.Delete(key)
|
case <-ticker.C:
|
||||||
}
|
now := time.Now()
|
||||||
return true
|
da.nonces.Range(func(key, value interface{}) bool {
|
||||||
})
|
if now.Sub(value.(time.Time)) > da.config.NonceExpiry {
|
||||||
|
da.nonces.Delete(key)
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -218,7 +218,6 @@ func (c *Container) ProfileManager() *ProfileManager {
|
||||||
return c.profileManager
|
return c.profileManager
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// NodeService returns the node service (may be nil if P2P is unavailable).
|
// NodeService returns the node service (may be nil if P2P is unavailable).
|
||||||
func (c *Container) NodeService() *NodeService {
|
func (c *Container) NodeService() *NodeService {
|
||||||
c.mu.RLock()
|
c.mu.RLock()
|
||||||
|
|
|
||||||
|
|
@ -33,8 +33,8 @@ func TestDualMiningCPUAndGPU(t *testing.T) {
|
||||||
|
|
||||||
// GPU config - explicit device selection required!
|
// GPU config - explicit device selection required!
|
||||||
GPUEnabled: true,
|
GPUEnabled: true,
|
||||||
OpenCL: true, // AMD GPU
|
OpenCL: true, // AMD GPU
|
||||||
Devices: "0", // Device 0 only - user must pick
|
Devices: "0", // Device 0 only - user must pick
|
||||||
}
|
}
|
||||||
|
|
||||||
minerInstance, err := manager.StartMiner(context.Background(), "xmrig", config)
|
minerInstance, err := manager.StartMiner(context.Background(), "xmrig", config)
|
||||||
|
|
|
||||||
|
|
@ -24,6 +24,7 @@ const (
|
||||||
ErrCodeProfileNotFound = "PROFILE_NOT_FOUND"
|
ErrCodeProfileNotFound = "PROFILE_NOT_FOUND"
|
||||||
ErrCodeProfileExists = "PROFILE_EXISTS"
|
ErrCodeProfileExists = "PROFILE_EXISTS"
|
||||||
ErrCodeInternalError = "INTERNAL_ERROR"
|
ErrCodeInternalError = "INTERNAL_ERROR"
|
||||||
|
ErrCodeInternal = "INTERNAL_ERROR" // Alias for consistency
|
||||||
)
|
)
|
||||||
|
|
||||||
// MiningError is a structured error type for the mining package
|
// MiningError is a structured error type for the mining package
|
||||||
|
|
|
||||||
|
|
@ -302,7 +302,10 @@ func (c *wsClient) writePump() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
w.Write(message)
|
if _, err := w.Write(message); err != nil {
|
||||||
|
logging.Debug("WebSocket write error", logging.Fields{"error": err})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if err := w.Close(); err != nil {
|
if err := w.Close(); err != nil {
|
||||||
return
|
return
|
||||||
|
|
|
||||||
|
|
@ -312,6 +312,7 @@ func (m *Manager) StartMiner(ctx context.Context, minerType string, config *Conf
|
||||||
Name: instanceName,
|
Name: instanceName,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
RecordMinerStart()
|
||||||
return miner, nil
|
return miner, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -454,6 +455,7 @@ func (m *Manager) StopMiner(ctx context.Context, name string) error {
|
||||||
return stopErr
|
return stopErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
RecordMinerStop()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -574,21 +576,55 @@ func (m *Manager) collectMinerStats() {
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
// collectSingleMinerStats collects stats from a single miner.
|
// statsRetryCount is the number of retries for transient stats failures.
|
||||||
|
const statsRetryCount = 2
|
||||||
|
|
||||||
|
// statsRetryDelay is the delay between stats collection retries.
|
||||||
|
const statsRetryDelay = 500 * time.Millisecond
|
||||||
|
|
||||||
|
// collectSingleMinerStats collects stats from a single miner with retry logic.
|
||||||
// This is called concurrently for each miner.
|
// This is called concurrently for each miner.
|
||||||
func (m *Manager) collectSingleMinerStats(miner Miner, minerType string, now time.Time, dbEnabled bool) {
|
func (m *Manager) collectSingleMinerStats(miner Miner, minerType string, now time.Time, dbEnabled bool) {
|
||||||
minerName := miner.GetName()
|
minerName := miner.GetName()
|
||||||
|
|
||||||
// Use context with timeout to prevent hanging on unresponsive miner APIs
|
var stats *PerformanceMetrics
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), statsCollectionTimeout)
|
var lastErr error
|
||||||
defer cancel() // Ensure context is released after all operations
|
|
||||||
|
|
||||||
stats, err := miner.GetStats(ctx)
|
// Retry loop for transient failures
|
||||||
if err != nil {
|
for attempt := 0; attempt <= statsRetryCount; attempt++ {
|
||||||
logging.Error("failed to get miner stats", logging.Fields{"miner": minerName, "error": err})
|
// Use context with timeout to prevent hanging on unresponsive miner APIs
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), statsCollectionTimeout)
|
||||||
|
stats, lastErr = miner.GetStats(ctx)
|
||||||
|
cancel() // Release context immediately
|
||||||
|
|
||||||
|
if lastErr == nil {
|
||||||
|
break // Success
|
||||||
|
}
|
||||||
|
|
||||||
|
// Log retry attempts at debug level
|
||||||
|
if attempt < statsRetryCount {
|
||||||
|
logging.Debug("retrying stats collection", logging.Fields{
|
||||||
|
"miner": minerName,
|
||||||
|
"attempt": attempt + 1,
|
||||||
|
"error": lastErr.Error(),
|
||||||
|
})
|
||||||
|
time.Sleep(statsRetryDelay)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if lastErr != nil {
|
||||||
|
logging.Error("failed to get miner stats after retries", logging.Fields{
|
||||||
|
"miner": minerName,
|
||||||
|
"error": lastErr.Error(),
|
||||||
|
"retries": statsRetryCount,
|
||||||
|
})
|
||||||
|
RecordStatsCollection(true, true)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Record stats collection (retried if we did any retries)
|
||||||
|
RecordStatsCollection(stats != nil && lastErr == nil, false)
|
||||||
|
|
||||||
point := HashratePoint{
|
point := HashratePoint{
|
||||||
Timestamp: now,
|
Timestamp: now,
|
||||||
Hashrate: stats.Hashrate,
|
Hashrate: stats.Hashrate,
|
||||||
|
|
@ -605,10 +641,12 @@ func (m *Manager) collectSingleMinerStats(miner Miner, minerType string, now tim
|
||||||
Timestamp: point.Timestamp,
|
Timestamp: point.Timestamp,
|
||||||
Hashrate: point.Hashrate,
|
Hashrate: point.Hashrate,
|
||||||
}
|
}
|
||||||
// Use the same context for DB writes so they respect timeout/cancellation
|
// Create a new context for DB writes (original context is from retry loop)
|
||||||
if err := database.InsertHashratePoint(ctx, minerName, minerType, dbPoint, database.ResolutionHigh); err != nil {
|
dbCtx, dbCancel := context.WithTimeout(context.Background(), statsCollectionTimeout)
|
||||||
|
if err := database.InsertHashratePoint(dbCtx, minerName, minerType, dbPoint, database.ResolutionHigh); err != nil {
|
||||||
logging.Warn("failed to persist hashrate", logging.Fields{"miner": minerName, "error": err})
|
logging.Warn("failed to persist hashrate", logging.Fields{"miner": minerName, "error": err})
|
||||||
}
|
}
|
||||||
|
dbCancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Emit stats event for real-time WebSocket updates
|
// Emit stats event for real-time WebSocket updates
|
||||||
|
|
|
||||||
169
pkg/mining/metrics.go
Normal file
169
pkg/mining/metrics.go
Normal file
|
|
@ -0,0 +1,169 @@
|
||||||
|
package mining
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Metrics provides simple instrumentation counters for the mining package.
|
||||||
|
// These can be exposed via Prometheus or other metrics systems in the future.
|
||||||
|
type Metrics struct {
|
||||||
|
// API metrics
|
||||||
|
RequestsTotal atomic.Int64
|
||||||
|
RequestsErrored atomic.Int64
|
||||||
|
RequestLatency *LatencyHistogram
|
||||||
|
|
||||||
|
// Miner metrics
|
||||||
|
MinersStarted atomic.Int64
|
||||||
|
MinersStopped atomic.Int64
|
||||||
|
MinersErrored atomic.Int64
|
||||||
|
|
||||||
|
// Stats collection metrics
|
||||||
|
StatsCollected atomic.Int64
|
||||||
|
StatsRetried atomic.Int64
|
||||||
|
StatsFailed atomic.Int64
|
||||||
|
|
||||||
|
// WebSocket metrics
|
||||||
|
WSConnections atomic.Int64
|
||||||
|
WSMessages atomic.Int64
|
||||||
|
|
||||||
|
// P2P metrics
|
||||||
|
P2PMessagesSent atomic.Int64
|
||||||
|
P2PMessagesReceived atomic.Int64
|
||||||
|
P2PConnectionsTotal atomic.Int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// LatencyHistogram tracks request latencies with basic percentile support.
|
||||||
|
type LatencyHistogram struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
samples []time.Duration
|
||||||
|
maxSize int
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewLatencyHistogram creates a new latency histogram with a maximum sample size.
|
||||||
|
func NewLatencyHistogram(maxSize int) *LatencyHistogram {
|
||||||
|
return &LatencyHistogram{
|
||||||
|
samples: make([]time.Duration, 0, maxSize),
|
||||||
|
maxSize: maxSize,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Record adds a latency sample.
|
||||||
|
func (h *LatencyHistogram) Record(d time.Duration) {
|
||||||
|
h.mu.Lock()
|
||||||
|
defer h.mu.Unlock()
|
||||||
|
|
||||||
|
if len(h.samples) >= h.maxSize {
|
||||||
|
// Ring buffer behavior - overwrite oldest
|
||||||
|
copy(h.samples, h.samples[1:])
|
||||||
|
h.samples = h.samples[:len(h.samples)-1]
|
||||||
|
}
|
||||||
|
h.samples = append(h.samples, d)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Average returns the average latency.
|
||||||
|
func (h *LatencyHistogram) Average() time.Duration {
|
||||||
|
h.mu.Lock()
|
||||||
|
defer h.mu.Unlock()
|
||||||
|
|
||||||
|
if len(h.samples) == 0 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
var total time.Duration
|
||||||
|
for _, d := range h.samples {
|
||||||
|
total += d
|
||||||
|
}
|
||||||
|
return total / time.Duration(len(h.samples))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Count returns the number of samples.
|
||||||
|
func (h *LatencyHistogram) Count() int {
|
||||||
|
h.mu.Lock()
|
||||||
|
defer h.mu.Unlock()
|
||||||
|
return len(h.samples)
|
||||||
|
}
|
||||||
|
|
||||||
|
// DefaultMetrics is the global metrics instance.
|
||||||
|
var DefaultMetrics = &Metrics{
|
||||||
|
RequestLatency: NewLatencyHistogram(1000),
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordRequest records an API request.
|
||||||
|
func RecordRequest(errored bool, latency time.Duration) {
|
||||||
|
DefaultMetrics.RequestsTotal.Add(1)
|
||||||
|
if errored {
|
||||||
|
DefaultMetrics.RequestsErrored.Add(1)
|
||||||
|
}
|
||||||
|
DefaultMetrics.RequestLatency.Record(latency)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordMinerStart records a miner start event.
|
||||||
|
func RecordMinerStart() {
|
||||||
|
DefaultMetrics.MinersStarted.Add(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordMinerStop records a miner stop event.
|
||||||
|
func RecordMinerStop() {
|
||||||
|
DefaultMetrics.MinersStopped.Add(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordMinerError records a miner error event.
|
||||||
|
func RecordMinerError() {
|
||||||
|
DefaultMetrics.MinersErrored.Add(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordStatsCollection records a stats collection event.
|
||||||
|
func RecordStatsCollection(retried bool, failed bool) {
|
||||||
|
DefaultMetrics.StatsCollected.Add(1)
|
||||||
|
if retried {
|
||||||
|
DefaultMetrics.StatsRetried.Add(1)
|
||||||
|
}
|
||||||
|
if failed {
|
||||||
|
DefaultMetrics.StatsFailed.Add(1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordWSConnection increments or decrements WebSocket connection count.
|
||||||
|
func RecordWSConnection(connected bool) {
|
||||||
|
if connected {
|
||||||
|
DefaultMetrics.WSConnections.Add(1)
|
||||||
|
} else {
|
||||||
|
DefaultMetrics.WSConnections.Add(-1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordWSMessage records a WebSocket message.
|
||||||
|
func RecordWSMessage() {
|
||||||
|
DefaultMetrics.WSMessages.Add(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordP2PMessage records a P2P message.
|
||||||
|
func RecordP2PMessage(sent bool) {
|
||||||
|
if sent {
|
||||||
|
DefaultMetrics.P2PMessagesSent.Add(1)
|
||||||
|
} else {
|
||||||
|
DefaultMetrics.P2PMessagesReceived.Add(1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetMetricsSnapshot returns a snapshot of current metrics.
|
||||||
|
func GetMetricsSnapshot() map[string]interface{} {
|
||||||
|
return map[string]interface{}{
|
||||||
|
"requests_total": DefaultMetrics.RequestsTotal.Load(),
|
||||||
|
"requests_errored": DefaultMetrics.RequestsErrored.Load(),
|
||||||
|
"request_latency_avg_ms": DefaultMetrics.RequestLatency.Average().Milliseconds(),
|
||||||
|
"request_latency_samples": DefaultMetrics.RequestLatency.Count(),
|
||||||
|
"miners_started": DefaultMetrics.MinersStarted.Load(),
|
||||||
|
"miners_stopped": DefaultMetrics.MinersStopped.Load(),
|
||||||
|
"miners_errored": DefaultMetrics.MinersErrored.Load(),
|
||||||
|
"stats_collected": DefaultMetrics.StatsCollected.Load(),
|
||||||
|
"stats_retried": DefaultMetrics.StatsRetried.Load(),
|
||||||
|
"stats_failed": DefaultMetrics.StatsFailed.Load(),
|
||||||
|
"ws_connections": DefaultMetrics.WSConnections.Load(),
|
||||||
|
"ws_messages": DefaultMetrics.WSMessages.Load(),
|
||||||
|
"p2p_messages_sent": DefaultMetrics.P2PMessagesSent.Load(),
|
||||||
|
"p2p_messages_received": DefaultMetrics.P2PMessagesReceived.Load(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -216,11 +216,17 @@ func (b *BaseMiner) WriteStdin(input string) error {
|
||||||
input += "\n"
|
input += "\n"
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write with timeout to prevent blocking indefinitely
|
// Write with timeout to prevent blocking indefinitely.
|
||||||
|
// Use buffered channel size 1 so goroutine can exit even if we don't read the result.
|
||||||
done := make(chan error, 1)
|
done := make(chan error, 1)
|
||||||
go func() {
|
go func() {
|
||||||
_, err := stdinPipe.Write([]byte(input))
|
_, err := stdinPipe.Write([]byte(input))
|
||||||
done <- err
|
// Non-blocking send - if timeout already fired, this won't block
|
||||||
|
select {
|
||||||
|
case done <- err:
|
||||||
|
default:
|
||||||
|
// Timeout already occurred, goroutine exits cleanly
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
|
|
@ -252,13 +258,13 @@ func (b *BaseMiner) InstallFromURL(url string) error {
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
if resp.StatusCode != http.StatusOK {
|
||||||
io.Copy(io.Discard, resp.Body) // Drain body to allow connection reuse
|
_, _ = io.Copy(io.Discard, resp.Body) // Drain body to allow connection reuse (error ignored intentionally)
|
||||||
return fmt.Errorf("failed to download release: unexpected status code %d", resp.StatusCode)
|
return fmt.Errorf("failed to download release: unexpected status code %d", resp.StatusCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := io.Copy(tmpfile, resp.Body); err != nil {
|
if _, err := io.Copy(tmpfile, resp.Body); err != nil {
|
||||||
// Drain remaining body to allow connection reuse
|
// Drain remaining body to allow connection reuse (error ignored intentionally)
|
||||||
io.Copy(io.Discard, resp.Body)
|
_, _ = io.Copy(io.Discard, resp.Body)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -548,7 +554,9 @@ func (b *BaseMiner) unzip(src, dest string) error {
|
||||||
return fmt.Errorf("%s: illegal file path", fpath)
|
return fmt.Errorf("%s: illegal file path", fpath)
|
||||||
}
|
}
|
||||||
if f.FileInfo().IsDir() {
|
if f.FileInfo().IsDir() {
|
||||||
os.MkdirAll(fpath, os.ModePerm)
|
if err := os.MkdirAll(fpath, os.ModePerm); err != nil {
|
||||||
|
return fmt.Errorf("failed to create directory %s: %w", fpath, err)
|
||||||
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/Snider/Mining/pkg/node"
|
"github.com/Snider/Mining/pkg/node"
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
|
|
@ -90,8 +91,8 @@ func (ns *NodeService) StopTransport() error {
|
||||||
|
|
||||||
// Node Info Response
|
// Node Info Response
|
||||||
type NodeInfoResponse struct {
|
type NodeInfoResponse struct {
|
||||||
HasIdentity bool `json:"hasIdentity"`
|
HasIdentity bool `json:"hasIdentity"`
|
||||||
Identity *node.NodeIdentity `json:"identity,omitempty"`
|
Identity *node.NodeIdentity `json:"identity,omitempty"`
|
||||||
RegisteredPeers int `json:"registeredPeers"`
|
RegisteredPeers int `json:"registeredPeers"`
|
||||||
ConnectedPeers int `json:"connectedPeers"`
|
ConnectedPeers int `json:"connectedPeers"`
|
||||||
}
|
}
|
||||||
|
|
@ -257,12 +258,17 @@ func (ns *NodeService) handleRemovePeer(c *gin.Context) {
|
||||||
// @Produce json
|
// @Produce json
|
||||||
// @Param id path string true "Peer ID"
|
// @Param id path string true "Peer ID"
|
||||||
// @Success 200 {object} map[string]float64
|
// @Success 200 {object} map[string]float64
|
||||||
|
// @Failure 404 {object} APIError "Peer not found"
|
||||||
// @Router /peers/{id}/ping [post]
|
// @Router /peers/{id}/ping [post]
|
||||||
func (ns *NodeService) handlePingPeer(c *gin.Context) {
|
func (ns *NodeService) handlePingPeer(c *gin.Context) {
|
||||||
peerID := c.Param("id")
|
peerID := c.Param("id")
|
||||||
rtt, err := ns.controller.PingPeer(peerID)
|
rtt, err := ns.controller.PingPeer(peerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
if strings.Contains(err.Error(), "not found") || strings.Contains(err.Error(), "not connected") {
|
||||||
|
respondWithError(c, http.StatusNotFound, "PEER_NOT_FOUND", "peer not found or not connected", err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
respondWithError(c, http.StatusInternalServerError, ErrCodeInternal, "ping failed", err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
c.JSON(http.StatusOK, gin.H{"rtt_ms": rtt})
|
c.JSON(http.StatusOK, gin.H{"rtt_ms": rtt})
|
||||||
|
|
@ -275,11 +281,16 @@ func (ns *NodeService) handlePingPeer(c *gin.Context) {
|
||||||
// @Produce json
|
// @Produce json
|
||||||
// @Param id path string true "Peer ID"
|
// @Param id path string true "Peer ID"
|
||||||
// @Success 200 {object} map[string]string
|
// @Success 200 {object} map[string]string
|
||||||
|
// @Failure 404 {object} APIError "Peer not found"
|
||||||
// @Router /peers/{id}/connect [post]
|
// @Router /peers/{id}/connect [post]
|
||||||
func (ns *NodeService) handleConnectPeer(c *gin.Context) {
|
func (ns *NodeService) handleConnectPeer(c *gin.Context) {
|
||||||
peerID := c.Param("id")
|
peerID := c.Param("id")
|
||||||
if err := ns.controller.ConnectToPeer(peerID); err != nil {
|
if err := ns.controller.ConnectToPeer(peerID); err != nil {
|
||||||
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
if strings.Contains(err.Error(), "not found") {
|
||||||
|
respondWithError(c, http.StatusNotFound, "PEER_NOT_FOUND", "peer not found", err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
respondWithError(c, http.StatusInternalServerError, ErrCodeConnectionFailed, "connection failed", err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
c.JSON(http.StatusOK, gin.H{"status": "connected"})
|
c.JSON(http.StatusOK, gin.H{"status": "connected"})
|
||||||
|
|
@ -287,7 +298,7 @@ func (ns *NodeService) handleConnectPeer(c *gin.Context) {
|
||||||
|
|
||||||
// handleDisconnectPeer godoc
|
// handleDisconnectPeer godoc
|
||||||
// @Summary Disconnect from a peer
|
// @Summary Disconnect from a peer
|
||||||
// @Description Close the connection to a peer
|
// @Description Close the connection to a peer. Idempotent - returns success if peer not connected.
|
||||||
// @Tags peers
|
// @Tags peers
|
||||||
// @Produce json
|
// @Produce json
|
||||||
// @Param id path string true "Peer ID"
|
// @Param id path string true "Peer ID"
|
||||||
|
|
@ -296,7 +307,12 @@ func (ns *NodeService) handleConnectPeer(c *gin.Context) {
|
||||||
func (ns *NodeService) handleDisconnectPeer(c *gin.Context) {
|
func (ns *NodeService) handleDisconnectPeer(c *gin.Context) {
|
||||||
peerID := c.Param("id")
|
peerID := c.Param("id")
|
||||||
if err := ns.controller.DisconnectFromPeer(peerID); err != nil {
|
if err := ns.controller.DisconnectFromPeer(peerID); err != nil {
|
||||||
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
// Make disconnect idempotent - if peer not connected, still return success
|
||||||
|
if strings.Contains(err.Error(), "not connected") {
|
||||||
|
c.JSON(http.StatusOK, gin.H{"status": "disconnected"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
respondWithError(c, http.StatusInternalServerError, ErrCodeInternal, "disconnect failed", err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
c.JSON(http.StatusOK, gin.H{"status": "disconnected"})
|
c.JSON(http.StatusOK, gin.H{"status": "disconnected"})
|
||||||
|
|
|
||||||
|
|
@ -144,6 +144,35 @@ func generateRequestID() string {
|
||||||
return fmt.Sprintf("%d-%x", time.Now().UnixMilli(), b[:4])
|
return fmt.Sprintf("%d-%x", time.Now().UnixMilli(), b[:4])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getRequestID extracts the request ID from gin context
|
||||||
|
func getRequestID(c *gin.Context) string {
|
||||||
|
if id, exists := c.Get("requestID"); exists {
|
||||||
|
if s, ok := id.(string); ok {
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
// logWithRequestID logs a message with request ID correlation
|
||||||
|
func logWithRequestID(c *gin.Context, level string, message string, fields logging.Fields) {
|
||||||
|
if fields == nil {
|
||||||
|
fields = logging.Fields{}
|
||||||
|
}
|
||||||
|
if reqID := getRequestID(c); reqID != "" {
|
||||||
|
fields["request_id"] = reqID
|
||||||
|
}
|
||||||
|
switch level {
|
||||||
|
case "error":
|
||||||
|
logging.Error(message, fields)
|
||||||
|
case "warn":
|
||||||
|
logging.Warn(message, fields)
|
||||||
|
case "info":
|
||||||
|
logging.Info(message, fields)
|
||||||
|
default:
|
||||||
|
logging.Debug(message, fields)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WebSocket upgrader for the events endpoint
|
// WebSocket upgrader for the events endpoint
|
||||||
var wsUpgrader = websocket.Upgrader{
|
var wsUpgrader = websocket.Upgrader{
|
||||||
|
|
@ -272,9 +301,9 @@ func (s *Service) InitRouter() {
|
||||||
// Configure CORS to only allow local origins
|
// Configure CORS to only allow local origins
|
||||||
corsConfig := cors.Config{
|
corsConfig := cors.Config{
|
||||||
AllowOrigins: []string{
|
AllowOrigins: []string{
|
||||||
"http://localhost:4200", // Angular dev server
|
"http://localhost:4200", // Angular dev server
|
||||||
"http://127.0.0.1:4200",
|
"http://127.0.0.1:4200",
|
||||||
"http://localhost:9090", // Default API port
|
"http://localhost:9090", // Default API port
|
||||||
"http://127.0.0.1:9090",
|
"http://127.0.0.1:9090",
|
||||||
"http://localhost:" + serverPort,
|
"http://localhost:" + serverPort,
|
||||||
"http://127.0.0.1:" + serverPort,
|
"http://127.0.0.1:" + serverPort,
|
||||||
|
|
@ -312,6 +341,14 @@ func (s *Service) Stop() {
|
||||||
if s.EventHub != nil {
|
if s.EventHub != nil {
|
||||||
s.EventHub.Stop()
|
s.EventHub.Stop()
|
||||||
}
|
}
|
||||||
|
if s.auth != nil {
|
||||||
|
s.auth.Stop()
|
||||||
|
}
|
||||||
|
if s.NodeService != nil {
|
||||||
|
if err := s.NodeService.StopTransport(); err != nil {
|
||||||
|
logging.Warn("failed to stop node service transport", logging.Fields{"error": err})
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ServiceStartup initializes the router and starts the HTTP server.
|
// ServiceStartup initializes the router and starts the HTTP server.
|
||||||
|
|
@ -333,6 +370,7 @@ func (s *Service) ServiceStartup(ctx context.Context) error {
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
|
s.Stop() // Clean up service resources (auth, event hub, node service)
|
||||||
s.Manager.Stop()
|
s.Manager.Stop()
|
||||||
ctxShutdown, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
ctxShutdown, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
@ -377,6 +415,7 @@ func (s *Service) SetupRoutes() {
|
||||||
|
|
||||||
{
|
{
|
||||||
apiGroup.GET("/info", s.handleGetInfo)
|
apiGroup.GET("/info", s.handleGetInfo)
|
||||||
|
apiGroup.GET("/metrics", s.handleMetrics)
|
||||||
apiGroup.POST("/doctor", s.handleDoctor)
|
apiGroup.POST("/doctor", s.handleDoctor)
|
||||||
apiGroup.POST("/update", s.handleUpdateCheck)
|
apiGroup.POST("/update", s.handleUpdateCheck)
|
||||||
|
|
||||||
|
|
@ -824,17 +863,28 @@ func (s *Service) handleListProfiles(c *gin.Context) {
|
||||||
// @Produce json
|
// @Produce json
|
||||||
// @Param profile body MiningProfile true "Mining Profile"
|
// @Param profile body MiningProfile true "Mining Profile"
|
||||||
// @Success 201 {object} MiningProfile
|
// @Success 201 {object} MiningProfile
|
||||||
|
// @Failure 400 {object} APIError "Invalid profile data"
|
||||||
// @Router /profiles [post]
|
// @Router /profiles [post]
|
||||||
func (s *Service) handleCreateProfile(c *gin.Context) {
|
func (s *Service) handleCreateProfile(c *gin.Context) {
|
||||||
var profile MiningProfile
|
var profile MiningProfile
|
||||||
if err := c.ShouldBindJSON(&profile); err != nil {
|
if err := c.ShouldBindJSON(&profile); err != nil {
|
||||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
respondWithError(c, http.StatusBadRequest, ErrCodeInvalidInput, "invalid profile data", err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate required fields
|
||||||
|
if profile.Name == "" {
|
||||||
|
respondWithError(c, http.StatusBadRequest, ErrCodeInvalidInput, "profile name is required", "")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if profile.MinerType == "" {
|
||||||
|
respondWithError(c, http.StatusBadRequest, ErrCodeInvalidInput, "miner type is required", "")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
createdProfile, err := s.ProfileManager.CreateProfile(&profile)
|
createdProfile, err := s.ProfileManager.CreateProfile(&profile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to create profile", "details": err.Error()})
|
respondWithError(c, http.StatusInternalServerError, ErrCodeInternal, "failed to create profile", err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -868,18 +918,24 @@ func (s *Service) handleGetProfile(c *gin.Context) {
|
||||||
// @Param id path string true "Profile ID"
|
// @Param id path string true "Profile ID"
|
||||||
// @Param profile body MiningProfile true "Updated Mining Profile"
|
// @Param profile body MiningProfile true "Updated Mining Profile"
|
||||||
// @Success 200 {object} MiningProfile
|
// @Success 200 {object} MiningProfile
|
||||||
|
// @Failure 404 {object} APIError "Profile not found"
|
||||||
// @Router /profiles/{id} [put]
|
// @Router /profiles/{id} [put]
|
||||||
func (s *Service) handleUpdateProfile(c *gin.Context) {
|
func (s *Service) handleUpdateProfile(c *gin.Context) {
|
||||||
profileID := c.Param("id")
|
profileID := c.Param("id")
|
||||||
var profile MiningProfile
|
var profile MiningProfile
|
||||||
if err := c.ShouldBindJSON(&profile); err != nil {
|
if err := c.ShouldBindJSON(&profile); err != nil {
|
||||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
respondWithError(c, http.StatusBadRequest, ErrCodeInvalidInput, "invalid profile data", err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
profile.ID = profileID
|
profile.ID = profileID
|
||||||
|
|
||||||
if err := s.ProfileManager.UpdateProfile(&profile); err != nil {
|
if err := s.ProfileManager.UpdateProfile(&profile); err != nil {
|
||||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update profile", "details": err.Error()})
|
// Check if error is "not found"
|
||||||
|
if strings.Contains(err.Error(), "not found") {
|
||||||
|
respondWithError(c, http.StatusNotFound, ErrCodeProfileNotFound, "profile not found", err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
respondWithError(c, http.StatusInternalServerError, ErrCodeInternal, "failed to update profile", err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
c.JSON(http.StatusOK, profile)
|
c.JSON(http.StatusOK, profile)
|
||||||
|
|
@ -887,7 +943,7 @@ func (s *Service) handleUpdateProfile(c *gin.Context) {
|
||||||
|
|
||||||
// handleDeleteProfile godoc
|
// handleDeleteProfile godoc
|
||||||
// @Summary Delete a mining profile
|
// @Summary Delete a mining profile
|
||||||
// @Description Delete a mining profile by its ID
|
// @Description Delete a mining profile by its ID. Idempotent - returns success even if profile doesn't exist.
|
||||||
// @Tags profiles
|
// @Tags profiles
|
||||||
// @Produce json
|
// @Produce json
|
||||||
// @Param id path string true "Profile ID"
|
// @Param id path string true "Profile ID"
|
||||||
|
|
@ -896,7 +952,12 @@ func (s *Service) handleUpdateProfile(c *gin.Context) {
|
||||||
func (s *Service) handleDeleteProfile(c *gin.Context) {
|
func (s *Service) handleDeleteProfile(c *gin.Context) {
|
||||||
profileID := c.Param("id")
|
profileID := c.Param("id")
|
||||||
if err := s.ProfileManager.DeleteProfile(profileID); err != nil {
|
if err := s.ProfileManager.DeleteProfile(profileID); err != nil {
|
||||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete profile", "details": err.Error()})
|
// Make DELETE idempotent - if profile doesn't exist, still return success
|
||||||
|
if strings.Contains(err.Error(), "not found") {
|
||||||
|
c.JSON(http.StatusOK, gin.H{"status": "profile deleted"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
respondWithError(c, http.StatusInternalServerError, ErrCodeInternal, "failed to delete profile", err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
c.JSON(http.StatusOK, gin.H{"status": "profile deleted"})
|
c.JSON(http.StatusOK, gin.H{"status": "profile deleted"})
|
||||||
|
|
@ -1030,7 +1091,20 @@ func (s *Service) handleWebSocketEvents(c *gin.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
logging.Info("new WebSocket connection", logging.Fields{"remote": c.Request.RemoteAddr})
|
logging.Info("new WebSocket connection", logging.Fields{"remote": c.Request.RemoteAddr})
|
||||||
|
RecordWSConnection(true)
|
||||||
if !s.EventHub.ServeWs(conn) {
|
if !s.EventHub.ServeWs(conn) {
|
||||||
|
RecordWSConnection(false) // Undo increment on rejection
|
||||||
logging.Warn("WebSocket connection rejected", logging.Fields{"remote": c.Request.RemoteAddr, "reason": "limit reached"})
|
logging.Warn("WebSocket connection rejected", logging.Fields{"remote": c.Request.RemoteAddr, "reason": "limit reached"})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// handleMetrics godoc
|
||||||
|
// @Summary Get internal metrics
|
||||||
|
// @Description Returns internal metrics for monitoring and debugging
|
||||||
|
// @Tags system
|
||||||
|
// @Produce json
|
||||||
|
// @Success 200 {object} map[string]interface{}
|
||||||
|
// @Router /metrics [get]
|
||||||
|
func (s *Service) handleMetrics(c *gin.Context) {
|
||||||
|
c.JSON(http.StatusOK, GetMetricsSnapshot())
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -30,10 +30,10 @@ type MockMiner struct {
|
||||||
WriteStdinFunc func(input string) error
|
WriteStdinFunc func(input string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MockMiner) Install() error { return m.InstallFunc() }
|
func (m *MockMiner) Install() error { return m.InstallFunc() }
|
||||||
func (m *MockMiner) Uninstall() error { return m.UninstallFunc() }
|
func (m *MockMiner) Uninstall() error { return m.UninstallFunc() }
|
||||||
func (m *MockMiner) Start(config *Config) error { return m.StartFunc(config) }
|
func (m *MockMiner) Start(config *Config) error { return m.StartFunc(config) }
|
||||||
func (m *MockMiner) Stop() error { return m.StopFunc() }
|
func (m *MockMiner) Stop() error { return m.StopFunc() }
|
||||||
func (m *MockMiner) GetStats(ctx context.Context) (*PerformanceMetrics, error) {
|
func (m *MockMiner) GetStats(ctx context.Context) (*PerformanceMetrics, error) {
|
||||||
return m.GetStatsFunc(ctx)
|
return m.GetStatsFunc(ctx)
|
||||||
}
|
}
|
||||||
|
|
@ -43,9 +43,9 @@ func (m *MockMiner) GetType() string {
|
||||||
}
|
}
|
||||||
return "mock"
|
return "mock"
|
||||||
}
|
}
|
||||||
func (m *MockMiner) GetName() string { return m.GetNameFunc() }
|
func (m *MockMiner) GetName() string { return m.GetNameFunc() }
|
||||||
func (m *MockMiner) GetPath() string { return m.GetPathFunc() }
|
func (m *MockMiner) GetPath() string { return m.GetPathFunc() }
|
||||||
func (m *MockMiner) GetBinaryPath() string { return m.GetBinaryPathFunc() }
|
func (m *MockMiner) GetBinaryPath() string { return m.GetBinaryPathFunc() }
|
||||||
func (m *MockMiner) CheckInstallation() (*InstallationDetails, error) {
|
func (m *MockMiner) CheckInstallation() (*InstallationDetails, error) {
|
||||||
return m.CheckInstallationFunc()
|
return m.CheckInstallationFunc()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -14,10 +14,10 @@ const settingsFileName = "settings.json"
|
||||||
|
|
||||||
// WindowState stores the last window position and size
|
// WindowState stores the last window position and size
|
||||||
type WindowState struct {
|
type WindowState struct {
|
||||||
X int `json:"x"`
|
X int `json:"x"`
|
||||||
Y int `json:"y"`
|
Y int `json:"y"`
|
||||||
Width int `json:"width"`
|
Width int `json:"width"`
|
||||||
Height int `json:"height"`
|
Height int `json:"height"`
|
||||||
Maximized bool `json:"maximized"`
|
Maximized bool `json:"maximized"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -15,31 +15,31 @@ const MinerTypeSimulated = "simulated"
|
||||||
// SimulatedMiner is a mock miner that generates realistic-looking stats for UI testing.
|
// SimulatedMiner is a mock miner that generates realistic-looking stats for UI testing.
|
||||||
type SimulatedMiner struct {
|
type SimulatedMiner struct {
|
||||||
// Exported fields for JSON serialization
|
// Exported fields for JSON serialization
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
MinerType string `json:"miner_type"`
|
MinerType string `json:"miner_type"`
|
||||||
Version string `json:"version"`
|
Version string `json:"version"`
|
||||||
URL string `json:"url"`
|
URL string `json:"url"`
|
||||||
Path string `json:"path"`
|
Path string `json:"path"`
|
||||||
MinerBinary string `json:"miner_binary"`
|
MinerBinary string `json:"miner_binary"`
|
||||||
Running bool `json:"running"`
|
Running bool `json:"running"`
|
||||||
Algorithm string `json:"algorithm"`
|
Algorithm string `json:"algorithm"`
|
||||||
HashrateHistory []HashratePoint `json:"hashrateHistory"`
|
HashrateHistory []HashratePoint `json:"hashrateHistory"`
|
||||||
LowResHistory []HashratePoint `json:"lowResHashrateHistory"`
|
LowResHistory []HashratePoint `json:"lowResHashrateHistory"`
|
||||||
Stats *PerformanceMetrics `json:"stats,omitempty"`
|
Stats *PerformanceMetrics `json:"stats,omitempty"`
|
||||||
FullStats *XMRigSummary `json:"full_stats,omitempty"` // XMRig-compatible format for UI
|
FullStats *XMRigSummary `json:"full_stats,omitempty"` // XMRig-compatible format for UI
|
||||||
|
|
||||||
// Internal fields (not exported)
|
// Internal fields (not exported)
|
||||||
baseHashrate int
|
baseHashrate int
|
||||||
peakHashrate int
|
peakHashrate int
|
||||||
variance float64
|
variance float64
|
||||||
startTime time.Time
|
startTime time.Time
|
||||||
shares int
|
shares int
|
||||||
rejected int
|
rejected int
|
||||||
logs []string
|
logs []string
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
stopChan chan struct{}
|
stopChan chan struct{}
|
||||||
poolName string
|
poolName string
|
||||||
difficultyBase int
|
difficultyBase int
|
||||||
}
|
}
|
||||||
|
|
||||||
// SimulatedMinerConfig holds configuration for creating a simulated miner.
|
// SimulatedMinerConfig holds configuration for creating a simulated miner.
|
||||||
|
|
|
||||||
|
|
@ -55,4 +55,3 @@ func FetchJSONStats[T any](ctx context.Context, config HTTPStatsConfig, target *
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -141,7 +141,7 @@ func TestCPUThrottleThreadCount(t *testing.T) {
|
||||||
defer manager.Stop()
|
defer manager.Stop()
|
||||||
|
|
||||||
numCPU := runtime.NumCPU()
|
numCPU := runtime.NumCPU()
|
||||||
targetThreads := 1 // Use only 1 thread
|
targetThreads := 1 // Use only 1 thread
|
||||||
expectedMaxCPU := float64(100) / float64(numCPU) * float64(targetThreads) * 1.5 // 50% tolerance
|
expectedMaxCPU := float64(100) / float64(numCPU) * float64(targetThreads) * 1.5 // 50% tolerance
|
||||||
|
|
||||||
config := &Config{
|
config := &Config{
|
||||||
|
|
|
||||||
|
|
@ -197,10 +197,10 @@ func isValidCLIArg(arg string) bool {
|
||||||
// Block arguments that could override security-related settings
|
// Block arguments that could override security-related settings
|
||||||
blockedPrefixes := []string{
|
blockedPrefixes := []string{
|
||||||
"--api-access-token", "--api-worker-id", // TT-Miner API settings
|
"--api-access-token", "--api-worker-id", // TT-Miner API settings
|
||||||
"--config", // Could load arbitrary config
|
"--config", // Could load arbitrary config
|
||||||
"--log-file", // Could write to arbitrary locations
|
"--log-file", // Could write to arbitrary locations
|
||||||
"--coin-file", // Could load arbitrary coin configs
|
"--coin-file", // Could load arbitrary coin configs
|
||||||
"-o", "--out", // Output redirection
|
"-o", "--out", // Output redirection
|
||||||
}
|
}
|
||||||
lowerArg := strings.ToLower(arg)
|
lowerArg := strings.ToLower(arg)
|
||||||
for _, blocked := range blockedPrefixes {
|
for _, blocked := range blockedPrefixes {
|
||||||
|
|
|
||||||
|
|
@ -43,9 +43,9 @@ func TestXMRigDualMiningConfig(t *testing.T) {
|
||||||
GPUPool: "stratum+tcp://ravencoin.pool.com:3333",
|
GPUPool: "stratum+tcp://ravencoin.pool.com:3333",
|
||||||
GPUWallet: "gpu_wallet_address",
|
GPUWallet: "gpu_wallet_address",
|
||||||
GPUAlgo: "kawpow",
|
GPUAlgo: "kawpow",
|
||||||
CUDA: true, // NVIDIA
|
CUDA: true, // NVIDIA
|
||||||
OpenCL: false,
|
OpenCL: false,
|
||||||
Devices: "0", // Explicit device selection required
|
Devices: "0", // Explicit device selection required
|
||||||
}
|
}
|
||||||
|
|
||||||
err := miner.createConfig(config)
|
err := miner.createConfig(config)
|
||||||
|
|
@ -146,7 +146,7 @@ func TestXMRigGPUOnlyConfig(t *testing.T) {
|
||||||
Pool: "stratum+tcp://pool.supportxmr.com:3333",
|
Pool: "stratum+tcp://pool.supportxmr.com:3333",
|
||||||
Wallet: "test_wallet",
|
Wallet: "test_wallet",
|
||||||
Algo: "rx/0",
|
Algo: "rx/0",
|
||||||
NoCPU: true, // Disable CPU
|
NoCPU: true, // Disable CPU
|
||||||
GPUEnabled: true,
|
GPUEnabled: true,
|
||||||
OpenCL: true, // AMD GPU
|
OpenCL: true, // AMD GPU
|
||||||
CUDA: true, // Also NVIDIA
|
CUDA: true, // Also NVIDIA
|
||||||
|
|
|
||||||
|
|
@ -99,21 +99,33 @@ func (m *XMRigMiner) Start(config *Config) error {
|
||||||
|
|
||||||
// Capture cmd locally to avoid race with Stop()
|
// Capture cmd locally to avoid race with Stop()
|
||||||
cmd := m.cmd
|
cmd := m.cmd
|
||||||
|
minerName := m.Name // Capture name for logging
|
||||||
go func() {
|
go func() {
|
||||||
// Use a channel to detect if Wait() completes
|
// Use a channel to detect if Wait() completes
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
|
var waitErr error
|
||||||
go func() {
|
go func() {
|
||||||
cmd.Wait()
|
waitErr = cmd.Wait()
|
||||||
close(done)
|
close(done)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Wait with timeout to prevent goroutine leak on zombie processes
|
// Wait with timeout to prevent goroutine leak on zombie processes
|
||||||
select {
|
select {
|
||||||
case <-done:
|
case <-done:
|
||||||
// Normal exit
|
// Normal exit - log the exit status
|
||||||
|
if waitErr != nil {
|
||||||
|
logging.Info("miner process exited", logging.Fields{
|
||||||
|
"miner": minerName,
|
||||||
|
"error": waitErr.Error(),
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
logging.Info("miner process exited normally", logging.Fields{
|
||||||
|
"miner": minerName,
|
||||||
|
})
|
||||||
|
}
|
||||||
case <-time.After(5 * time.Minute):
|
case <-time.After(5 * time.Minute):
|
||||||
// Process didn't exit after 5 minutes - force cleanup
|
// Process didn't exit after 5 minutes - force cleanup
|
||||||
logging.Warn("miner process wait timeout, forcing cleanup", logging.Fields{"miner": m.Name})
|
logging.Warn("miner process wait timeout, forcing cleanup", logging.Fields{"miner": minerName})
|
||||||
if cmd.Process != nil {
|
if cmd.Process != nil {
|
||||||
cmd.Process.Kill()
|
cmd.Process.Kill()
|
||||||
}
|
}
|
||||||
|
|
@ -122,7 +134,7 @@ func (m *XMRigMiner) Start(config *Config) error {
|
||||||
case <-done:
|
case <-done:
|
||||||
// Inner goroutine completed
|
// Inner goroutine completed
|
||||||
case <-time.After(10 * time.Second):
|
case <-time.After(10 * time.Second):
|
||||||
logging.Error("process cleanup timed out after kill", logging.Fields{"miner": m.Name})
|
logging.Error("process cleanup timed out after kill", logging.Fields{"miner": minerName})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -144,9 +144,9 @@ func TestExtractProfileBundle(t *testing.T) {
|
||||||
func TestTarballFunctions(t *testing.T) {
|
func TestTarballFunctions(t *testing.T) {
|
||||||
t.Run("CreateAndExtractTarball", func(t *testing.T) {
|
t.Run("CreateAndExtractTarball", func(t *testing.T) {
|
||||||
files := map[string][]byte{
|
files := map[string][]byte{
|
||||||
"file1.txt": []byte("content of file 1"),
|
"file1.txt": []byte("content of file 1"),
|
||||||
"dir/file2.json": []byte(`{"key":"value"}`),
|
"dir/file2.json": []byte(`{"key":"value"}`),
|
||||||
"miners/xmrig": []byte("binary content"),
|
"miners/xmrig": []byte("binary content"),
|
||||||
}
|
}
|
||||||
|
|
||||||
tarData, err := createTarball(files)
|
tarData, err := createTarball(files)
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,8 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/Snider/Mining/pkg/logging"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Controller manages remote peer operations from a controller node.
|
// Controller manages remote peer operations from a controller node.
|
||||||
|
|
@ -248,6 +250,11 @@ func (c *Controller) GetAllStats() map[string]*StatsPayload {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
stats, err := c.GetRemoteStats(p.ID)
|
stats, err := c.GetRemoteStats(p.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
logging.Debug("failed to get stats from peer", logging.Fields{
|
||||||
|
"peer_id": p.ID,
|
||||||
|
"peer": p.Name,
|
||||||
|
"error": err.Error(),
|
||||||
|
})
|
||||||
return // Skip failed peers
|
return // Skip failed peers
|
||||||
}
|
}
|
||||||
mu.Lock()
|
mu.Lock()
|
||||||
|
|
|
||||||
|
|
@ -12,11 +12,11 @@ type MessageType string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// Connection lifecycle
|
// Connection lifecycle
|
||||||
MsgHandshake MessageType = "handshake"
|
MsgHandshake MessageType = "handshake"
|
||||||
MsgHandshakeAck MessageType = "handshake_ack"
|
MsgHandshakeAck MessageType = "handshake_ack"
|
||||||
MsgPing MessageType = "ping"
|
MsgPing MessageType = "ping"
|
||||||
MsgPong MessageType = "pong"
|
MsgPong MessageType = "pong"
|
||||||
MsgDisconnect MessageType = "disconnect"
|
MsgDisconnect MessageType = "disconnect"
|
||||||
|
|
||||||
// Miner operations
|
// Miner operations
|
||||||
MsgGetStats MessageType = "get_stats"
|
MsgGetStats MessageType = "get_stats"
|
||||||
|
|
@ -39,10 +39,10 @@ const (
|
||||||
|
|
||||||
// Message represents a P2P message between nodes.
|
// Message represents a P2P message between nodes.
|
||||||
type Message struct {
|
type Message struct {
|
||||||
ID string `json:"id"` // UUID
|
ID string `json:"id"` // UUID
|
||||||
Type MessageType `json:"type"`
|
Type MessageType `json:"type"`
|
||||||
From string `json:"from"` // Sender node ID
|
From string `json:"from"` // Sender node ID
|
||||||
To string `json:"to"` // Recipient node ID (empty for broadcast)
|
To string `json:"to"` // Recipient node ID (empty for broadcast)
|
||||||
Timestamp time.Time `json:"ts"`
|
Timestamp time.Time `json:"ts"`
|
||||||
Payload json.RawMessage `json:"payload"`
|
Payload json.RawMessage `json:"payload"`
|
||||||
ReplyTo string `json:"replyTo,omitempty"` // ID of message being replied to
|
ReplyTo string `json:"replyTo,omitempty"` // ID of message being replied to
|
||||||
|
|
@ -98,10 +98,10 @@ type HandshakePayload struct {
|
||||||
|
|
||||||
// HandshakeAckPayload is the response to a handshake.
|
// HandshakeAckPayload is the response to a handshake.
|
||||||
type HandshakeAckPayload struct {
|
type HandshakeAckPayload struct {
|
||||||
Identity NodeIdentity `json:"identity"`
|
Identity NodeIdentity `json:"identity"`
|
||||||
ChallengeResponse []byte `json:"challengeResponse,omitempty"`
|
ChallengeResponse []byte `json:"challengeResponse,omitempty"`
|
||||||
Accepted bool `json:"accepted"`
|
Accepted bool `json:"accepted"`
|
||||||
Reason string `json:"reason,omitempty"` // If not accepted
|
Reason string `json:"reason,omitempty"` // If not accepted
|
||||||
}
|
}
|
||||||
|
|
||||||
// PingPayload for keepalive/latency measurement.
|
// PingPayload for keepalive/latency measurement.
|
||||||
|
|
@ -117,7 +117,7 @@ type PongPayload struct {
|
||||||
|
|
||||||
// StartMinerPayload requests starting a miner.
|
// StartMinerPayload requests starting a miner.
|
||||||
type StartMinerPayload struct {
|
type StartMinerPayload struct {
|
||||||
MinerType string `json:"minerType"` // Required: miner type (e.g., "xmrig", "tt-miner")
|
MinerType string `json:"minerType"` // Required: miner type (e.g., "xmrig", "tt-miner")
|
||||||
ProfileID string `json:"profileId,omitempty"`
|
ProfileID string `json:"profileId,omitempty"`
|
||||||
Config json.RawMessage `json:"config,omitempty"` // Override profile config
|
Config json.RawMessage `json:"config,omitempty"` // Override profile config
|
||||||
}
|
}
|
||||||
|
|
@ -158,7 +158,7 @@ type StatsPayload struct {
|
||||||
// GetLogsPayload requests console logs from a miner.
|
// GetLogsPayload requests console logs from a miner.
|
||||||
type GetLogsPayload struct {
|
type GetLogsPayload struct {
|
||||||
MinerName string `json:"minerName"`
|
MinerName string `json:"minerName"`
|
||||||
Lines int `json:"lines"` // Number of lines to fetch
|
Lines int `json:"lines"` // Number of lines to fetch
|
||||||
Since int64 `json:"since,omitempty"` // Unix timestamp, logs after this time
|
Since int64 `json:"since,omitempty"` // Unix timestamp, logs after this time
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/Snider/Borg/pkg/smsg"
|
"github.com/Snider/Borg/pkg/smsg"
|
||||||
|
|
@ -15,6 +16,12 @@ import (
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// debugLogCounter tracks message counts for rate limiting debug logs
|
||||||
|
var debugLogCounter atomic.Int64
|
||||||
|
|
||||||
|
// debugLogInterval controls how often we log debug messages in hot paths (1 in N)
|
||||||
|
const debugLogInterval = 100
|
||||||
|
|
||||||
// TransportConfig configures the WebSocket transport.
|
// TransportConfig configures the WebSocket transport.
|
||||||
type TransportConfig struct {
|
type TransportConfig struct {
|
||||||
ListenAddr string // ":9091" default
|
ListenAddr string // ":9091" default
|
||||||
|
|
@ -404,28 +411,28 @@ func (t *Transport) performHandshake(pc *PeerConnection) error {
|
||||||
|
|
||||||
msg, err := NewMessage(MsgHandshake, identity.ID, pc.Peer.ID, payload)
|
msg, err := NewMessage(MsgHandshake, identity.ID, pc.Peer.ID, payload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("create handshake message: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// First message is unencrypted (peer needs our public key)
|
// First message is unencrypted (peer needs our public key)
|
||||||
data, err := json.Marshal(msg)
|
data, err := json.Marshal(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("marshal handshake message: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := pc.Conn.WriteMessage(websocket.TextMessage, data); err != nil {
|
if err := pc.Conn.WriteMessage(websocket.TextMessage, data); err != nil {
|
||||||
return err
|
return fmt.Errorf("send handshake: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for ack
|
// Wait for ack
|
||||||
_, ackData, err := pc.Conn.ReadMessage()
|
_, ackData, err := pc.Conn.ReadMessage()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("read handshake ack: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var ackMsg Message
|
var ackMsg Message
|
||||||
if err := json.Unmarshal(ackData, &ackMsg); err != nil {
|
if err := json.Unmarshal(ackData, &ackMsg); err != nil {
|
||||||
return err
|
return fmt.Errorf("unmarshal handshake ack: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if ackMsg.Type != MsgHandshakeAck {
|
if ackMsg.Type != MsgHandshakeAck {
|
||||||
|
|
@ -434,7 +441,7 @@ func (t *Transport) performHandshake(pc *PeerConnection) error {
|
||||||
|
|
||||||
var ackPayload HandshakeAckPayload
|
var ackPayload HandshakeAckPayload
|
||||||
if err := ackMsg.ParsePayload(&ackPayload); err != nil {
|
if err := ackMsg.ParsePayload(&ackPayload); err != nil {
|
||||||
return err
|
return fmt.Errorf("parse handshake ack payload: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !ackPayload.Accepted {
|
if !ackPayload.Accepted {
|
||||||
|
|
@ -490,7 +497,10 @@ func (t *Transport) readLoop(pc *PeerConnection) {
|
||||||
continue // Skip invalid messages
|
continue // Skip invalid messages
|
||||||
}
|
}
|
||||||
|
|
||||||
logging.Debug("received message from peer", logging.Fields{"type": msg.Type, "peer_id": pc.Peer.ID, "reply_to": msg.ReplyTo})
|
// Rate limit debug logs in hot path to reduce noise (log 1 in N messages)
|
||||||
|
if debugLogCounter.Add(1)%debugLogInterval == 0 {
|
||||||
|
logging.Debug("received message from peer", logging.Fields{"type": msg.Type, "peer_id": pc.Peer.ID, "reply_to": msg.ReplyTo, "sample": "1/100"})
|
||||||
|
}
|
||||||
|
|
||||||
// Dispatch to handler (read handler under lock to avoid race)
|
// Dispatch to handler (read handler under lock to avoid race)
|
||||||
t.mu.RLock()
|
t.mu.RLock()
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue