refactor(node): clarify AX-facing transport and worker names
All checks were successful
Security Scan / security (push) Successful in 11s
Test / test (push) Successful in 1m53s

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-03-30 22:54:45 +00:00
parent 885070d241
commit bd9efd1deb
6 changed files with 130 additions and 67 deletions

View file

@ -50,7 +50,7 @@ func makeWorkerServer(t *testing.T) (*NodeManager, string, *Transport) {
srv := NewTransport(nm, reg, cfg)
mux := http.NewServeMux()
mux.HandleFunc(cfg.WSPath, srv.handleWSUpgrade)
mux.HandleFunc(cfg.WebSocketPath, srv.handleWSUpgrade)
ts := httptest.NewServer(mux)
u, _ := url.Parse(ts.URL)

View file

@ -82,7 +82,7 @@ func TestIntegration_FullNodeLifecycle_Good(t *testing.T) {
// Start the worker transport behind httptest.
mux := http.NewServeMux()
mux.HandleFunc(workerCfg.WSPath, workerTransport.handleWSUpgrade)
mux.HandleFunc(workerCfg.WebSocketPath, workerTransport.handleWSUpgrade)
ts := httptest.NewServer(mux)
t.Cleanup(func() {
controllerTransport.Stop()

View file

@ -33,15 +33,23 @@ const DefaultMaxMessageSize int64 = 1 << 20 // 1MB
// agentUserAgentPrefix identifies this tool in request headers.
const agentUserAgentPrefix = "agent-go-p2p"
const (
defaultTransportListenAddress = ":9091"
defaultTransportWebSocketPath = "/ws"
defaultTransportMaximumConnections = 100
)
// TransportConfig configures the WebSocket transport.
//
// cfg := DefaultTransportConfig()
type TransportConfig struct {
ListenAddr string // ":9091" default
WSPath string // "/ws" - WebSocket endpoint path
WebSocketPath string // "/ws" - WebSocket endpoint path
WSPath string // Deprecated compatibility alias for WebSocketPath.
TLSCertPath string // Optional TLS for wss://
TLSKeyPath string
MaxConns int // Maximum concurrent connections
MaxConnections int // Maximum concurrent connections
MaxConns int // Deprecated compatibility alias for MaxConnections.
MaxMessageSize int64 // Maximum message size in bytes (0 = 1MB default)
PingInterval time.Duration // WebSocket keepalive interval
PongTimeout time.Duration // Timeout waiting for pong
@ -52,15 +60,61 @@ type TransportConfig struct {
// cfg := DefaultTransportConfig()
func DefaultTransportConfig() TransportConfig {
return TransportConfig{
ListenAddr: ":9091",
WSPath: "/ws",
MaxConns: 100,
ListenAddr: defaultTransportListenAddress,
WebSocketPath: defaultTransportWebSocketPath,
WSPath: defaultTransportWebSocketPath,
MaxConnections: defaultTransportMaximumConnections,
MaxConns: defaultTransportMaximumConnections,
MaxMessageSize: DefaultMaxMessageSize,
PingInterval: 30 * time.Second,
PongTimeout: 10 * time.Second,
}
}
// listenAddress returns the effective listen address, falling back to the
// default when the config leaves it empty.
func (c TransportConfig) listenAddress() string {
if c.ListenAddr != "" {
return c.ListenAddr
}
return defaultTransportListenAddress
}
// webSocketPath returns the effective WebSocket endpoint path, preferring the
// clearer WebSocketPath field and falling back to the compatibility alias.
func (c TransportConfig) webSocketPath() string {
switch {
case c.WebSocketPath != "" && c.WebSocketPath != defaultTransportWebSocketPath:
return c.WebSocketPath
case c.WSPath != "" && c.WSPath != defaultTransportWebSocketPath:
return c.WSPath
case c.WebSocketPath != "":
return c.WebSocketPath
case c.WSPath != "":
return c.WSPath
default:
return defaultTransportWebSocketPath
}
}
// maximumConnections returns the effective concurrent connection limit,
// preferring the clearer MaxConnections field and falling back to the
// compatibility alias.
func (c TransportConfig) maximumConnections() int {
switch {
case c.MaxConnections != 0 && c.MaxConnections != defaultTransportMaximumConnections:
return c.MaxConnections
case c.MaxConns != 0 && c.MaxConns != defaultTransportMaximumConnections:
return c.MaxConns
case c.MaxConnections != 0:
return c.MaxConnections
case c.MaxConns != 0:
return c.MaxConns
default:
return defaultTransportMaximumConnections
}
}
// MessageHandler processes incoming messages.
//
// var handler MessageHandler = func(conn *PeerConnection, msg *Message) {}
@ -282,10 +336,12 @@ func (t *Transport) agentUserAgent() string {
// Start opens the WebSocket listener and background maintenance loops.
func (t *Transport) Start() error {
mux := http.NewServeMux()
mux.HandleFunc(t.config.WSPath, t.handleWSUpgrade)
mux.HandleFunc(t.config.webSocketPath(), t.handleWSUpgrade)
listenAddress := t.config.listenAddress()
t.httpServer = &http.Server{
Addr: t.config.ListenAddr,
Addr: listenAddress,
Handler: mux,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
@ -325,7 +381,7 @@ func (t *Transport) Start() error {
err = t.httpServer.ListenAndServe()
}
if err != nil && err != http.ErrServerClosed {
logging.Error("HTTP server error", logging.Fields{"error": err, "addr": t.config.ListenAddr})
logging.Error("HTTP server error", logging.Fields{"error": err, "addr": listenAddress})
}
})
@ -389,7 +445,7 @@ func (t *Transport) Connect(peer *Peer) (*PeerConnection, error) {
if t.config.TLSCertPath != "" {
scheme = "wss"
}
peerURL := url.URL{Scheme: scheme, Host: peer.Address, Path: t.config.WSPath}
peerURL := url.URL{Scheme: scheme, Host: peer.Address, Path: t.config.webSocketPath()}
userAgent := t.agentUserAgent()
// Dial the peer with timeout to prevent hanging on unresponsive peers
@ -509,14 +565,14 @@ func (t *Transport) Connection(peerID string) *PeerConnection {
func (t *Transport) handleWSUpgrade(w http.ResponseWriter, r *http.Request) {
userAgent := r.Header.Get("User-Agent")
// Enforce MaxConns limit (including pending connections during handshake)
// Enforce the maximum connection limit, including pending handshakes.
t.mutex.RLock()
currentConnections := len(t.connections)
t.mutex.RUnlock()
pendingHandshakeCount := int(t.pendingHandshakeCount.Load())
totalConnections := currentConnections + pendingHandshakeCount
if totalConnections >= t.config.MaxConns {
if totalConnections >= t.config.maximumConnections() {
http.Error(w, "Too many connections", http.StatusServiceUnavailable)
return
}

View file

@ -72,7 +72,7 @@ func setupTestTransportPairWithConfig(t *testing.T, serverCfg, clientCfg Transpo
// Use httptest.Server with the transport's WebSocket handler
mux := http.NewServeMux()
mux.HandleFunc(serverCfg.WSPath, serverTransport.handleWSUpgrade)
mux.HandleFunc(serverCfg.WebSocketPath, serverTransport.handleWSUpgrade)
ts := httptest.NewServer(mux)
u, _ := url.Parse(ts.URL)
@ -251,7 +251,7 @@ func TestTransport_ConnectSendsAgentUserAgent_Good(t *testing.T) {
var capturedUserAgent atomic.Value
mux := http.NewServeMux()
mux.HandleFunc(serverCfg.WSPath, func(w http.ResponseWriter, r *http.Request) {
mux.HandleFunc(serverCfg.WebSocketPath, func(w http.ResponseWriter, r *http.Request) {
capturedUserAgent.Store(r.Header.Get("User-Agent"))
serverTransport.handleWSUpgrade(w, r)
})
@ -472,17 +472,17 @@ func TestTransport_RateLimiting_Good(t *testing.T) {
}
}
func TestTransport_MaxConnsEnforcement_Good(t *testing.T) {
// Server with MaxConns=1
func TestTransport_MaxConnectionsEnforcement_Good(t *testing.T) {
// Server with MaxConnections=1
serverNM := newTestNodeManager(t, "maxconns-server", RoleWorker)
serverReg := newTestPeerRegistry(t)
serverCfg := DefaultTransportConfig()
serverCfg.MaxConns = 1
serverCfg.MaxConnections = 1
serverTransport := NewTransport(serverNM, serverReg, serverCfg)
mux := http.NewServeMux()
mux.HandleFunc(serverCfg.WSPath, serverTransport.handleWSUpgrade)
mux.HandleFunc(serverCfg.WebSocketPath, serverTransport.handleWSUpgrade)
ts := httptest.NewServer(mux)
t.Cleanup(func() {
serverTransport.Stop()
@ -509,7 +509,7 @@ func TestTransport_MaxConnsEnforcement_Good(t *testing.T) {
// Allow server to register the connection
time.Sleep(50 * time.Millisecond)
// Second client should be rejected (MaxConns=1 reached)
// Second client should be rejected (MaxConnections=1 reached)
client2NM := newTestNodeManager(t, "client2", RoleController)
client2Reg := newTestPeerRegistry(t)
client2Transport := NewTransport(client2NM, client2Reg, DefaultTransportConfig())
@ -520,7 +520,7 @@ func TestTransport_MaxConnsEnforcement_Good(t *testing.T) {
_, err = client2Transport.Connect(peer2)
if err == nil {
t.Fatal("second connection should be rejected when MaxConns=1")
t.Fatal("second connection should be rejected when MaxConnections=1")
}
}

View file

@ -43,13 +43,14 @@ type ProfileManager interface {
//
// worker := NewWorker(nodeManager, transport)
type Worker struct {
nodeManager *NodeManager
transport *Transport
minerManager MinerManager
profileManager ProfileManager
startedAt time.Time
DataDir string // Base directory for deployments (defaults to xdg.DataHome)
DataDirectory string // Deprecated compatibility alias for DataDir
nodeManager *NodeManager
transport *Transport
minerManager MinerManager
profileManager ProfileManager
startedAt time.Time
DeploymentDirectory string // Base directory for deployments (defaults to xdg.DataHome)
DataDir string // Deprecated compatibility alias for DeploymentDirectory
DataDirectory string // Deprecated compatibility alias for DeploymentDirectory
}
// NewWorker creates a new Worker instance.
@ -57,11 +58,12 @@ type Worker struct {
// worker := NewWorker(nodeManager, transport)
func NewWorker(nodeManager *NodeManager, transport *Transport) *Worker {
return &Worker{
nodeManager: nodeManager,
transport: transport,
startedAt: time.Now(),
DataDir: xdg.DataHome,
DataDirectory: xdg.DataHome,
nodeManager: nodeManager,
transport: transport,
startedAt: time.Now(),
DeploymentDirectory: xdg.DataHome,
DataDir: xdg.DataHome,
DataDirectory: xdg.DataHome,
}
}
@ -367,10 +369,9 @@ func (w *Worker) handleDeploy(conn *PeerConnection, msg *Message) (*Message, err
return msg.Reply(MessageDeployAck, ack)
case BundleMiner, BundleFull:
// Determine installation directory
// We use the configured deployment directory for
// lethean-desktop/miners/<bundle_name>.
minersDir := core.JoinPath(w.deploymentDir(), "lethean-desktop", "miners")
// Determine the installation directory under the configured deployment
// root for lethean-desktop/miners/<bundle_name>.
minersDir := core.JoinPath(w.deploymentDirectory(), "lethean-desktop", "miners")
installDir := core.JoinPath(minersDir, payload.Name)
logging.Info("deploying miner bundle", logging.Fields{
@ -428,16 +429,22 @@ func (w *Worker) RegisterWithTransport() {
w.RegisterOnTransport()
}
// deploymentDir resolves the active deployment directory, preferring DataDir
// unless a caller has only populated the legacy DataDirectory field.
func (w *Worker) deploymentDir() string {
// deploymentDirectory resolves the active deployment directory, preferring
// DeploymentDirectory unless a caller has only populated the legacy aliases.
func (w *Worker) deploymentDirectory() string {
switch {
case w.DeploymentDirectory != "" && w.DeploymentDirectory != xdg.DataHome:
return w.DeploymentDirectory
case w.DataDir != "" && w.DataDir != xdg.DataHome:
return w.DataDir
case w.DataDirectory != "" && w.DataDirectory != xdg.DataHome:
return w.DataDirectory
case w.DeploymentDirectory != "":
return w.DeploymentDirectory
case w.DataDir != "":
return w.DataDir
case w.DataDirectory != "":
return w.DataDirectory
case w.DataDir != "":
return w.DataDir
default:
return xdg.DataHome
}

View file

@ -35,7 +35,7 @@ func TestWorker_NewWorker_Good(t *testing.T) {
transport := NewTransport(nm, pr, DefaultTransportConfig())
worker := NewWorker(nm, transport)
worker.DataDirectory = t.TempDir()
worker.DeploymentDirectory = t.TempDir()
if worker == nil {
t.Fatal("NewWorker returned nil")
@ -68,7 +68,7 @@ func TestWorker_SetMinerManager_Good(t *testing.T) {
transport := NewTransport(nm, pr, DefaultTransportConfig())
worker := NewWorker(nm, transport)
worker.DataDirectory = t.TempDir()
worker.DeploymentDirectory = t.TempDir()
mockManager := &mockMinerManager{}
worker.SetMinerManager(mockManager)
@ -98,7 +98,7 @@ func TestWorker_SetProfileManager_Good(t *testing.T) {
transport := NewTransport(nm, pr, DefaultTransportConfig())
worker := NewWorker(nm, transport)
worker.DataDirectory = t.TempDir()
worker.DeploymentDirectory = t.TempDir()
mockProfile := &mockProfileManager{}
worker.SetProfileManager(mockProfile)
@ -128,7 +128,7 @@ func TestWorker_HandlePing_Good(t *testing.T) {
transport := NewTransport(nm, pr, DefaultTransportConfig())
worker := NewWorker(nm, transport)
worker.DataDirectory = t.TempDir()
worker.DeploymentDirectory = t.TempDir()
// Create a ping message
identity := nm.Identity()
@ -189,7 +189,7 @@ func TestWorker_HandleStats_Good(t *testing.T) {
transport := NewTransport(nm, pr, DefaultTransportConfig())
worker := NewWorker(nm, transport)
worker.DataDirectory = t.TempDir()
worker.DeploymentDirectory = t.TempDir()
// Create a stats request message.
identity := nm.Identity()
@ -249,7 +249,7 @@ func TestWorker_HandleStartMiner_NoManager_Bad(t *testing.T) {
transport := NewTransport(nm, pr, DefaultTransportConfig())
worker := NewWorker(nm, transport)
worker.DataDirectory = t.TempDir()
worker.DeploymentDirectory = t.TempDir()
// Create a start_miner message
identity := nm.Identity()
@ -289,7 +289,7 @@ func TestWorker_HandleStopMiner_NoManager_Bad(t *testing.T) {
transport := NewTransport(nm, pr, DefaultTransportConfig())
worker := NewWorker(nm, transport)
worker.DataDirectory = t.TempDir()
worker.DeploymentDirectory = t.TempDir()
// Create a stop_miner message
identity := nm.Identity()
@ -329,7 +329,7 @@ func TestWorker_HandleLogs_NoManager_Bad(t *testing.T) {
transport := NewTransport(nm, pr, DefaultTransportConfig())
worker := NewWorker(nm, transport)
worker.DataDirectory = t.TempDir()
worker.DeploymentDirectory = t.TempDir()
// Create a logs request message.
identity := nm.Identity()
@ -369,7 +369,7 @@ func TestWorker_HandleDeploy_Profile_Good(t *testing.T) {
transport := NewTransport(nm, pr, DefaultTransportConfig())
worker := NewWorker(nm, transport)
worker.DataDirectory = t.TempDir()
worker.DeploymentDirectory = t.TempDir()
// Create a deploy message for profile
identity := nm.Identity()
@ -413,7 +413,7 @@ func TestWorker_HandleDeploy_UnknownType_Bad(t *testing.T) {
transport := NewTransport(nm, pr, DefaultTransportConfig())
worker := NewWorker(nm, transport)
worker.DataDirectory = t.TempDir()
worker.DeploymentDirectory = t.TempDir()
// Create a deploy message with unknown type
identity := nm.Identity()
@ -620,7 +620,7 @@ func TestWorker_HandleStartMiner_WithManager_Good(t *testing.T) {
transport := NewTransport(nm, pr, DefaultTransportConfig())
worker := NewWorker(nm, transport)
worker.DataDirectory = t.TempDir()
worker.DeploymentDirectory = t.TempDir()
mm := &mockMinerManager{
miners: []MinerInstance{},
@ -631,7 +631,7 @@ func TestWorker_HandleStartMiner_WithManager_Good(t *testing.T) {
identity := nm.Identity()
t.Run("WithConfigOverride", func(t *testing.T) {
t.Run("ConfigOverride", func(t *testing.T) {
payload := StartMinerPayload{
MinerType: "xmrig",
Config: RawMessage(`{"pool":"test:3333"}`),
@ -678,7 +678,7 @@ func TestWorker_HandleStartMiner_WithManager_Good(t *testing.T) {
}
})
t.Run("WithProfileManager", func(t *testing.T) {
t.Run("ProfileManagerConfigured", func(t *testing.T) {
pm := &mockProfileManagerFull{
profiles: map[string]any{
"test-profile": map[string]any{"pool": "pool.test:3333"},
@ -787,7 +787,7 @@ func TestWorker_HandleStopMiner_WithManager_Good(t *testing.T) {
}
transport := NewTransport(nm, pr, DefaultTransportConfig())
worker := NewWorker(nm, transport)
worker.DataDirectory = t.TempDir()
worker.DeploymentDirectory = t.TempDir()
identity := nm.Identity()
t.Run("Success", func(t *testing.T) {
@ -851,7 +851,7 @@ func TestWorker_HandleLogs_WithManager_Good(t *testing.T) {
}
transport := NewTransport(nm, pr, DefaultTransportConfig())
worker := NewWorker(nm, transport)
worker.DataDirectory = t.TempDir()
worker.DeploymentDirectory = t.TempDir()
identity := nm.Identity()
t.Run("Success", func(t *testing.T) {
@ -958,7 +958,7 @@ func TestWorker_HandleStats_WithMinerManager_Good(t *testing.T) {
}
transport := NewTransport(nm, pr, DefaultTransportConfig())
worker := NewWorker(nm, transport)
worker.DataDirectory = t.TempDir()
worker.DeploymentDirectory = t.TempDir()
identity := nm.Identity()
// Set miner manager with miners that have real stats
@ -1019,7 +1019,7 @@ func TestWorker_HandleMessage_UnknownType_Bad(t *testing.T) {
}
transport := NewTransport(nm, pr, DefaultTransportConfig())
worker := NewWorker(nm, transport)
worker.DataDirectory = t.TempDir()
worker.DeploymentDirectory = t.TempDir()
identity := nm.Identity()
msg, _ := NewMessage("unknown_type", "sender-id", identity.ID, nil)
@ -1046,7 +1046,7 @@ func TestWorker_HandleDeploy_ProfileWithManager_Good(t *testing.T) {
}
transport := NewTransport(nm, pr, DefaultTransportConfig())
worker := NewWorker(nm, transport)
worker.DataDirectory = t.TempDir()
worker.DeploymentDirectory = t.TempDir()
pm := &mockProfileManagerFull{profiles: make(map[string]any)}
worker.SetProfileManager(pm)
@ -1101,7 +1101,7 @@ func TestWorker_HandleDeploy_ProfileSaveFails_Bad(t *testing.T) {
}
transport := NewTransport(nm, pr, DefaultTransportConfig())
worker := NewWorker(nm, transport)
worker.DataDirectory = t.TempDir()
worker.DeploymentDirectory = t.TempDir()
worker.SetProfileManager(&mockProfileManagerFailing{})
identity := nm.Identity()
@ -1147,7 +1147,7 @@ func TestWorker_HandleDeploy_MinerBundle_Good(t *testing.T) {
}
transport := NewTransport(nm, pr, DefaultTransportConfig())
worker := NewWorker(nm, transport)
worker.DataDirectory = t.TempDir()
worker.DeploymentDirectory = t.TempDir()
pm := &mockProfileManagerFull{profiles: make(map[string]any)}
worker.SetProfileManager(pm)
@ -1211,7 +1211,7 @@ func TestWorker_HandleDeploy_FullBundle_Good(t *testing.T) {
}
transport := NewTransport(nm, pr, DefaultTransportConfig())
worker := NewWorker(nm, transport)
worker.DataDirectory = t.TempDir()
worker.DeploymentDirectory = t.TempDir()
identity := nm.Identity()
@ -1249,7 +1249,7 @@ func TestWorker_HandleDeploy_FullBundle_Good(t *testing.T) {
}
}
func TestWorker_HandleDeploy_MinerBundle_WithProfileManager_Good(t *testing.T) {
func TestWorker_HandleDeploy_MinerBundle_ProfileManager_Good(t *testing.T) {
cleanup := setupTestEnvironment(t)
defer cleanup()
@ -1267,7 +1267,7 @@ func TestWorker_HandleDeploy_MinerBundle_WithProfileManager_Good(t *testing.T) {
}
transport := NewTransport(nm, pr, DefaultTransportConfig())
worker := NewWorker(nm, transport)
worker.DataDirectory = t.TempDir()
worker.DeploymentDirectory = t.TempDir()
// Set a failing profile manager to exercise the warn-and-continue path
worker.SetProfileManager(&mockProfileManagerFailing{})
@ -1320,7 +1320,7 @@ func TestWorker_HandleDeploy_InvalidPayload_Bad(t *testing.T) {
pr, _ := NewPeerRegistryFromPath(testJoinPath(t.TempDir(), "peers.json"))
transport := NewTransport(nm, pr, DefaultTransportConfig())
worker := NewWorker(nm, transport)
worker.DataDirectory = t.TempDir()
worker.DeploymentDirectory = t.TempDir()
identity := nm.Identity()
// Create a message with invalid payload
@ -1345,7 +1345,7 @@ func TestWorker_HandleStats_NoIdentity_Bad(t *testing.T) {
pr, _ := NewPeerRegistryFromPath(testJoinPath(t.TempDir(), "peers.json"))
transport := NewTransport(nm, pr, DefaultTransportConfig())
worker := NewWorker(nm, transport)
worker.DataDirectory = t.TempDir()
worker.DeploymentDirectory = t.TempDir()
msg, _ := NewMessage(MessageGetStats, "sender-id", "target-id", nil)
_, err := worker.handleStats(msg)