diff --git a/cmd/mining/cmd/node.go b/cmd/mining/cmd/node.go index 00d1d6f..9b5fbb6 100644 --- a/cmd/mining/cmd/node.go +++ b/cmd/mining/cmd/node.go @@ -150,9 +150,24 @@ This allows other nodes to connect, send commands, and receive stats.`, transport := node.NewTransport(nm, pr, config) + // Create controller for outgoing requests + controller := node.NewController(nm, pr, transport) + // Create worker to handle incoming messages worker := node.NewWorker(nm, transport) - worker.RegisterWithTransport() + + // Use dispatcher to route messages to controller or worker + dispatcher := node.NewMessageDispatcher(controller, worker) + transport.OnMessage(dispatcher.Dispatch) + + // Start IPC control server + controlListener, err := node.StartControlServer(controller) + if err != nil { + // Log error but continue (fail soft for control server) + fmt.Printf("Warning: failed to start control server: %v\n", err) + } else { + defer controlListener.Close() + } if err := transport.Start(); err != nil { return fmt.Errorf("failed to start transport: %w", err) @@ -162,6 +177,9 @@ This allows other nodes to connect, send commands, and receive stats.`, fmt.Printf("P2P server started on %s\n", config.ListenAddr) fmt.Printf("Node ID: %s (%s)\n", identity.ID, identity.Name) fmt.Printf("Role: %s\n", identity.Role) + if controlListener != nil { + fmt.Printf("Control Socket: %s\n", controlListener.Addr()) + } fmt.Println() fmt.Println("Press Ctrl+C to stop...") @@ -173,6 +191,10 @@ This allows other nodes to connect, send commands, and receive stats.`, sig := <-sigChan fmt.Printf("\nReceived signal %v, shutting down...\n", sig) + if controlListener != nil { + controlListener.Close() + } + // Graceful shutdown: stop transport and cleanup resources if err := transport.Stop(); err != nil { fmt.Printf("Warning: error during transport shutdown: %v\n", err) diff --git a/cmd/mining/cmd/peer.go b/cmd/mining/cmd/peer.go index 086d8a2..fb40c41 100644 --- a/cmd/mining/cmd/peer.go +++ b/cmd/mining/cmd/peer.go @@ -149,13 +149,24 @@ var peerPingCmd = &cobra.Command{ return fmt.Errorf("peer not found: %s", peerID) } - if !peer.Connected { - return fmt.Errorf("peer not connected: %s", peer.Name) + // Connect to the control service of the running node + client, err := node.NewControlClient() + if err != nil { + return fmt.Errorf("failed to connect to node: %w", err) } + defer client.Close() fmt.Printf("Pinging %s (%s)...\n", peer.Name, peer.Address) - // TODO: Actually send ping via transport - fmt.Println("Ping functionality requires active connection via 'node serve'") + + // Call the Ping method via RPC + pingArgs := &node.PingArgs{PeerID: peer.ID} + var reply node.PingReply + + if err := client.Call("ControlService.Ping", pingArgs, &reply); err != nil { + return fmt.Errorf("ping failed: %w", err) + } + + fmt.Printf("Reply from %s: time=%.1f ms\n", peer.Name, reply.LatencyMS) return nil }, } diff --git a/miner/core/tests/CMakeLists.txt b/miner/core/tests/CMakeLists.txt index e2d349c..2089a9a 100644 --- a/miner/core/tests/CMakeLists.txt +++ b/miner/core/tests/CMakeLists.txt @@ -18,8 +18,22 @@ list(FILTER MINER_SOURCES EXCLUDE REGEX ".*_riscv.*") if (WIN32) list(FILTER MINER_SOURCES EXCLUDE REGEX ".*_unix\\.cpp$") list(FILTER MINER_SOURCES EXCLUDE REGEX ".*_linux\\.cpp$") + list(FILTER MINER_SOURCES EXCLUDE REGEX ".*_mac\\.cpp$") +elseif (APPLE) + list(FILTER MINER_SOURCES EXCLUDE REGEX ".*_win\\.cpp$") + list(FILTER MINER_SOURCES EXCLUDE REGEX ".*_linux\\.cpp$") else() list(FILTER MINER_SOURCES EXCLUDE REGEX ".*_win\\.cpp$") + list(FILTER MINER_SOURCES EXCLUDE REGEX ".*_mac\\.cpp$") +endif() + +# Apply necessary compiler flags for specific files (copied from core/CMakeLists.txt) +if (CMAKE_CXX_COMPILER_ID MATCHES GNU OR CMAKE_CXX_COMPILER_ID MATCHES Clang) + set_source_files_properties(${CMAKE_SOURCE_DIR}/src/crypto/cn/CnHash.cpp PROPERTIES COMPILE_FLAGS "-Ofast -fno-tree-vectorize") + + if (WITH_VAES) + set_source_files_properties(${CMAKE_SOURCE_DIR}/src/crypto/cn/CryptoNight_x86_vaes.cpp PROPERTIES COMPILE_FLAGS "-Ofast -fno-tree-vectorize -mavx2 -mvaes") + endif() endif() # Create a library with common test utilities and miner components diff --git a/miner/proxy/CMakeLists.txt b/miner/proxy/CMakeLists.txt index a12d2b4..66003e4 100644 --- a/miner/proxy/CMakeLists.txt +++ b/miner/proxy/CMakeLists.txt @@ -162,7 +162,7 @@ if (CMAKE_SYSTEM_PROCESSOR MATCHES "^(x86_64|AMD64)$" AND CMAKE_SIZEOF_VOID_P EQ add_definitions(/DRAPIDJSON_SSE2) endif() -set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/cmake") +set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/../core/cmake") find_package(UV REQUIRED) diff --git a/pkg/mining/manager_test.go b/pkg/mining/manager_test.go index 94a46f8..329f0b9 100644 --- a/pkg/mining/manager_test.go +++ b/pkg/mining/manager_test.go @@ -151,7 +151,14 @@ func TestGetMiner_Good(t *testing.T) { } // Case 1: Get an existing miner - startedMiner, _ := m.StartMiner(context.Background(), "xmrig", config) + startedMiner, err := m.StartMiner(context.Background(), "xmrig", config) + if err != nil { + t.Fatalf("Failed to start miner: %v", err) + } + if startedMiner == nil { + t.Fatal("StartMiner returned nil miner") + } + retrievedMiner, err := m.GetMiner(startedMiner.GetName()) if err != nil { t.Fatalf("Expected to get miner, but got error: %v", err) diff --git a/pkg/mining/service.go b/pkg/mining/service.go index f65ea40..ddee4bd 100644 --- a/pkg/mining/service.go +++ b/pkg/mining/service.go @@ -2,6 +2,7 @@ package mining import ( "context" + "crypto/rand" "encoding/base64" "encoding/json" "fmt" @@ -203,9 +204,12 @@ func requestIDMiddleware() gin.HandlerFunc { // generateRequestID creates a unique request ID using timestamp and random bytes func generateRequestID() string { - b := make([]byte, 8) - _, _ = base64.StdEncoding.Decode(b, []byte(fmt.Sprintf("%d", time.Now().UnixNano()))) - return fmt.Sprintf("%d-%x", time.Now().UnixMilli(), b[:4]) + b := make([]byte, 4) + if _, err := rand.Read(b); err != nil { + // Fallback if random source fails (unlikely) + return fmt.Sprintf("%d", time.Now().UnixNano()) + } + return fmt.Sprintf("%d-%x", time.Now().UnixMilli(), b) } // getRequestID extracts the request ID from gin context diff --git a/pkg/node/control.go b/pkg/node/control.go new file mode 100644 index 0000000..cf0dd3b --- /dev/null +++ b/pkg/node/control.go @@ -0,0 +1,124 @@ +package node + +import ( + "fmt" + "net" + "net/rpc" + "os" + "path/filepath" + "time" + + "github.com/Snider/Mining/pkg/logging" + "github.com/adrg/xdg" +) + +// ControlService exposes node commands via RPC. +type ControlService struct { + controller *Controller +} + +// PingArgs represents arguments for the Ping command. +type PingArgs struct { + PeerID string +} + +// PingReply represents the response from the Ping command. +type PingReply struct { + LatencyMS float64 +} + +// Ping sends a ping to the specified peer. +func (s *ControlService) Ping(args *PingArgs, reply *PingReply) error { + if s.controller == nil { + return fmt.Errorf("controller not initialized") + } + + latency, err := s.controller.PingPeer(args.PeerID) + if err != nil { + return err + } + + reply.LatencyMS = latency + return nil +} + +// StartControlServer starts the RPC server on a Unix socket. +// Returns the listener, which should be closed by the caller. +func StartControlServer(controller *Controller) (net.Listener, error) { + service := &ControlService{controller: controller} + server := rpc.NewServer() + if err := server.Register(service); err != nil { + return nil, fmt.Errorf("failed to register control service: %w", err) + } + + sockPath, err := getControlSocketPath() + if err != nil { + return nil, err + } + + // Remove stale socket file if it exists + if _, err := os.Stat(sockPath); err == nil { + // Try to connect to see if it's active + if conn, err := net.DialTimeout("unix", sockPath, 100*time.Millisecond); err == nil { + conn.Close() + return nil, fmt.Errorf("control socket already in use: %s", sockPath) + } + // Not active, remove it + if err := os.Remove(sockPath); err != nil { + return nil, fmt.Errorf("failed to remove stale socket: %w", err) + } + } + + // Ensure directory exists + if err := os.MkdirAll(filepath.Dir(sockPath), 0755); err != nil { + return nil, fmt.Errorf("failed to create socket directory: %w", err) + } + + listener, err := net.Listen("unix", sockPath) + if err != nil { + return nil, fmt.Errorf("failed to listen on socket %s: %w", sockPath, err) + } + + // Set permissions so only user can access + if err := os.Chmod(sockPath, 0600); err != nil { + listener.Close() + return nil, fmt.Errorf("failed to set socket permissions: %w", err) + } + + logging.Info("control server started", logging.Fields{"address": sockPath}) + + go func() { + for { + conn, err := listener.Accept() + if err != nil { + // Check if listener was closed (ErrNetClosing is not exported, check string or type) + // Simply returning on error is usually fine for this use case + return + } + go server.ServeConn(conn) + } + }() + + return listener, nil +} + +// NewControlClient creates a new RPC client connected to the control server. +func NewControlClient() (*rpc.Client, error) { + sockPath, err := getControlSocketPath() + if err != nil { + return nil, err + } + + // Use DialTimeout to fail fast if server is not running + conn, err := net.DialTimeout("unix", sockPath, 2*time.Second) + if err != nil { + return nil, fmt.Errorf("failed to connect to control socket (is 'node serve' running?): %w", err) + } + + return rpc.NewClient(conn), nil +} + +// getControlSocketPath returns the path to the control socket. +func getControlSocketPath() (string, error) { + return xdg.RuntimeFile("lethean-desktop/node.sock") +} diff --git a/pkg/node/controller.go b/pkg/node/controller.go index 299c631..6c8d3aa 100644 --- a/pkg/node/controller.go +++ b/pkg/node/controller.go @@ -30,14 +30,11 @@ func NewController(node *NodeManager, peers *PeerRegistry, transport *Transport) pending: make(map[string]chan *Message), } - // Register message handler for responses - transport.OnMessage(c.handleResponse) - return c } -// handleResponse processes incoming messages that are responses to our requests. -func (c *Controller) handleResponse(conn *PeerConnection, msg *Message) { +// HandleResponse processes incoming messages that are responses to our requests. +func (c *Controller) HandleResponse(conn *PeerConnection, msg *Message) { if msg.ReplyTo == "" { return // Not a response, let worker handle it } diff --git a/pkg/node/dispatcher.go b/pkg/node/dispatcher.go deleted file mode 100644 index f6b4124..0000000 --- a/pkg/node/dispatcher.go +++ /dev/null @@ -1,34 +0,0 @@ -// pkg/node/dispatcher.go - -func (n *NodeManager) DispatchUEPS(pkt *ueps.ParsedPacket) error { - // 1. The "Threat" Circuit Breaker (L5 Guard) - if pkt.Header.ThreatScore > 50000 { - // High threat? Drop it. Don't even parse the payload. - // This protects the Agent from "semantic viruses" - return fmt.Errorf("packet rejected: threat score %d exceeds safety limit", pkt.Header.ThreatScore) - } - - // 2. The "Intent" Router (L9 Semantic) - switch pkt.Header.IntentID { - - case 0x01: // Handshake / Hello - return n.handleHandshake(pkt) - - case 0x20: // Compute / Job Request - // "Hey, can you run this Docker container?" - // Check local resources first (Self-Validation) - return n.handleComputeRequest(pkt.Payload) - - case 0x30: // Rehab / Intervention - // "Violet says you are hallucinating. Pause execution." - // This is the "Benevolent Intervention" Axiom. - return n.enterRehabMode(pkt.Payload) - - case 0xFF: // Extended / Custom - // Check the payload for specific sub-protocols (e.g. your JSON blobs) - return n.handleApplicationData(pkt.Payload) - - default: - return fmt.Errorf("unknown intent ID: 0x%X", pkt.Header.IntentID) - } -} diff --git a/pkg/node/msg_dispatcher.go b/pkg/node/msg_dispatcher.go new file mode 100644 index 0000000..bd99286 --- /dev/null +++ b/pkg/node/msg_dispatcher.go @@ -0,0 +1,29 @@ +package node + +// MessageDispatcher routes messages to either Controller or Worker +type MessageDispatcher struct { + controller *Controller + worker *Worker +} + +// NewMessageDispatcher creates a new MessageDispatcher +func NewMessageDispatcher(controller *Controller, worker *Worker) *MessageDispatcher { + return &MessageDispatcher{ + controller: controller, + worker: worker, + } +} + +// Dispatch routes incoming messages to the appropriate handler +func (d *MessageDispatcher) Dispatch(conn *PeerConnection, msg *Message) { + // If ReplyTo is set, it's a response to a request initiated by Controller + if msg.ReplyTo != "" && d.controller != nil { + d.controller.HandleResponse(conn, msg) + return + } + + // Otherwise treat it as a new request for Worker + if d.worker != nil { + d.worker.HandleMessage(conn, msg) + } +} diff --git a/pkg/node/peer.go b/pkg/node/peer.go index 4c76962..5d541c9 100644 --- a/pkg/node/peer.go +++ b/pkg/node/peer.go @@ -706,23 +706,23 @@ func (r *PeerRegistry) load() error { } // Example usage inside a connection handler -func (n *NodeManager) SendEthicalPacket(peerID string, intent uint8, data []byte) error { - // 1. Get the shared secret for this specific peer (derived from ECDH) - secret, err := n.DeriveSharedSecret(peerID) - if err != nil { - return err - } - - // 2. Construct the UEPS frame - // Intent 0x20 = e.g., "Distributed Compute" - pkt := ueps.NewBuilder(intent, data) - - // 3. Seal it - wireBytes, err := pkt.MarshalAndSign(secret) - if err != nil { - return err - } - - // 4. Send wireBytes over your TCP connection... - return nil -} +// func (n *NodeManager) SendEthicalPacket(peerID string, intent uint8, data []byte) error { +// // 1. Get the shared secret for this specific peer (derived from ECDH) +// secret, err := n.DeriveSharedSecret(peerID) +// if err != nil { +// return err +// } +// +// // 2. Construct the UEPS frame +// // Intent 0x20 = e.g., "Distributed Compute" +// pkt := ueps.NewBuilder(intent, data) +// +// // 3. Seal it +// wireBytes, err := pkt.MarshalAndSign(secret) +// if err != nil { +// return err +// } +// +// // 4. Send wireBytes over your TCP connection... +// return nil +// } diff --git a/pkg/ueps/packet.go b/pkg/ueps/packet.go index 8331202..7c75334 100644 --- a/pkg/ueps/packet.go +++ b/pkg/ueps/packet.go @@ -6,7 +6,6 @@ import ( "crypto/sha256" "encoding/binary" "errors" - "fmt" "io" )