`node` is ambiguous in the `node` package — callers cannot distinguish the field from the package name without tracing. `nodeManager` names the type it holds and removes all mapping overhead. Co-Authored-By: Charon <charon@lethean.io>
334 lines
9.5 KiB
Go
334 lines
9.5 KiB
Go
package node
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"sync"
|
|
"time"
|
|
|
|
"forge.lthn.ai/Snider/Mining/pkg/logging"
|
|
)
|
|
|
|
// controller := node.NewController(nodeManager, peerRegistry, transport)
|
|
// rtt, _ := controller.PingPeer("abc123def456")
|
|
// stats, _ := controller.GetRemoteStats("abc123def456")
|
|
type Controller struct {
|
|
nodeManager *NodeManager
|
|
peers *PeerRegistry
|
|
transport *Transport
|
|
mutex sync.RWMutex
|
|
|
|
pendingRequests map[string]chan *Message
|
|
}
|
|
|
|
// controller := node.NewController(nodeManager, peerRegistry, transport)
|
|
// rtt, err := controller.PingPeer("abc123def456")
|
|
func NewController(nodeManager *NodeManager, peers *PeerRegistry, transport *Transport) *Controller {
|
|
controller := &Controller{
|
|
nodeManager: nodeManager,
|
|
peers: peers,
|
|
transport: transport,
|
|
pendingRequests: make(map[string]chan *Message),
|
|
}
|
|
|
|
// transport.OnMessage(controller.handleResponse) // registered in NewController; routes replies to pending request channels
|
|
transport.OnMessage(controller.handleResponse)
|
|
|
|
return controller
|
|
}
|
|
|
|
// transport.OnMessage(controller.handleResponse) // registered in NewController
|
|
func (controller *Controller) handleResponse(conn *PeerConnection, msg *Message) {
|
|
if msg.ReplyTo == "" {
|
|
return // Not a response, let worker handle it
|
|
}
|
|
|
|
controller.mutex.Lock()
|
|
responseChannel, exists := controller.pendingRequests[msg.ReplyTo]
|
|
if exists {
|
|
delete(controller.pendingRequests, msg.ReplyTo)
|
|
}
|
|
controller.mutex.Unlock()
|
|
|
|
if exists && responseChannel != nil {
|
|
select {
|
|
case responseChannel <- msg:
|
|
default:
|
|
// Channel full or closed
|
|
}
|
|
}
|
|
}
|
|
|
|
// response, err := controller.sendRequest("abc123def456", msg, 10*time.Second)
|
|
// if err != nil { return nil, err }
|
|
func (controller *Controller) sendRequest(peerID string, msg *Message, timeout time.Duration) (*Message, error) {
|
|
actualPeerID := peerID
|
|
|
|
// Auto-connect if not already connected
|
|
if controller.transport.GetConnection(peerID) == nil {
|
|
peer := controller.peers.GetPeer(peerID)
|
|
if peer == nil {
|
|
return nil, &ProtocolError{Code: ErrCodeNotFound, Message: "peer not found: " + peerID}
|
|
}
|
|
conn, err := controller.transport.Connect(peer)
|
|
if err != nil {
|
|
return nil, &ProtocolError{Code: ErrCodeOperationFailed, Message: "failed to connect to peer: " + err.Error()}
|
|
}
|
|
// Use the real peer ID after handshake (it may have changed)
|
|
actualPeerID = conn.Peer.ID
|
|
// Update the message destination
|
|
msg.To = actualPeerID
|
|
}
|
|
|
|
// Create response channel
|
|
responseChannel := make(chan *Message, 1)
|
|
|
|
controller.mutex.Lock()
|
|
controller.pendingRequests[msg.ID] = responseChannel
|
|
controller.mutex.Unlock()
|
|
|
|
// Clean up on exit - ensure channel is closed and removed from map
|
|
defer func() {
|
|
controller.mutex.Lock()
|
|
delete(controller.pendingRequests, msg.ID)
|
|
controller.mutex.Unlock()
|
|
close(responseChannel) // Close channel to allow garbage collection
|
|
}()
|
|
|
|
// Send the message
|
|
if err := controller.transport.Send(actualPeerID, msg); err != nil {
|
|
return nil, &ProtocolError{Code: ErrCodeOperationFailed, Message: "failed to send message: " + err.Error()}
|
|
}
|
|
|
|
// Wait for response
|
|
requestContext, cancel := context.WithTimeout(context.Background(), timeout)
|
|
defer cancel()
|
|
|
|
select {
|
|
case response := <-responseChannel:
|
|
return response, nil
|
|
case <-requestContext.Done():
|
|
return nil, &ProtocolError{Code: ErrCodeTimeout, Message: "request timeout"}
|
|
}
|
|
}
|
|
|
|
// stats, err := controller.GetRemoteStats("abc123def456")
|
|
// if err == nil { log("miners:", len(stats.Miners)) }
|
|
func (controller *Controller) GetRemoteStats(peerID string) (*StatsPayload, error) {
|
|
identity := controller.nodeManager.GetIdentity()
|
|
if identity == nil {
|
|
return nil, &ProtocolError{Code: ErrCodeUnauthorized, Message: "node identity not initialized"}
|
|
}
|
|
|
|
message, err := NewMessage(MsgGetStats, identity.ID, peerID, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
response, err := controller.sendRequest(peerID, message, 10*time.Second)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var stats StatsPayload
|
|
if err := ParseResponse(response, MsgStats, &stats); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &stats, nil
|
|
}
|
|
|
|
// err := controller.StartRemoteMiner("abc123def456", "xmrig", "profile-1", nil)
|
|
// err := controller.StartRemoteMiner("abc123def456", "xmrig", "", json.RawMessage(`{"pool":"stratum+tcp://pool.lthn.io:3333"}`))
|
|
func (controller *Controller) StartRemoteMiner(peerID, minerType, profileID string, configOverride json.RawMessage) error {
|
|
identity := controller.nodeManager.GetIdentity()
|
|
if identity == nil {
|
|
return &ProtocolError{Code: ErrCodeUnauthorized, Message: "node identity not initialized"}
|
|
}
|
|
|
|
if minerType == "" {
|
|
return &ProtocolError{Code: ErrCodeInvalidMessage, Message: "miner type is required"}
|
|
}
|
|
|
|
payload := StartMinerPayload{
|
|
MinerType: minerType,
|
|
ProfileID: profileID,
|
|
Config: configOverride,
|
|
}
|
|
|
|
message, err := NewMessage(MsgStartMiner, identity.ID, peerID, payload)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
response, err := controller.sendRequest(peerID, message, 30*time.Second)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var acknowledgement MinerAckPayload
|
|
if err := ParseResponse(response, MsgMinerAck, &acknowledgement); err != nil {
|
|
return err
|
|
}
|
|
|
|
if !acknowledgement.Success {
|
|
return &ProtocolError{Code: ErrCodeOperationFailed, Message: "miner start failed: " + acknowledgement.Error}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// err := controller.StopRemoteMiner("abc123def456", "xmrig")
|
|
// if err != nil { log("stop failed:", err) }
|
|
func (controller *Controller) StopRemoteMiner(peerID, minerName string) error {
|
|
identity := controller.nodeManager.GetIdentity()
|
|
if identity == nil {
|
|
return &ProtocolError{Code: ErrCodeUnauthorized, Message: "node identity not initialized"}
|
|
}
|
|
|
|
payload := StopMinerPayload{
|
|
MinerName: minerName,
|
|
}
|
|
|
|
message, err := NewMessage(MsgStopMiner, identity.ID, peerID, payload)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
response, err := controller.sendRequest(peerID, message, 30*time.Second)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var acknowledgement MinerAckPayload
|
|
if err := ParseResponse(response, MsgMinerAck, &acknowledgement); err != nil {
|
|
return err
|
|
}
|
|
|
|
if !acknowledgement.Success {
|
|
return &ProtocolError{Code: ErrCodeOperationFailed, Message: "miner stop failed: " + acknowledgement.Error}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// lines, err := controller.GetRemoteLogs("abc123def456", "xmrig", 100)
|
|
// if err == nil { log("last log:", lines[len(lines)-1]) }
|
|
func (controller *Controller) GetRemoteLogs(peerID, minerName string, lines int) ([]string, error) {
|
|
identity := controller.nodeManager.GetIdentity()
|
|
if identity == nil {
|
|
return nil, &ProtocolError{Code: ErrCodeUnauthorized, Message: "node identity not initialized"}
|
|
}
|
|
|
|
payload := GetLogsPayload{
|
|
MinerName: minerName,
|
|
Lines: lines,
|
|
}
|
|
|
|
message, err := NewMessage(MsgGetLogs, identity.ID, peerID, payload)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
response, err := controller.sendRequest(peerID, message, 10*time.Second)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var logs LogsPayload
|
|
if err := ParseResponse(response, MsgLogs, &logs); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return logs.Lines, nil
|
|
}
|
|
|
|
// allStats := controller.GetAllStats()
|
|
// for peerID, stats := range allStats { log(peerID, stats.Miners) }
|
|
func (controller *Controller) GetAllStats() map[string]*StatsPayload {
|
|
peers := controller.peers.GetConnectedPeers()
|
|
results := make(map[string]*StatsPayload)
|
|
var resultsMutex sync.Mutex
|
|
var waitGroup sync.WaitGroup
|
|
|
|
for _, peer := range peers {
|
|
waitGroup.Add(1)
|
|
go func(connectedPeer *Peer) {
|
|
defer waitGroup.Done()
|
|
stats, err := controller.GetRemoteStats(connectedPeer.ID)
|
|
if err != nil {
|
|
logging.Debug("failed to get stats from peer", logging.Fields{
|
|
"peer_id": connectedPeer.ID,
|
|
"peer": connectedPeer.Name,
|
|
"error": err.Error(),
|
|
})
|
|
return // Skip failed peers
|
|
}
|
|
resultsMutex.Lock()
|
|
results[connectedPeer.ID] = stats
|
|
resultsMutex.Unlock()
|
|
}(peer)
|
|
}
|
|
|
|
waitGroup.Wait()
|
|
return results
|
|
}
|
|
|
|
// rtt, err := controller.PingPeer("abc123def456")
|
|
// if err == nil { log("latency:", rtt, "ms") }
|
|
func (controller *Controller) PingPeer(peerID string) (float64, error) {
|
|
identity := controller.nodeManager.GetIdentity()
|
|
if identity == nil {
|
|
return 0, &ProtocolError{Code: ErrCodeUnauthorized, Message: "node identity not initialized"}
|
|
}
|
|
sentAt := time.Now()
|
|
|
|
payload := PingPayload{
|
|
SentAt: sentAt.UnixMilli(),
|
|
}
|
|
|
|
message, err := NewMessage(MsgPing, identity.ID, peerID, payload)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
response, err := controller.sendRequest(peerID, message, 5*time.Second)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
if err := ValidateResponse(response, MsgPong); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
roundTripTime := time.Since(sentAt).Seconds() * 1000 // milliseconds
|
|
|
|
// Update peer metrics
|
|
peer := controller.peers.GetPeer(peerID)
|
|
if peer != nil {
|
|
controller.peers.UpdateMetrics(peerID, roundTripTime, peer.GeoKM, peer.Hops)
|
|
}
|
|
|
|
return roundTripTime, nil
|
|
}
|
|
|
|
// if err := controller.ConnectToPeer("abc123def456"); err != nil { log(err) }
|
|
func (controller *Controller) ConnectToPeer(peerID string) error {
|
|
peer := controller.peers.GetPeer(peerID)
|
|
if peer == nil {
|
|
return &ProtocolError{Code: ErrCodeNotFound, Message: "peer not found: " + peerID}
|
|
}
|
|
|
|
_, err := controller.transport.Connect(peer)
|
|
return err
|
|
}
|
|
|
|
// if err := controller.DisconnectFromPeer("abc123def456"); err != nil { log(err) }
|
|
func (controller *Controller) DisconnectFromPeer(peerID string) error {
|
|
conn := controller.transport.GetConnection(peerID)
|
|
if conn == nil {
|
|
return &ProtocolError{Code: ErrCodeNotFound, Message: "peer not connected: " + peerID}
|
|
}
|
|
|
|
return conn.Close()
|
|
}
|