package node import ( "context" "encoding/json" "sync" "time" "forge.lthn.ai/Snider/Mining/pkg/logging" ) // controller := node.NewController(nodeManager, peerRegistry, transport) // rtt, _ := controller.PingPeer("abc123def456") // stats, _ := controller.GetRemoteStats("abc123def456") type Controller struct { nodeManager *NodeManager peers *PeerRegistry transport *Transport mutex sync.RWMutex pendingRequests map[string]chan *Message } // controller := node.NewController(nodeManager, peerRegistry, transport) // rtt, err := controller.PingPeer("abc123def456") func NewController(nodeManager *NodeManager, peers *PeerRegistry, transport *Transport) *Controller { controller := &Controller{ nodeManager: nodeManager, peers: peers, transport: transport, pendingRequests: make(map[string]chan *Message), } // transport.OnMessage(controller.handleResponse) // registered in NewController; routes replies to pending request channels transport.OnMessage(controller.handleResponse) return controller } // transport.OnMessage(controller.handleResponse) // registered in NewController func (controller *Controller) handleResponse(conn *PeerConnection, msg *Message) { if msg.ReplyTo == "" { return // Not a response, let worker handle it } controller.mutex.Lock() responseChannel, exists := controller.pendingRequests[msg.ReplyTo] if exists { delete(controller.pendingRequests, msg.ReplyTo) } controller.mutex.Unlock() if exists && responseChannel != nil { select { case responseChannel <- msg: default: // Channel full or closed } } } // response, err := controller.sendRequest("abc123def456", msg, 10*time.Second) // if err != nil { return nil, err } func (controller *Controller) sendRequest(peerID string, msg *Message, timeout time.Duration) (*Message, error) { actualPeerID := peerID // Auto-connect if not already connected if controller.transport.GetConnection(peerID) == nil { peer := controller.peers.GetPeer(peerID) if peer == nil { return nil, &ProtocolError{Code: ErrCodeNotFound, Message: "peer not found: " + peerID} } conn, err := controller.transport.Connect(peer) if err != nil { return nil, &ProtocolError{Code: ErrCodeOperationFailed, Message: "failed to connect to peer: " + err.Error()} } // Use the real peer ID after handshake (it may have changed) actualPeerID = conn.Peer.ID // Update the message destination msg.To = actualPeerID } // Create response channel responseChannel := make(chan *Message, 1) controller.mutex.Lock() controller.pendingRequests[msg.ID] = responseChannel controller.mutex.Unlock() // Clean up on exit - ensure channel is closed and removed from map defer func() { controller.mutex.Lock() delete(controller.pendingRequests, msg.ID) controller.mutex.Unlock() close(responseChannel) // Close channel to allow garbage collection }() // Send the message if err := controller.transport.Send(actualPeerID, msg); err != nil { return nil, &ProtocolError{Code: ErrCodeOperationFailed, Message: "failed to send message: " + err.Error()} } // Wait for response requestContext, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() select { case response := <-responseChannel: return response, nil case <-requestContext.Done(): return nil, &ProtocolError{Code: ErrCodeTimeout, Message: "request timeout"} } } // stats, err := controller.GetRemoteStats("abc123def456") // if err == nil { log("miners:", len(stats.Miners)) } func (controller *Controller) GetRemoteStats(peerID string) (*StatsPayload, error) { identity := controller.nodeManager.GetIdentity() if identity == nil { return nil, &ProtocolError{Code: ErrCodeUnauthorized, Message: "node identity not initialized"} } message, err := NewMessage(MsgGetStats, identity.ID, peerID, nil) if err != nil { return nil, err } response, err := controller.sendRequest(peerID, message, 10*time.Second) if err != nil { return nil, err } var stats StatsPayload if err := ParseResponse(response, MsgStats, &stats); err != nil { return nil, err } return &stats, nil } // err := controller.StartRemoteMiner("abc123def456", "xmrig", "profile-1", nil) // err := controller.StartRemoteMiner("abc123def456", "xmrig", "", json.RawMessage(`{"pool":"stratum+tcp://pool.lthn.io:3333"}`)) func (controller *Controller) StartRemoteMiner(peerID, minerType, profileID string, configOverride json.RawMessage) error { identity := controller.nodeManager.GetIdentity() if identity == nil { return &ProtocolError{Code: ErrCodeUnauthorized, Message: "node identity not initialized"} } if minerType == "" { return &ProtocolError{Code: ErrCodeInvalidMessage, Message: "miner type is required"} } payload := StartMinerPayload{ MinerType: minerType, ProfileID: profileID, Config: configOverride, } message, err := NewMessage(MsgStartMiner, identity.ID, peerID, payload) if err != nil { return err } response, err := controller.sendRequest(peerID, message, 30*time.Second) if err != nil { return err } var acknowledgement MinerAckPayload if err := ParseResponse(response, MsgMinerAck, &acknowledgement); err != nil { return err } if !acknowledgement.Success { return &ProtocolError{Code: ErrCodeOperationFailed, Message: "miner start failed: " + acknowledgement.Error} } return nil } // err := controller.StopRemoteMiner("abc123def456", "xmrig") // if err != nil { log("stop failed:", err) } func (controller *Controller) StopRemoteMiner(peerID, minerName string) error { identity := controller.nodeManager.GetIdentity() if identity == nil { return &ProtocolError{Code: ErrCodeUnauthorized, Message: "node identity not initialized"} } payload := StopMinerPayload{ MinerName: minerName, } message, err := NewMessage(MsgStopMiner, identity.ID, peerID, payload) if err != nil { return err } response, err := controller.sendRequest(peerID, message, 30*time.Second) if err != nil { return err } var acknowledgement MinerAckPayload if err := ParseResponse(response, MsgMinerAck, &acknowledgement); err != nil { return err } if !acknowledgement.Success { return &ProtocolError{Code: ErrCodeOperationFailed, Message: "miner stop failed: " + acknowledgement.Error} } return nil } // lines, err := controller.GetRemoteLogs("abc123def456", "xmrig", 100) // if err == nil { log("last log:", lines[len(lines)-1]) } func (controller *Controller) GetRemoteLogs(peerID, minerName string, lines int) ([]string, error) { identity := controller.nodeManager.GetIdentity() if identity == nil { return nil, &ProtocolError{Code: ErrCodeUnauthorized, Message: "node identity not initialized"} } payload := GetLogsPayload{ MinerName: minerName, Lines: lines, } message, err := NewMessage(MsgGetLogs, identity.ID, peerID, payload) if err != nil { return nil, err } response, err := controller.sendRequest(peerID, message, 10*time.Second) if err != nil { return nil, err } var logs LogsPayload if err := ParseResponse(response, MsgLogs, &logs); err != nil { return nil, err } return logs.Lines, nil } // allStats := controller.GetAllStats() // for peerID, stats := range allStats { log(peerID, stats.Miners) } func (controller *Controller) GetAllStats() map[string]*StatsPayload { peers := controller.peers.GetConnectedPeers() results := make(map[string]*StatsPayload) var resultsMutex sync.Mutex var waitGroup sync.WaitGroup for _, peer := range peers { waitGroup.Add(1) go func(connectedPeer *Peer) { defer waitGroup.Done() stats, err := controller.GetRemoteStats(connectedPeer.ID) if err != nil { logging.Debug("failed to get stats from peer", logging.Fields{ "peer_id": connectedPeer.ID, "peer": connectedPeer.Name, "error": err.Error(), }) return // Skip failed peers } resultsMutex.Lock() results[connectedPeer.ID] = stats resultsMutex.Unlock() }(peer) } waitGroup.Wait() return results } // rtt, err := controller.PingPeer("abc123def456") // if err == nil { log("latency:", rtt, "ms") } func (controller *Controller) PingPeer(peerID string) (float64, error) { identity := controller.nodeManager.GetIdentity() if identity == nil { return 0, &ProtocolError{Code: ErrCodeUnauthorized, Message: "node identity not initialized"} } sentAt := time.Now() payload := PingPayload{ SentAt: sentAt.UnixMilli(), } message, err := NewMessage(MsgPing, identity.ID, peerID, payload) if err != nil { return 0, err } response, err := controller.sendRequest(peerID, message, 5*time.Second) if err != nil { return 0, err } if err := ValidateResponse(response, MsgPong); err != nil { return 0, err } roundTripTime := time.Since(sentAt).Seconds() * 1000 // milliseconds // Update peer metrics peer := controller.peers.GetPeer(peerID) if peer != nil { controller.peers.UpdateMetrics(peerID, roundTripTime, peer.GeoKM, peer.Hops) } return roundTripTime, nil } // if err := controller.ConnectToPeer("abc123def456"); err != nil { log(err) } func (controller *Controller) ConnectToPeer(peerID string) error { peer := controller.peers.GetPeer(peerID) if peer == nil { return &ProtocolError{Code: ErrCodeNotFound, Message: "peer not found: " + peerID} } _, err := controller.transport.Connect(peer) return err } // if err := controller.DisconnectFromPeer("abc123def456"); err != nil { log(err) } func (controller *Controller) DisconnectFromPeer(peerID string) error { conn := controller.transport.GetConnection(peerID) if conn == nil { return &ProtocolError{Code: ErrCodeNotFound, Message: "peer not connected: " + peerID} } return conn.Close() }