refactor(node): adopt AX naming across core APIs
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
851b1294bd
commit
885070d241
8 changed files with 223 additions and 97 deletions
|
|
@ -72,8 +72,8 @@ func (c *Controller) sendRequest(peerID string, msg *Message, timeout time.Durat
|
|||
resolvedPeerID := peerID
|
||||
|
||||
// Auto-connect if not already connected
|
||||
if c.transport.Connection(peerID) == nil {
|
||||
peer := c.peerRegistry.Peer(peerID)
|
||||
if c.transport.GetConnection(peerID) == nil {
|
||||
peer := c.peerRegistry.GetPeer(peerID)
|
||||
if peer == nil {
|
||||
return nil, core.E("Controller.sendRequest", "peer not found: "+peerID, nil)
|
||||
}
|
||||
|
|
@ -119,18 +119,18 @@ func (c *Controller) sendRequest(peerID string, msg *Message, timeout time.Durat
|
|||
}
|
||||
}
|
||||
|
||||
// RemoteStats requests miner statistics from a remote peer.
|
||||
// GetRemoteStats requests miner statistics from a remote peer.
|
||||
//
|
||||
// stats, err := controller.RemoteStats("worker-1")
|
||||
func (c *Controller) RemoteStats(peerID string) (*StatsPayload, error) {
|
||||
identity := c.nodeManager.Identity()
|
||||
// stats, err := controller.GetRemoteStats("worker-1")
|
||||
func (c *Controller) GetRemoteStats(peerID string) (*StatsPayload, error) {
|
||||
identity := c.nodeManager.GetIdentity()
|
||||
if identity == nil {
|
||||
return nil, ErrorIdentityNotInitialized
|
||||
}
|
||||
|
||||
msg, err := NewMessage(MessageGetStats, identity.ID, peerID, nil)
|
||||
if err != nil {
|
||||
return nil, core.E("Controller.RemoteStats", "failed to create message", err)
|
||||
return nil, core.E("Controller.GetRemoteStats", "failed to create message", err)
|
||||
}
|
||||
|
||||
response, err := c.sendRequest(peerID, msg, 10*time.Second)
|
||||
|
|
@ -146,11 +146,16 @@ func (c *Controller) RemoteStats(peerID string) (*StatsPayload, error) {
|
|||
return &stats, nil
|
||||
}
|
||||
|
||||
// RemoteStats is a deprecated compatibility alias for GetRemoteStats.
|
||||
func (c *Controller) RemoteStats(peerID string) (*StatsPayload, error) {
|
||||
return c.GetRemoteStats(peerID)
|
||||
}
|
||||
|
||||
// StartRemoteMiner requests a remote peer to start a miner with a given profile.
|
||||
//
|
||||
// err := controller.StartRemoteMiner("worker-1", "xmrig", "profile-1", nil)
|
||||
func (c *Controller) StartRemoteMiner(peerID, minerType, profileID string, configOverride RawMessage) error {
|
||||
identity := c.nodeManager.Identity()
|
||||
identity := c.nodeManager.GetIdentity()
|
||||
if identity == nil {
|
||||
return ErrorIdentityNotInitialized
|
||||
}
|
||||
|
|
@ -191,7 +196,7 @@ func (c *Controller) StartRemoteMiner(peerID, minerType, profileID string, confi
|
|||
//
|
||||
// err := controller.StopRemoteMiner("worker-1", "xmrig-0")
|
||||
func (c *Controller) StopRemoteMiner(peerID, minerName string) error {
|
||||
identity := c.nodeManager.Identity()
|
||||
identity := c.nodeManager.GetIdentity()
|
||||
if identity == nil {
|
||||
return ErrorIdentityNotInitialized
|
||||
}
|
||||
|
|
@ -222,11 +227,11 @@ func (c *Controller) StopRemoteMiner(peerID, minerName string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// RemoteLogs requests console logs from a remote miner.
|
||||
// GetRemoteLogs requests console logs from a remote miner.
|
||||
//
|
||||
// logs, err := controller.RemoteLogs("worker-1", "xmrig-0", 100)
|
||||
func (c *Controller) RemoteLogs(peerID, minerName string, lines int) ([]string, error) {
|
||||
identity := c.nodeManager.Identity()
|
||||
// logs, err := controller.GetRemoteLogs("worker-1", "xmrig-0", 100)
|
||||
func (c *Controller) GetRemoteLogs(peerID, minerName string, lines int) ([]string, error) {
|
||||
identity := c.nodeManager.GetIdentity()
|
||||
if identity == nil {
|
||||
return nil, ErrorIdentityNotInitialized
|
||||
}
|
||||
|
|
@ -238,7 +243,7 @@ func (c *Controller) RemoteLogs(peerID, minerName string, lines int) ([]string,
|
|||
|
||||
msg, err := NewMessage(MessageGetLogs, identity.ID, peerID, payload)
|
||||
if err != nil {
|
||||
return nil, core.E("Controller.RemoteLogs", "failed to create message", err)
|
||||
return nil, core.E("Controller.GetRemoteLogs", "failed to create message", err)
|
||||
}
|
||||
|
||||
response, err := c.sendRequest(peerID, msg, 10*time.Second)
|
||||
|
|
@ -254,10 +259,15 @@ func (c *Controller) RemoteLogs(peerID, minerName string, lines int) ([]string,
|
|||
return logs.Lines, nil
|
||||
}
|
||||
|
||||
// AllStats fetches stats from all connected peers.
|
||||
// RemoteLogs is a deprecated compatibility alias for GetRemoteLogs.
|
||||
func (c *Controller) RemoteLogs(peerID, minerName string, lines int) ([]string, error) {
|
||||
return c.GetRemoteLogs(peerID, minerName, lines)
|
||||
}
|
||||
|
||||
// GetAllStats fetches stats from all connected peers.
|
||||
//
|
||||
// statsByPeerID := controller.AllStats()
|
||||
func (c *Controller) AllStats() map[string]*StatsPayload {
|
||||
// statsByPeerID := controller.GetAllStats()
|
||||
func (c *Controller) GetAllStats() map[string]*StatsPayload {
|
||||
results := make(map[string]*StatsPayload)
|
||||
var mu sync.Mutex
|
||||
var wg sync.WaitGroup
|
||||
|
|
@ -266,7 +276,7 @@ func (c *Controller) AllStats() map[string]*StatsPayload {
|
|||
wg.Add(1)
|
||||
go func(p *Peer) {
|
||||
defer wg.Done()
|
||||
stats, err := c.RemoteStats(p.ID)
|
||||
stats, err := c.GetRemoteStats(p.ID)
|
||||
if err != nil {
|
||||
logging.Debug("failed to get stats from peer", logging.Fields{
|
||||
"peer_id": p.ID,
|
||||
|
|
@ -285,11 +295,16 @@ func (c *Controller) AllStats() map[string]*StatsPayload {
|
|||
return results
|
||||
}
|
||||
|
||||
// AllStats is a deprecated compatibility alias for GetAllStats.
|
||||
func (c *Controller) AllStats() map[string]*StatsPayload {
|
||||
return c.GetAllStats()
|
||||
}
|
||||
|
||||
// PingPeer sends a ping to a peer and refreshes that peer's metrics.
|
||||
//
|
||||
// rttMS, err := controller.PingPeer("worker-1")
|
||||
func (c *Controller) PingPeer(peerID string) (float64, error) {
|
||||
identity := c.nodeManager.Identity()
|
||||
identity := c.nodeManager.GetIdentity()
|
||||
if identity == nil {
|
||||
return 0, ErrorIdentityNotInitialized
|
||||
}
|
||||
|
|
@ -317,7 +332,7 @@ func (c *Controller) PingPeer(peerID string) (float64, error) {
|
|||
rtt := time.Since(sentAt).Seconds() * 1000 // Convert to ms
|
||||
|
||||
// Update peer metrics
|
||||
peer := c.peerRegistry.Peer(peerID)
|
||||
peer := c.peerRegistry.GetPeer(peerID)
|
||||
if peer != nil {
|
||||
c.peerRegistry.UpdateMetrics(peerID, rtt, peer.GeoKM, peer.Hops)
|
||||
}
|
||||
|
|
@ -329,7 +344,7 @@ func (c *Controller) PingPeer(peerID string) (float64, error) {
|
|||
//
|
||||
// err := controller.ConnectToPeer("worker-1")
|
||||
func (c *Controller) ConnectToPeer(peerID string) error {
|
||||
peer := c.peerRegistry.Peer(peerID)
|
||||
peer := c.peerRegistry.GetPeer(peerID)
|
||||
if peer == nil {
|
||||
return core.E("Controller.ConnectToPeer", "peer not found: "+peerID, nil)
|
||||
}
|
||||
|
|
@ -342,7 +357,7 @@ func (c *Controller) ConnectToPeer(peerID string) error {
|
|||
//
|
||||
// err := controller.DisconnectFromPeer("worker-1")
|
||||
func (c *Controller) DisconnectFromPeer(peerID string) error {
|
||||
conn := c.transport.Connection(peerID)
|
||||
conn := c.transport.GetConnection(peerID)
|
||||
if conn == nil {
|
||||
return core.E("Controller.DisconnectFromPeer", "peer not connected: "+peerID, nil)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -490,17 +490,21 @@ func (m *mockMinerManagerFull) ListMiners() []MinerInstance {
|
|||
return result
|
||||
}
|
||||
|
||||
func (m *mockMinerManagerFull) Miner(name string) (MinerInstance, error) {
|
||||
func (m *mockMinerManagerFull) GetMiner(name string) (MinerInstance, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
miner, exists := m.miners[name]
|
||||
if !exists {
|
||||
return nil, core.E("mockMinerManagerFull.Miner", "miner "+name+" not found", nil)
|
||||
return nil, core.E("mockMinerManagerFull.GetMiner", "miner "+name+" not found", nil)
|
||||
}
|
||||
return miner, nil
|
||||
}
|
||||
|
||||
func (m *mockMinerManagerFull) Miner(name string) (MinerInstance, error) {
|
||||
return m.GetMiner(name)
|
||||
}
|
||||
|
||||
// mockMinerFull implements MinerInstance with real data.
|
||||
type mockMinerFull struct {
|
||||
name string
|
||||
|
|
@ -509,16 +513,21 @@ type mockMinerFull struct {
|
|||
consoleHistory []string
|
||||
}
|
||||
|
||||
func (m *mockMinerFull) Name() string { return m.name }
|
||||
func (m *mockMinerFull) Type() string { return m.minerType }
|
||||
func (m *mockMinerFull) Stats() (any, error) { return m.stats, nil }
|
||||
func (m *mockMinerFull) ConsoleHistory(lines int) []string {
|
||||
func (m *mockMinerFull) GetName() string { return m.name }
|
||||
func (m *mockMinerFull) GetType() string { return m.minerType }
|
||||
func (m *mockMinerFull) GetStats() (any, error) { return m.stats, nil }
|
||||
func (m *mockMinerFull) GetConsoleHistory(lines int) []string {
|
||||
if lines >= len(m.consoleHistory) {
|
||||
return m.consoleHistory
|
||||
}
|
||||
return m.consoleHistory[:lines]
|
||||
}
|
||||
|
||||
func (m *mockMinerFull) Name() string { return m.GetName() }
|
||||
func (m *mockMinerFull) Type() string { return m.GetType() }
|
||||
func (m *mockMinerFull) Stats() (any, error) { return m.GetStats() }
|
||||
func (m *mockMinerFull) ConsoleHistory(lines int) []string { return m.GetConsoleHistory(lines) }
|
||||
|
||||
func TestController_StartRemoteMiner_Good(t *testing.T) {
|
||||
controller, _, tp := setupControllerPairWithMiner(t)
|
||||
serverID := tp.ServerNode.Identity().ID
|
||||
|
|
|
|||
|
|
@ -126,6 +126,11 @@ func NewNodeManagerFromPaths(keyPath, configPath string) (*NodeManager, error) {
|
|||
return nm, nil
|
||||
}
|
||||
|
||||
// NewNodeManagerWithPaths is a deprecated compatibility alias for NewNodeManagerFromPaths.
|
||||
func NewNodeManagerWithPaths(keyPath, configPath string) (*NodeManager, error) {
|
||||
return NewNodeManagerFromPaths(keyPath, configPath)
|
||||
}
|
||||
|
||||
// HasIdentity returns true if a node identity has been initialized.
|
||||
func (n *NodeManager) HasIdentity() bool {
|
||||
n.mu.RLock()
|
||||
|
|
@ -133,10 +138,10 @@ func (n *NodeManager) HasIdentity() bool {
|
|||
return n.identity != nil
|
||||
}
|
||||
|
||||
// Identity returns a copy of the loaded node identity.
|
||||
// GetIdentity returns a copy of the loaded node identity.
|
||||
//
|
||||
// identity := nodeManager.Identity()
|
||||
func (n *NodeManager) Identity() *NodeIdentity {
|
||||
// identity := nodeManager.GetIdentity()
|
||||
func (n *NodeManager) GetIdentity() *NodeIdentity {
|
||||
n.mu.RLock()
|
||||
defer n.mu.RUnlock()
|
||||
if n.identity == nil {
|
||||
|
|
@ -147,6 +152,11 @@ func (n *NodeManager) Identity() *NodeIdentity {
|
|||
return &identity
|
||||
}
|
||||
|
||||
// Identity is a deprecated compatibility alias for GetIdentity.
|
||||
func (n *NodeManager) Identity() *NodeIdentity {
|
||||
return n.GetIdentity()
|
||||
}
|
||||
|
||||
// GenerateIdentity writes a new node identity for the given name and role.
|
||||
//
|
||||
// err := nodeManager.GenerateIdentity("worker-1", RoleWorker)
|
||||
|
|
|
|||
62
node/peer.go
62
node/peer.go
|
|
@ -160,6 +160,11 @@ func NewPeerRegistryFromPath(peersPath string) (*PeerRegistry, error) {
|
|||
return pr, nil
|
||||
}
|
||||
|
||||
// NewPeerRegistryWithPath is a deprecated compatibility alias for NewPeerRegistryFromPath.
|
||||
func NewPeerRegistryWithPath(peersPath string) (*PeerRegistry, error) {
|
||||
return NewPeerRegistryFromPath(peersPath)
|
||||
}
|
||||
|
||||
// SetAuthMode changes how unknown peers are handled.
|
||||
//
|
||||
// registry.SetAuthMode(PeerAuthAllowlist)
|
||||
|
|
@ -170,15 +175,20 @@ func (r *PeerRegistry) SetAuthMode(mode PeerAuthMode) {
|
|||
logging.Info("peer auth mode changed", logging.Fields{"mode": mode})
|
||||
}
|
||||
|
||||
// AuthMode returns the current authentication mode.
|
||||
// GetAuthMode returns the current authentication mode.
|
||||
//
|
||||
// mode := registry.AuthMode()
|
||||
func (r *PeerRegistry) AuthMode() PeerAuthMode {
|
||||
// mode := registry.GetAuthMode()
|
||||
func (r *PeerRegistry) GetAuthMode() PeerAuthMode {
|
||||
r.allowedPublicKeyMu.RLock()
|
||||
defer r.allowedPublicKeyMu.RUnlock()
|
||||
return r.authMode
|
||||
}
|
||||
|
||||
// AuthMode is a deprecated compatibility alias for GetAuthMode.
|
||||
func (r *PeerRegistry) AuthMode() PeerAuthMode {
|
||||
return r.GetAuthMode()
|
||||
}
|
||||
|
||||
// AllowPublicKey adds a public key to the allowlist.
|
||||
func (r *PeerRegistry) AllowPublicKey(publicKey string) {
|
||||
r.allowedPublicKeyMu.Lock()
|
||||
|
|
@ -323,10 +333,10 @@ func (r *PeerRegistry) RemovePeer(id string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Peer returns a copy of the peer with the supplied ID.
|
||||
// GetPeer returns a copy of the peer with the supplied ID.
|
||||
//
|
||||
// peer := registry.Peer("worker-1")
|
||||
func (r *PeerRegistry) Peer(id string) *Peer {
|
||||
// peer := registry.GetPeer("worker-1")
|
||||
func (r *PeerRegistry) GetPeer(id string) *Peer {
|
||||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
|
||||
|
|
@ -340,6 +350,11 @@ func (r *PeerRegistry) Peer(id string) *Peer {
|
|||
return &peerCopy
|
||||
}
|
||||
|
||||
// Peer is a deprecated compatibility alias for GetPeer.
|
||||
func (r *PeerRegistry) Peer(id string) *Peer {
|
||||
return r.GetPeer(id)
|
||||
}
|
||||
|
||||
// ListPeers returns all registered peers.
|
||||
func (r *PeerRegistry) ListPeers() []*Peer {
|
||||
return slices.Collect(r.Peers())
|
||||
|
|
@ -406,10 +421,10 @@ func (r *PeerRegistry) UpdateScore(id string, score float64) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// MarkConnected updates a peer's connection state.
|
||||
// SetConnected updates a peer's connection state.
|
||||
//
|
||||
// registry.MarkConnected("worker-1", true)
|
||||
func (r *PeerRegistry) MarkConnected(id string, connected bool) {
|
||||
// registry.SetConnected("worker-1", true)
|
||||
func (r *PeerRegistry) SetConnected(id string, connected bool) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
|
|
@ -421,6 +436,11 @@ func (r *PeerRegistry) MarkConnected(id string, connected bool) {
|
|||
}
|
||||
}
|
||||
|
||||
// MarkConnected is a deprecated compatibility alias for SetConnected.
|
||||
func (r *PeerRegistry) MarkConnected(id string, connected bool) {
|
||||
r.SetConnected(id, connected)
|
||||
}
|
||||
|
||||
// Score adjustment constants
|
||||
const (
|
||||
ScoreSuccessIncrement = 1.0 // Increment for successful interaction
|
||||
|
|
@ -488,10 +508,10 @@ func (r *PeerRegistry) RecordTimeout(id string) {
|
|||
})
|
||||
}
|
||||
|
||||
// PeersSortedByScore returns peers sorted by score, highest first.
|
||||
// GetPeersByScore returns peers sorted by score, highest first.
|
||||
//
|
||||
// peers := registry.PeersSortedByScore()
|
||||
func (r *PeerRegistry) PeersSortedByScore() []*Peer {
|
||||
// peers := registry.GetPeersByScore()
|
||||
func (r *PeerRegistry) GetPeersByScore() []*Peer {
|
||||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
|
||||
|
|
@ -511,10 +531,15 @@ func (r *PeerRegistry) PeersSortedByScore() []*Peer {
|
|||
return peers
|
||||
}
|
||||
|
||||
// PeersSortedByScore is a deprecated compatibility alias for GetPeersByScore.
|
||||
func (r *PeerRegistry) PeersSortedByScore() []*Peer {
|
||||
return r.GetPeersByScore()
|
||||
}
|
||||
|
||||
// PeersByScore returns an iterator over peers sorted by score (highest first).
|
||||
func (r *PeerRegistry) PeersByScore() iter.Seq[*Peer] {
|
||||
return func(yield func(*Peer) bool) {
|
||||
peers := r.PeersSortedByScore()
|
||||
peers := r.GetPeersByScore()
|
||||
for _, p := range peers {
|
||||
if !yield(p) {
|
||||
return
|
||||
|
|
@ -576,13 +601,18 @@ func (r *PeerRegistry) SelectNearestPeers(n int) []*Peer {
|
|||
return peers
|
||||
}
|
||||
|
||||
// ConnectedPeerList returns all currently connected peers as a slice.
|
||||
// GetConnectedPeers returns all currently connected peers as a slice.
|
||||
//
|
||||
// connectedPeers := registry.ConnectedPeerList()
|
||||
func (r *PeerRegistry) ConnectedPeerList() []*Peer {
|
||||
// connectedPeers := registry.GetConnectedPeers()
|
||||
func (r *PeerRegistry) GetConnectedPeers() []*Peer {
|
||||
return slices.Collect(r.ConnectedPeers())
|
||||
}
|
||||
|
||||
// ConnectedPeerList is a deprecated compatibility alias for GetConnectedPeers.
|
||||
func (r *PeerRegistry) ConnectedPeerList() []*Peer {
|
||||
return r.GetConnectedPeers()
|
||||
}
|
||||
|
||||
// ConnectedPeers returns an iterator over all currently connected peers.
|
||||
// Each peer is a copy to prevent mutation.
|
||||
func (r *PeerRegistry) ConnectedPeers() iter.Seq[*Peer] {
|
||||
|
|
|
|||
|
|
@ -88,12 +88,17 @@ func IsProtocolError(err error) bool {
|
|||
return ok
|
||||
}
|
||||
|
||||
// ProtocolErrorCode returns the error code if err is a ProtocolError, otherwise returns 0.
|
||||
// GetProtocolErrorCode returns the error code if err is a ProtocolError, otherwise returns 0.
|
||||
//
|
||||
// code := ProtocolErrorCode(err)
|
||||
func ProtocolErrorCode(err error) int {
|
||||
// code := GetProtocolErrorCode(err)
|
||||
func GetProtocolErrorCode(err error) int {
|
||||
if pe, ok := err.(*ProtocolError); ok {
|
||||
return pe.Code
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// ProtocolErrorCode is a deprecated compatibility alias for GetProtocolErrorCode.
|
||||
func ProtocolErrorCode(err error) int {
|
||||
return GetProtocolErrorCode(err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -264,7 +264,7 @@ func agentHeaderToken(value string) string {
|
|||
|
||||
// agentUserAgent returns a transparent identity string for request headers.
|
||||
func (t *Transport) agentUserAgent() string {
|
||||
identity := t.nodeManager.Identity()
|
||||
identity := t.nodeManager.GetIdentity()
|
||||
if identity == nil {
|
||||
return core.Sprintf("%s proto=%s", agentUserAgentPrefix, ProtocolVersion)
|
||||
}
|
||||
|
|
@ -431,7 +431,7 @@ func (t *Transport) Connect(peer *Peer) (*PeerConnection, error) {
|
|||
})
|
||||
|
||||
// Update registry
|
||||
t.peerRegistry.MarkConnected(pc.Peer.ID, true)
|
||||
t.peerRegistry.SetConnected(pc.Peer.ID, true)
|
||||
|
||||
// Start read loop
|
||||
t.waitGroup.Add(1)
|
||||
|
|
@ -491,15 +491,20 @@ func (t *Transport) Broadcast(msg *Message) error {
|
|||
return lastErr
|
||||
}
|
||||
|
||||
// Connection returns an active connection to a peer.
|
||||
// GetConnection returns an active connection to a peer.
|
||||
//
|
||||
// connection := transport.Connection("worker-1")
|
||||
func (t *Transport) Connection(peerID string) *PeerConnection {
|
||||
// connection := transport.GetConnection("worker-1")
|
||||
func (t *Transport) GetConnection(peerID string) *PeerConnection {
|
||||
t.mutex.RLock()
|
||||
defer t.mutex.RUnlock()
|
||||
return t.connections[peerID]
|
||||
}
|
||||
|
||||
// Connection is a deprecated compatibility alias for GetConnection.
|
||||
func (t *Transport) Connection(peerID string) *PeerConnection {
|
||||
return t.GetConnection(peerID)
|
||||
}
|
||||
|
||||
// handleWSUpgrade handles incoming WebSocket connections.
|
||||
func (t *Transport) handleWSUpgrade(w http.ResponseWriter, r *http.Request) {
|
||||
userAgent := r.Header.Get("User-Agent")
|
||||
|
|
@ -569,7 +574,7 @@ func (t *Transport) handleWSUpgrade(w http.ResponseWriter, r *http.Request) {
|
|||
"peer_id": payload.Identity.ID,
|
||||
"user_agent": userAgent,
|
||||
})
|
||||
identity := t.nodeManager.Identity()
|
||||
identity := t.nodeManager.GetIdentity()
|
||||
if identity != nil {
|
||||
rejectPayload := HandshakeAckPayload{
|
||||
Identity: *identity,
|
||||
|
|
@ -601,7 +606,7 @@ func (t *Transport) handleWSUpgrade(w http.ResponseWriter, r *http.Request) {
|
|||
"user_agent": userAgent,
|
||||
})
|
||||
// Send rejection before closing
|
||||
identity := t.nodeManager.Identity()
|
||||
identity := t.nodeManager.GetIdentity()
|
||||
if identity != nil {
|
||||
rejectPayload := HandshakeAckPayload{
|
||||
Identity: *identity,
|
||||
|
|
@ -618,7 +623,7 @@ func (t *Transport) handleWSUpgrade(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
// Create peer if not exists (only if auth passed)
|
||||
peer := t.peerRegistry.Peer(payload.Identity.ID)
|
||||
peer := t.peerRegistry.GetPeer(payload.Identity.ID)
|
||||
if peer == nil {
|
||||
// Auto-register the peer since they passed allowlist check
|
||||
peer = &Peer{
|
||||
|
|
@ -647,7 +652,7 @@ func (t *Transport) handleWSUpgrade(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
// Send handshake acknowledgment
|
||||
identity := t.nodeManager.Identity()
|
||||
identity := t.nodeManager.GetIdentity()
|
||||
if identity == nil {
|
||||
conn.Close()
|
||||
return
|
||||
|
|
@ -689,7 +694,7 @@ func (t *Transport) handleWSUpgrade(w http.ResponseWriter, r *http.Request) {
|
|||
t.mutex.Unlock()
|
||||
|
||||
// Update registry
|
||||
t.peerRegistry.MarkConnected(peer.ID, true)
|
||||
t.peerRegistry.SetConnected(peer.ID, true)
|
||||
|
||||
logging.Debug("accepted peer connection", logging.Fields{
|
||||
"peer_id": peer.ID,
|
||||
|
|
@ -718,7 +723,7 @@ func (t *Transport) performHandshake(pc *PeerConnection) error {
|
|||
pc.Conn.SetReadDeadline(time.Time{})
|
||||
}()
|
||||
|
||||
identity := t.nodeManager.Identity()
|
||||
identity := t.nodeManager.GetIdentity()
|
||||
if identity == nil {
|
||||
return ErrorIdentityNotInitialized
|
||||
}
|
||||
|
|
@ -900,7 +905,7 @@ func (t *Transport) keepalive(pc *PeerConnection) {
|
|||
}
|
||||
|
||||
// Send ping
|
||||
identity := t.nodeManager.Identity()
|
||||
identity := t.nodeManager.GetIdentity()
|
||||
pingMsg, err := NewMessage(MessagePing, identity.ID, pc.Peer.ID, PingPayload{
|
||||
SentAt: time.Now().UnixMilli(),
|
||||
})
|
||||
|
|
@ -922,7 +927,7 @@ func (t *Transport) removeConnection(pc *PeerConnection) {
|
|||
delete(t.connections, pc.Peer.ID)
|
||||
t.mutex.Unlock()
|
||||
|
||||
t.peerRegistry.MarkConnected(pc.Peer.ID, false)
|
||||
t.peerRegistry.SetConnected(pc.Peer.ID, false)
|
||||
pc.Close()
|
||||
}
|
||||
|
||||
|
|
@ -981,7 +986,7 @@ func (pc *PeerConnection) GracefulClose(reason string, code int) error {
|
|||
// already manages write deadlines under the lock. Setting it here
|
||||
// without the lock races with concurrent Send() calls (P2P-RACE-1).
|
||||
if pc.transport != nil && pc.SharedSecret != nil {
|
||||
identity := pc.transport.nodeManager.Identity()
|
||||
identity := pc.transport.nodeManager.GetIdentity()
|
||||
if identity != nil {
|
||||
payload := DisconnectPayload{
|
||||
Reason: reason,
|
||||
|
|
@ -1039,11 +1044,16 @@ func (t *Transport) decryptMessage(data []byte, sharedSecret []byte) (*Message,
|
|||
return &msg, nil
|
||||
}
|
||||
|
||||
// ConnectedPeerCount returns the number of connected peers.
|
||||
// ConnectedPeers returns the number of connected peers.
|
||||
//
|
||||
// count := transport.ConnectedPeerCount()
|
||||
func (t *Transport) ConnectedPeerCount() int {
|
||||
// count := transport.ConnectedPeers()
|
||||
func (t *Transport) ConnectedPeers() int {
|
||||
t.mutex.RLock()
|
||||
defer t.mutex.RUnlock()
|
||||
return len(t.connections)
|
||||
}
|
||||
|
||||
// ConnectedPeerCount is a deprecated compatibility alias for ConnectedPeers.
|
||||
func (t *Transport) ConnectedPeerCount() int {
|
||||
return t.ConnectedPeers()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,24 +18,24 @@ type MinerManager interface {
|
|||
StartMiner(minerType string, config any) (MinerInstance, error)
|
||||
StopMiner(name string) error
|
||||
ListMiners() []MinerInstance
|
||||
Miner(name string) (MinerInstance, error)
|
||||
GetMiner(name string) (MinerInstance, error)
|
||||
}
|
||||
|
||||
// MinerInstance represents a running miner for stats collection.
|
||||
//
|
||||
// var miner MinerInstance
|
||||
type MinerInstance interface {
|
||||
Name() string
|
||||
Type() string
|
||||
Stats() (any, error)
|
||||
ConsoleHistory(lines int) []string
|
||||
GetName() string
|
||||
GetType() string
|
||||
GetStats() (any, error)
|
||||
GetConsoleHistory(lines int) []string
|
||||
}
|
||||
|
||||
// ProfileManager interface for profile operations.
|
||||
//
|
||||
// var profileManager ProfileManager
|
||||
type ProfileManager interface {
|
||||
Profile(id string) (any, error)
|
||||
GetProfile(id string) (any, error)
|
||||
SaveProfile(profile any) error
|
||||
}
|
||||
|
||||
|
|
@ -48,7 +48,8 @@ type Worker struct {
|
|||
minerManager MinerManager
|
||||
profileManager ProfileManager
|
||||
startedAt time.Time
|
||||
DataDirectory string // Base directory for deployments (defaults to xdg.DataHome)
|
||||
DataDir string // Base directory for deployments (defaults to xdg.DataHome)
|
||||
DataDirectory string // Deprecated compatibility alias for DataDir
|
||||
}
|
||||
|
||||
// NewWorker creates a new Worker instance.
|
||||
|
|
@ -59,6 +60,7 @@ func NewWorker(nodeManager *NodeManager, transport *Transport) *Worker {
|
|||
nodeManager: nodeManager,
|
||||
transport: transport,
|
||||
startedAt: time.Now(),
|
||||
DataDir: xdg.DataHome,
|
||||
DataDirectory: xdg.DataHome,
|
||||
}
|
||||
}
|
||||
|
|
@ -104,7 +106,7 @@ func (w *Worker) HandleMessage(conn *PeerConnection, msg *Message) {
|
|||
|
||||
if err != nil {
|
||||
// Send error response
|
||||
identity := w.nodeManager.Identity()
|
||||
identity := w.nodeManager.GetIdentity()
|
||||
if identity != nil {
|
||||
errMsg, _ := NewErrorMessage(
|
||||
identity.ID,
|
||||
|
|
@ -145,7 +147,7 @@ func (w *Worker) handlePing(msg *Message) (*Message, error) {
|
|||
|
||||
// handleStats responds with current miner statistics.
|
||||
func (w *Worker) handleStats(msg *Message) (*Message, error) {
|
||||
identity := w.nodeManager.Identity()
|
||||
identity := w.nodeManager.GetIdentity()
|
||||
if identity == nil {
|
||||
return nil, ErrorIdentityNotInitialized
|
||||
}
|
||||
|
|
@ -160,7 +162,7 @@ func (w *Worker) handleStats(msg *Message) (*Message, error) {
|
|||
if w.minerManager != nil {
|
||||
miners := w.minerManager.ListMiners()
|
||||
for _, miner := range miners {
|
||||
minerStats, err := miner.Stats()
|
||||
minerStats, err := miner.GetStats()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
|
@ -178,8 +180,8 @@ func (w *Worker) handleStats(msg *Message) (*Message, error) {
|
|||
// convertMinerStats converts miner stats to the protocol format.
|
||||
func convertMinerStats(miner MinerInstance, rawStats any) MinerStatsItem {
|
||||
item := MinerStatsItem{
|
||||
Name: miner.Name(),
|
||||
Type: miner.Type(),
|
||||
Name: miner.GetName(),
|
||||
Type: miner.GetType(),
|
||||
}
|
||||
|
||||
// Try to extract common fields from the stats
|
||||
|
|
@ -228,7 +230,7 @@ func (w *Worker) handleStartMiner(msg *Message) (*Message, error) {
|
|||
if payload.Config != nil {
|
||||
config = payload.Config
|
||||
} else if w.profileManager != nil {
|
||||
profile, err := w.profileManager.Profile(payload.ProfileID)
|
||||
profile, err := w.profileManager.GetProfile(payload.ProfileID)
|
||||
if err != nil {
|
||||
return nil, core.E("Worker.handleStartMiner", "profile not found: "+payload.ProfileID, nil)
|
||||
}
|
||||
|
|
@ -249,7 +251,7 @@ func (w *Worker) handleStartMiner(msg *Message) (*Message, error) {
|
|||
|
||||
ack := MinerAckPayload{
|
||||
Success: true,
|
||||
MinerName: miner.Name(),
|
||||
MinerName: miner.GetName(),
|
||||
}
|
||||
return msg.Reply(MessageMinerAck, ack)
|
||||
}
|
||||
|
|
@ -294,12 +296,12 @@ func (w *Worker) handleLogs(msg *Message) (*Message, error) {
|
|||
payload.Lines = maxLogLines
|
||||
}
|
||||
|
||||
miner, err := w.minerManager.Miner(payload.MinerName)
|
||||
miner, err := w.minerManager.GetMiner(payload.MinerName)
|
||||
if err != nil {
|
||||
return nil, core.E("Worker.handleLogs", "miner not found: "+payload.MinerName, nil)
|
||||
}
|
||||
|
||||
lines := miner.ConsoleHistory(payload.Lines)
|
||||
lines := miner.GetConsoleHistory(payload.Lines)
|
||||
|
||||
logs := LogsPayload{
|
||||
MinerName: payload.MinerName,
|
||||
|
|
@ -366,8 +368,9 @@ func (w *Worker) handleDeploy(conn *PeerConnection, msg *Message) (*Message, err
|
|||
|
||||
case BundleMiner, BundleFull:
|
||||
// Determine installation directory
|
||||
// We use w.DataDirectory/lethean-desktop/miners/<bundle_name>
|
||||
minersDir := core.JoinPath(w.DataDirectory, "lethean-desktop", "miners")
|
||||
// We use the configured deployment directory for
|
||||
// lethean-desktop/miners/<bundle_name>.
|
||||
minersDir := core.JoinPath(w.deploymentDir(), "lethean-desktop", "miners")
|
||||
installDir := core.JoinPath(minersDir, payload.Name)
|
||||
|
||||
logging.Info("deploying miner bundle", logging.Fields{
|
||||
|
|
@ -419,3 +422,23 @@ func (w *Worker) handleDeploy(conn *PeerConnection, msg *Message) (*Message, err
|
|||
func (w *Worker) RegisterOnTransport() {
|
||||
w.transport.OnMessage(w.HandleMessage)
|
||||
}
|
||||
|
||||
// RegisterWithTransport is a deprecated compatibility alias for RegisterOnTransport.
|
||||
func (w *Worker) RegisterWithTransport() {
|
||||
w.RegisterOnTransport()
|
||||
}
|
||||
|
||||
// deploymentDir resolves the active deployment directory, preferring DataDir
|
||||
// unless a caller has only populated the legacy DataDirectory field.
|
||||
func (w *Worker) deploymentDir() string {
|
||||
switch {
|
||||
case w.DataDir != "" && w.DataDir != xdg.DataHome:
|
||||
return w.DataDir
|
||||
case w.DataDirectory != "":
|
||||
return w.DataDirectory
|
||||
case w.DataDir != "":
|
||||
return w.DataDir
|
||||
default:
|
||||
return xdg.DataHome
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -499,32 +499,44 @@ func (m *mockMinerManager) ListMiners() []MinerInstance {
|
|||
return m.miners
|
||||
}
|
||||
|
||||
func (m *mockMinerManager) Miner(name string) (MinerInstance, error) {
|
||||
func (m *mockMinerManager) GetMiner(name string) (MinerInstance, error) {
|
||||
for _, miner := range m.miners {
|
||||
if miner.Name() == name {
|
||||
if miner.GetName() == name {
|
||||
return miner, nil
|
||||
}
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (m *mockMinerManager) Miner(name string) (MinerInstance, error) {
|
||||
return m.GetMiner(name)
|
||||
}
|
||||
|
||||
type mockMinerInstance struct {
|
||||
name string
|
||||
minerType string
|
||||
stats any
|
||||
}
|
||||
|
||||
func (m *mockMinerInstance) Name() string { return m.name }
|
||||
func (m *mockMinerInstance) Type() string { return m.minerType }
|
||||
func (m *mockMinerInstance) Stats() (any, error) { return m.stats, nil }
|
||||
func (m *mockMinerInstance) ConsoleHistory(lines int) []string { return []string{} }
|
||||
func (m *mockMinerInstance) GetName() string { return m.name }
|
||||
func (m *mockMinerInstance) GetType() string { return m.minerType }
|
||||
func (m *mockMinerInstance) GetStats() (any, error) { return m.stats, nil }
|
||||
func (m *mockMinerInstance) GetConsoleHistory(lines int) []string { return []string{} }
|
||||
func (m *mockMinerInstance) Name() string { return m.GetName() }
|
||||
func (m *mockMinerInstance) Type() string { return m.GetType() }
|
||||
func (m *mockMinerInstance) Stats() (any, error) { return m.GetStats() }
|
||||
func (m *mockMinerInstance) ConsoleHistory(lines int) []string { return m.GetConsoleHistory(lines) }
|
||||
|
||||
type mockProfileManager struct{}
|
||||
|
||||
func (m *mockProfileManager) Profile(id string) (any, error) {
|
||||
func (m *mockProfileManager) GetProfile(id string) (any, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (m *mockProfileManager) Profile(id string) (any, error) {
|
||||
return m.GetProfile(id)
|
||||
}
|
||||
|
||||
func (m *mockProfileManager) SaveProfile(profile any) error {
|
||||
return nil
|
||||
}
|
||||
|
|
@ -544,19 +556,23 @@ func (m *mockMinerManagerFailing) StopMiner(name string) error {
|
|||
return core.E("mockMinerManagerFailing.StopMiner", "miner "+name+" not found", nil)
|
||||
}
|
||||
|
||||
func (m *mockMinerManagerFailing) Miner(name string) (MinerInstance, error) {
|
||||
func (m *mockMinerManagerFailing) GetMiner(name string) (MinerInstance, error) {
|
||||
return nil, core.E("mockMinerManagerFailing.Miner", "miner "+name+" not found", nil)
|
||||
}
|
||||
|
||||
func (m *mockMinerManagerFailing) Miner(name string) (MinerInstance, error) {
|
||||
return m.GetMiner(name)
|
||||
}
|
||||
|
||||
// mockProfileManagerFull implements ProfileManager that returns real data.
|
||||
type mockProfileManagerFull struct {
|
||||
profiles map[string]any
|
||||
}
|
||||
|
||||
func (m *mockProfileManagerFull) Profile(id string) (any, error) {
|
||||
func (m *mockProfileManagerFull) GetProfile(id string) (any, error) {
|
||||
p, ok := m.profiles[id]
|
||||
if !ok {
|
||||
return nil, core.E("mockProfileManagerFull.Profile", "profile "+id+" not found", nil)
|
||||
return nil, core.E("mockProfileManagerFull.GetProfile", "profile "+id+" not found", nil)
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
|
|
@ -565,17 +581,25 @@ func (m *mockProfileManagerFull) SaveProfile(profile any) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (m *mockProfileManagerFull) Profile(id string) (any, error) {
|
||||
return m.GetProfile(id)
|
||||
}
|
||||
|
||||
// mockProfileManagerFailing always returns errors.
|
||||
type mockProfileManagerFailing struct{}
|
||||
|
||||
func (m *mockProfileManagerFailing) Profile(id string) (any, error) {
|
||||
return nil, core.E("mockProfileManagerFailing.Profile", "profile "+id+" not found", nil)
|
||||
func (m *mockProfileManagerFailing) GetProfile(id string) (any, error) {
|
||||
return nil, core.E("mockProfileManagerFailing.GetProfile", "profile "+id+" not found", nil)
|
||||
}
|
||||
|
||||
func (m *mockProfileManagerFailing) SaveProfile(profile any) error {
|
||||
return core.E("mockProfileManagerFailing.SaveProfile", "save failed", nil)
|
||||
}
|
||||
|
||||
func (m *mockProfileManagerFailing) Profile(id string) (any, error) {
|
||||
return m.GetProfile(id)
|
||||
}
|
||||
|
||||
func TestWorker_HandleStartMiner_WithManager_Good(t *testing.T) {
|
||||
cleanup := setupTestEnvironment(t)
|
||||
defer cleanup()
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue