package node import ( "context" "sync" "time" core "dappco.re/go/core" "dappco.re/go/core/p2p/logging" ) // Controller drives remote peer operations from a controller node. // // controller := NewController(nodeManager, peerRegistry, transport) type Controller struct { nodeManager *NodeManager peerRegistry *PeerRegistry transport *Transport mutex sync.RWMutex // Pending requests awaiting responses. pendingRequests map[string]chan *Message // message ID -> response channel } // NewController wires a controller to a node manager, peer registry, and transport. // // controller := NewController(nodeManager, peerRegistry, transport) func NewController(nodeManager *NodeManager, peerRegistry *PeerRegistry, transport *Transport) *Controller { c := &Controller{ nodeManager: nodeManager, peerRegistry: peerRegistry, transport: transport, pendingRequests: make(map[string]chan *Message), } // Register message handler for responses transport.OnMessage(c.handleResponse) return c } // handleResponse processes incoming replies and routes them to the waiting request. func (c *Controller) handleResponse(_ *PeerConnection, message *Message) { if message.ReplyTo == "" { return // Not a response, let worker handle it } c.mutex.Lock() responseChannel, hasPendingRequest := c.pendingRequests[message.ReplyTo] if hasPendingRequest { delete(c.pendingRequests, message.ReplyTo) } c.mutex.Unlock() if hasPendingRequest && responseChannel != nil { select { case responseChannel <- message: default: // Late duplicate response; drop it. } } } // sendRequest registers a temporary response channel, sends message, and waits // for the matching reply or timeout. // // The response channel is intentionally never closed. Removing it from the // pending map is enough to stop future routing, and it avoids a late-response // close/send race after the caller has already timed out. func (c *Controller) sendRequest(peerID string, message *Message, timeout time.Duration) (*Message, error) { resolvedPeerID := peerID // Auto-connect if not already connected if c.transport.GetConnection(peerID) == nil { peer := c.peerRegistry.GetPeer(peerID) if peer == nil { return nil, core.E("Controller.sendRequest", "peer not found: "+peerID, nil) } conn, err := c.transport.Connect(peer) if err != nil { return nil, core.E("Controller.sendRequest", "failed to connect to peer", err) } // Use the real peer ID after handshake (it may have changed) resolvedPeerID = conn.Peer.ID // Update the message destination message.To = resolvedPeerID } // Create response channel responseChannel := make(chan *Message, 1) c.mutex.Lock() c.pendingRequests[message.ID] = responseChannel c.mutex.Unlock() // Clean up on exit. Deleting the pending entry is enough because // handleResponse only routes through the map. defer func() { c.mutex.Lock() delete(c.pendingRequests, message.ID) c.mutex.Unlock() }() // Send the message if err := c.transport.Send(resolvedPeerID, message); err != nil { return nil, core.E("Controller.sendRequest", "failed to send message", err) } // Wait for response ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() select { case response := <-responseChannel: return response, nil case <-ctx.Done(): return nil, core.E("Controller.sendRequest", "request timeout", nil) } } // GetRemoteStats requests miner statistics from a remote peer. // // stats, err := controller.GetRemoteStats("worker-1") func (c *Controller) GetRemoteStats(peerID string) (*StatsPayload, error) { identity := c.nodeManager.GetIdentity() if identity == nil { return nil, ErrorIdentityNotInitialized } requestMessage, err := NewMessage(MessageGetStats, identity.ID, peerID, nil) if err != nil { return nil, core.E("Controller.GetRemoteStats", "failed to create message", err) } response, err := c.sendRequest(peerID, requestMessage, 10*time.Second) if err != nil { return nil, err } var stats StatsPayload if err := ParseResponse(response, MessageStats, &stats); err != nil { return nil, err } return &stats, nil } // StartRemoteMiner requests a remote peer to start a miner with a given profile. // // err := controller.StartRemoteMiner("worker-1", "xmrig", "profile-1", nil) func (c *Controller) StartRemoteMiner(peerID, minerType, profileID string, configOverride RawMessage) error { identity := c.nodeManager.GetIdentity() if identity == nil { return ErrorIdentityNotInitialized } if minerType == "" { return core.E("Controller.StartRemoteMiner", "miner type is required", nil) } payload := StartMinerPayload{ MinerType: minerType, ProfileID: profileID, Config: configOverride, } requestMessage, err := NewMessage(MessageStartMiner, identity.ID, peerID, payload) if err != nil { return core.E("Controller.StartRemoteMiner", "failed to create message", err) } response, err := c.sendRequest(peerID, requestMessage, 30*time.Second) if err != nil { return err } var ack MinerAckPayload if err := ParseResponse(response, MessageMinerAck, &ack); err != nil { return err } if !ack.Success { return core.E("Controller.StartRemoteMiner", "miner start failed: "+ack.Error, nil) } return nil } // StopRemoteMiner requests a remote peer to stop a miner. // // err := controller.StopRemoteMiner("worker-1", "xmrig-0") func (c *Controller) StopRemoteMiner(peerID, minerName string) error { identity := c.nodeManager.GetIdentity() if identity == nil { return ErrorIdentityNotInitialized } payload := StopMinerPayload{ MinerName: minerName, } requestMessage, err := NewMessage(MessageStopMiner, identity.ID, peerID, payload) if err != nil { return core.E("Controller.StopRemoteMiner", "failed to create message", err) } response, err := c.sendRequest(peerID, requestMessage, 30*time.Second) if err != nil { return err } var ack MinerAckPayload if err := ParseResponse(response, MessageMinerAck, &ack); err != nil { return err } if !ack.Success { return core.E("Controller.StopRemoteMiner", "miner stop failed: "+ack.Error, nil) } return nil } // GetRemoteLogs requests console logs from a remote miner. // // logs, err := controller.GetRemoteLogs("worker-1", "xmrig-0", 100) func (c *Controller) GetRemoteLogs(peerID, minerName string, lines int) ([]string, error) { identity := c.nodeManager.GetIdentity() if identity == nil { return nil, ErrorIdentityNotInitialized } payload := LogsRequestPayload{ MinerName: minerName, Lines: lines, } requestMessage, err := NewMessage(MessageGetLogs, identity.ID, peerID, payload) if err != nil { return nil, core.E("Controller.GetRemoteLogs", "failed to create message", err) } response, err := c.sendRequest(peerID, requestMessage, 10*time.Second) if err != nil { return nil, err } var logs LogsPayload if err := ParseResponse(response, MessageLogs, &logs); err != nil { return nil, err } return logs.Lines, nil } // GetAllStats fetches stats from all connected peers. // // statsByPeerID := controller.GetAllStats() func (c *Controller) GetAllStats() map[string]*StatsPayload { results := make(map[string]*StatsPayload) var mu sync.Mutex var wg sync.WaitGroup for peer := range c.peerRegistry.ConnectedPeers() { wg.Add(1) go func(p *Peer) { defer wg.Done() stats, err := c.GetRemoteStats(p.ID) if err != nil { logging.Debug("failed to get stats from peer", logging.Fields{ "peer_id": p.ID, "peer": p.Name, "error": err.Error(), }) return // Skip failed peers } mu.Lock() results[p.ID] = stats mu.Unlock() }(peer) } wg.Wait() return results } // PingPeer sends a ping to a peer and refreshes that peer's metrics. // // rttMilliseconds, err := controller.PingPeer("worker-1") func (c *Controller) PingPeer(peerID string) (float64, error) { identity := c.nodeManager.GetIdentity() if identity == nil { return 0, ErrorIdentityNotInitialized } sentAt := time.Now() payload := PingPayload{ SentAt: sentAt.UnixMilli(), } requestMessage, err := NewMessage(MessagePing, identity.ID, peerID, payload) if err != nil { return 0, core.E("Controller.PingPeer", "failed to create message", err) } response, err := c.sendRequest(peerID, requestMessage, 5*time.Second) if err != nil { return 0, err } if err := ValidateResponse(response, MessagePong); err != nil { return 0, err } // Calculate round-trip time in milliseconds. rtt := time.Since(sentAt).Seconds() * 1000 // Update peer metrics peer := c.peerRegistry.GetPeer(peerID) if peer != nil { c.peerRegistry.UpdateMetrics(peerID, rtt, peer.GeographicKilometres, peer.Hops) } return rtt, nil } // ConnectToPeer opens a transport connection to a peer. // // err := controller.ConnectToPeer("worker-1") func (c *Controller) ConnectToPeer(peerID string) error { peer := c.peerRegistry.GetPeer(peerID) if peer == nil { return core.E("Controller.ConnectToPeer", "peer not found: "+peerID, nil) } _, err := c.transport.Connect(peer) return err } // DisconnectFromPeer closes the active connection to a peer. // // err := controller.DisconnectFromPeer("worker-1") func (c *Controller) DisconnectFromPeer(peerID string) error { conn := c.transport.GetConnection(peerID) if conn == nil { return core.E("Controller.DisconnectFromPeer", "peer not connected: "+peerID, nil) } return conn.Close() }