diff --git a/pkg/node/controller.go b/pkg/node/controller.go index 6647063..7bae0e7 100644 --- a/pkg/node/controller.go +++ b/pkg/node/controller.go @@ -39,18 +39,18 @@ func NewController(node *NodeManager, peers *PeerRegistry, transport *Transport) return c } -// transport.OnMessage(c.handleResponse) // registered in NewController -func (c *Controller) handleResponse(conn *PeerConnection, msg *Message) { +// transport.OnMessage(ctrl.handleResponse) // registered in NewController +func (ctrl *Controller) handleResponse(conn *PeerConnection, msg *Message) { if msg.ReplyTo == "" { return // Not a response, let worker handle it } - c.mutex.Lock() - channel, exists := c.pending[msg.ReplyTo] + ctrl.mutex.Lock() + channel, exists := ctrl.pending[msg.ReplyTo] if exists { - delete(c.pending, msg.ReplyTo) + delete(ctrl.pending, msg.ReplyTo) } - c.mutex.Unlock() + ctrl.mutex.Unlock() if exists && channel != nil { select { @@ -61,18 +61,18 @@ func (c *Controller) handleResponse(conn *PeerConnection, msg *Message) { } } -// resp, err := c.sendRequest("abc123def456", msg, 10*time.Second) +// resp, err := ctrl.sendRequest("abc123def456", msg, 10*time.Second) // if err != nil { return nil, err } -func (c *Controller) sendRequest(peerID string, msg *Message, timeout time.Duration) (*Message, error) { +func (ctrl *Controller) sendRequest(peerID string, msg *Message, timeout time.Duration) (*Message, error) { actualPeerID := peerID // Auto-connect if not already connected - if c.transport.GetConnection(peerID) == nil { - peer := c.peers.GetPeer(peerID) + if ctrl.transport.GetConnection(peerID) == nil { + peer := ctrl.peers.GetPeer(peerID) if peer == nil { return nil, fmt.Errorf("peer not found: %s", peerID) } - conn, err := c.transport.Connect(peer) + conn, err := ctrl.transport.Connect(peer) if err != nil { return nil, fmt.Errorf("failed to connect to peer: %w", err) } @@ -85,20 +85,20 @@ func (c *Controller) sendRequest(peerID string, msg *Message, timeout time.Durat // Create response channel responseChannel := make(chan *Message, 1) - c.mutex.Lock() - c.pending[msg.ID] = responseChannel - c.mutex.Unlock() + ctrl.mutex.Lock() + ctrl.pending[msg.ID] = responseChannel + ctrl.mutex.Unlock() // Clean up on exit - ensure channel is closed and removed from map defer func() { - c.mutex.Lock() - delete(c.pending, msg.ID) - c.mutex.Unlock() + ctrl.mutex.Lock() + delete(ctrl.pending, msg.ID) + ctrl.mutex.Unlock() close(responseChannel) // Close channel to allow garbage collection }() // Send the message - if err := c.transport.Send(actualPeerID, msg); err != nil { + if err := ctrl.transport.Send(actualPeerID, msg); err != nil { return nil, fmt.Errorf("failed to send message: %w", err) } @@ -116,8 +116,8 @@ func (c *Controller) sendRequest(peerID string, msg *Message, timeout time.Durat // stats, err := ctrl.GetRemoteStats("abc123def456") // if err == nil { log("miners:", len(stats.Miners)) } -func (c *Controller) GetRemoteStats(peerID string) (*StatsPayload, error) { - identity := c.node.GetIdentity() +func (ctrl *Controller) GetRemoteStats(peerID string) (*StatsPayload, error) { + identity := ctrl.node.GetIdentity() if identity == nil { return nil, fmt.Errorf("node identity not initialized") } @@ -127,7 +127,7 @@ func (c *Controller) GetRemoteStats(peerID string) (*StatsPayload, error) { return nil, fmt.Errorf("failed to create message: %w", err) } - resp, err := c.sendRequest(peerID, msg, 10*time.Second) + resp, err := ctrl.sendRequest(peerID, msg, 10*time.Second) if err != nil { return nil, err } @@ -142,8 +142,8 @@ func (c *Controller) GetRemoteStats(peerID string) (*StatsPayload, error) { // err := ctrl.StartRemoteMiner("abc123def456", "xmrig", "profile-1", nil) // err := ctrl.StartRemoteMiner("abc123def456", "xmrig", "", json.RawMessage(`{"pool":"stratum+tcp://pool.lthn.io:3333"}`)) -func (c *Controller) StartRemoteMiner(peerID, minerType, profileID string, configOverride json.RawMessage) error { - identity := c.node.GetIdentity() +func (ctrl *Controller) StartRemoteMiner(peerID, minerType, profileID string, configOverride json.RawMessage) error { + identity := ctrl.node.GetIdentity() if identity == nil { return fmt.Errorf("node identity not initialized") } @@ -163,7 +163,7 @@ func (c *Controller) StartRemoteMiner(peerID, minerType, profileID string, confi return fmt.Errorf("failed to create message: %w", err) } - resp, err := c.sendRequest(peerID, msg, 30*time.Second) + resp, err := ctrl.sendRequest(peerID, msg, 30*time.Second) if err != nil { return err } @@ -182,8 +182,8 @@ func (c *Controller) StartRemoteMiner(peerID, minerType, profileID string, confi // err := ctrl.StopRemoteMiner("abc123def456", "xmrig") // if err != nil { log("stop failed:", err) } -func (c *Controller) StopRemoteMiner(peerID, minerName string) error { - identity := c.node.GetIdentity() +func (ctrl *Controller) StopRemoteMiner(peerID, minerName string) error { + identity := ctrl.node.GetIdentity() if identity == nil { return fmt.Errorf("node identity not initialized") } @@ -197,7 +197,7 @@ func (c *Controller) StopRemoteMiner(peerID, minerName string) error { return fmt.Errorf("failed to create message: %w", err) } - resp, err := c.sendRequest(peerID, msg, 30*time.Second) + resp, err := ctrl.sendRequest(peerID, msg, 30*time.Second) if err != nil { return err } @@ -216,8 +216,8 @@ func (c *Controller) StopRemoteMiner(peerID, minerName string) error { // lines, err := ctrl.GetRemoteLogs("abc123def456", "xmrig", 100) // if err == nil { log("last log:", lines[len(lines)-1]) } -func (c *Controller) GetRemoteLogs(peerID, minerName string, lines int) ([]string, error) { - identity := c.node.GetIdentity() +func (ctrl *Controller) GetRemoteLogs(peerID, minerName string, lines int) ([]string, error) { + identity := ctrl.node.GetIdentity() if identity == nil { return nil, fmt.Errorf("node identity not initialized") } @@ -232,7 +232,7 @@ func (c *Controller) GetRemoteLogs(peerID, minerName string, lines int) ([]strin return nil, fmt.Errorf("failed to create message: %w", err) } - resp, err := c.sendRequest(peerID, msg, 10*time.Second) + resp, err := ctrl.sendRequest(peerID, msg, 10*time.Second) if err != nil { return nil, err } @@ -247,8 +247,8 @@ func (c *Controller) GetRemoteLogs(peerID, minerName string, lines int) ([]strin // allStats := ctrl.GetAllStats() // for peerID, stats := range allStats { log(peerID, stats.Miners) } -func (c *Controller) GetAllStats() map[string]*StatsPayload { - peers := c.peers.GetConnectedPeers() +func (ctrl *Controller) GetAllStats() map[string]*StatsPayload { + peers := ctrl.peers.GetConnectedPeers() results := make(map[string]*StatsPayload) var resultsMutex sync.Mutex var waitGroup sync.WaitGroup @@ -257,7 +257,7 @@ func (c *Controller) GetAllStats() map[string]*StatsPayload { waitGroup.Add(1) go func(connectedPeer *Peer) { defer waitGroup.Done() - stats, err := c.GetRemoteStats(connectedPeer.ID) + stats, err := ctrl.GetRemoteStats(connectedPeer.ID) if err != nil { logging.Debug("failed to get stats from peer", logging.Fields{ "peer_id": connectedPeer.ID, @@ -278,8 +278,8 @@ func (c *Controller) GetAllStats() map[string]*StatsPayload { // rtt, err := ctrl.PingPeer("abc123def456") // if err == nil { log("latency:", rtt, "ms") } -func (c *Controller) PingPeer(peerID string) (float64, error) { - identity := c.node.GetIdentity() +func (ctrl *Controller) PingPeer(peerID string) (float64, error) { + identity := ctrl.node.GetIdentity() if identity == nil { return 0, fmt.Errorf("node identity not initialized") } @@ -294,7 +294,7 @@ func (c *Controller) PingPeer(peerID string) (float64, error) { return 0, fmt.Errorf("failed to create message: %w", err) } - resp, err := c.sendRequest(peerID, msg, 5*time.Second) + resp, err := ctrl.sendRequest(peerID, msg, 5*time.Second) if err != nil { return 0, err } @@ -306,28 +306,28 @@ func (c *Controller) PingPeer(peerID string) (float64, error) { roundTripTime := time.Since(sentAt).Seconds() * 1000 // milliseconds // Update peer metrics - peer := c.peers.GetPeer(peerID) + peer := ctrl.peers.GetPeer(peerID) if peer != nil { - c.peers.UpdateMetrics(peerID, roundTripTime, peer.GeoKM, peer.Hops) + ctrl.peers.UpdateMetrics(peerID, roundTripTime, peer.GeoKM, peer.Hops) } return roundTripTime, nil } // if err := ctrl.ConnectToPeer("abc123def456"); err != nil { log(err) } -func (c *Controller) ConnectToPeer(peerID string) error { - peer := c.peers.GetPeer(peerID) +func (ctrl *Controller) ConnectToPeer(peerID string) error { + peer := ctrl.peers.GetPeer(peerID) if peer == nil { return fmt.Errorf("peer not found: %s", peerID) } - _, err := c.transport.Connect(peer) + _, err := ctrl.transport.Connect(peer) return err } // if err := ctrl.DisconnectFromPeer("abc123def456"); err != nil { log(err) } -func (c *Controller) DisconnectFromPeer(peerID string) error { - conn := c.transport.GetConnection(peerID) +func (ctrl *Controller) DisconnectFromPeer(peerID string) error { + conn := ctrl.transport.GetConnection(peerID) if conn == nil { return fmt.Errorf("peer not connected: %s", peerID) }