366 lines
9.1 KiB
Go
366 lines
9.1 KiB
Go
package node
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
core "dappco.re/go/core"
|
|
|
|
"dappco.re/go/core/p2p/logging"
|
|
)
|
|
|
|
// Controller drives remote peer operations from a controller node.
|
|
//
|
|
// controller := NewController(nodeManager, peerRegistry, transport)
|
|
type Controller struct {
|
|
node *NodeManager
|
|
peers *PeerRegistry
|
|
transport *Transport
|
|
mu sync.RWMutex
|
|
|
|
// Pending requests awaiting responses
|
|
pending map[string]chan *Message // message ID -> response channel
|
|
}
|
|
|
|
// NewController wires a controller to a node manager, peer registry, and transport.
|
|
//
|
|
// controller := NewController(nodeManager, peerRegistry, transport)
|
|
func NewController(node *NodeManager, peers *PeerRegistry, transport *Transport) *Controller {
|
|
c := &Controller{
|
|
node: node,
|
|
peers: peers,
|
|
transport: transport,
|
|
pending: make(map[string]chan *Message),
|
|
}
|
|
|
|
// Register message handler for responses
|
|
transport.OnMessage(c.handleResponse)
|
|
|
|
return c
|
|
}
|
|
|
|
// handleResponse processes incoming replies and routes them to the waiting request.
|
|
func (c *Controller) handleResponse(_ *PeerConnection, msg *Message) {
|
|
if msg.ReplyTo == "" {
|
|
return // Not a response, let worker handle it
|
|
}
|
|
|
|
c.mu.Lock()
|
|
responseCh, isPending := c.pending[msg.ReplyTo]
|
|
if isPending {
|
|
delete(c.pending, msg.ReplyTo)
|
|
}
|
|
c.mu.Unlock()
|
|
|
|
if isPending && responseCh != nil {
|
|
select {
|
|
case responseCh <- msg:
|
|
default:
|
|
// Late duplicate response; drop it.
|
|
}
|
|
}
|
|
}
|
|
|
|
// sendRequest registers a temporary response channel, sends msg, and waits
|
|
// for the matching reply or timeout.
|
|
//
|
|
// The response channel is intentionally never closed. Removing it from the
|
|
// pending map is enough to stop future routing, and it avoids a late-response
|
|
// close/send race after the caller has already timed out.
|
|
func (c *Controller) sendRequest(peerID string, msg *Message, timeout time.Duration) (*Message, error) {
|
|
resolvedPeerID := peerID
|
|
|
|
// Auto-connect if not already connected
|
|
if c.transport.Connection(peerID) == nil {
|
|
peer := c.peers.Peer(peerID)
|
|
if peer == nil {
|
|
return nil, core.E("Controller.sendRequest", "peer not found: "+peerID, nil)
|
|
}
|
|
conn, err := c.transport.Connect(peer)
|
|
if err != nil {
|
|
return nil, core.E("Controller.sendRequest", "failed to connect to peer", err)
|
|
}
|
|
// Use the real peer ID after handshake (it may have changed)
|
|
resolvedPeerID = conn.Peer.ID
|
|
// Update the message destination
|
|
msg.To = resolvedPeerID
|
|
}
|
|
|
|
// Create response channel
|
|
responseCh := make(chan *Message, 1)
|
|
|
|
c.mu.Lock()
|
|
c.pending[msg.ID] = responseCh
|
|
c.mu.Unlock()
|
|
|
|
// Clean up on exit. Deleting the pending entry is enough because
|
|
// handleResponse only routes through the map.
|
|
defer func() {
|
|
c.mu.Lock()
|
|
delete(c.pending, msg.ID)
|
|
c.mu.Unlock()
|
|
}()
|
|
|
|
// Send the message
|
|
if err := c.transport.Send(resolvedPeerID, msg); err != nil {
|
|
return nil, core.E("Controller.sendRequest", "failed to send message", err)
|
|
}
|
|
|
|
// Wait for response
|
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|
defer cancel()
|
|
|
|
select {
|
|
case resp := <-responseCh:
|
|
return resp, nil
|
|
case <-ctx.Done():
|
|
return nil, core.E("Controller.sendRequest", "request timeout", nil)
|
|
}
|
|
}
|
|
|
|
// RemoteStats requests miner statistics from a remote peer.
|
|
//
|
|
// stats, err := controller.RemoteStats("worker-1")
|
|
func (c *Controller) RemoteStats(peerID string) (*StatsPayload, error) {
|
|
identity := c.node.Identity()
|
|
if identity == nil {
|
|
return nil, ErrorIdentityNotInitialized
|
|
}
|
|
|
|
msg, err := NewMessage(MessageGetStats, identity.ID, peerID, nil)
|
|
if err != nil {
|
|
return nil, core.E("Controller.RemoteStats", "failed to create message", err)
|
|
}
|
|
|
|
resp, err := c.sendRequest(peerID, msg, 10*time.Second)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var stats StatsPayload
|
|
if err := ParseResponse(resp, MessageStats, &stats); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &stats, nil
|
|
}
|
|
|
|
// Deprecated: use RemoteStats.
|
|
func (c *Controller) GetRemoteStats(peerID string) (*StatsPayload, error) {
|
|
return c.RemoteStats(peerID)
|
|
}
|
|
|
|
// StartRemoteMiner requests a remote peer to start a miner with a given profile.
|
|
//
|
|
// err := controller.StartRemoteMiner("worker-1", "xmrig", "profile-1", nil)
|
|
func (c *Controller) StartRemoteMiner(peerID, minerType, profileID string, configOverride RawMessage) error {
|
|
identity := c.node.Identity()
|
|
if identity == nil {
|
|
return ErrorIdentityNotInitialized
|
|
}
|
|
|
|
if minerType == "" {
|
|
return core.E("Controller.StartRemoteMiner", "miner type is required", nil)
|
|
}
|
|
|
|
payload := StartMinerPayload{
|
|
MinerType: minerType,
|
|
ProfileID: profileID,
|
|
Config: configOverride,
|
|
}
|
|
|
|
msg, err := NewMessage(MessageStartMiner, identity.ID, peerID, payload)
|
|
if err != nil {
|
|
return core.E("Controller.StartRemoteMiner", "failed to create message", err)
|
|
}
|
|
|
|
resp, err := c.sendRequest(peerID, msg, 30*time.Second)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var ack MinerAckPayload
|
|
if err := ParseResponse(resp, MessageMinerAck, &ack); err != nil {
|
|
return err
|
|
}
|
|
|
|
if !ack.Success {
|
|
return core.E("Controller.StartRemoteMiner", "miner start failed: "+ack.Error, nil)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// StopRemoteMiner requests a remote peer to stop a miner.
|
|
//
|
|
// err := controller.StopRemoteMiner("worker-1", "xmrig-0")
|
|
func (c *Controller) StopRemoteMiner(peerID, minerName string) error {
|
|
identity := c.node.Identity()
|
|
if identity == nil {
|
|
return ErrorIdentityNotInitialized
|
|
}
|
|
|
|
payload := StopMinerPayload{
|
|
MinerName: minerName,
|
|
}
|
|
|
|
msg, err := NewMessage(MessageStopMiner, identity.ID, peerID, payload)
|
|
if err != nil {
|
|
return core.E("Controller.StopRemoteMiner", "failed to create message", err)
|
|
}
|
|
|
|
resp, err := c.sendRequest(peerID, msg, 30*time.Second)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var ack MinerAckPayload
|
|
if err := ParseResponse(resp, MessageMinerAck, &ack); err != nil {
|
|
return err
|
|
}
|
|
|
|
if !ack.Success {
|
|
return core.E("Controller.StopRemoteMiner", "miner stop failed: "+ack.Error, nil)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// RemoteLogs requests console logs from a remote miner.
|
|
//
|
|
// logs, err := controller.RemoteLogs("worker-1", "xmrig-0", 100)
|
|
func (c *Controller) RemoteLogs(peerID, minerName string, lines int) ([]string, error) {
|
|
identity := c.node.Identity()
|
|
if identity == nil {
|
|
return nil, ErrorIdentityNotInitialized
|
|
}
|
|
|
|
payload := LogsRequestPayload{
|
|
MinerName: minerName,
|
|
Lines: lines,
|
|
}
|
|
|
|
msg, err := NewMessage(MessageGetLogs, identity.ID, peerID, payload)
|
|
if err != nil {
|
|
return nil, core.E("Controller.RemoteLogs", "failed to create message", err)
|
|
}
|
|
|
|
resp, err := c.sendRequest(peerID, msg, 10*time.Second)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var logs LogsPayload
|
|
if err := ParseResponse(resp, MessageLogs, &logs); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return logs.Lines, nil
|
|
}
|
|
|
|
// Deprecated: use RemoteLogs.
|
|
func (c *Controller) GetRemoteLogs(peerID, minerName string, lines int) ([]string, error) {
|
|
return c.RemoteLogs(peerID, minerName, lines)
|
|
}
|
|
|
|
// AllStats fetches stats from all connected peers.
|
|
//
|
|
// statsByPeerID := controller.AllStats()
|
|
func (c *Controller) AllStats() map[string]*StatsPayload {
|
|
results := make(map[string]*StatsPayload)
|
|
var mu sync.Mutex
|
|
var wg sync.WaitGroup
|
|
|
|
for peer := range c.peers.ConnectedPeers() {
|
|
wg.Add(1)
|
|
go func(p *Peer) {
|
|
defer wg.Done()
|
|
stats, err := c.RemoteStats(p.ID)
|
|
if err != nil {
|
|
logging.Debug("failed to get stats from peer", logging.Fields{
|
|
"peer_id": p.ID,
|
|
"peer": p.Name,
|
|
"error": err.Error(),
|
|
})
|
|
return // Skip failed peers
|
|
}
|
|
mu.Lock()
|
|
results[p.ID] = stats
|
|
mu.Unlock()
|
|
}(peer)
|
|
}
|
|
|
|
wg.Wait()
|
|
return results
|
|
}
|
|
|
|
// Deprecated: use AllStats.
|
|
func (c *Controller) GetAllStats() map[string]*StatsPayload {
|
|
return c.AllStats()
|
|
}
|
|
|
|
// PingPeer sends a ping to a peer and refreshes that peer's metrics.
|
|
//
|
|
// rttMS, err := controller.PingPeer("worker-1")
|
|
func (c *Controller) PingPeer(peerID string) (float64, error) {
|
|
identity := c.node.Identity()
|
|
if identity == nil {
|
|
return 0, ErrorIdentityNotInitialized
|
|
}
|
|
sentAt := time.Now()
|
|
|
|
payload := PingPayload{
|
|
SentAt: sentAt.UnixMilli(),
|
|
}
|
|
|
|
msg, err := NewMessage(MessagePing, identity.ID, peerID, payload)
|
|
if err != nil {
|
|
return 0, core.E("Controller.PingPeer", "failed to create message", err)
|
|
}
|
|
|
|
resp, err := c.sendRequest(peerID, msg, 5*time.Second)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
if err := ValidateResponse(resp, MessagePong); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
// Calculate round-trip time
|
|
rtt := time.Since(sentAt).Seconds() * 1000 // Convert to ms
|
|
|
|
// Update peer metrics
|
|
peer := c.peers.Peer(peerID)
|
|
if peer != nil {
|
|
c.peers.UpdateMetrics(peerID, rtt, peer.GeoKM, peer.Hops)
|
|
}
|
|
|
|
return rtt, nil
|
|
}
|
|
|
|
// ConnectToPeer opens a transport connection to a peer.
|
|
//
|
|
// err := controller.ConnectToPeer("worker-1")
|
|
func (c *Controller) ConnectToPeer(peerID string) error {
|
|
peer := c.peers.Peer(peerID)
|
|
if peer == nil {
|
|
return core.E("Controller.ConnectToPeer", "peer not found: "+peerID, nil)
|
|
}
|
|
|
|
_, err := c.transport.Connect(peer)
|
|
return err
|
|
}
|
|
|
|
// DisconnectFromPeer closes the active connection to a peer.
|
|
//
|
|
// err := controller.DisconnectFromPeer("worker-1")
|
|
func (c *Controller) DisconnectFromPeer(peerID string) error {
|
|
conn := c.transport.Connection(peerID)
|
|
if conn == nil {
|
|
return core.E("Controller.DisconnectFromPeer", "peer not connected: "+peerID, nil)
|
|
}
|
|
|
|
return conn.Close()
|
|
}
|