package node import ( "encoding/base64" "time" core "dappco.re/go/core" "dappco.re/go/core/p2p/logging" "github.com/adrg/xdg" ) // var minerManager MinerManager type MinerManager interface { StartMiner(minerType string, config any) (MinerInstance, error) StopMiner(name string) error ListMiners() []MinerInstance GetMiner(name string) (MinerInstance, error) } // var miner MinerInstance type MinerInstance interface { GetName() string GetType() string GetStats() (any, error) GetConsoleHistory(lines int) []string } // var profileManager ProfileManager type ProfileManager interface { GetProfile(id string) (any, error) SaveProfile(profile any) error } // worker := NewWorker(nodeManager, transport) type Worker struct { nodeManager *NodeManager transport *Transport minerManager MinerManager profileManager ProfileManager startedAt time.Time DeploymentDirectory string // worker.DeploymentDirectory = "/srv/p2p/deployments" } // worker := NewWorker(nodeManager, transport) func NewWorker(nodeManager *NodeManager, transport *Transport) *Worker { return &Worker{ nodeManager: nodeManager, transport: transport, startedAt: time.Now(), DeploymentDirectory: xdg.DataHome, } } // worker.SetMinerManager(minerManager) func (w *Worker) SetMinerManager(manager MinerManager) { w.minerManager = manager } // worker.SetProfileManager(profileManager) func (w *Worker) SetProfileManager(manager ProfileManager) { w.profileManager = manager } // worker.HandleMessage(peerConnection, message) func (w *Worker) HandleMessage(peerConnection *PeerConnection, message *Message) { var response *Message var err error switch message.Type { case MessagePing: response, err = w.handlePing(message) case MessageGetStats: response, err = w.handleStats(message) case MessageStartMiner: response, err = w.handleStartMiner(message) case MessageStopMiner: response, err = w.handleStopMiner(message) case MessageGetLogs: response, err = w.handleLogs(message) case MessageDeploy: response, err = w.handleDeploy(peerConnection, message) default: // Unknown message type - ignore or send error return } if err != nil { // Send error response identity := w.nodeManager.GetIdentity() if identity != nil { errMsg, _ := NewErrorMessage( identity.ID, message.From, ErrorCodeOperationFailed, err.Error(), message.ID, ) peerConnection.Send(errMsg) } return } if response != nil { logging.Debug("sending response", logging.Fields{"type": response.Type, "to": message.From}) if err := peerConnection.Send(response); err != nil { logging.Error("failed to send response", logging.Fields{"error": err}) } else { logging.Debug("response sent successfully") } } } // handlePing responds to ping requests. func (w *Worker) handlePing(message *Message) (*Message, error) { var ping PingPayload if err := message.ParsePayload(&ping); err != nil { return nil, core.E("Worker.handlePing", "invalid ping payload", err) } pong := PongPayload{ SentAt: ping.SentAt, ReceivedAt: time.Now().UnixMilli(), } return message.Reply(MessagePong, pong) } // handleStats responds with current miner statistics. func (w *Worker) handleStats(message *Message) (*Message, error) { identity := w.nodeManager.GetIdentity() if identity == nil { return nil, ErrorIdentityNotInitialized } stats := StatsPayload{ NodeID: identity.ID, NodeName: identity.Name, Miners: []MinerStatsItem{}, Uptime: int64(time.Since(w.startedAt).Seconds()), } if w.minerManager != nil { miners := w.minerManager.ListMiners() for _, miner := range miners { minerStats, err := miner.GetStats() if err != nil { continue } // Convert to MinerStatsItem - this is a simplified conversion // The actual implementation would need to match the mining package's stats structure item := convertMinerStats(miner, minerStats) stats.Miners = append(stats.Miners, item) } } return message.Reply(MessageStats, stats) } // convertMinerStats converts a running miner's raw stats map to the wire protocol format. func convertMinerStats(miner MinerInstance, rawStats any) MinerStatsItem { item := MinerStatsItem{ Name: miner.GetName(), Type: miner.GetType(), } if statsMap, ok := rawStats.(map[string]any); ok { if hashrate, ok := statsMap["hashrate"].(float64); ok { item.Hashrate = hashrate } if shares, ok := statsMap["shares"].(int); ok { item.Shares = shares } if rejected, ok := statsMap["rejected"].(int); ok { item.Rejected = rejected } if uptime, ok := statsMap["uptime"].(int); ok { item.Uptime = uptime } if pool, ok := statsMap["pool"].(string); ok { item.Pool = pool } if algorithm, ok := statsMap["algorithm"].(string); ok { item.Algorithm = algorithm } } return item } // handleStartMiner starts a miner with the given profile. func (w *Worker) handleStartMiner(message *Message) (*Message, error) { if w.minerManager == nil { return nil, ErrorMinerManagerNotConfigured } var payload StartMinerPayload if err := message.ParsePayload(&payload); err != nil { return nil, core.E("Worker.handleStartMiner", "invalid start miner payload", err) } // Validate miner type is provided if payload.MinerType == "" { return nil, core.E("Worker.handleStartMiner", "miner type is required", nil) } // Get the config from the profile or use the override var config any if payload.Config != nil { config = payload.Config } else if w.profileManager != nil { profile, err := w.profileManager.GetProfile(payload.ProfileID) if err != nil { return nil, core.E("Worker.handleStartMiner", "profile not found: "+payload.ProfileID, nil) } config = profile } else { return nil, core.E("Worker.handleStartMiner", "no config provided and no profile manager configured", nil) } // Start the miner miner, err := w.minerManager.StartMiner(payload.MinerType, config) if err != nil { ack := MinerAckPayload{ Success: false, Error: err.Error(), } return message.Reply(MessageMinerAck, ack) } ack := MinerAckPayload{ Success: true, MinerName: miner.GetName(), } return message.Reply(MessageMinerAck, ack) } // handleStopMiner stops a running miner. func (w *Worker) handleStopMiner(message *Message) (*Message, error) { if w.minerManager == nil { return nil, ErrorMinerManagerNotConfigured } var payload StopMinerPayload if err := message.ParsePayload(&payload); err != nil { return nil, core.E("Worker.handleStopMiner", "invalid stop miner payload", err) } err := w.minerManager.StopMiner(payload.MinerName) ack := MinerAckPayload{ Success: err == nil, MinerName: payload.MinerName, } if err != nil { ack.Error = err.Error() } return message.Reply(MessageMinerAck, ack) } // handleLogs returns console logs from a miner. func (w *Worker) handleLogs(message *Message) (*Message, error) { if w.minerManager == nil { return nil, ErrorMinerManagerNotConfigured } var payload LogsRequestPayload if err := message.ParsePayload(&payload); err != nil { return nil, core.E("Worker.handleLogs", "invalid logs payload", err) } // Validate and limit the Lines parameter to prevent resource exhaustion const maxLogLines = 10000 if payload.Lines <= 0 || payload.Lines > maxLogLines { payload.Lines = maxLogLines } miner, err := w.minerManager.GetMiner(payload.MinerName) if err != nil { return nil, core.E("Worker.handleLogs", "miner not found: "+payload.MinerName, nil) } lines := miner.GetConsoleHistory(payload.Lines) logs := LogsPayload{ MinerName: payload.MinerName, Lines: lines, HasMore: len(lines) >= payload.Lines, } return message.Reply(MessageLogs, logs) } // handleDeploy handles deployment of profiles or miner bundles. func (w *Worker) handleDeploy(peerConnection *PeerConnection, message *Message) (*Message, error) { var payload DeployPayload if err := message.ParsePayload(&payload); err != nil { return nil, core.E("Worker.handleDeploy", "invalid deploy payload", err) } // Reconstruct Bundle object from payload bundle := &Bundle{ Type: BundleType(payload.BundleType), Name: payload.Name, Data: payload.Data, Checksum: payload.Checksum, } // Use shared secret as password (base64 encoded) password := "" if peerConnection != nil && len(peerConnection.SharedSecret) > 0 { password = base64.StdEncoding.EncodeToString(peerConnection.SharedSecret) } switch bundle.Type { case BundleProfile: if w.profileManager == nil { return nil, core.E("Worker.handleDeploy", "profile manager not configured", nil) } // Decrypt and extract profile data profileData, err := ExtractProfileBundle(bundle, password) if err != nil { return nil, core.E("Worker.handleDeploy", "failed to extract profile bundle", err) } // Unmarshal into interface{} to pass to ProfileManager var profile any if result := core.JSONUnmarshal(profileData, &profile); !result.OK { return nil, core.E("Worker.handleDeploy", "invalid profile data JSON", result.Value.(error)) } if err := w.profileManager.SaveProfile(profile); err != nil { ack := DeployAckPayload{ Success: false, Name: payload.Name, Error: err.Error(), } return message.Reply(MessageDeployAck, ack) } ack := DeployAckPayload{ Success: true, Name: payload.Name, } return message.Reply(MessageDeployAck, ack) case BundleMiner, BundleFull: // Determine the installation directory under the configured deployment // root for lethean-desktop/miners/. minersDir := core.JoinPath(w.deploymentDirectory(), "lethean-desktop", "miners") installDir := core.JoinPath(minersDir, payload.Name) logging.Info("deploying miner bundle", logging.Fields{ "name": payload.Name, "path": installDir, "type": payload.BundleType, }) // Extract miner bundle minerPath, profileData, err := ExtractMinerBundle(bundle, password, installDir) if err != nil { return nil, core.E("Worker.handleDeploy", "failed to extract miner bundle", err) } // If the bundle contained a profile config, save it if len(profileData) > 0 && w.profileManager != nil { var profile any if result := core.JSONUnmarshal(profileData, &profile); !result.OK { logging.Warn("failed to parse profile from miner bundle", logging.Fields{"error": result.Value.(error)}) } else { if err := w.profileManager.SaveProfile(profile); err != nil { logging.Warn("failed to save profile from miner bundle", logging.Fields{"error": err}) } } } // Success response ack := DeployAckPayload{ Success: true, Name: payload.Name, } // Log the installation logging.Info("miner bundle installed successfully", logging.Fields{ "name": payload.Name, "miner_path": minerPath, }) return message.Reply(MessageDeployAck, ack) default: return nil, core.E("Worker.handleDeploy", "unknown bundle type: "+payload.BundleType, nil) } } // worker.RegisterOnTransport() func (w *Worker) RegisterOnTransport() { w.transport.OnMessage(w.HandleMessage) } // deploymentDirectory resolves the active deployment directory. func (w *Worker) deploymentDirectory() string { if w.DeploymentDirectory != "" { return w.DeploymentDirectory } return xdg.DataHome }