diff --git a/node/bench_test.go b/node/bench_test.go index b6872b8..f96cefb 100644 --- a/node/bench_test.go +++ b/node/bench_test.go @@ -206,13 +206,13 @@ func BenchmarkPeerScoring(b *testing.B) { // Add 50 peers with varied metrics for i := range 50 { peer := &Peer{ - ID: testJoinPath("peer", string(rune('A'+i%26)), string(rune('0'+i/26))), - Name: "peer", - PingMS: float64(i*10 + 5), - Hops: i%5 + 1, - GeoKM: float64(i * 100), - Score: float64(50 + i%50), - AddedAt: time.Now(), + ID: testJoinPath("peer", string(rune('A'+i%26)), string(rune('0'+i/26))), + Name: "peer", + PingMilliseconds: float64(i*10 + 5), + Hops: i%5 + 1, + GeographicKilometres: float64(i * 100), + Score: float64(50 + i%50), + AddedAt: time.Now(), } // Bypass AddPeer's duplicate check by adding directly reg.mu.Lock() diff --git a/node/controller.go b/node/controller.go index 2bec4d5..a683738 100644 --- a/node/controller.go +++ b/node/controller.go @@ -41,34 +41,34 @@ func NewController(nodeManager *NodeManager, peerRegistry *PeerRegistry, transpo } // handleResponse processes incoming replies and routes them to the waiting request. -func (c *Controller) handleResponse(_ *PeerConnection, msg *Message) { - if msg.ReplyTo == "" { +func (c *Controller) handleResponse(_ *PeerConnection, message *Message) { + if message.ReplyTo == "" { return // Not a response, let worker handle it } c.mutex.Lock() - responseChannel, hasPendingRequest := c.pendingRequests[msg.ReplyTo] + responseChannel, hasPendingRequest := c.pendingRequests[message.ReplyTo] if hasPendingRequest { - delete(c.pendingRequests, msg.ReplyTo) + delete(c.pendingRequests, message.ReplyTo) } c.mutex.Unlock() if hasPendingRequest && responseChannel != nil { select { - case responseChannel <- msg: + case responseChannel <- message: default: // Late duplicate response; drop it. } } } -// sendRequest registers a temporary response channel, sends msg, and waits +// sendRequest registers a temporary response channel, sends message, and waits // for the matching reply or timeout. // // The response channel is intentionally never closed. Removing it from the // pending map is enough to stop future routing, and it avoids a late-response // close/send race after the caller has already timed out. -func (c *Controller) sendRequest(peerID string, msg *Message, timeout time.Duration) (*Message, error) { +func (c *Controller) sendRequest(peerID string, message *Message, timeout time.Duration) (*Message, error) { resolvedPeerID := peerID // Auto-connect if not already connected @@ -84,26 +84,26 @@ func (c *Controller) sendRequest(peerID string, msg *Message, timeout time.Durat // Use the real peer ID after handshake (it may have changed) resolvedPeerID = conn.Peer.ID // Update the message destination - msg.To = resolvedPeerID + message.To = resolvedPeerID } // Create response channel responseChannel := make(chan *Message, 1) c.mutex.Lock() - c.pendingRequests[msg.ID] = responseChannel + c.pendingRequests[message.ID] = responseChannel c.mutex.Unlock() // Clean up on exit. Deleting the pending entry is enough because // handleResponse only routes through the map. defer func() { c.mutex.Lock() - delete(c.pendingRequests, msg.ID) + delete(c.pendingRequests, message.ID) c.mutex.Unlock() }() // Send the message - if err := c.transport.Send(resolvedPeerID, msg); err != nil { + if err := c.transport.Send(resolvedPeerID, message); err != nil { return nil, core.E("Controller.sendRequest", "failed to send message", err) } @@ -128,12 +128,12 @@ func (c *Controller) GetRemoteStats(peerID string) (*StatsPayload, error) { return nil, ErrorIdentityNotInitialized } - msg, err := NewMessage(MessageGetStats, identity.ID, peerID, nil) + requestMessage, err := NewMessage(MessageGetStats, identity.ID, peerID, nil) if err != nil { return nil, core.E("Controller.GetRemoteStats", "failed to create message", err) } - response, err := c.sendRequest(peerID, msg, 10*time.Second) + response, err := c.sendRequest(peerID, requestMessage, 10*time.Second) if err != nil { return nil, err } @@ -165,12 +165,12 @@ func (c *Controller) StartRemoteMiner(peerID, minerType, profileID string, confi Config: configOverride, } - msg, err := NewMessage(MessageStartMiner, identity.ID, peerID, payload) + requestMessage, err := NewMessage(MessageStartMiner, identity.ID, peerID, payload) if err != nil { return core.E("Controller.StartRemoteMiner", "failed to create message", err) } - response, err := c.sendRequest(peerID, msg, 30*time.Second) + response, err := c.sendRequest(peerID, requestMessage, 30*time.Second) if err != nil { return err } @@ -200,12 +200,12 @@ func (c *Controller) StopRemoteMiner(peerID, minerName string) error { MinerName: minerName, } - msg, err := NewMessage(MessageStopMiner, identity.ID, peerID, payload) + requestMessage, err := NewMessage(MessageStopMiner, identity.ID, peerID, payload) if err != nil { return core.E("Controller.StopRemoteMiner", "failed to create message", err) } - response, err := c.sendRequest(peerID, msg, 30*time.Second) + response, err := c.sendRequest(peerID, requestMessage, 30*time.Second) if err != nil { return err } @@ -236,12 +236,12 @@ func (c *Controller) GetRemoteLogs(peerID, minerName string, lines int) ([]strin Lines: lines, } - msg, err := NewMessage(MessageGetLogs, identity.ID, peerID, payload) + requestMessage, err := NewMessage(MessageGetLogs, identity.ID, peerID, payload) if err != nil { return nil, core.E("Controller.GetRemoteLogs", "failed to create message", err) } - response, err := c.sendRequest(peerID, msg, 10*time.Second) + response, err := c.sendRequest(peerID, requestMessage, 10*time.Second) if err != nil { return nil, err } @@ -287,7 +287,7 @@ func (c *Controller) GetAllStats() map[string]*StatsPayload { // PingPeer sends a ping to a peer and refreshes that peer's metrics. // -// rttMS, err := controller.PingPeer("worker-1") +// rttMilliseconds, err := controller.PingPeer("worker-1") func (c *Controller) PingPeer(peerID string) (float64, error) { identity := c.nodeManager.GetIdentity() if identity == nil { @@ -299,12 +299,12 @@ func (c *Controller) PingPeer(peerID string) (float64, error) { SentAt: sentAt.UnixMilli(), } - msg, err := NewMessage(MessagePing, identity.ID, peerID, payload) + requestMessage, err := NewMessage(MessagePing, identity.ID, peerID, payload) if err != nil { return 0, core.E("Controller.PingPeer", "failed to create message", err) } - response, err := c.sendRequest(peerID, msg, 5*time.Second) + response, err := c.sendRequest(peerID, requestMessage, 5*time.Second) if err != nil { return 0, err } @@ -313,13 +313,13 @@ func (c *Controller) PingPeer(peerID string) (float64, error) { return 0, err } - // Calculate round-trip time - rtt := time.Since(sentAt).Seconds() * 1000 // Convert to ms + // Calculate round-trip time in milliseconds. + rtt := time.Since(sentAt).Seconds() * 1000 // Update peer metrics peer := c.peerRegistry.GetPeer(peerID) if peer != nil { - c.peerRegistry.UpdateMetrics(peerID, rtt, peer.GeoKM, peer.Hops) + c.peerRegistry.UpdateMetrics(peerID, rtt, peer.GeographicKilometres, peer.Hops) } return rtt, nil diff --git a/node/controller_test.go b/node/controller_test.go index c0881ef..dd05231 100644 --- a/node/controller_test.go +++ b/node/controller_test.go @@ -199,7 +199,7 @@ func TestController_PingPeerRTT_Good(t *testing.T) { // Record initial peer metrics. peerBefore := tp.ClientReg.GetPeer(serverID) require.NotNil(t, peerBefore, "server peer should exist in the client registry") - initialPingMS := peerBefore.PingMS + initialPingMilliseconds := peerBefore.PingMilliseconds // Send a ping. rtt, err := controller.PingPeer(serverID) @@ -210,9 +210,9 @@ func TestController_PingPeerRTT_Good(t *testing.T) { // Verify the peer registry was updated with the measured latency. peerAfter := tp.ClientReg.GetPeer(serverID) require.NotNil(t, peerAfter, "server peer should still exist after ping") - assert.NotEqual(t, initialPingMS, peerAfter.PingMS, - "PingMS should be updated after a successful ping") - assert.Greater(t, peerAfter.PingMS, 0.0, "PingMS should be positive") + assert.NotEqual(t, initialPingMilliseconds, peerAfter.PingMilliseconds, + "PingMilliseconds should be updated after a successful ping") + assert.Greater(t, peerAfter.PingMilliseconds, 0.0, "PingMilliseconds should be positive") } func TestController_ConcurrentRequests_Ugly(t *testing.T) { diff --git a/node/dispatcher.go b/node/dispatcher.go index 85d1110..ac78c49 100644 --- a/node/dispatcher.go +++ b/node/dispatcher.go @@ -29,8 +29,8 @@ const ( // IntentHandler processes a UEPS packet that has been routed by intent. // Implementations receive the fully parsed and HMAC-verified packet. // -// var handler IntentHandler = func(pkt *ueps.ParsedPacket) error { return nil } -type IntentHandler func(pkt *ueps.ParsedPacket) error +// var handler IntentHandler = func(packet *ueps.ParsedPacket) error { return nil } +type IntentHandler func(packet *ueps.ParsedPacket) error // Dispatcher routes verified UEPS packets to registered intent handlers. // It enforces a threat circuit breaker before routing: any packet whose @@ -107,36 +107,36 @@ func (d *Dispatcher) Handlers() iter.Seq2[byte, IntentHandler] { // - Returns nil on successful delivery to a handler, or any error the // handler itself returns. // - A nil packet returns ErrorNilPacket immediately. -func (d *Dispatcher) Dispatch(pkt *ueps.ParsedPacket) error { - if pkt == nil { +func (d *Dispatcher) Dispatch(packet *ueps.ParsedPacket) error { + if packet == nil { return ErrorNilPacket } // 1. Threat circuit breaker (L5 guard) - if pkt.Header.ThreatScore > ThreatScoreThreshold { + if packet.Header.ThreatScore > ThreatScoreThreshold { d.log.Warn("packet dropped: threat score exceeds safety threshold", logging.Fields{ - "threat_score": pkt.Header.ThreatScore, + "threat_score": packet.Header.ThreatScore, "threshold": ThreatScoreThreshold, - "intent_id": core.Sprintf("0x%02X", pkt.Header.IntentID), - "version": pkt.Header.Version, + "intent_id": core.Sprintf("0x%02X", packet.Header.IntentID), + "version": packet.Header.Version, }) return ErrorThreatScoreExceeded } // 2. Intent routing (L9 semantic) d.mu.RLock() - handler, exists := d.handlers[pkt.Header.IntentID] + handler, exists := d.handlers[packet.Header.IntentID] d.mu.RUnlock() if !exists { d.log.Warn("packet dropped: unknown intent", logging.Fields{ - "intent_id": core.Sprintf("0x%02X", pkt.Header.IntentID), - "version": pkt.Header.Version, + "intent_id": core.Sprintf("0x%02X", packet.Header.IntentID), + "version": packet.Header.Version, }) return ErrorUnknownIntent } - return handler(pkt) + return handler(packet) } // Sentinel errors returned by Dispatch. diff --git a/node/integration_test.go b/node/integration_test.go index 4fd8f2e..bd45880 100644 --- a/node/integration_test.go +++ b/node/integration_test.go @@ -138,7 +138,7 @@ func TestIntegration_FullNodeLifecycle_Good(t *testing.T) { // Verify registry metrics were updated. peerAfterPing := controllerReg.GetPeer(serverPeerID) require.NotNil(t, peerAfterPing) - assert.Greater(t, peerAfterPing.PingMS, 0.0, "PingMS should be updated") + assert.Greater(t, peerAfterPing.PingMilliseconds, 0.0, "PingMilliseconds should be updated") // ---------------------------------------------------------------- // Step 5: Encrypted message exchange — RemoteStats diff --git a/node/message.go b/node/message.go index 8b7f37c..4931d8f 100644 --- a/node/message.go +++ b/node/message.go @@ -297,15 +297,15 @@ const ( // NewErrorMessage builds an error response message for an existing request. // -// msg, err := NewErrorMessage("worker-1", "controller-1", ErrorCodeOperationFailed, "miner start failed", "req-1") +// errorMessage, err := NewErrorMessage("worker-1", "controller-1", ErrorCodeOperationFailed, "miner start failed", "req-1") func NewErrorMessage(from, to string, code int, message string, replyTo string) (*Message, error) { - msg, err := NewMessage(MessageError, from, to, ErrorPayload{ + errorMessage, err := NewMessage(MessageError, from, to, ErrorPayload{ Code: code, Message: message, }) if err != nil { return nil, err } - msg.ReplyTo = replyTo - return msg, nil + errorMessage.ReplyTo = replyTo + return errorMessage, nil } diff --git a/node/peer.go b/node/peer.go index 8d7458a..7776f7d 100644 --- a/node/peer.go +++ b/node/peer.go @@ -17,7 +17,14 @@ import ( // Peer represents a known remote node. // -// peer := &Peer{ID: "worker-1", Address: "127.0.0.1:9101"} +// peer := &Peer{ +// ID: "worker-1", +// Name: "Worker 1", +// Address: "127.0.0.1:9101", +// PingMilliseconds: 42.5, +// GeographicKilometres: 100, +// Score: 80, +// } type Peer struct { ID string `json:"id"` Name string `json:"name"` @@ -28,10 +35,10 @@ type Peer struct { LastSeen time.Time `json:"lastSeen"` // Poindexter metrics (updated dynamically) - PingMS float64 `json:"pingMs"` // Latency in milliseconds - Hops int `json:"hops"` // Network hop count - GeoKM float64 `json:"geoKm"` // Geographic distance in kilometers - Score float64 `json:"score"` // Reliability score 0-100 + PingMilliseconds float64 `json:"pingMs"` // Latency in milliseconds + Hops int `json:"hops"` // Network hop count + GeographicKilometres float64 `json:"geoKm"` // Geographic distance in kilometres + Score float64 `json:"score"` // Reliability score 0-100 // Connection state (not persisted) Connected bool `json:"-"` @@ -111,13 +118,13 @@ type PeerRegistry struct { saveMutex sync.Mutex // Protects pending save state } -// Dimension weights for peer selection -// Lower ping, hops, geo are better; higher score is better +// Dimension weights for peer selection. +// Lower ping, hops, and geographic distance are better; higher score is better. var ( - pingWeight = 1.0 - hopsWeight = 0.7 - geoWeight = 0.2 - scoreWeight = 1.2 + pingWeight = 1.0 + hopsWeight = 0.7 + geographicWeight = 0.2 + scoreWeight = 1.2 ) // NewPeerRegistry loads the default peer registry. @@ -362,8 +369,11 @@ func (r *PeerRegistry) Peers() iter.Seq[*Peer] { } // UpdateMetrics updates a peer's performance metrics. +// +// registry.UpdateMetrics("worker-1", 42.5, 100, 3) +// // Note: Persistence is debounced. Call Close() to flush before shutdown. -func (r *PeerRegistry) UpdateMetrics(id string, pingMilliseconds, geoKilometres float64, hopCount int) error { +func (r *PeerRegistry) UpdateMetrics(id string, pingMilliseconds, geographicKilometres float64, hopCount int) error { r.mu.Lock() peer, exists := r.peers[id] @@ -372,8 +382,8 @@ func (r *PeerRegistry) UpdateMetrics(id string, pingMilliseconds, geoKilometres return core.E("PeerRegistry.UpdateMetrics", "peer "+id+" not found", nil) } - peer.PingMS = pingMilliseconds - peer.GeoKM = geoKilometres + peer.PingMilliseconds = pingMilliseconds + peer.GeographicKilometres = geographicKilometres peer.Hops = hopCount peer.LastSeen = time.Now() @@ -533,7 +543,7 @@ func (r *PeerRegistry) SelectOptimalPeer() *Peer { return nil } - // Target: ideal peer (0 ping, 0 hops, 0 geo, 100 score) + // Target: ideal peer (0 ping, 0 hops, 0 geographic distance, 100 score) // Score is inverted (100 - score) so lower is better in the tree target := []float64{0, 0, 0, 0} @@ -618,14 +628,14 @@ func (r *PeerRegistry) rebuildKDTree() { points := make([]poindexter.KDPoint[string], 0, len(r.peers)) for _, peer := range r.peers { - // Build 4D point with weighted, normalized values + // Build a 4D point with weighted, normalised values. // Invert score so that higher score = lower value (better) point := poindexter.KDPoint[string]{ ID: peer.ID, Coords: []float64{ - peer.PingMS * pingWeight, + peer.PingMilliseconds * pingWeight, float64(peer.Hops) * hopsWeight, - peer.GeoKM * geoWeight, + peer.GeographicKilometres * geographicWeight, (100 - peer.Score) * scoreWeight, // Invert score }, Value: peer.ID, diff --git a/node/peer_test.go b/node/peer_test.go index 6a79c79..b10979c 100644 --- a/node/peer_test.go +++ b/node/peer_test.go @@ -177,11 +177,11 @@ func TestPeer_Registry_UpdateMetrics_Good(t *testing.T) { if updated == nil { t.Fatal("expected peer to exist") } - if updated.PingMS != 50.5 { - t.Errorf("expected ping 50.5, got %f", updated.PingMS) + if updated.PingMilliseconds != 50.5 { + t.Errorf("expected ping 50.5, got %f", updated.PingMilliseconds) } - if updated.GeoKM != 100.2 { - t.Errorf("expected geo 100.2, got %f", updated.GeoKM) + if updated.GeographicKilometres != 100.2 { + t.Errorf("expected geographic distance 100.2, got %f", updated.GeographicKilometres) } if updated.Hops != 3 { t.Errorf("expected hops 3, got %d", updated.Hops) @@ -306,9 +306,9 @@ func TestPeer_Registry_SelectOptimalPeer_Good(t *testing.T) { // Add peers with different metrics peers := []*Peer{ - {ID: "opt-1", Name: "Slow Peer", PingMS: 200, Hops: 5, GeoKM: 1000, Score: 50}, - {ID: "opt-2", Name: "Fast Peer", PingMS: 10, Hops: 1, GeoKM: 50, Score: 90}, - {ID: "opt-3", Name: "Medium Peer", PingMS: 50, Hops: 2, GeoKM: 200, Score: 70}, + {ID: "opt-1", Name: "Slow Peer", PingMilliseconds: 200, Hops: 5, GeographicKilometres: 1000, Score: 50}, + {ID: "opt-2", Name: "Fast Peer", PingMilliseconds: 10, Hops: 1, GeographicKilometres: 50, Score: 90}, + {ID: "opt-3", Name: "Medium Peer", PingMilliseconds: 50, Hops: 2, GeographicKilometres: 200, Score: 70}, } for _, p := range peers { @@ -331,10 +331,10 @@ func TestPeer_Registry_SelectNearestPeers_Good(t *testing.T) { defer cleanup() peers := []*Peer{ - {ID: "near-1", Name: "Peer 1", PingMS: 100, Score: 50}, - {ID: "near-2", Name: "Peer 2", PingMS: 10, Score: 90}, - {ID: "near-3", Name: "Peer 3", PingMS: 50, Score: 70}, - {ID: "near-4", Name: "Peer 4", PingMS: 200, Score: 30}, + {ID: "near-1", Name: "Peer 1", PingMilliseconds: 100, Score: 50}, + {ID: "near-2", Name: "Peer 2", PingMilliseconds: 10, Score: 90}, + {ID: "near-3", Name: "Peer 3", PingMilliseconds: 50, Score: 70}, + {ID: "near-4", Name: "Peer 4", PingMilliseconds: 200, Score: 30}, } for _, p := range peers { diff --git a/node/protocol.go b/node/protocol.go index bc11502..34f255d 100644 --- a/node/protocol.go +++ b/node/protocol.go @@ -68,14 +68,14 @@ var DefaultResponseHandler = &ResponseHandler{} // ValidateResponse is a convenience function using the default handler. // -// err := ValidateResponse(msg, MessageStats) +// err := ValidateResponse(message, MessageStats) func ValidateResponse(resp *Message, expectedType MessageType) error { return DefaultResponseHandler.ValidateResponse(resp, expectedType) } // ParseResponse is a convenience function using the default handler. // -// err := ParseResponse(msg, MessageStats, &stats) +// err := ParseResponse(message, MessageStats, &stats) func ParseResponse(resp *Message, expectedType MessageType, target any) error { return DefaultResponseHandler.ParseResponse(resp, expectedType, target) } diff --git a/node/transport.go b/node/transport.go index a0755d0..55871f5 100644 --- a/node/transport.go +++ b/node/transport.go @@ -94,8 +94,8 @@ func (c TransportConfig) maximumConnections() int { // MessageHandler processes incoming messages. // -// var handler MessageHandler = func(conn *PeerConnection, msg *Message) {} -type MessageHandler func(conn *PeerConnection, msg *Message) +// var handler MessageHandler = func(peerConnection *PeerConnection, message *Message) {} +type MessageHandler func(peerConnection *PeerConnection, message *Message) // MessageDeduplicator tracks recent message IDs to prevent duplicate processing. // diff --git a/node/worker.go b/node/worker.go index 8997327..9c0669d 100644 --- a/node/worker.go +++ b/node/worker.go @@ -79,24 +79,24 @@ func (w *Worker) SetProfileManager(manager ProfileManager) { // HandleMessage routes an incoming message to the correct worker handler. // -// worker.HandleMessage(conn, msg) -func (w *Worker) HandleMessage(conn *PeerConnection, msg *Message) { +// worker.HandleMessage(peerConnection, message) +func (w *Worker) HandleMessage(peerConnection *PeerConnection, message *Message) { var response *Message var err error - switch msg.Type { + switch message.Type { case MessagePing: - response, err = w.handlePing(msg) + response, err = w.handlePing(message) case MessageGetStats: - response, err = w.handleStats(msg) + response, err = w.handleStats(message) case MessageStartMiner: - response, err = w.handleStartMiner(msg) + response, err = w.handleStartMiner(message) case MessageStopMiner: - response, err = w.handleStopMiner(msg) + response, err = w.handleStopMiner(message) case MessageGetLogs: - response, err = w.handleLogs(msg) + response, err = w.handleLogs(message) case MessageDeploy: - response, err = w.handleDeploy(conn, msg) + response, err = w.handleDeploy(peerConnection, message) default: // Unknown message type - ignore or send error return @@ -108,19 +108,19 @@ func (w *Worker) HandleMessage(conn *PeerConnection, msg *Message) { if identity != nil { errMsg, _ := NewErrorMessage( identity.ID, - msg.From, + message.From, ErrorCodeOperationFailed, err.Error(), - msg.ID, + message.ID, ) - conn.Send(errMsg) + peerConnection.Send(errMsg) } return } if response != nil { - logging.Debug("sending response", logging.Fields{"type": response.Type, "to": msg.From}) - if err := conn.Send(response); err != nil { + logging.Debug("sending response", logging.Fields{"type": response.Type, "to": message.From}) + if err := peerConnection.Send(response); err != nil { logging.Error("failed to send response", logging.Fields{"error": err}) } else { logging.Debug("response sent successfully") @@ -129,9 +129,9 @@ func (w *Worker) HandleMessage(conn *PeerConnection, msg *Message) { } // handlePing responds to ping requests. -func (w *Worker) handlePing(msg *Message) (*Message, error) { +func (w *Worker) handlePing(message *Message) (*Message, error) { var ping PingPayload - if err := msg.ParsePayload(&ping); err != nil { + if err := message.ParsePayload(&ping); err != nil { return nil, core.E("Worker.handlePing", "invalid ping payload", err) } @@ -140,11 +140,11 @@ func (w *Worker) handlePing(msg *Message) (*Message, error) { ReceivedAt: time.Now().UnixMilli(), } - return msg.Reply(MessagePong, pong) + return message.Reply(MessagePong, pong) } // handleStats responds with current miner statistics. -func (w *Worker) handleStats(msg *Message) (*Message, error) { +func (w *Worker) handleStats(message *Message) (*Message, error) { identity := w.nodeManager.GetIdentity() if identity == nil { return nil, ErrorIdentityNotInitialized @@ -172,7 +172,7 @@ func (w *Worker) handleStats(msg *Message) (*Message, error) { } } - return msg.Reply(MessageStats, stats) + return message.Reply(MessageStats, stats) } // convertMinerStats converts miner stats to the protocol format. @@ -208,13 +208,13 @@ func convertMinerStats(miner MinerInstance, rawStats any) MinerStatsItem { } // handleStartMiner starts a miner with the given profile. -func (w *Worker) handleStartMiner(msg *Message) (*Message, error) { +func (w *Worker) handleStartMiner(message *Message) (*Message, error) { if w.minerManager == nil { return nil, ErrorMinerManagerNotConfigured } var payload StartMinerPayload - if err := msg.ParsePayload(&payload); err != nil { + if err := message.ParsePayload(&payload); err != nil { return nil, core.E("Worker.handleStartMiner", "invalid start miner payload", err) } @@ -244,24 +244,24 @@ func (w *Worker) handleStartMiner(msg *Message) (*Message, error) { Success: false, Error: err.Error(), } - return msg.Reply(MessageMinerAck, ack) + return message.Reply(MessageMinerAck, ack) } ack := MinerAckPayload{ Success: true, MinerName: miner.GetName(), } - return msg.Reply(MessageMinerAck, ack) + return message.Reply(MessageMinerAck, ack) } // handleStopMiner stops a running miner. -func (w *Worker) handleStopMiner(msg *Message) (*Message, error) { +func (w *Worker) handleStopMiner(message *Message) (*Message, error) { if w.minerManager == nil { return nil, ErrorMinerManagerNotConfigured } var payload StopMinerPayload - if err := msg.ParsePayload(&payload); err != nil { + if err := message.ParsePayload(&payload); err != nil { return nil, core.E("Worker.handleStopMiner", "invalid stop miner payload", err) } @@ -274,17 +274,17 @@ func (w *Worker) handleStopMiner(msg *Message) (*Message, error) { ack.Error = err.Error() } - return msg.Reply(MessageMinerAck, ack) + return message.Reply(MessageMinerAck, ack) } // handleLogs returns console logs from a miner. -func (w *Worker) handleLogs(msg *Message) (*Message, error) { +func (w *Worker) handleLogs(message *Message) (*Message, error) { if w.minerManager == nil { return nil, ErrorMinerManagerNotConfigured } var payload LogsRequestPayload - if err := msg.ParsePayload(&payload); err != nil { + if err := message.ParsePayload(&payload); err != nil { return nil, core.E("Worker.handleLogs", "invalid logs payload", err) } @@ -307,13 +307,13 @@ func (w *Worker) handleLogs(msg *Message) (*Message, error) { HasMore: len(lines) >= payload.Lines, } - return msg.Reply(MessageLogs, logs) + return message.Reply(MessageLogs, logs) } // handleDeploy handles deployment of profiles or miner bundles. -func (w *Worker) handleDeploy(conn *PeerConnection, msg *Message) (*Message, error) { +func (w *Worker) handleDeploy(peerConnection *PeerConnection, message *Message) (*Message, error) { var payload DeployPayload - if err := msg.ParsePayload(&payload); err != nil { + if err := message.ParsePayload(&payload); err != nil { return nil, core.E("Worker.handleDeploy", "invalid deploy payload", err) } @@ -327,8 +327,8 @@ func (w *Worker) handleDeploy(conn *PeerConnection, msg *Message) (*Message, err // Use shared secret as password (base64 encoded) password := "" - if conn != nil && len(conn.SharedSecret) > 0 { - password = base64.StdEncoding.EncodeToString(conn.SharedSecret) + if peerConnection != nil && len(peerConnection.SharedSecret) > 0 { + password = base64.StdEncoding.EncodeToString(peerConnection.SharedSecret) } switch bundle.Type { @@ -355,14 +355,14 @@ func (w *Worker) handleDeploy(conn *PeerConnection, msg *Message) (*Message, err Name: payload.Name, Error: err.Error(), } - return msg.Reply(MessageDeployAck, ack) + return message.Reply(MessageDeployAck, ack) } ack := DeployAckPayload{ Success: true, Name: payload.Name, } - return msg.Reply(MessageDeployAck, ack) + return message.Reply(MessageDeployAck, ack) case BundleMiner, BundleFull: // Determine the installation directory under the configured deployment @@ -406,7 +406,7 @@ func (w *Worker) handleDeploy(conn *PeerConnection, msg *Message) (*Message, err "miner_path": minerPath, }) - return msg.Reply(MessageDeployAck, ack) + return message.Reply(MessageDeployAck, ack) default: return nil, core.E("Worker.handleDeploy", "unknown bundle type: "+payload.BundleType, nil)