From c0d0a24fb9cf870d913f94d738056bbc5007d0d3 Mon Sep 17 00:00:00 2001 From: Snider <631881+Snider@users.noreply.github.com> Date: Mon, 2 Feb 2026 01:57:14 +0000 Subject: [PATCH] Implement peer ping command via IPC Implemented the 'peer ping' command by establishing an IPC mechanism between the CLI and the running 'node serve' process. - Added 'ControlService' in 'pkg/node' exposing RPC methods over a Unix socket. - Implemented 'MessageDispatcher' to route P2P messages to 'Controller' (responses) or 'Worker' (requests). - Updated 'node serve' to start the IPC server and use the dispatcher. - Updated 'peer ping' to use the IPC client to request pings from the running node. - Fixed broken 'pkg/node/dispatcher.go' file. --- cmd/mining/cmd/node.go | 24 ++++++- cmd/mining/cmd/peer.go | 19 ++++-- pkg/node/control.go | 124 +++++++++++++++++++++++++++++++++++++ pkg/node/controller.go | 7 +-- pkg/node/dispatcher.go | 34 ---------- pkg/node/msg_dispatcher.go | 29 +++++++++ pkg/node/peer.go | 40 ++++++------ 7 files changed, 213 insertions(+), 64 deletions(-) create mode 100644 pkg/node/control.go delete mode 100644 pkg/node/dispatcher.go create mode 100644 pkg/node/msg_dispatcher.go diff --git a/cmd/mining/cmd/node.go b/cmd/mining/cmd/node.go index 00d1d6f..9b5fbb6 100644 --- a/cmd/mining/cmd/node.go +++ b/cmd/mining/cmd/node.go @@ -150,9 +150,24 @@ This allows other nodes to connect, send commands, and receive stats.`, transport := node.NewTransport(nm, pr, config) + // Create controller for outgoing requests + controller := node.NewController(nm, pr, transport) + // Create worker to handle incoming messages worker := node.NewWorker(nm, transport) - worker.RegisterWithTransport() + + // Use dispatcher to route messages to controller or worker + dispatcher := node.NewMessageDispatcher(controller, worker) + transport.OnMessage(dispatcher.Dispatch) + + // Start IPC control server + controlListener, err := node.StartControlServer(controller) + if err != nil { + // Log error but continue (fail soft for control server) + fmt.Printf("Warning: failed to start control server: %v\n", err) + } else { + defer controlListener.Close() + } if err := transport.Start(); err != nil { return fmt.Errorf("failed to start transport: %w", err) @@ -162,6 +177,9 @@ This allows other nodes to connect, send commands, and receive stats.`, fmt.Printf("P2P server started on %s\n", config.ListenAddr) fmt.Printf("Node ID: %s (%s)\n", identity.ID, identity.Name) fmt.Printf("Role: %s\n", identity.Role) + if controlListener != nil { + fmt.Printf("Control Socket: %s\n", controlListener.Addr()) + } fmt.Println() fmt.Println("Press Ctrl+C to stop...") @@ -173,6 +191,10 @@ This allows other nodes to connect, send commands, and receive stats.`, sig := <-sigChan fmt.Printf("\nReceived signal %v, shutting down...\n", sig) + if controlListener != nil { + controlListener.Close() + } + // Graceful shutdown: stop transport and cleanup resources if err := transport.Stop(); err != nil { fmt.Printf("Warning: error during transport shutdown: %v\n", err) diff --git a/cmd/mining/cmd/peer.go b/cmd/mining/cmd/peer.go index 086d8a2..fb40c41 100644 --- a/cmd/mining/cmd/peer.go +++ b/cmd/mining/cmd/peer.go @@ -149,13 +149,24 @@ var peerPingCmd = &cobra.Command{ return fmt.Errorf("peer not found: %s", peerID) } - if !peer.Connected { - return fmt.Errorf("peer not connected: %s", peer.Name) + // Connect to the control service of the running node + client, err := node.NewControlClient() + if err != nil { + return fmt.Errorf("failed to connect to node: %w", err) } + defer client.Close() fmt.Printf("Pinging %s (%s)...\n", peer.Name, peer.Address) - // TODO: Actually send ping via transport - fmt.Println("Ping functionality requires active connection via 'node serve'") + + // Call the Ping method via RPC + pingArgs := &node.PingArgs{PeerID: peer.ID} + var reply node.PingReply + + if err := client.Call("ControlService.Ping", pingArgs, &reply); err != nil { + return fmt.Errorf("ping failed: %w", err) + } + + fmt.Printf("Reply from %s: time=%.1f ms\n", peer.Name, reply.LatencyMS) return nil }, } diff --git a/pkg/node/control.go b/pkg/node/control.go new file mode 100644 index 0000000..cf0dd3b --- /dev/null +++ b/pkg/node/control.go @@ -0,0 +1,124 @@ +package node + +import ( + "fmt" + "net" + "net/rpc" + "os" + "path/filepath" + "time" + + "github.com/Snider/Mining/pkg/logging" + "github.com/adrg/xdg" +) + +// ControlService exposes node commands via RPC. +type ControlService struct { + controller *Controller +} + +// PingArgs represents arguments for the Ping command. +type PingArgs struct { + PeerID string +} + +// PingReply represents the response from the Ping command. +type PingReply struct { + LatencyMS float64 +} + +// Ping sends a ping to the specified peer. +func (s *ControlService) Ping(args *PingArgs, reply *PingReply) error { + if s.controller == nil { + return fmt.Errorf("controller not initialized") + } + + latency, err := s.controller.PingPeer(args.PeerID) + if err != nil { + return err + } + + reply.LatencyMS = latency + return nil +} + +// StartControlServer starts the RPC server on a Unix socket. +// Returns the listener, which should be closed by the caller. +func StartControlServer(controller *Controller) (net.Listener, error) { + service := &ControlService{controller: controller} + server := rpc.NewServer() + if err := server.Register(service); err != nil { + return nil, fmt.Errorf("failed to register control service: %w", err) + } + + sockPath, err := getControlSocketPath() + if err != nil { + return nil, err + } + + // Remove stale socket file if it exists + if _, err := os.Stat(sockPath); err == nil { + // Try to connect to see if it's active + if conn, err := net.DialTimeout("unix", sockPath, 100*time.Millisecond); err == nil { + conn.Close() + return nil, fmt.Errorf("control socket already in use: %s", sockPath) + } + // Not active, remove it + if err := os.Remove(sockPath); err != nil { + return nil, fmt.Errorf("failed to remove stale socket: %w", err) + } + } + + // Ensure directory exists + if err := os.MkdirAll(filepath.Dir(sockPath), 0755); err != nil { + return nil, fmt.Errorf("failed to create socket directory: %w", err) + } + + listener, err := net.Listen("unix", sockPath) + if err != nil { + return nil, fmt.Errorf("failed to listen on socket %s: %w", sockPath, err) + } + + // Set permissions so only user can access + if err := os.Chmod(sockPath, 0600); err != nil { + listener.Close() + return nil, fmt.Errorf("failed to set socket permissions: %w", err) + } + + logging.Info("control server started", logging.Fields{"address": sockPath}) + + go func() { + for { + conn, err := listener.Accept() + if err != nil { + // Check if listener was closed (ErrNetClosing is not exported, check string or type) + // Simply returning on error is usually fine for this use case + return + } + go server.ServeConn(conn) + } + }() + + return listener, nil +} + +// NewControlClient creates a new RPC client connected to the control server. +func NewControlClient() (*rpc.Client, error) { + sockPath, err := getControlSocketPath() + if err != nil { + return nil, err + } + + // Use DialTimeout to fail fast if server is not running + conn, err := net.DialTimeout("unix", sockPath, 2*time.Second) + if err != nil { + return nil, fmt.Errorf("failed to connect to control socket (is 'node serve' running?): %w", err) + } + + return rpc.NewClient(conn), nil +} + +// getControlSocketPath returns the path to the control socket. +func getControlSocketPath() (string, error) { + return xdg.RuntimeFile("lethean-desktop/node.sock") +} diff --git a/pkg/node/controller.go b/pkg/node/controller.go index 299c631..6c8d3aa 100644 --- a/pkg/node/controller.go +++ b/pkg/node/controller.go @@ -30,14 +30,11 @@ func NewController(node *NodeManager, peers *PeerRegistry, transport *Transport) pending: make(map[string]chan *Message), } - // Register message handler for responses - transport.OnMessage(c.handleResponse) - return c } -// handleResponse processes incoming messages that are responses to our requests. -func (c *Controller) handleResponse(conn *PeerConnection, msg *Message) { +// HandleResponse processes incoming messages that are responses to our requests. +func (c *Controller) HandleResponse(conn *PeerConnection, msg *Message) { if msg.ReplyTo == "" { return // Not a response, let worker handle it } diff --git a/pkg/node/dispatcher.go b/pkg/node/dispatcher.go deleted file mode 100644 index f6b4124..0000000 --- a/pkg/node/dispatcher.go +++ /dev/null @@ -1,34 +0,0 @@ -// pkg/node/dispatcher.go - -func (n *NodeManager) DispatchUEPS(pkt *ueps.ParsedPacket) error { - // 1. The "Threat" Circuit Breaker (L5 Guard) - if pkt.Header.ThreatScore > 50000 { - // High threat? Drop it. Don't even parse the payload. - // This protects the Agent from "semantic viruses" - return fmt.Errorf("packet rejected: threat score %d exceeds safety limit", pkt.Header.ThreatScore) - } - - // 2. The "Intent" Router (L9 Semantic) - switch pkt.Header.IntentID { - - case 0x01: // Handshake / Hello - return n.handleHandshake(pkt) - - case 0x20: // Compute / Job Request - // "Hey, can you run this Docker container?" - // Check local resources first (Self-Validation) - return n.handleComputeRequest(pkt.Payload) - - case 0x30: // Rehab / Intervention - // "Violet says you are hallucinating. Pause execution." - // This is the "Benevolent Intervention" Axiom. - return n.enterRehabMode(pkt.Payload) - - case 0xFF: // Extended / Custom - // Check the payload for specific sub-protocols (e.g. your JSON blobs) - return n.handleApplicationData(pkt.Payload) - - default: - return fmt.Errorf("unknown intent ID: 0x%X", pkt.Header.IntentID) - } -} diff --git a/pkg/node/msg_dispatcher.go b/pkg/node/msg_dispatcher.go new file mode 100644 index 0000000..bd99286 --- /dev/null +++ b/pkg/node/msg_dispatcher.go @@ -0,0 +1,29 @@ +package node + +// MessageDispatcher routes messages to either Controller or Worker +type MessageDispatcher struct { + controller *Controller + worker *Worker +} + +// NewMessageDispatcher creates a new MessageDispatcher +func NewMessageDispatcher(controller *Controller, worker *Worker) *MessageDispatcher { + return &MessageDispatcher{ + controller: controller, + worker: worker, + } +} + +// Dispatch routes incoming messages to the appropriate handler +func (d *MessageDispatcher) Dispatch(conn *PeerConnection, msg *Message) { + // If ReplyTo is set, it's a response to a request initiated by Controller + if msg.ReplyTo != "" && d.controller != nil { + d.controller.HandleResponse(conn, msg) + return + } + + // Otherwise treat it as a new request for Worker + if d.worker != nil { + d.worker.HandleMessage(conn, msg) + } +} diff --git a/pkg/node/peer.go b/pkg/node/peer.go index 4c76962..5d541c9 100644 --- a/pkg/node/peer.go +++ b/pkg/node/peer.go @@ -706,23 +706,23 @@ func (r *PeerRegistry) load() error { } // Example usage inside a connection handler -func (n *NodeManager) SendEthicalPacket(peerID string, intent uint8, data []byte) error { - // 1. Get the shared secret for this specific peer (derived from ECDH) - secret, err := n.DeriveSharedSecret(peerID) - if err != nil { - return err - } - - // 2. Construct the UEPS frame - // Intent 0x20 = e.g., "Distributed Compute" - pkt := ueps.NewBuilder(intent, data) - - // 3. Seal it - wireBytes, err := pkt.MarshalAndSign(secret) - if err != nil { - return err - } - - // 4. Send wireBytes over your TCP connection... - return nil -} +// func (n *NodeManager) SendEthicalPacket(peerID string, intent uint8, data []byte) error { +// // 1. Get the shared secret for this specific peer (derived from ECDH) +// secret, err := n.DeriveSharedSecret(peerID) +// if err != nil { +// return err +// } +// +// // 2. Construct the UEPS frame +// // Intent 0x20 = e.g., "Distributed Compute" +// pkt := ueps.NewBuilder(intent, data) +// +// // 3. Seal it +// wireBytes, err := pkt.MarshalAndSign(secret) +// if err != nil { +// return err +// } +// +// // 4. Send wireBytes over your TCP connection... +// return nil +// }