Mining/pkg/node/worker.go
Claude fb998ce2df
ax(node): replace banned path/filepath with path in worker.go
worker.go imported path/filepath (banned import) solely for two
filepath.Join calls building the miner install directory path.
Replaced with path.Join from the stdlib path package which is
not banned and behaves identically on Linux for absolute paths.

Co-Authored-By: Charon <charon@lethean.io>
2026-04-02 14:03:13 +01:00

413 lines
11 KiB
Go

package node
import (
"encoding/base64"
"encoding/json"
"fmt"
"path"
"time"
"forge.lthn.ai/Snider/Mining/pkg/logging"
"github.com/adrg/xdg"
)
// MinerManager interface for the mining package integration.
// This allows the node package to interact with mining.Manager without import cycles.
type MinerManager interface {
StartMiner(minerType string, config interface{}) (MinerInstance, error)
StopMiner(name string) error
ListMiners() []MinerInstance
GetMiner(name string) (MinerInstance, error)
}
// miner := worker.minerManager.GetMiner("xmrig")
// name := miner.GetName() // "xmrig"
// stats, _ := miner.GetStats() // current hashrate, shares
// logs := miner.GetConsoleHistory(50)
type MinerInstance interface {
GetName() string
GetType() string
GetStats() (interface{}, error)
GetConsoleHistory(lines int) []string
}
// profile, err := worker.profileManager.GetProfile("pool-main")
// if err != nil { return nil, err }
// worker.profileManager.SaveProfile(profile)
type ProfileManager interface {
GetProfile(id string) (interface{}, error)
SaveProfile(profile interface{}) error
}
// worker := node.NewWorker(nodeManager, transport)
// worker.SetMinerManager(miningManager)
// worker.SetProfileManager(profileManager)
// worker.RegisterWithTransport() // begin processing incoming messages
type Worker struct {
node *NodeManager
transport *Transport
minerManager MinerManager
profileManager ProfileManager
startTime time.Time
}
// worker := node.NewWorker(nodeManager, transport)
// worker.SetMinerManager(miningManager)
// worker.RegisterWithTransport()
func NewWorker(node *NodeManager, transport *Transport) *Worker {
return &Worker{
node: node,
transport: transport,
startTime: time.Now(),
}
}
// worker.SetMinerManager(miningManager)
func (worker *Worker) SetMinerManager(manager MinerManager) {
worker.minerManager = manager
}
// worker.SetProfileManager(profileManager)
func (worker *Worker) SetProfileManager(manager ProfileManager) {
worker.profileManager = manager
}
// worker.HandleMessage(conn, msg)
// worker.RegisterWithTransport() // registers HandleMessage as the transport handler
func (worker *Worker) HandleMessage(conn *PeerConnection, msg *Message) {
var response *Message
var err error
switch msg.Type {
case MsgPing:
response, err = worker.handlePing(msg)
case MsgGetStats:
response, err = worker.handleGetStats(msg)
case MsgStartMiner:
response, err = worker.handleStartMiner(msg)
case MsgStopMiner:
response, err = worker.handleStopMiner(msg)
case MsgGetLogs:
response, err = worker.handleGetLogs(msg)
case MsgDeploy:
response, err = worker.handleDeploy(conn, msg)
default:
// Unknown message type - ignore or send error
return
}
if err != nil {
// Send error response
identity := worker.node.GetIdentity()
if identity != nil {
errorMessage, _ := NewErrorMessage(
identity.ID,
msg.From,
ErrCodeOperationFailed,
err.Error(),
msg.ID,
)
conn.Send(errorMessage)
}
return
}
if response != nil {
logging.Debug("sending response", logging.Fields{"type": response.Type, "to": msg.From})
if err := conn.Send(response); err != nil {
logging.Error("failed to send response", logging.Fields{"error": err})
} else {
logging.Debug("response sent successfully")
}
}
}
// resp, err := worker.handlePing(msg)
func (worker *Worker) handlePing(msg *Message) (*Message, error) {
var ping PingPayload
if err := msg.ParsePayload(&ping); err != nil {
return nil, fmt.Errorf("invalid ping payload: %w", err)
}
pong := PongPayload{
SentAt: ping.SentAt,
ReceivedAt: time.Now().UnixMilli(),
}
return msg.Reply(MsgPong, pong)
}
// resp, err := worker.handleGetStats(msg)
func (worker *Worker) handleGetStats(msg *Message) (*Message, error) {
identity := worker.node.GetIdentity()
if identity == nil {
return nil, fmt.Errorf("node identity not initialized")
}
stats := StatsPayload{
NodeID: identity.ID,
NodeName: identity.Name,
Miners: []MinerStatsItem{},
Uptime: int64(time.Since(worker.startTime).Seconds()),
}
if worker.minerManager != nil {
miners := worker.minerManager.ListMiners()
for _, miner := range miners {
minerStats, err := miner.GetStats()
if err != nil {
continue
}
// Convert to MinerStatsItem - this is a simplified conversion
// The actual implementation would need to match the mining package's stats structure
item := convertMinerStats(miner, minerStats)
stats.Miners = append(stats.Miners, item)
}
}
return msg.Reply(MsgStats, stats)
}
// item := convertMinerStats(miner, rawStats)
func convertMinerStats(miner MinerInstance, rawStats interface{}) MinerStatsItem {
item := MinerStatsItem{
Name: miner.GetName(),
Type: miner.GetType(),
}
// Try to extract common fields from the stats
if statsMap, ok := rawStats.(map[string]interface{}); ok {
if hashrate, ok := statsMap["hashrate"].(float64); ok {
item.Hashrate = hashrate
}
if shares, ok := statsMap["shares"].(int); ok {
item.Shares = shares
}
if rejected, ok := statsMap["rejected"].(int); ok {
item.Rejected = rejected
}
if uptime, ok := statsMap["uptime"].(int); ok {
item.Uptime = uptime
}
if pool, ok := statsMap["pool"].(string); ok {
item.Pool = pool
}
if algorithm, ok := statsMap["algorithm"].(string); ok {
item.Algorithm = algorithm
}
}
return item
}
// resp, err := worker.handleStartMiner(msg)
func (worker *Worker) handleStartMiner(msg *Message) (*Message, error) {
if worker.minerManager == nil {
return nil, fmt.Errorf("miner manager not configured")
}
var payload StartMinerPayload
if err := msg.ParsePayload(&payload); err != nil {
return nil, fmt.Errorf("invalid start miner payload: %w", err)
}
// Validate miner type is provided
if payload.MinerType == "" {
return nil, fmt.Errorf("miner type is required")
}
// Get the config from the profile or use the override
var config interface{}
if payload.Config != nil {
config = payload.Config
} else if worker.profileManager != nil {
profile, err := worker.profileManager.GetProfile(payload.ProfileID)
if err != nil {
return nil, fmt.Errorf("profile not found: %s", payload.ProfileID)
}
config = profile
} else {
return nil, fmt.Errorf("no config provided and no profile manager configured")
}
// Start the miner
miner, err := worker.minerManager.StartMiner(payload.MinerType, config)
if err != nil {
ack := MinerAckPayload{
Success: false,
Error: err.Error(),
}
return msg.Reply(MsgMinerAck, ack)
}
ack := MinerAckPayload{
Success: true,
MinerName: miner.GetName(),
}
return msg.Reply(MsgMinerAck, ack)
}
// resp, err := worker.handleStopMiner(msg)
func (worker *Worker) handleStopMiner(msg *Message) (*Message, error) {
if worker.minerManager == nil {
return nil, fmt.Errorf("miner manager not configured")
}
var payload StopMinerPayload
if err := msg.ParsePayload(&payload); err != nil {
return nil, fmt.Errorf("invalid stop miner payload: %w", err)
}
err := worker.minerManager.StopMiner(payload.MinerName)
ack := MinerAckPayload{
Success: err == nil,
MinerName: payload.MinerName,
}
if err != nil {
ack.Error = err.Error()
}
return msg.Reply(MsgMinerAck, ack)
}
// resp, err := worker.handleGetLogs(msg)
func (worker *Worker) handleGetLogs(msg *Message) (*Message, error) {
if worker.minerManager == nil {
return nil, fmt.Errorf("miner manager not configured")
}
var payload GetLogsPayload
if err := msg.ParsePayload(&payload); err != nil {
return nil, fmt.Errorf("invalid get logs payload: %w", err)
}
// Validate and limit the Lines parameter to prevent resource exhaustion
const maxLogLines = 10000
if payload.Lines <= 0 || payload.Lines > maxLogLines {
payload.Lines = maxLogLines
}
miner, err := worker.minerManager.GetMiner(payload.MinerName)
if err != nil {
return nil, fmt.Errorf("miner not found: %s", payload.MinerName)
}
lines := miner.GetConsoleHistory(payload.Lines)
logs := LogsPayload{
MinerName: payload.MinerName,
Lines: lines,
HasMore: len(lines) >= payload.Lines,
}
return msg.Reply(MsgLogs, logs)
}
// resp, err := worker.handleDeploy(conn, msg)
func (worker *Worker) handleDeploy(conn *PeerConnection, msg *Message) (*Message, error) {
var payload DeployPayload
if err := msg.ParsePayload(&payload); err != nil {
return nil, fmt.Errorf("invalid deploy payload: %w", err)
}
// Reconstruct Bundle object from payload
bundle := &Bundle{
Type: BundleType(payload.BundleType),
Name: payload.Name,
Data: payload.Data,
Checksum: payload.Checksum,
}
// Use shared secret as password (base64 encoded)
password := ""
if conn != nil && len(conn.SharedSecret) > 0 {
password = base64.StdEncoding.EncodeToString(conn.SharedSecret)
}
switch bundle.Type {
case BundleProfile:
if worker.profileManager == nil {
return nil, fmt.Errorf("profile manager not configured")
}
// Decrypt and extract profile data
profileData, err := ExtractProfileBundle(bundle, password)
if err != nil {
return nil, fmt.Errorf("failed to extract profile bundle: %w", err)
}
// Unmarshal into interface{} to pass to ProfileManager
var profile interface{}
if err := json.Unmarshal(profileData, &profile); err != nil {
return nil, fmt.Errorf("invalid profile data JSON: %w", err)
}
if err := worker.profileManager.SaveProfile(profile); err != nil {
ack := DeployAckPayload{
Success: false,
Name: payload.Name,
Error: err.Error(),
}
return msg.Reply(MsgDeployAck, ack)
}
ack := DeployAckPayload{
Success: true,
Name: payload.Name,
}
return msg.Reply(MsgDeployAck, ack)
case BundleMiner, BundleFull:
// Determine installation directory
// We use xdg.DataHome/lethean-desktop/miners/<bundle_name>
minersDir := path.Join(xdg.DataHome, "lethean-desktop", "miners")
installDir := path.Join(minersDir, payload.Name)
logging.Info("deploying miner bundle", logging.Fields{
"name": payload.Name,
"path": installDir,
"type": payload.BundleType,
})
// Extract miner bundle
minerPath, profileData, err := ExtractMinerBundle(bundle, password, installDir)
if err != nil {
return nil, fmt.Errorf("failed to extract miner bundle: %w", err)
}
// If the bundle contained a profile config, save it
if len(profileData) > 0 && worker.profileManager != nil {
var profile interface{}
if err := json.Unmarshal(profileData, &profile); err != nil {
logging.Warn("failed to parse profile from miner bundle", logging.Fields{"error": err})
} else {
if err := worker.profileManager.SaveProfile(profile); err != nil {
logging.Warn("failed to save profile from miner bundle", logging.Fields{"error": err})
}
}
}
// Success response
ack := DeployAckPayload{
Success: true,
Name: payload.Name,
}
// Log the installation
logging.Info("miner bundle installed successfully", logging.Fields{
"name": payload.Name,
"miner_path": minerPath,
})
return msg.Reply(MsgDeployAck, ack)
default:
return nil, fmt.Errorf("unknown bundle type: %s", payload.BundleType)
}
}
// worker.RegisterWithTransport() // call once after setup to start handling messages
func (worker *Worker) RegisterWithTransport() {
worker.transport.OnMessage(worker.HandleMessage)
}