Implement peer ping command via IPC

Implemented the 'peer ping' command by establishing an IPC mechanism between the CLI and the running 'node serve' process.
- Added 'ControlService' in 'pkg/node' exposing RPC methods over a Unix socket.
- Implemented 'MessageDispatcher' to route P2P messages to 'Controller' (responses) or 'Worker' (requests).
- Updated 'node serve' to start the IPC server and use the dispatcher.
- Updated 'peer ping' to use the IPC client to request pings from the running node.
- Fixed broken 'pkg/node/dispatcher.go' file.
This commit is contained in:
Snider 2026-02-02 01:57:14 +00:00
parent bce309b78d
commit c0d0a24fb9
7 changed files with 213 additions and 64 deletions

View file

@ -150,9 +150,24 @@ This allows other nodes to connect, send commands, and receive stats.`,
transport := node.NewTransport(nm, pr, config)
// Create controller for outgoing requests
controller := node.NewController(nm, pr, transport)
// Create worker to handle incoming messages
worker := node.NewWorker(nm, transport)
worker.RegisterWithTransport()
// Use dispatcher to route messages to controller or worker
dispatcher := node.NewMessageDispatcher(controller, worker)
transport.OnMessage(dispatcher.Dispatch)
// Start IPC control server
controlListener, err := node.StartControlServer(controller)
if err != nil {
// Log error but continue (fail soft for control server)
fmt.Printf("Warning: failed to start control server: %v\n", err)
} else {
defer controlListener.Close()
}
if err := transport.Start(); err != nil {
return fmt.Errorf("failed to start transport: %w", err)
@ -162,6 +177,9 @@ This allows other nodes to connect, send commands, and receive stats.`,
fmt.Printf("P2P server started on %s\n", config.ListenAddr)
fmt.Printf("Node ID: %s (%s)\n", identity.ID, identity.Name)
fmt.Printf("Role: %s\n", identity.Role)
if controlListener != nil {
fmt.Printf("Control Socket: %s\n", controlListener.Addr())
}
fmt.Println()
fmt.Println("Press Ctrl+C to stop...")
@ -173,6 +191,10 @@ This allows other nodes to connect, send commands, and receive stats.`,
sig := <-sigChan
fmt.Printf("\nReceived signal %v, shutting down...\n", sig)
if controlListener != nil {
controlListener.Close()
}
// Graceful shutdown: stop transport and cleanup resources
if err := transport.Stop(); err != nil {
fmt.Printf("Warning: error during transport shutdown: %v\n", err)

View file

@ -149,13 +149,24 @@ var peerPingCmd = &cobra.Command{
return fmt.Errorf("peer not found: %s", peerID)
}
if !peer.Connected {
return fmt.Errorf("peer not connected: %s", peer.Name)
// Connect to the control service of the running node
client, err := node.NewControlClient()
if err != nil {
return fmt.Errorf("failed to connect to node: %w", err)
}
defer client.Close()
fmt.Printf("Pinging %s (%s)...\n", peer.Name, peer.Address)
// TODO: Actually send ping via transport
fmt.Println("Ping functionality requires active connection via 'node serve'")
// Call the Ping method via RPC
pingArgs := &node.PingArgs{PeerID: peer.ID}
var reply node.PingReply
if err := client.Call("ControlService.Ping", pingArgs, &reply); err != nil {
return fmt.Errorf("ping failed: %w", err)
}
fmt.Printf("Reply from %s: time=%.1f ms\n", peer.Name, reply.LatencyMS)
return nil
},
}

124
pkg/node/control.go Normal file
View file

@ -0,0 +1,124 @@
package node
import (
"fmt"
"net"
"net/rpc"
"os"
"path/filepath"
"time"
"github.com/Snider/Mining/pkg/logging"
"github.com/adrg/xdg"
)
// ControlService exposes node commands via RPC.
type ControlService struct {
controller *Controller
}
// PingArgs represents arguments for the Ping command.
type PingArgs struct {
PeerID string
}
// PingReply represents the response from the Ping command.
type PingReply struct {
LatencyMS float64
}
// Ping sends a ping to the specified peer.
func (s *ControlService) Ping(args *PingArgs, reply *PingReply) error {
if s.controller == nil {
return fmt.Errorf("controller not initialized")
}
latency, err := s.controller.PingPeer(args.PeerID)
if err != nil {
return err
}
reply.LatencyMS = latency
return nil
}
// StartControlServer starts the RPC server on a Unix socket.
// Returns the listener, which should be closed by the caller.
func StartControlServer(controller *Controller) (net.Listener, error) {
service := &ControlService{controller: controller}
server := rpc.NewServer()
if err := server.Register(service); err != nil {
return nil, fmt.Errorf("failed to register control service: %w", err)
}
sockPath, err := getControlSocketPath()
if err != nil {
return nil, err
}
// Remove stale socket file if it exists
if _, err := os.Stat(sockPath); err == nil {
// Try to connect to see if it's active
if conn, err := net.DialTimeout("unix", sockPath, 100*time.Millisecond); err == nil {
conn.Close()
return nil, fmt.Errorf("control socket already in use: %s", sockPath)
}
// Not active, remove it
if err := os.Remove(sockPath); err != nil {
return nil, fmt.Errorf("failed to remove stale socket: %w", err)
}
}
// Ensure directory exists
if err := os.MkdirAll(filepath.Dir(sockPath), 0755); err != nil {
return nil, fmt.Errorf("failed to create socket directory: %w", err)
}
listener, err := net.Listen("unix", sockPath)
if err != nil {
return nil, fmt.Errorf("failed to listen on socket %s: %w", sockPath, err)
}
// Set permissions so only user can access
if err := os.Chmod(sockPath, 0600); err != nil {
listener.Close()
return nil, fmt.Errorf("failed to set socket permissions: %w", err)
}
logging.Info("control server started", logging.Fields{"address": sockPath})
go func() {
for {
conn, err := listener.Accept()
if err != nil {
// Check if listener was closed (ErrNetClosing is not exported, check string or type)
// Simply returning on error is usually fine for this use case
return
}
go server.ServeConn(conn)
}
}()
return listener, nil
}
// NewControlClient creates a new RPC client connected to the control server.
func NewControlClient() (*rpc.Client, error) {
sockPath, err := getControlSocketPath()
if err != nil {
return nil, err
}
// Use DialTimeout to fail fast if server is not running
conn, err := net.DialTimeout("unix", sockPath, 2*time.Second)
if err != nil {
return nil, fmt.Errorf("failed to connect to control socket (is 'node serve' running?): %w", err)
}
return rpc.NewClient(conn), nil
}
// getControlSocketPath returns the path to the control socket.
func getControlSocketPath() (string, error) {
return xdg.RuntimeFile("lethean-desktop/node.sock")
}

View file

@ -30,14 +30,11 @@ func NewController(node *NodeManager, peers *PeerRegistry, transport *Transport)
pending: make(map[string]chan *Message),
}
// Register message handler for responses
transport.OnMessage(c.handleResponse)
return c
}
// handleResponse processes incoming messages that are responses to our requests.
func (c *Controller) handleResponse(conn *PeerConnection, msg *Message) {
// HandleResponse processes incoming messages that are responses to our requests.
func (c *Controller) HandleResponse(conn *PeerConnection, msg *Message) {
if msg.ReplyTo == "" {
return // Not a response, let worker handle it
}

View file

@ -1,34 +0,0 @@
// pkg/node/dispatcher.go
func (n *NodeManager) DispatchUEPS(pkt *ueps.ParsedPacket) error {
// 1. The "Threat" Circuit Breaker (L5 Guard)
if pkt.Header.ThreatScore > 50000 {
// High threat? Drop it. Don't even parse the payload.
// This protects the Agent from "semantic viruses"
return fmt.Errorf("packet rejected: threat score %d exceeds safety limit", pkt.Header.ThreatScore)
}
// 2. The "Intent" Router (L9 Semantic)
switch pkt.Header.IntentID {
case 0x01: // Handshake / Hello
return n.handleHandshake(pkt)
case 0x20: // Compute / Job Request
// "Hey, can you run this Docker container?"
// Check local resources first (Self-Validation)
return n.handleComputeRequest(pkt.Payload)
case 0x30: // Rehab / Intervention
// "Violet says you are hallucinating. Pause execution."
// This is the "Benevolent Intervention" Axiom.
return n.enterRehabMode(pkt.Payload)
case 0xFF: // Extended / Custom
// Check the payload for specific sub-protocols (e.g. your JSON blobs)
return n.handleApplicationData(pkt.Payload)
default:
return fmt.Errorf("unknown intent ID: 0x%X", pkt.Header.IntentID)
}
}

View file

@ -0,0 +1,29 @@
package node
// MessageDispatcher routes messages to either Controller or Worker
type MessageDispatcher struct {
controller *Controller
worker *Worker
}
// NewMessageDispatcher creates a new MessageDispatcher
func NewMessageDispatcher(controller *Controller, worker *Worker) *MessageDispatcher {
return &MessageDispatcher{
controller: controller,
worker: worker,
}
}
// Dispatch routes incoming messages to the appropriate handler
func (d *MessageDispatcher) Dispatch(conn *PeerConnection, msg *Message) {
// If ReplyTo is set, it's a response to a request initiated by Controller
if msg.ReplyTo != "" && d.controller != nil {
d.controller.HandleResponse(conn, msg)
return
}
// Otherwise treat it as a new request for Worker
if d.worker != nil {
d.worker.HandleMessage(conn, msg)
}
}

View file

@ -706,23 +706,23 @@ func (r *PeerRegistry) load() error {
}
// Example usage inside a connection handler
func (n *NodeManager) SendEthicalPacket(peerID string, intent uint8, data []byte) error {
// 1. Get the shared secret for this specific peer (derived from ECDH)
secret, err := n.DeriveSharedSecret(peerID)
if err != nil {
return err
}
// 2. Construct the UEPS frame
// Intent 0x20 = e.g., "Distributed Compute"
pkt := ueps.NewBuilder(intent, data)
// 3. Seal it
wireBytes, err := pkt.MarshalAndSign(secret)
if err != nil {
return err
}
// 4. Send wireBytes over your TCP connection...
return nil
}
// func (n *NodeManager) SendEthicalPacket(peerID string, intent uint8, data []byte) error {
// // 1. Get the shared secret for this specific peer (derived from ECDH)
// secret, err := n.DeriveSharedSecret(peerID)
// if err != nil {
// return err
// }
//
// // 2. Construct the UEPS frame
// // Intent 0x20 = e.g., "Distributed Compute"
// pkt := ueps.NewBuilder(intent, data)
//
// // 3. Seal it
// wireBytes, err := pkt.MarshalAndSign(secret)
// if err != nil {
// return err
// }
//
// // 4. Send wireBytes over your TCP connection...
// return nil
// }