diff --git a/node/controller_test.go b/node/controller_test.go index 8a38c84..03b121e 100644 --- a/node/controller_test.go +++ b/node/controller_test.go @@ -50,7 +50,7 @@ func makeWorkerServer(t *testing.T) (*NodeManager, string, *Transport) { srv := NewTransport(nm, reg, cfg) mux := http.NewServeMux() - mux.HandleFunc(cfg.WSPath, srv.handleWSUpgrade) + mux.HandleFunc(cfg.WebSocketPath, srv.handleWSUpgrade) ts := httptest.NewServer(mux) u, _ := url.Parse(ts.URL) diff --git a/node/integration_test.go b/node/integration_test.go index 96de5d2..9eadf12 100644 --- a/node/integration_test.go +++ b/node/integration_test.go @@ -82,7 +82,7 @@ func TestIntegration_FullNodeLifecycle_Good(t *testing.T) { // Start the worker transport behind httptest. mux := http.NewServeMux() - mux.HandleFunc(workerCfg.WSPath, workerTransport.handleWSUpgrade) + mux.HandleFunc(workerCfg.WebSocketPath, workerTransport.handleWSUpgrade) ts := httptest.NewServer(mux) t.Cleanup(func() { controllerTransport.Stop() diff --git a/node/transport.go b/node/transport.go index 29bfc99..b0a72fa 100644 --- a/node/transport.go +++ b/node/transport.go @@ -33,15 +33,23 @@ const DefaultMaxMessageSize int64 = 1 << 20 // 1MB // agentUserAgentPrefix identifies this tool in request headers. const agentUserAgentPrefix = "agent-go-p2p" +const ( + defaultTransportListenAddress = ":9091" + defaultTransportWebSocketPath = "/ws" + defaultTransportMaximumConnections = 100 +) + // TransportConfig configures the WebSocket transport. // // cfg := DefaultTransportConfig() type TransportConfig struct { ListenAddr string // ":9091" default - WSPath string // "/ws" - WebSocket endpoint path + WebSocketPath string // "/ws" - WebSocket endpoint path + WSPath string // Deprecated compatibility alias for WebSocketPath. TLSCertPath string // Optional TLS for wss:// TLSKeyPath string - MaxConns int // Maximum concurrent connections + MaxConnections int // Maximum concurrent connections + MaxConns int // Deprecated compatibility alias for MaxConnections. MaxMessageSize int64 // Maximum message size in bytes (0 = 1MB default) PingInterval time.Duration // WebSocket keepalive interval PongTimeout time.Duration // Timeout waiting for pong @@ -52,15 +60,61 @@ type TransportConfig struct { // cfg := DefaultTransportConfig() func DefaultTransportConfig() TransportConfig { return TransportConfig{ - ListenAddr: ":9091", - WSPath: "/ws", - MaxConns: 100, + ListenAddr: defaultTransportListenAddress, + WebSocketPath: defaultTransportWebSocketPath, + WSPath: defaultTransportWebSocketPath, + MaxConnections: defaultTransportMaximumConnections, + MaxConns: defaultTransportMaximumConnections, MaxMessageSize: DefaultMaxMessageSize, PingInterval: 30 * time.Second, PongTimeout: 10 * time.Second, } } +// listenAddress returns the effective listen address, falling back to the +// default when the config leaves it empty. +func (c TransportConfig) listenAddress() string { + if c.ListenAddr != "" { + return c.ListenAddr + } + return defaultTransportListenAddress +} + +// webSocketPath returns the effective WebSocket endpoint path, preferring the +// clearer WebSocketPath field and falling back to the compatibility alias. +func (c TransportConfig) webSocketPath() string { + switch { + case c.WebSocketPath != "" && c.WebSocketPath != defaultTransportWebSocketPath: + return c.WebSocketPath + case c.WSPath != "" && c.WSPath != defaultTransportWebSocketPath: + return c.WSPath + case c.WebSocketPath != "": + return c.WebSocketPath + case c.WSPath != "": + return c.WSPath + default: + return defaultTransportWebSocketPath + } +} + +// maximumConnections returns the effective concurrent connection limit, +// preferring the clearer MaxConnections field and falling back to the +// compatibility alias. +func (c TransportConfig) maximumConnections() int { + switch { + case c.MaxConnections != 0 && c.MaxConnections != defaultTransportMaximumConnections: + return c.MaxConnections + case c.MaxConns != 0 && c.MaxConns != defaultTransportMaximumConnections: + return c.MaxConns + case c.MaxConnections != 0: + return c.MaxConnections + case c.MaxConns != 0: + return c.MaxConns + default: + return defaultTransportMaximumConnections + } +} + // MessageHandler processes incoming messages. // // var handler MessageHandler = func(conn *PeerConnection, msg *Message) {} @@ -282,10 +336,12 @@ func (t *Transport) agentUserAgent() string { // Start opens the WebSocket listener and background maintenance loops. func (t *Transport) Start() error { mux := http.NewServeMux() - mux.HandleFunc(t.config.WSPath, t.handleWSUpgrade) + mux.HandleFunc(t.config.webSocketPath(), t.handleWSUpgrade) + + listenAddress := t.config.listenAddress() t.httpServer = &http.Server{ - Addr: t.config.ListenAddr, + Addr: listenAddress, Handler: mux, ReadTimeout: 30 * time.Second, WriteTimeout: 30 * time.Second, @@ -325,7 +381,7 @@ func (t *Transport) Start() error { err = t.httpServer.ListenAndServe() } if err != nil && err != http.ErrServerClosed { - logging.Error("HTTP server error", logging.Fields{"error": err, "addr": t.config.ListenAddr}) + logging.Error("HTTP server error", logging.Fields{"error": err, "addr": listenAddress}) } }) @@ -389,7 +445,7 @@ func (t *Transport) Connect(peer *Peer) (*PeerConnection, error) { if t.config.TLSCertPath != "" { scheme = "wss" } - peerURL := url.URL{Scheme: scheme, Host: peer.Address, Path: t.config.WSPath} + peerURL := url.URL{Scheme: scheme, Host: peer.Address, Path: t.config.webSocketPath()} userAgent := t.agentUserAgent() // Dial the peer with timeout to prevent hanging on unresponsive peers @@ -509,14 +565,14 @@ func (t *Transport) Connection(peerID string) *PeerConnection { func (t *Transport) handleWSUpgrade(w http.ResponseWriter, r *http.Request) { userAgent := r.Header.Get("User-Agent") - // Enforce MaxConns limit (including pending connections during handshake) + // Enforce the maximum connection limit, including pending handshakes. t.mutex.RLock() currentConnections := len(t.connections) t.mutex.RUnlock() pendingHandshakeCount := int(t.pendingHandshakeCount.Load()) totalConnections := currentConnections + pendingHandshakeCount - if totalConnections >= t.config.MaxConns { + if totalConnections >= t.config.maximumConnections() { http.Error(w, "Too many connections", http.StatusServiceUnavailable) return } diff --git a/node/transport_test.go b/node/transport_test.go index fa90ab0..6416995 100644 --- a/node/transport_test.go +++ b/node/transport_test.go @@ -72,7 +72,7 @@ func setupTestTransportPairWithConfig(t *testing.T, serverCfg, clientCfg Transpo // Use httptest.Server with the transport's WebSocket handler mux := http.NewServeMux() - mux.HandleFunc(serverCfg.WSPath, serverTransport.handleWSUpgrade) + mux.HandleFunc(serverCfg.WebSocketPath, serverTransport.handleWSUpgrade) ts := httptest.NewServer(mux) u, _ := url.Parse(ts.URL) @@ -251,7 +251,7 @@ func TestTransport_ConnectSendsAgentUserAgent_Good(t *testing.T) { var capturedUserAgent atomic.Value mux := http.NewServeMux() - mux.HandleFunc(serverCfg.WSPath, func(w http.ResponseWriter, r *http.Request) { + mux.HandleFunc(serverCfg.WebSocketPath, func(w http.ResponseWriter, r *http.Request) { capturedUserAgent.Store(r.Header.Get("User-Agent")) serverTransport.handleWSUpgrade(w, r) }) @@ -472,17 +472,17 @@ func TestTransport_RateLimiting_Good(t *testing.T) { } } -func TestTransport_MaxConnsEnforcement_Good(t *testing.T) { - // Server with MaxConns=1 +func TestTransport_MaxConnectionsEnforcement_Good(t *testing.T) { + // Server with MaxConnections=1 serverNM := newTestNodeManager(t, "maxconns-server", RoleWorker) serverReg := newTestPeerRegistry(t) serverCfg := DefaultTransportConfig() - serverCfg.MaxConns = 1 + serverCfg.MaxConnections = 1 serverTransport := NewTransport(serverNM, serverReg, serverCfg) mux := http.NewServeMux() - mux.HandleFunc(serverCfg.WSPath, serverTransport.handleWSUpgrade) + mux.HandleFunc(serverCfg.WebSocketPath, serverTransport.handleWSUpgrade) ts := httptest.NewServer(mux) t.Cleanup(func() { serverTransport.Stop() @@ -509,7 +509,7 @@ func TestTransport_MaxConnsEnforcement_Good(t *testing.T) { // Allow server to register the connection time.Sleep(50 * time.Millisecond) - // Second client should be rejected (MaxConns=1 reached) + // Second client should be rejected (MaxConnections=1 reached) client2NM := newTestNodeManager(t, "client2", RoleController) client2Reg := newTestPeerRegistry(t) client2Transport := NewTransport(client2NM, client2Reg, DefaultTransportConfig()) @@ -520,7 +520,7 @@ func TestTransport_MaxConnsEnforcement_Good(t *testing.T) { _, err = client2Transport.Connect(peer2) if err == nil { - t.Fatal("second connection should be rejected when MaxConns=1") + t.Fatal("second connection should be rejected when MaxConnections=1") } } diff --git a/node/worker.go b/node/worker.go index e29e5aa..dd034db 100644 --- a/node/worker.go +++ b/node/worker.go @@ -43,13 +43,14 @@ type ProfileManager interface { // // worker := NewWorker(nodeManager, transport) type Worker struct { - nodeManager *NodeManager - transport *Transport - minerManager MinerManager - profileManager ProfileManager - startedAt time.Time - DataDir string // Base directory for deployments (defaults to xdg.DataHome) - DataDirectory string // Deprecated compatibility alias for DataDir + nodeManager *NodeManager + transport *Transport + minerManager MinerManager + profileManager ProfileManager + startedAt time.Time + DeploymentDirectory string // Base directory for deployments (defaults to xdg.DataHome) + DataDir string // Deprecated compatibility alias for DeploymentDirectory + DataDirectory string // Deprecated compatibility alias for DeploymentDirectory } // NewWorker creates a new Worker instance. @@ -57,11 +58,12 @@ type Worker struct { // worker := NewWorker(nodeManager, transport) func NewWorker(nodeManager *NodeManager, transport *Transport) *Worker { return &Worker{ - nodeManager: nodeManager, - transport: transport, - startedAt: time.Now(), - DataDir: xdg.DataHome, - DataDirectory: xdg.DataHome, + nodeManager: nodeManager, + transport: transport, + startedAt: time.Now(), + DeploymentDirectory: xdg.DataHome, + DataDir: xdg.DataHome, + DataDirectory: xdg.DataHome, } } @@ -367,10 +369,9 @@ func (w *Worker) handleDeploy(conn *PeerConnection, msg *Message) (*Message, err return msg.Reply(MessageDeployAck, ack) case BundleMiner, BundleFull: - // Determine installation directory - // We use the configured deployment directory for - // lethean-desktop/miners/. - minersDir := core.JoinPath(w.deploymentDir(), "lethean-desktop", "miners") + // Determine the installation directory under the configured deployment + // root for lethean-desktop/miners/. + minersDir := core.JoinPath(w.deploymentDirectory(), "lethean-desktop", "miners") installDir := core.JoinPath(minersDir, payload.Name) logging.Info("deploying miner bundle", logging.Fields{ @@ -428,16 +429,22 @@ func (w *Worker) RegisterWithTransport() { w.RegisterOnTransport() } -// deploymentDir resolves the active deployment directory, preferring DataDir -// unless a caller has only populated the legacy DataDirectory field. -func (w *Worker) deploymentDir() string { +// deploymentDirectory resolves the active deployment directory, preferring +// DeploymentDirectory unless a caller has only populated the legacy aliases. +func (w *Worker) deploymentDirectory() string { switch { + case w.DeploymentDirectory != "" && w.DeploymentDirectory != xdg.DataHome: + return w.DeploymentDirectory case w.DataDir != "" && w.DataDir != xdg.DataHome: return w.DataDir + case w.DataDirectory != "" && w.DataDirectory != xdg.DataHome: + return w.DataDirectory + case w.DeploymentDirectory != "": + return w.DeploymentDirectory + case w.DataDir != "": + return w.DataDir case w.DataDirectory != "": return w.DataDirectory - case w.DataDir != "": - return w.DataDir default: return xdg.DataHome } diff --git a/node/worker_test.go b/node/worker_test.go index 4fc8265..24b39fe 100644 --- a/node/worker_test.go +++ b/node/worker_test.go @@ -35,7 +35,7 @@ func TestWorker_NewWorker_Good(t *testing.T) { transport := NewTransport(nm, pr, DefaultTransportConfig()) worker := NewWorker(nm, transport) - worker.DataDirectory = t.TempDir() + worker.DeploymentDirectory = t.TempDir() if worker == nil { t.Fatal("NewWorker returned nil") @@ -68,7 +68,7 @@ func TestWorker_SetMinerManager_Good(t *testing.T) { transport := NewTransport(nm, pr, DefaultTransportConfig()) worker := NewWorker(nm, transport) - worker.DataDirectory = t.TempDir() + worker.DeploymentDirectory = t.TempDir() mockManager := &mockMinerManager{} worker.SetMinerManager(mockManager) @@ -98,7 +98,7 @@ func TestWorker_SetProfileManager_Good(t *testing.T) { transport := NewTransport(nm, pr, DefaultTransportConfig()) worker := NewWorker(nm, transport) - worker.DataDirectory = t.TempDir() + worker.DeploymentDirectory = t.TempDir() mockProfile := &mockProfileManager{} worker.SetProfileManager(mockProfile) @@ -128,7 +128,7 @@ func TestWorker_HandlePing_Good(t *testing.T) { transport := NewTransport(nm, pr, DefaultTransportConfig()) worker := NewWorker(nm, transport) - worker.DataDirectory = t.TempDir() + worker.DeploymentDirectory = t.TempDir() // Create a ping message identity := nm.Identity() @@ -189,7 +189,7 @@ func TestWorker_HandleStats_Good(t *testing.T) { transport := NewTransport(nm, pr, DefaultTransportConfig()) worker := NewWorker(nm, transport) - worker.DataDirectory = t.TempDir() + worker.DeploymentDirectory = t.TempDir() // Create a stats request message. identity := nm.Identity() @@ -249,7 +249,7 @@ func TestWorker_HandleStartMiner_NoManager_Bad(t *testing.T) { transport := NewTransport(nm, pr, DefaultTransportConfig()) worker := NewWorker(nm, transport) - worker.DataDirectory = t.TempDir() + worker.DeploymentDirectory = t.TempDir() // Create a start_miner message identity := nm.Identity() @@ -289,7 +289,7 @@ func TestWorker_HandleStopMiner_NoManager_Bad(t *testing.T) { transport := NewTransport(nm, pr, DefaultTransportConfig()) worker := NewWorker(nm, transport) - worker.DataDirectory = t.TempDir() + worker.DeploymentDirectory = t.TempDir() // Create a stop_miner message identity := nm.Identity() @@ -329,7 +329,7 @@ func TestWorker_HandleLogs_NoManager_Bad(t *testing.T) { transport := NewTransport(nm, pr, DefaultTransportConfig()) worker := NewWorker(nm, transport) - worker.DataDirectory = t.TempDir() + worker.DeploymentDirectory = t.TempDir() // Create a logs request message. identity := nm.Identity() @@ -369,7 +369,7 @@ func TestWorker_HandleDeploy_Profile_Good(t *testing.T) { transport := NewTransport(nm, pr, DefaultTransportConfig()) worker := NewWorker(nm, transport) - worker.DataDirectory = t.TempDir() + worker.DeploymentDirectory = t.TempDir() // Create a deploy message for profile identity := nm.Identity() @@ -413,7 +413,7 @@ func TestWorker_HandleDeploy_UnknownType_Bad(t *testing.T) { transport := NewTransport(nm, pr, DefaultTransportConfig()) worker := NewWorker(nm, transport) - worker.DataDirectory = t.TempDir() + worker.DeploymentDirectory = t.TempDir() // Create a deploy message with unknown type identity := nm.Identity() @@ -620,7 +620,7 @@ func TestWorker_HandleStartMiner_WithManager_Good(t *testing.T) { transport := NewTransport(nm, pr, DefaultTransportConfig()) worker := NewWorker(nm, transport) - worker.DataDirectory = t.TempDir() + worker.DeploymentDirectory = t.TempDir() mm := &mockMinerManager{ miners: []MinerInstance{}, @@ -631,7 +631,7 @@ func TestWorker_HandleStartMiner_WithManager_Good(t *testing.T) { identity := nm.Identity() - t.Run("WithConfigOverride", func(t *testing.T) { + t.Run("ConfigOverride", func(t *testing.T) { payload := StartMinerPayload{ MinerType: "xmrig", Config: RawMessage(`{"pool":"test:3333"}`), @@ -678,7 +678,7 @@ func TestWorker_HandleStartMiner_WithManager_Good(t *testing.T) { } }) - t.Run("WithProfileManager", func(t *testing.T) { + t.Run("ProfileManagerConfigured", func(t *testing.T) { pm := &mockProfileManagerFull{ profiles: map[string]any{ "test-profile": map[string]any{"pool": "pool.test:3333"}, @@ -787,7 +787,7 @@ func TestWorker_HandleStopMiner_WithManager_Good(t *testing.T) { } transport := NewTransport(nm, pr, DefaultTransportConfig()) worker := NewWorker(nm, transport) - worker.DataDirectory = t.TempDir() + worker.DeploymentDirectory = t.TempDir() identity := nm.Identity() t.Run("Success", func(t *testing.T) { @@ -851,7 +851,7 @@ func TestWorker_HandleLogs_WithManager_Good(t *testing.T) { } transport := NewTransport(nm, pr, DefaultTransportConfig()) worker := NewWorker(nm, transport) - worker.DataDirectory = t.TempDir() + worker.DeploymentDirectory = t.TempDir() identity := nm.Identity() t.Run("Success", func(t *testing.T) { @@ -958,7 +958,7 @@ func TestWorker_HandleStats_WithMinerManager_Good(t *testing.T) { } transport := NewTransport(nm, pr, DefaultTransportConfig()) worker := NewWorker(nm, transport) - worker.DataDirectory = t.TempDir() + worker.DeploymentDirectory = t.TempDir() identity := nm.Identity() // Set miner manager with miners that have real stats @@ -1019,7 +1019,7 @@ func TestWorker_HandleMessage_UnknownType_Bad(t *testing.T) { } transport := NewTransport(nm, pr, DefaultTransportConfig()) worker := NewWorker(nm, transport) - worker.DataDirectory = t.TempDir() + worker.DeploymentDirectory = t.TempDir() identity := nm.Identity() msg, _ := NewMessage("unknown_type", "sender-id", identity.ID, nil) @@ -1046,7 +1046,7 @@ func TestWorker_HandleDeploy_ProfileWithManager_Good(t *testing.T) { } transport := NewTransport(nm, pr, DefaultTransportConfig()) worker := NewWorker(nm, transport) - worker.DataDirectory = t.TempDir() + worker.DeploymentDirectory = t.TempDir() pm := &mockProfileManagerFull{profiles: make(map[string]any)} worker.SetProfileManager(pm) @@ -1101,7 +1101,7 @@ func TestWorker_HandleDeploy_ProfileSaveFails_Bad(t *testing.T) { } transport := NewTransport(nm, pr, DefaultTransportConfig()) worker := NewWorker(nm, transport) - worker.DataDirectory = t.TempDir() + worker.DeploymentDirectory = t.TempDir() worker.SetProfileManager(&mockProfileManagerFailing{}) identity := nm.Identity() @@ -1147,7 +1147,7 @@ func TestWorker_HandleDeploy_MinerBundle_Good(t *testing.T) { } transport := NewTransport(nm, pr, DefaultTransportConfig()) worker := NewWorker(nm, transport) - worker.DataDirectory = t.TempDir() + worker.DeploymentDirectory = t.TempDir() pm := &mockProfileManagerFull{profiles: make(map[string]any)} worker.SetProfileManager(pm) @@ -1211,7 +1211,7 @@ func TestWorker_HandleDeploy_FullBundle_Good(t *testing.T) { } transport := NewTransport(nm, pr, DefaultTransportConfig()) worker := NewWorker(nm, transport) - worker.DataDirectory = t.TempDir() + worker.DeploymentDirectory = t.TempDir() identity := nm.Identity() @@ -1249,7 +1249,7 @@ func TestWorker_HandleDeploy_FullBundle_Good(t *testing.T) { } } -func TestWorker_HandleDeploy_MinerBundle_WithProfileManager_Good(t *testing.T) { +func TestWorker_HandleDeploy_MinerBundle_ProfileManager_Good(t *testing.T) { cleanup := setupTestEnvironment(t) defer cleanup() @@ -1267,7 +1267,7 @@ func TestWorker_HandleDeploy_MinerBundle_WithProfileManager_Good(t *testing.T) { } transport := NewTransport(nm, pr, DefaultTransportConfig()) worker := NewWorker(nm, transport) - worker.DataDirectory = t.TempDir() + worker.DeploymentDirectory = t.TempDir() // Set a failing profile manager to exercise the warn-and-continue path worker.SetProfileManager(&mockProfileManagerFailing{}) @@ -1320,7 +1320,7 @@ func TestWorker_HandleDeploy_InvalidPayload_Bad(t *testing.T) { pr, _ := NewPeerRegistryFromPath(testJoinPath(t.TempDir(), "peers.json")) transport := NewTransport(nm, pr, DefaultTransportConfig()) worker := NewWorker(nm, transport) - worker.DataDirectory = t.TempDir() + worker.DeploymentDirectory = t.TempDir() identity := nm.Identity() // Create a message with invalid payload @@ -1345,7 +1345,7 @@ func TestWorker_HandleStats_NoIdentity_Bad(t *testing.T) { pr, _ := NewPeerRegistryFromPath(testJoinPath(t.TempDir(), "peers.json")) transport := NewTransport(nm, pr, DefaultTransportConfig()) worker := NewWorker(nm, transport) - worker.DataDirectory = t.TempDir() + worker.DeploymentDirectory = t.TempDir() msg, _ := NewMessage(MessageGetStats, "sender-id", "target-id", nil) _, err := worker.handleStats(msg)