go-p2p/node/controller.go
Virgil 9c5f3d0bd3 refactor(node): trim redundant AX comments
Co-Authored-By: Virgil <virgil@lethean.io>
2026-03-31 15:39:28 +01:00

319 lines
8 KiB
Go

package node
import (
"context"
"sync"
"time"
core "dappco.re/go/core"
"dappco.re/go/core/p2p/logging"
)
// controller := NewController(nodeManager, peerRegistry, transport)
type Controller struct {
nodeManager *NodeManager
peerRegistry *PeerRegistry
transport *Transport
mutex sync.RWMutex
// Pending requests awaiting responses.
pendingRequests map[string]chan *Message // message ID -> response channel
}
// controller := NewController(nodeManager, peerRegistry, transport)
func NewController(nodeManager *NodeManager, peerRegistry *PeerRegistry, transport *Transport) *Controller {
c := &Controller{
nodeManager: nodeManager,
peerRegistry: peerRegistry,
transport: transport,
pendingRequests: make(map[string]chan *Message),
}
// Register message handler for responses
transport.OnMessage(c.handleResponse)
return c
}
func (c *Controller) handleResponse(_ *PeerConnection, message *Message) {
if message.ReplyTo == "" {
return // Not a response, let worker handle it
}
c.mutex.Lock()
responseChannel, hasPendingRequest := c.pendingRequests[message.ReplyTo]
if hasPendingRequest {
delete(c.pendingRequests, message.ReplyTo)
}
c.mutex.Unlock()
if hasPendingRequest && responseChannel != nil {
select {
case responseChannel <- message:
default:
// Late duplicate response; drop it.
}
}
}
func (c *Controller) sendRequest(peerID string, message *Message, timeout time.Duration) (*Message, error) {
resolvedPeerID := peerID
// Auto-connect if not already connected
if c.transport.GetConnection(peerID) == nil {
peer := c.peerRegistry.GetPeer(peerID)
if peer == nil {
return nil, core.E("Controller.sendRequest", "peer not found: "+peerID, nil)
}
conn, err := c.transport.Connect(peer)
if err != nil {
return nil, core.E("Controller.sendRequest", "failed to connect to peer", err)
}
resolvedPeerID = conn.Peer.ID
message.To = resolvedPeerID
}
responseChannel := make(chan *Message, 1)
c.mutex.Lock()
c.pendingRequests[message.ID] = responseChannel
c.mutex.Unlock()
// Clean up on exit. Deleting the pending entry is enough because
// handleResponse only routes through the map.
defer func() {
c.mutex.Lock()
delete(c.pendingRequests, message.ID)
c.mutex.Unlock()
}()
if err := c.transport.Send(resolvedPeerID, message); err != nil {
return nil, core.E("Controller.sendRequest", "failed to send message", err)
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
select {
case response := <-responseChannel:
return response, nil
case <-ctx.Done():
return nil, core.E("Controller.sendRequest", "request timeout", nil)
}
}
// stats, err := controller.GetRemoteStats("worker-1")
func (c *Controller) GetRemoteStats(peerID string) (*StatsPayload, error) {
identity := c.nodeManager.GetIdentity()
if identity == nil {
return nil, ErrorIdentityNotInitialized
}
requestMessage, err := NewMessage(MessageGetStats, identity.ID, peerID, nil)
if err != nil {
return nil, core.E("Controller.GetRemoteStats", "failed to create message", err)
}
response, err := c.sendRequest(peerID, requestMessage, 10*time.Second)
if err != nil {
return nil, err
}
var stats StatsPayload
if err := ParseResponse(response, MessageStats, &stats); err != nil {
return nil, err
}
return &stats, nil
}
// err := controller.StartRemoteMiner("worker-1", "xmrig", "profile-1", nil)
func (c *Controller) StartRemoteMiner(peerID, minerType, profileID string, configOverride RawMessage) error {
identity := c.nodeManager.GetIdentity()
if identity == nil {
return ErrorIdentityNotInitialized
}
if minerType == "" {
return core.E("Controller.StartRemoteMiner", "miner type is required", nil)
}
payload := StartMinerPayload{
MinerType: minerType,
ProfileID: profileID,
Config: configOverride,
}
requestMessage, err := NewMessage(MessageStartMiner, identity.ID, peerID, payload)
if err != nil {
return core.E("Controller.StartRemoteMiner", "failed to create message", err)
}
response, err := c.sendRequest(peerID, requestMessage, 30*time.Second)
if err != nil {
return err
}
var ack MinerAckPayload
if err := ParseResponse(response, MessageMinerAck, &ack); err != nil {
return err
}
if !ack.Success {
return core.E("Controller.StartRemoteMiner", "miner start failed: "+ack.Error, nil)
}
return nil
}
// err := controller.StopRemoteMiner("worker-1", "xmrig-0")
func (c *Controller) StopRemoteMiner(peerID, minerName string) error {
identity := c.nodeManager.GetIdentity()
if identity == nil {
return ErrorIdentityNotInitialized
}
payload := StopMinerPayload{
MinerName: minerName,
}
requestMessage, err := NewMessage(MessageStopMiner, identity.ID, peerID, payload)
if err != nil {
return core.E("Controller.StopRemoteMiner", "failed to create message", err)
}
response, err := c.sendRequest(peerID, requestMessage, 30*time.Second)
if err != nil {
return err
}
var ack MinerAckPayload
if err := ParseResponse(response, MessageMinerAck, &ack); err != nil {
return err
}
if !ack.Success {
return core.E("Controller.StopRemoteMiner", "miner stop failed: "+ack.Error, nil)
}
return nil
}
// logs, err := controller.GetRemoteLogs("worker-1", "xmrig-0", 100)
func (c *Controller) GetRemoteLogs(peerID, minerName string, lines int) ([]string, error) {
identity := c.nodeManager.GetIdentity()
if identity == nil {
return nil, ErrorIdentityNotInitialized
}
payload := LogsRequestPayload{
MinerName: minerName,
Lines: lines,
}
requestMessage, err := NewMessage(MessageGetLogs, identity.ID, peerID, payload)
if err != nil {
return nil, core.E("Controller.GetRemoteLogs", "failed to create message", err)
}
response, err := c.sendRequest(peerID, requestMessage, 10*time.Second)
if err != nil {
return nil, err
}
var logs LogsPayload
if err := ParseResponse(response, MessageLogs, &logs); err != nil {
return nil, err
}
return logs.Lines, nil
}
// statsByPeerID := controller.GetAllStats()
func (c *Controller) GetAllStats() map[string]*StatsPayload {
results := make(map[string]*StatsPayload)
var mu sync.Mutex
var wg sync.WaitGroup
for peer := range c.peerRegistry.ConnectedPeers() {
wg.Add(1)
go func(p *Peer) {
defer wg.Done()
stats, err := c.GetRemoteStats(p.ID)
if err != nil {
logging.Debug("failed to get stats from peer", logging.Fields{
"peer_id": p.ID,
"peer": p.Name,
"error": err.Error(),
})
return // Skip failed peers
}
mu.Lock()
results[p.ID] = stats
mu.Unlock()
}(peer)
}
wg.Wait()
return results
}
// rttMilliseconds, err := controller.PingPeer("worker-1")
func (c *Controller) PingPeer(peerID string) (float64, error) {
identity := c.nodeManager.GetIdentity()
if identity == nil {
return 0, ErrorIdentityNotInitialized
}
sentAt := time.Now()
payload := PingPayload{
SentAt: sentAt.UnixMilli(),
}
requestMessage, err := NewMessage(MessagePing, identity.ID, peerID, payload)
if err != nil {
return 0, core.E("Controller.PingPeer", "failed to create message", err)
}
response, err := c.sendRequest(peerID, requestMessage, 5*time.Second)
if err != nil {
return 0, err
}
if err := ValidateResponse(response, MessagePong); err != nil {
return 0, err
}
// Calculate round-trip time in milliseconds.
rtt := time.Since(sentAt).Seconds() * 1000
// Update peer metrics
peer := c.peerRegistry.GetPeer(peerID)
if peer != nil {
c.peerRegistry.UpdateMetrics(peerID, rtt, peer.GeographicKilometres, peer.Hops)
}
return rtt, nil
}
// err := controller.ConnectToPeer("worker-1")
func (c *Controller) ConnectToPeer(peerID string) error {
peer := c.peerRegistry.GetPeer(peerID)
if peer == nil {
return core.E("Controller.ConnectToPeer", "peer not found: "+peerID, nil)
}
_, err := c.transport.Connect(peer)
return err
}
// err := controller.DisconnectFromPeer("worker-1")
func (c *Controller) DisconnectFromPeer(peerID string) error {
conn := c.transport.GetConnection(peerID)
if conn == nil {
return core.E("Controller.DisconnectFromPeer", "peer not connected: "+peerID, nil)
}
return conn.Close()
}