ax(node): expand ctrl receiver to controller in Controller methods
AX Principle 1 — predictable names over short names. `ctrl` is an abbreviation that requires a comment to explain; `controller` is self-documenting and consistent with the `manager` pattern used in NodeManager and PeerRegistry receivers. Co-Authored-By: Charon <charon@lethean.io>
This commit is contained in:
parent
f0d988b980
commit
cd8d17cc78
1 changed files with 57 additions and 57 deletions
|
|
@ -10,9 +10,9 @@ import (
|
|||
"forge.lthn.ai/Snider/Mining/pkg/logging"
|
||||
)
|
||||
|
||||
// ctrl := node.NewController(nodeManager, peerRegistry, transport)
|
||||
// rtt, _ := ctrl.PingPeer("abc123def456")
|
||||
// stats, _ := ctrl.GetRemoteStats("abc123def456")
|
||||
// controller := node.NewController(nodeManager, peerRegistry, transport)
|
||||
// rtt, _ := controller.PingPeer("abc123def456")
|
||||
// stats, _ := controller.GetRemoteStats("abc123def456")
|
||||
type Controller struct {
|
||||
node *NodeManager
|
||||
peers *PeerRegistry
|
||||
|
|
@ -22,8 +22,8 @@ type Controller struct {
|
|||
pendingRequests map[string]chan *Message
|
||||
}
|
||||
|
||||
// ctrl := node.NewController(nodeManager, peerRegistry, transport)
|
||||
// rtt, err := ctrl.PingPeer("abc123def456")
|
||||
// controller := node.NewController(nodeManager, peerRegistry, transport)
|
||||
// rtt, err := controller.PingPeer("abc123def456")
|
||||
func NewController(node *NodeManager, peers *PeerRegistry, transport *Transport) *Controller {
|
||||
controller := &Controller{
|
||||
node: node,
|
||||
|
|
@ -38,18 +38,18 @@ func NewController(node *NodeManager, peers *PeerRegistry, transport *Transport)
|
|||
return controller
|
||||
}
|
||||
|
||||
// transport.OnMessage(ctrl.handleResponse) // registered in NewController
|
||||
func (ctrl *Controller) handleResponse(conn *PeerConnection, msg *Message) {
|
||||
// transport.OnMessage(controller.handleResponse) // registered in NewController
|
||||
func (controller *Controller) handleResponse(conn *PeerConnection, msg *Message) {
|
||||
if msg.ReplyTo == "" {
|
||||
return // Not a response, let worker handle it
|
||||
}
|
||||
|
||||
ctrl.mutex.Lock()
|
||||
responseChannel, exists := ctrl.pendingRequests[msg.ReplyTo]
|
||||
controller.mutex.Lock()
|
||||
responseChannel, exists := controller.pendingRequests[msg.ReplyTo]
|
||||
if exists {
|
||||
delete(ctrl.pendingRequests, msg.ReplyTo)
|
||||
delete(controller.pendingRequests, msg.ReplyTo)
|
||||
}
|
||||
ctrl.mutex.Unlock()
|
||||
controller.mutex.Unlock()
|
||||
|
||||
if exists && responseChannel != nil {
|
||||
select {
|
||||
|
|
@ -60,18 +60,18 @@ func (ctrl *Controller) handleResponse(conn *PeerConnection, msg *Message) {
|
|||
}
|
||||
}
|
||||
|
||||
// resp, err := ctrl.sendRequest("abc123def456", msg, 10*time.Second)
|
||||
// resp, err := controller.sendRequest("abc123def456", msg, 10*time.Second)
|
||||
// if err != nil { return nil, err }
|
||||
func (ctrl *Controller) sendRequest(peerID string, msg *Message, timeout time.Duration) (*Message, error) {
|
||||
func (controller *Controller) sendRequest(peerID string, msg *Message, timeout time.Duration) (*Message, error) {
|
||||
actualPeerID := peerID
|
||||
|
||||
// Auto-connect if not already connected
|
||||
if ctrl.transport.GetConnection(peerID) == nil {
|
||||
peer := ctrl.peers.GetPeer(peerID)
|
||||
if controller.transport.GetConnection(peerID) == nil {
|
||||
peer := controller.peers.GetPeer(peerID)
|
||||
if peer == nil {
|
||||
return nil, fmt.Errorf("peer not found: %s", peerID)
|
||||
}
|
||||
conn, err := ctrl.transport.Connect(peer)
|
||||
conn, err := controller.transport.Connect(peer)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to connect to peer: %w", err)
|
||||
}
|
||||
|
|
@ -84,20 +84,20 @@ func (ctrl *Controller) sendRequest(peerID string, msg *Message, timeout time.Du
|
|||
// Create response channel
|
||||
responseChannel := make(chan *Message, 1)
|
||||
|
||||
ctrl.mutex.Lock()
|
||||
ctrl.pendingRequests[msg.ID] = responseChannel
|
||||
ctrl.mutex.Unlock()
|
||||
controller.mutex.Lock()
|
||||
controller.pendingRequests[msg.ID] = responseChannel
|
||||
controller.mutex.Unlock()
|
||||
|
||||
// Clean up on exit - ensure channel is closed and removed from map
|
||||
defer func() {
|
||||
ctrl.mutex.Lock()
|
||||
delete(ctrl.pendingRequests, msg.ID)
|
||||
ctrl.mutex.Unlock()
|
||||
controller.mutex.Lock()
|
||||
delete(controller.pendingRequests, msg.ID)
|
||||
controller.mutex.Unlock()
|
||||
close(responseChannel) // Close channel to allow garbage collection
|
||||
}()
|
||||
|
||||
// Send the message
|
||||
if err := ctrl.transport.Send(actualPeerID, msg); err != nil {
|
||||
if err := controller.transport.Send(actualPeerID, msg); err != nil {
|
||||
return nil, fmt.Errorf("failed to send message: %w", err)
|
||||
}
|
||||
|
||||
|
|
@ -113,10 +113,10 @@ func (ctrl *Controller) sendRequest(peerID string, msg *Message, timeout time.Du
|
|||
}
|
||||
}
|
||||
|
||||
// stats, err := ctrl.GetRemoteStats("abc123def456")
|
||||
// stats, err := controller.GetRemoteStats("abc123def456")
|
||||
// if err == nil { log("miners:", len(stats.Miners)) }
|
||||
func (ctrl *Controller) GetRemoteStats(peerID string) (*StatsPayload, error) {
|
||||
identity := ctrl.node.GetIdentity()
|
||||
func (controller *Controller) GetRemoteStats(peerID string) (*StatsPayload, error) {
|
||||
identity := controller.node.GetIdentity()
|
||||
if identity == nil {
|
||||
return nil, fmt.Errorf("node identity not initialized")
|
||||
}
|
||||
|
|
@ -126,7 +126,7 @@ func (ctrl *Controller) GetRemoteStats(peerID string) (*StatsPayload, error) {
|
|||
return nil, fmt.Errorf("failed to create message: %w", err)
|
||||
}
|
||||
|
||||
resp, err := ctrl.sendRequest(peerID, msg, 10*time.Second)
|
||||
resp, err := controller.sendRequest(peerID, msg, 10*time.Second)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -139,10 +139,10 @@ func (ctrl *Controller) GetRemoteStats(peerID string) (*StatsPayload, error) {
|
|||
return &stats, nil
|
||||
}
|
||||
|
||||
// err := ctrl.StartRemoteMiner("abc123def456", "xmrig", "profile-1", nil)
|
||||
// err := ctrl.StartRemoteMiner("abc123def456", "xmrig", "", json.RawMessage(`{"pool":"stratum+tcp://pool.lthn.io:3333"}`))
|
||||
func (ctrl *Controller) StartRemoteMiner(peerID, minerType, profileID string, configOverride json.RawMessage) error {
|
||||
identity := ctrl.node.GetIdentity()
|
||||
// err := controller.StartRemoteMiner("abc123def456", "xmrig", "profile-1", nil)
|
||||
// err := controller.StartRemoteMiner("abc123def456", "xmrig", "", json.RawMessage(`{"pool":"stratum+tcp://pool.lthn.io:3333"}`))
|
||||
func (controller *Controller) StartRemoteMiner(peerID, minerType, profileID string, configOverride json.RawMessage) error {
|
||||
identity := controller.node.GetIdentity()
|
||||
if identity == nil {
|
||||
return fmt.Errorf("node identity not initialized")
|
||||
}
|
||||
|
|
@ -162,7 +162,7 @@ func (ctrl *Controller) StartRemoteMiner(peerID, minerType, profileID string, co
|
|||
return fmt.Errorf("failed to create message: %w", err)
|
||||
}
|
||||
|
||||
resp, err := ctrl.sendRequest(peerID, msg, 30*time.Second)
|
||||
resp, err := controller.sendRequest(peerID, msg, 30*time.Second)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -179,10 +179,10 @@ func (ctrl *Controller) StartRemoteMiner(peerID, minerType, profileID string, co
|
|||
return nil
|
||||
}
|
||||
|
||||
// err := ctrl.StopRemoteMiner("abc123def456", "xmrig")
|
||||
// err := controller.StopRemoteMiner("abc123def456", "xmrig")
|
||||
// if err != nil { log("stop failed:", err) }
|
||||
func (ctrl *Controller) StopRemoteMiner(peerID, minerName string) error {
|
||||
identity := ctrl.node.GetIdentity()
|
||||
func (controller *Controller) StopRemoteMiner(peerID, minerName string) error {
|
||||
identity := controller.node.GetIdentity()
|
||||
if identity == nil {
|
||||
return fmt.Errorf("node identity not initialized")
|
||||
}
|
||||
|
|
@ -196,7 +196,7 @@ func (ctrl *Controller) StopRemoteMiner(peerID, minerName string) error {
|
|||
return fmt.Errorf("failed to create message: %w", err)
|
||||
}
|
||||
|
||||
resp, err := ctrl.sendRequest(peerID, msg, 30*time.Second)
|
||||
resp, err := controller.sendRequest(peerID, msg, 30*time.Second)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -213,10 +213,10 @@ func (ctrl *Controller) StopRemoteMiner(peerID, minerName string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// lines, err := ctrl.GetRemoteLogs("abc123def456", "xmrig", 100)
|
||||
// lines, err := controller.GetRemoteLogs("abc123def456", "xmrig", 100)
|
||||
// if err == nil { log("last log:", lines[len(lines)-1]) }
|
||||
func (ctrl *Controller) GetRemoteLogs(peerID, minerName string, lines int) ([]string, error) {
|
||||
identity := ctrl.node.GetIdentity()
|
||||
func (controller *Controller) GetRemoteLogs(peerID, minerName string, lines int) ([]string, error) {
|
||||
identity := controller.node.GetIdentity()
|
||||
if identity == nil {
|
||||
return nil, fmt.Errorf("node identity not initialized")
|
||||
}
|
||||
|
|
@ -231,7 +231,7 @@ func (ctrl *Controller) GetRemoteLogs(peerID, minerName string, lines int) ([]st
|
|||
return nil, fmt.Errorf("failed to create message: %w", err)
|
||||
}
|
||||
|
||||
resp, err := ctrl.sendRequest(peerID, msg, 10*time.Second)
|
||||
resp, err := controller.sendRequest(peerID, msg, 10*time.Second)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -244,10 +244,10 @@ func (ctrl *Controller) GetRemoteLogs(peerID, minerName string, lines int) ([]st
|
|||
return logs.Lines, nil
|
||||
}
|
||||
|
||||
// allStats := ctrl.GetAllStats()
|
||||
// allStats := controller.GetAllStats()
|
||||
// for peerID, stats := range allStats { log(peerID, stats.Miners) }
|
||||
func (ctrl *Controller) GetAllStats() map[string]*StatsPayload {
|
||||
peers := ctrl.peers.GetConnectedPeers()
|
||||
func (controller *Controller) GetAllStats() map[string]*StatsPayload {
|
||||
peers := controller.peers.GetConnectedPeers()
|
||||
results := make(map[string]*StatsPayload)
|
||||
var resultsMutex sync.Mutex
|
||||
var waitGroup sync.WaitGroup
|
||||
|
|
@ -256,7 +256,7 @@ func (ctrl *Controller) GetAllStats() map[string]*StatsPayload {
|
|||
waitGroup.Add(1)
|
||||
go func(connectedPeer *Peer) {
|
||||
defer waitGroup.Done()
|
||||
stats, err := ctrl.GetRemoteStats(connectedPeer.ID)
|
||||
stats, err := controller.GetRemoteStats(connectedPeer.ID)
|
||||
if err != nil {
|
||||
logging.Debug("failed to get stats from peer", logging.Fields{
|
||||
"peer_id": connectedPeer.ID,
|
||||
|
|
@ -275,10 +275,10 @@ func (ctrl *Controller) GetAllStats() map[string]*StatsPayload {
|
|||
return results
|
||||
}
|
||||
|
||||
// rtt, err := ctrl.PingPeer("abc123def456")
|
||||
// rtt, err := controller.PingPeer("abc123def456")
|
||||
// if err == nil { log("latency:", rtt, "ms") }
|
||||
func (ctrl *Controller) PingPeer(peerID string) (float64, error) {
|
||||
identity := ctrl.node.GetIdentity()
|
||||
func (controller *Controller) PingPeer(peerID string) (float64, error) {
|
||||
identity := controller.node.GetIdentity()
|
||||
if identity == nil {
|
||||
return 0, fmt.Errorf("node identity not initialized")
|
||||
}
|
||||
|
|
@ -293,7 +293,7 @@ func (ctrl *Controller) PingPeer(peerID string) (float64, error) {
|
|||
return 0, fmt.Errorf("failed to create message: %w", err)
|
||||
}
|
||||
|
||||
resp, err := ctrl.sendRequest(peerID, msg, 5*time.Second)
|
||||
resp, err := controller.sendRequest(peerID, msg, 5*time.Second)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
|
@ -305,28 +305,28 @@ func (ctrl *Controller) PingPeer(peerID string) (float64, error) {
|
|||
roundTripTime := time.Since(sentAt).Seconds() * 1000 // milliseconds
|
||||
|
||||
// Update peer metrics
|
||||
peer := ctrl.peers.GetPeer(peerID)
|
||||
peer := controller.peers.GetPeer(peerID)
|
||||
if peer != nil {
|
||||
ctrl.peers.UpdateMetrics(peerID, roundTripTime, peer.GeoKM, peer.Hops)
|
||||
controller.peers.UpdateMetrics(peerID, roundTripTime, peer.GeoKM, peer.Hops)
|
||||
}
|
||||
|
||||
return roundTripTime, nil
|
||||
}
|
||||
|
||||
// if err := ctrl.ConnectToPeer("abc123def456"); err != nil { log(err) }
|
||||
func (ctrl *Controller) ConnectToPeer(peerID string) error {
|
||||
peer := ctrl.peers.GetPeer(peerID)
|
||||
// if err := controller.ConnectToPeer("abc123def456"); err != nil { log(err) }
|
||||
func (controller *Controller) ConnectToPeer(peerID string) error {
|
||||
peer := controller.peers.GetPeer(peerID)
|
||||
if peer == nil {
|
||||
return fmt.Errorf("peer not found: %s", peerID)
|
||||
}
|
||||
|
||||
_, err := ctrl.transport.Connect(peer)
|
||||
_, err := controller.transport.Connect(peer)
|
||||
return err
|
||||
}
|
||||
|
||||
// if err := ctrl.DisconnectFromPeer("abc123def456"); err != nil { log(err) }
|
||||
func (ctrl *Controller) DisconnectFromPeer(peerID string) error {
|
||||
conn := ctrl.transport.GetConnection(peerID)
|
||||
// if err := controller.DisconnectFromPeer("abc123def456"); err != nil { log(err) }
|
||||
func (controller *Controller) DisconnectFromPeer(peerID string) error {
|
||||
conn := controller.transport.GetConnection(peerID)
|
||||
if conn == nil {
|
||||
return fmt.Errorf("peer not connected: %s", peerID)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue