worker.go imported path/filepath (banned import) solely for two filepath.Join calls building the miner install directory path. Replaced with path.Join from the stdlib path package which is not banned and behaves identically on Linux for absolute paths. Co-Authored-By: Charon <charon@lethean.io>
413 lines
11 KiB
Go
413 lines
11 KiB
Go
package node
|
|
|
|
import (
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"fmt"
|
|
"path"
|
|
"time"
|
|
|
|
"forge.lthn.ai/Snider/Mining/pkg/logging"
|
|
"github.com/adrg/xdg"
|
|
)
|
|
|
|
// MinerManager interface for the mining package integration.
|
|
// This allows the node package to interact with mining.Manager without import cycles.
|
|
type MinerManager interface {
|
|
StartMiner(minerType string, config interface{}) (MinerInstance, error)
|
|
StopMiner(name string) error
|
|
ListMiners() []MinerInstance
|
|
GetMiner(name string) (MinerInstance, error)
|
|
}
|
|
|
|
// miner := worker.minerManager.GetMiner("xmrig")
|
|
// name := miner.GetName() // "xmrig"
|
|
// stats, _ := miner.GetStats() // current hashrate, shares
|
|
// logs := miner.GetConsoleHistory(50)
|
|
type MinerInstance interface {
|
|
GetName() string
|
|
GetType() string
|
|
GetStats() (interface{}, error)
|
|
GetConsoleHistory(lines int) []string
|
|
}
|
|
|
|
// profile, err := worker.profileManager.GetProfile("pool-main")
|
|
// if err != nil { return nil, err }
|
|
// worker.profileManager.SaveProfile(profile)
|
|
type ProfileManager interface {
|
|
GetProfile(id string) (interface{}, error)
|
|
SaveProfile(profile interface{}) error
|
|
}
|
|
|
|
// worker := node.NewWorker(nodeManager, transport)
|
|
// worker.SetMinerManager(miningManager)
|
|
// worker.SetProfileManager(profileManager)
|
|
// worker.RegisterWithTransport() // begin processing incoming messages
|
|
type Worker struct {
|
|
node *NodeManager
|
|
transport *Transport
|
|
minerManager MinerManager
|
|
profileManager ProfileManager
|
|
startTime time.Time
|
|
}
|
|
|
|
// worker := node.NewWorker(nodeManager, transport)
|
|
// worker.SetMinerManager(miningManager)
|
|
// worker.RegisterWithTransport()
|
|
func NewWorker(node *NodeManager, transport *Transport) *Worker {
|
|
return &Worker{
|
|
node: node,
|
|
transport: transport,
|
|
startTime: time.Now(),
|
|
}
|
|
}
|
|
|
|
// worker.SetMinerManager(miningManager)
|
|
func (worker *Worker) SetMinerManager(manager MinerManager) {
|
|
worker.minerManager = manager
|
|
}
|
|
|
|
// worker.SetProfileManager(profileManager)
|
|
func (worker *Worker) SetProfileManager(manager ProfileManager) {
|
|
worker.profileManager = manager
|
|
}
|
|
|
|
// worker.HandleMessage(conn, msg)
|
|
// worker.RegisterWithTransport() // registers HandleMessage as the transport handler
|
|
func (worker *Worker) HandleMessage(conn *PeerConnection, msg *Message) {
|
|
var response *Message
|
|
var err error
|
|
|
|
switch msg.Type {
|
|
case MsgPing:
|
|
response, err = worker.handlePing(msg)
|
|
case MsgGetStats:
|
|
response, err = worker.handleGetStats(msg)
|
|
case MsgStartMiner:
|
|
response, err = worker.handleStartMiner(msg)
|
|
case MsgStopMiner:
|
|
response, err = worker.handleStopMiner(msg)
|
|
case MsgGetLogs:
|
|
response, err = worker.handleGetLogs(msg)
|
|
case MsgDeploy:
|
|
response, err = worker.handleDeploy(conn, msg)
|
|
default:
|
|
// Unknown message type - ignore or send error
|
|
return
|
|
}
|
|
|
|
if err != nil {
|
|
// Send error response
|
|
identity := worker.node.GetIdentity()
|
|
if identity != nil {
|
|
errorMessage, _ := NewErrorMessage(
|
|
identity.ID,
|
|
msg.From,
|
|
ErrCodeOperationFailed,
|
|
err.Error(),
|
|
msg.ID,
|
|
)
|
|
conn.Send(errorMessage)
|
|
}
|
|
return
|
|
}
|
|
|
|
if response != nil {
|
|
logging.Debug("sending response", logging.Fields{"type": response.Type, "to": msg.From})
|
|
if err := conn.Send(response); err != nil {
|
|
logging.Error("failed to send response", logging.Fields{"error": err})
|
|
} else {
|
|
logging.Debug("response sent successfully")
|
|
}
|
|
}
|
|
}
|
|
|
|
// resp, err := worker.handlePing(msg)
|
|
func (worker *Worker) handlePing(msg *Message) (*Message, error) {
|
|
var ping PingPayload
|
|
if err := msg.ParsePayload(&ping); err != nil {
|
|
return nil, fmt.Errorf("invalid ping payload: %w", err)
|
|
}
|
|
|
|
pong := PongPayload{
|
|
SentAt: ping.SentAt,
|
|
ReceivedAt: time.Now().UnixMilli(),
|
|
}
|
|
|
|
return msg.Reply(MsgPong, pong)
|
|
}
|
|
|
|
// resp, err := worker.handleGetStats(msg)
|
|
func (worker *Worker) handleGetStats(msg *Message) (*Message, error) {
|
|
identity := worker.node.GetIdentity()
|
|
if identity == nil {
|
|
return nil, fmt.Errorf("node identity not initialized")
|
|
}
|
|
|
|
stats := StatsPayload{
|
|
NodeID: identity.ID,
|
|
NodeName: identity.Name,
|
|
Miners: []MinerStatsItem{},
|
|
Uptime: int64(time.Since(worker.startTime).Seconds()),
|
|
}
|
|
|
|
if worker.minerManager != nil {
|
|
miners := worker.minerManager.ListMiners()
|
|
for _, miner := range miners {
|
|
minerStats, err := miner.GetStats()
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
// Convert to MinerStatsItem - this is a simplified conversion
|
|
// The actual implementation would need to match the mining package's stats structure
|
|
item := convertMinerStats(miner, minerStats)
|
|
stats.Miners = append(stats.Miners, item)
|
|
}
|
|
}
|
|
|
|
return msg.Reply(MsgStats, stats)
|
|
}
|
|
|
|
// item := convertMinerStats(miner, rawStats)
|
|
func convertMinerStats(miner MinerInstance, rawStats interface{}) MinerStatsItem {
|
|
item := MinerStatsItem{
|
|
Name: miner.GetName(),
|
|
Type: miner.GetType(),
|
|
}
|
|
|
|
// Try to extract common fields from the stats
|
|
if statsMap, ok := rawStats.(map[string]interface{}); ok {
|
|
if hashrate, ok := statsMap["hashrate"].(float64); ok {
|
|
item.Hashrate = hashrate
|
|
}
|
|
if shares, ok := statsMap["shares"].(int); ok {
|
|
item.Shares = shares
|
|
}
|
|
if rejected, ok := statsMap["rejected"].(int); ok {
|
|
item.Rejected = rejected
|
|
}
|
|
if uptime, ok := statsMap["uptime"].(int); ok {
|
|
item.Uptime = uptime
|
|
}
|
|
if pool, ok := statsMap["pool"].(string); ok {
|
|
item.Pool = pool
|
|
}
|
|
if algorithm, ok := statsMap["algorithm"].(string); ok {
|
|
item.Algorithm = algorithm
|
|
}
|
|
}
|
|
|
|
return item
|
|
}
|
|
|
|
// resp, err := worker.handleStartMiner(msg)
|
|
func (worker *Worker) handleStartMiner(msg *Message) (*Message, error) {
|
|
if worker.minerManager == nil {
|
|
return nil, fmt.Errorf("miner manager not configured")
|
|
}
|
|
|
|
var payload StartMinerPayload
|
|
if err := msg.ParsePayload(&payload); err != nil {
|
|
return nil, fmt.Errorf("invalid start miner payload: %w", err)
|
|
}
|
|
|
|
// Validate miner type is provided
|
|
if payload.MinerType == "" {
|
|
return nil, fmt.Errorf("miner type is required")
|
|
}
|
|
|
|
// Get the config from the profile or use the override
|
|
var config interface{}
|
|
if payload.Config != nil {
|
|
config = payload.Config
|
|
} else if worker.profileManager != nil {
|
|
profile, err := worker.profileManager.GetProfile(payload.ProfileID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("profile not found: %s", payload.ProfileID)
|
|
}
|
|
config = profile
|
|
} else {
|
|
return nil, fmt.Errorf("no config provided and no profile manager configured")
|
|
}
|
|
|
|
// Start the miner
|
|
miner, err := worker.minerManager.StartMiner(payload.MinerType, config)
|
|
if err != nil {
|
|
ack := MinerAckPayload{
|
|
Success: false,
|
|
Error: err.Error(),
|
|
}
|
|
return msg.Reply(MsgMinerAck, ack)
|
|
}
|
|
|
|
ack := MinerAckPayload{
|
|
Success: true,
|
|
MinerName: miner.GetName(),
|
|
}
|
|
return msg.Reply(MsgMinerAck, ack)
|
|
}
|
|
|
|
// resp, err := worker.handleStopMiner(msg)
|
|
func (worker *Worker) handleStopMiner(msg *Message) (*Message, error) {
|
|
if worker.minerManager == nil {
|
|
return nil, fmt.Errorf("miner manager not configured")
|
|
}
|
|
|
|
var payload StopMinerPayload
|
|
if err := msg.ParsePayload(&payload); err != nil {
|
|
return nil, fmt.Errorf("invalid stop miner payload: %w", err)
|
|
}
|
|
|
|
err := worker.minerManager.StopMiner(payload.MinerName)
|
|
ack := MinerAckPayload{
|
|
Success: err == nil,
|
|
MinerName: payload.MinerName,
|
|
}
|
|
if err != nil {
|
|
ack.Error = err.Error()
|
|
}
|
|
|
|
return msg.Reply(MsgMinerAck, ack)
|
|
}
|
|
|
|
// resp, err := worker.handleGetLogs(msg)
|
|
func (worker *Worker) handleGetLogs(msg *Message) (*Message, error) {
|
|
if worker.minerManager == nil {
|
|
return nil, fmt.Errorf("miner manager not configured")
|
|
}
|
|
|
|
var payload GetLogsPayload
|
|
if err := msg.ParsePayload(&payload); err != nil {
|
|
return nil, fmt.Errorf("invalid get logs payload: %w", err)
|
|
}
|
|
|
|
// Validate and limit the Lines parameter to prevent resource exhaustion
|
|
const maxLogLines = 10000
|
|
if payload.Lines <= 0 || payload.Lines > maxLogLines {
|
|
payload.Lines = maxLogLines
|
|
}
|
|
|
|
miner, err := worker.minerManager.GetMiner(payload.MinerName)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("miner not found: %s", payload.MinerName)
|
|
}
|
|
|
|
lines := miner.GetConsoleHistory(payload.Lines)
|
|
|
|
logs := LogsPayload{
|
|
MinerName: payload.MinerName,
|
|
Lines: lines,
|
|
HasMore: len(lines) >= payload.Lines,
|
|
}
|
|
|
|
return msg.Reply(MsgLogs, logs)
|
|
}
|
|
|
|
// resp, err := worker.handleDeploy(conn, msg)
|
|
func (worker *Worker) handleDeploy(conn *PeerConnection, msg *Message) (*Message, error) {
|
|
var payload DeployPayload
|
|
if err := msg.ParsePayload(&payload); err != nil {
|
|
return nil, fmt.Errorf("invalid deploy payload: %w", err)
|
|
}
|
|
|
|
// Reconstruct Bundle object from payload
|
|
bundle := &Bundle{
|
|
Type: BundleType(payload.BundleType),
|
|
Name: payload.Name,
|
|
Data: payload.Data,
|
|
Checksum: payload.Checksum,
|
|
}
|
|
|
|
// Use shared secret as password (base64 encoded)
|
|
password := ""
|
|
if conn != nil && len(conn.SharedSecret) > 0 {
|
|
password = base64.StdEncoding.EncodeToString(conn.SharedSecret)
|
|
}
|
|
|
|
switch bundle.Type {
|
|
case BundleProfile:
|
|
if worker.profileManager == nil {
|
|
return nil, fmt.Errorf("profile manager not configured")
|
|
}
|
|
|
|
// Decrypt and extract profile data
|
|
profileData, err := ExtractProfileBundle(bundle, password)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to extract profile bundle: %w", err)
|
|
}
|
|
|
|
// Unmarshal into interface{} to pass to ProfileManager
|
|
var profile interface{}
|
|
if err := json.Unmarshal(profileData, &profile); err != nil {
|
|
return nil, fmt.Errorf("invalid profile data JSON: %w", err)
|
|
}
|
|
|
|
if err := worker.profileManager.SaveProfile(profile); err != nil {
|
|
ack := DeployAckPayload{
|
|
Success: false,
|
|
Name: payload.Name,
|
|
Error: err.Error(),
|
|
}
|
|
return msg.Reply(MsgDeployAck, ack)
|
|
}
|
|
|
|
ack := DeployAckPayload{
|
|
Success: true,
|
|
Name: payload.Name,
|
|
}
|
|
return msg.Reply(MsgDeployAck, ack)
|
|
|
|
case BundleMiner, BundleFull:
|
|
// Determine installation directory
|
|
// We use xdg.DataHome/lethean-desktop/miners/<bundle_name>
|
|
minersDir := path.Join(xdg.DataHome, "lethean-desktop", "miners")
|
|
installDir := path.Join(minersDir, payload.Name)
|
|
|
|
logging.Info("deploying miner bundle", logging.Fields{
|
|
"name": payload.Name,
|
|
"path": installDir,
|
|
"type": payload.BundleType,
|
|
})
|
|
|
|
// Extract miner bundle
|
|
minerPath, profileData, err := ExtractMinerBundle(bundle, password, installDir)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to extract miner bundle: %w", err)
|
|
}
|
|
|
|
// If the bundle contained a profile config, save it
|
|
if len(profileData) > 0 && worker.profileManager != nil {
|
|
var profile interface{}
|
|
if err := json.Unmarshal(profileData, &profile); err != nil {
|
|
logging.Warn("failed to parse profile from miner bundle", logging.Fields{"error": err})
|
|
} else {
|
|
if err := worker.profileManager.SaveProfile(profile); err != nil {
|
|
logging.Warn("failed to save profile from miner bundle", logging.Fields{"error": err})
|
|
}
|
|
}
|
|
}
|
|
|
|
// Success response
|
|
ack := DeployAckPayload{
|
|
Success: true,
|
|
Name: payload.Name,
|
|
}
|
|
|
|
// Log the installation
|
|
logging.Info("miner bundle installed successfully", logging.Fields{
|
|
"name": payload.Name,
|
|
"miner_path": minerPath,
|
|
})
|
|
|
|
return msg.Reply(MsgDeployAck, ack)
|
|
|
|
default:
|
|
return nil, fmt.Errorf("unknown bundle type: %s", payload.BundleType)
|
|
}
|
|
}
|
|
|
|
// worker.RegisterWithTransport() // call once after setup to start handling messages
|
|
func (worker *Worker) RegisterWithTransport() {
|
|
worker.transport.OnMessage(worker.HandleMessage)
|
|
}
|