Merge 492e68761f into bce309b78d
This commit is contained in:
commit
575c9c8baa
12 changed files with 243 additions and 70 deletions
|
|
@ -150,9 +150,24 @@ This allows other nodes to connect, send commands, and receive stats.`,
|
||||||
|
|
||||||
transport := node.NewTransport(nm, pr, config)
|
transport := node.NewTransport(nm, pr, config)
|
||||||
|
|
||||||
|
// Create controller for outgoing requests
|
||||||
|
controller := node.NewController(nm, pr, transport)
|
||||||
|
|
||||||
// Create worker to handle incoming messages
|
// Create worker to handle incoming messages
|
||||||
worker := node.NewWorker(nm, transport)
|
worker := node.NewWorker(nm, transport)
|
||||||
worker.RegisterWithTransport()
|
|
||||||
|
// Use dispatcher to route messages to controller or worker
|
||||||
|
dispatcher := node.NewMessageDispatcher(controller, worker)
|
||||||
|
transport.OnMessage(dispatcher.Dispatch)
|
||||||
|
|
||||||
|
// Start IPC control server
|
||||||
|
controlListener, err := node.StartControlServer(controller)
|
||||||
|
if err != nil {
|
||||||
|
// Log error but continue (fail soft for control server)
|
||||||
|
fmt.Printf("Warning: failed to start control server: %v\n", err)
|
||||||
|
} else {
|
||||||
|
defer controlListener.Close()
|
||||||
|
}
|
||||||
|
|
||||||
if err := transport.Start(); err != nil {
|
if err := transport.Start(); err != nil {
|
||||||
return fmt.Errorf("failed to start transport: %w", err)
|
return fmt.Errorf("failed to start transport: %w", err)
|
||||||
|
|
@ -162,6 +177,9 @@ This allows other nodes to connect, send commands, and receive stats.`,
|
||||||
fmt.Printf("P2P server started on %s\n", config.ListenAddr)
|
fmt.Printf("P2P server started on %s\n", config.ListenAddr)
|
||||||
fmt.Printf("Node ID: %s (%s)\n", identity.ID, identity.Name)
|
fmt.Printf("Node ID: %s (%s)\n", identity.ID, identity.Name)
|
||||||
fmt.Printf("Role: %s\n", identity.Role)
|
fmt.Printf("Role: %s\n", identity.Role)
|
||||||
|
if controlListener != nil {
|
||||||
|
fmt.Printf("Control Socket: %s\n", controlListener.Addr())
|
||||||
|
}
|
||||||
fmt.Println()
|
fmt.Println()
|
||||||
fmt.Println("Press Ctrl+C to stop...")
|
fmt.Println("Press Ctrl+C to stop...")
|
||||||
|
|
||||||
|
|
@ -173,6 +191,10 @@ This allows other nodes to connect, send commands, and receive stats.`,
|
||||||
sig := <-sigChan
|
sig := <-sigChan
|
||||||
fmt.Printf("\nReceived signal %v, shutting down...\n", sig)
|
fmt.Printf("\nReceived signal %v, shutting down...\n", sig)
|
||||||
|
|
||||||
|
if controlListener != nil {
|
||||||
|
controlListener.Close()
|
||||||
|
}
|
||||||
|
|
||||||
// Graceful shutdown: stop transport and cleanup resources
|
// Graceful shutdown: stop transport and cleanup resources
|
||||||
if err := transport.Stop(); err != nil {
|
if err := transport.Stop(); err != nil {
|
||||||
fmt.Printf("Warning: error during transport shutdown: %v\n", err)
|
fmt.Printf("Warning: error during transport shutdown: %v\n", err)
|
||||||
|
|
|
||||||
|
|
@ -149,13 +149,24 @@ var peerPingCmd = &cobra.Command{
|
||||||
return fmt.Errorf("peer not found: %s", peerID)
|
return fmt.Errorf("peer not found: %s", peerID)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !peer.Connected {
|
// Connect to the control service of the running node
|
||||||
return fmt.Errorf("peer not connected: %s", peer.Name)
|
client, err := node.NewControlClient()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to connect to node: %w", err)
|
||||||
}
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
fmt.Printf("Pinging %s (%s)...\n", peer.Name, peer.Address)
|
fmt.Printf("Pinging %s (%s)...\n", peer.Name, peer.Address)
|
||||||
// TODO: Actually send ping via transport
|
|
||||||
fmt.Println("Ping functionality requires active connection via 'node serve'")
|
// Call the Ping method via RPC
|
||||||
|
pingArgs := &node.PingArgs{PeerID: peer.ID}
|
||||||
|
var reply node.PingReply
|
||||||
|
|
||||||
|
if err := client.Call("ControlService.Ping", pingArgs, &reply); err != nil {
|
||||||
|
return fmt.Errorf("ping failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("Reply from %s: time=%.1f ms\n", peer.Name, reply.LatencyMS)
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -18,8 +18,22 @@ list(FILTER MINER_SOURCES EXCLUDE REGEX ".*_riscv.*")
|
||||||
if (WIN32)
|
if (WIN32)
|
||||||
list(FILTER MINER_SOURCES EXCLUDE REGEX ".*_unix\\.cpp$")
|
list(FILTER MINER_SOURCES EXCLUDE REGEX ".*_unix\\.cpp$")
|
||||||
list(FILTER MINER_SOURCES EXCLUDE REGEX ".*_linux\\.cpp$")
|
list(FILTER MINER_SOURCES EXCLUDE REGEX ".*_linux\\.cpp$")
|
||||||
|
list(FILTER MINER_SOURCES EXCLUDE REGEX ".*_mac\\.cpp$")
|
||||||
|
elseif (APPLE)
|
||||||
|
list(FILTER MINER_SOURCES EXCLUDE REGEX ".*_win\\.cpp$")
|
||||||
|
list(FILTER MINER_SOURCES EXCLUDE REGEX ".*_linux\\.cpp$")
|
||||||
else()
|
else()
|
||||||
list(FILTER MINER_SOURCES EXCLUDE REGEX ".*_win\\.cpp$")
|
list(FILTER MINER_SOURCES EXCLUDE REGEX ".*_win\\.cpp$")
|
||||||
|
list(FILTER MINER_SOURCES EXCLUDE REGEX ".*_mac\\.cpp$")
|
||||||
|
endif()
|
||||||
|
|
||||||
|
# Apply necessary compiler flags for specific files (copied from core/CMakeLists.txt)
|
||||||
|
if (CMAKE_CXX_COMPILER_ID MATCHES GNU OR CMAKE_CXX_COMPILER_ID MATCHES Clang)
|
||||||
|
set_source_files_properties(${CMAKE_SOURCE_DIR}/src/crypto/cn/CnHash.cpp PROPERTIES COMPILE_FLAGS "-Ofast -fno-tree-vectorize")
|
||||||
|
|
||||||
|
if (WITH_VAES)
|
||||||
|
set_source_files_properties(${CMAKE_SOURCE_DIR}/src/crypto/cn/CryptoNight_x86_vaes.cpp PROPERTIES COMPILE_FLAGS "-Ofast -fno-tree-vectorize -mavx2 -mvaes")
|
||||||
|
endif()
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
# Create a library with common test utilities and miner components
|
# Create a library with common test utilities and miner components
|
||||||
|
|
|
||||||
|
|
@ -162,7 +162,7 @@ if (CMAKE_SYSTEM_PROCESSOR MATCHES "^(x86_64|AMD64)$" AND CMAKE_SIZEOF_VOID_P EQ
|
||||||
add_definitions(/DRAPIDJSON_SSE2)
|
add_definitions(/DRAPIDJSON_SSE2)
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/cmake")
|
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/../core/cmake")
|
||||||
|
|
||||||
find_package(UV REQUIRED)
|
find_package(UV REQUIRED)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -151,7 +151,14 @@ func TestGetMiner_Good(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Case 1: Get an existing miner
|
// Case 1: Get an existing miner
|
||||||
startedMiner, _ := m.StartMiner(context.Background(), "xmrig", config)
|
startedMiner, err := m.StartMiner(context.Background(), "xmrig", config)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to start miner: %v", err)
|
||||||
|
}
|
||||||
|
if startedMiner == nil {
|
||||||
|
t.Fatal("StartMiner returned nil miner")
|
||||||
|
}
|
||||||
|
|
||||||
retrievedMiner, err := m.GetMiner(startedMiner.GetName())
|
retrievedMiner, err := m.GetMiner(startedMiner.GetName())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Expected to get miner, but got error: %v", err)
|
t.Fatalf("Expected to get miner, but got error: %v", err)
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@ package mining
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/rand"
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
@ -203,9 +204,12 @@ func requestIDMiddleware() gin.HandlerFunc {
|
||||||
|
|
||||||
// generateRequestID creates a unique request ID using timestamp and random bytes
|
// generateRequestID creates a unique request ID using timestamp and random bytes
|
||||||
func generateRequestID() string {
|
func generateRequestID() string {
|
||||||
b := make([]byte, 8)
|
b := make([]byte, 4)
|
||||||
_, _ = base64.StdEncoding.Decode(b, []byte(fmt.Sprintf("%d", time.Now().UnixNano())))
|
if _, err := rand.Read(b); err != nil {
|
||||||
return fmt.Sprintf("%d-%x", time.Now().UnixMilli(), b[:4])
|
// Fallback if random source fails (unlikely)
|
||||||
|
return fmt.Sprintf("%d", time.Now().UnixNano())
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("%d-%x", time.Now().UnixMilli(), b)
|
||||||
}
|
}
|
||||||
|
|
||||||
// getRequestID extracts the request ID from gin context
|
// getRequestID extracts the request ID from gin context
|
||||||
|
|
|
||||||
124
pkg/node/control.go
Normal file
124
pkg/node/control.go
Normal file
|
|
@ -0,0 +1,124 @@
|
||||||
|
package node
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"net/rpc"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/Snider/Mining/pkg/logging"
|
||||||
|
"github.com/adrg/xdg"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ControlService exposes node commands via RPC.
|
||||||
|
type ControlService struct {
|
||||||
|
controller *Controller
|
||||||
|
}
|
||||||
|
|
||||||
|
// PingArgs represents arguments for the Ping command.
|
||||||
|
type PingArgs struct {
|
||||||
|
PeerID string
|
||||||
|
}
|
||||||
|
|
||||||
|
// PingReply represents the response from the Ping command.
|
||||||
|
type PingReply struct {
|
||||||
|
LatencyMS float64
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ping sends a ping to the specified peer.
|
||||||
|
func (s *ControlService) Ping(args *PingArgs, reply *PingReply) error {
|
||||||
|
if s.controller == nil {
|
||||||
|
return fmt.Errorf("controller not initialized")
|
||||||
|
}
|
||||||
|
|
||||||
|
latency, err := s.controller.PingPeer(args.PeerID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
reply.LatencyMS = latency
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// StartControlServer starts the RPC server on a Unix socket.
|
||||||
|
// Returns the listener, which should be closed by the caller.
|
||||||
|
func StartControlServer(controller *Controller) (net.Listener, error) {
|
||||||
|
service := &ControlService{controller: controller}
|
||||||
|
server := rpc.NewServer()
|
||||||
|
if err := server.Register(service); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to register control service: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
sockPath, err := getControlSocketPath()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove stale socket file if it exists
|
||||||
|
if _, err := os.Stat(sockPath); err == nil {
|
||||||
|
// Try to connect to see if it's active
|
||||||
|
if conn, err := net.DialTimeout("unix", sockPath, 100*time.Millisecond); err == nil {
|
||||||
|
conn.Close()
|
||||||
|
return nil, fmt.Errorf("control socket already in use: %s", sockPath)
|
||||||
|
}
|
||||||
|
// Not active, remove it
|
||||||
|
if err := os.Remove(sockPath); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to remove stale socket: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure directory exists
|
||||||
|
if err := os.MkdirAll(filepath.Dir(sockPath), 0755); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to create socket directory: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
listener, err := net.Listen("unix", sockPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to listen on socket %s: %w", sockPath, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set permissions so only user can access
|
||||||
|
if err := os.Chmod(sockPath, 0600); err != nil {
|
||||||
|
listener.Close()
|
||||||
|
return nil, fmt.Errorf("failed to set socket permissions: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
logging.Info("control server started", logging.Fields{"address": sockPath})
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
conn, err := listener.Accept()
|
||||||
|
if err != nil {
|
||||||
|
// Check if listener was closed (ErrNetClosing is not exported, check string or type)
|
||||||
|
// Simply returning on error is usually fine for this use case
|
||||||
|
return
|
||||||
|
}
|
||||||
|
go server.ServeConn(conn)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return listener, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewControlClient creates a new RPC client connected to the control server.
|
||||||
|
func NewControlClient() (*rpc.Client, error) {
|
||||||
|
sockPath, err := getControlSocketPath()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use DialTimeout to fail fast if server is not running
|
||||||
|
conn, err := net.DialTimeout("unix", sockPath, 2*time.Second)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to connect to control socket (is 'node serve' running?): %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return rpc.NewClient(conn), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// getControlSocketPath returns the path to the control socket.
|
||||||
|
func getControlSocketPath() (string, error) {
|
||||||
|
return xdg.RuntimeFile("lethean-desktop/node.sock")
|
||||||
|
}
|
||||||
|
|
@ -30,14 +30,11 @@ func NewController(node *NodeManager, peers *PeerRegistry, transport *Transport)
|
||||||
pending: make(map[string]chan *Message),
|
pending: make(map[string]chan *Message),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Register message handler for responses
|
|
||||||
transport.OnMessage(c.handleResponse)
|
|
||||||
|
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleResponse processes incoming messages that are responses to our requests.
|
// HandleResponse processes incoming messages that are responses to our requests.
|
||||||
func (c *Controller) handleResponse(conn *PeerConnection, msg *Message) {
|
func (c *Controller) HandleResponse(conn *PeerConnection, msg *Message) {
|
||||||
if msg.ReplyTo == "" {
|
if msg.ReplyTo == "" {
|
||||||
return // Not a response, let worker handle it
|
return // Not a response, let worker handle it
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,34 +0,0 @@
|
||||||
// pkg/node/dispatcher.go
|
|
||||||
|
|
||||||
func (n *NodeManager) DispatchUEPS(pkt *ueps.ParsedPacket) error {
|
|
||||||
// 1. The "Threat" Circuit Breaker (L5 Guard)
|
|
||||||
if pkt.Header.ThreatScore > 50000 {
|
|
||||||
// High threat? Drop it. Don't even parse the payload.
|
|
||||||
// This protects the Agent from "semantic viruses"
|
|
||||||
return fmt.Errorf("packet rejected: threat score %d exceeds safety limit", pkt.Header.ThreatScore)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 2. The "Intent" Router (L9 Semantic)
|
|
||||||
switch pkt.Header.IntentID {
|
|
||||||
|
|
||||||
case 0x01: // Handshake / Hello
|
|
||||||
return n.handleHandshake(pkt)
|
|
||||||
|
|
||||||
case 0x20: // Compute / Job Request
|
|
||||||
// "Hey, can you run this Docker container?"
|
|
||||||
// Check local resources first (Self-Validation)
|
|
||||||
return n.handleComputeRequest(pkt.Payload)
|
|
||||||
|
|
||||||
case 0x30: // Rehab / Intervention
|
|
||||||
// "Violet says you are hallucinating. Pause execution."
|
|
||||||
// This is the "Benevolent Intervention" Axiom.
|
|
||||||
return n.enterRehabMode(pkt.Payload)
|
|
||||||
|
|
||||||
case 0xFF: // Extended / Custom
|
|
||||||
// Check the payload for specific sub-protocols (e.g. your JSON blobs)
|
|
||||||
return n.handleApplicationData(pkt.Payload)
|
|
||||||
|
|
||||||
default:
|
|
||||||
return fmt.Errorf("unknown intent ID: 0x%X", pkt.Header.IntentID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
29
pkg/node/msg_dispatcher.go
Normal file
29
pkg/node/msg_dispatcher.go
Normal file
|
|
@ -0,0 +1,29 @@
|
||||||
|
package node
|
||||||
|
|
||||||
|
// MessageDispatcher routes messages to either Controller or Worker
|
||||||
|
type MessageDispatcher struct {
|
||||||
|
controller *Controller
|
||||||
|
worker *Worker
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewMessageDispatcher creates a new MessageDispatcher
|
||||||
|
func NewMessageDispatcher(controller *Controller, worker *Worker) *MessageDispatcher {
|
||||||
|
return &MessageDispatcher{
|
||||||
|
controller: controller,
|
||||||
|
worker: worker,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Dispatch routes incoming messages to the appropriate handler
|
||||||
|
func (d *MessageDispatcher) Dispatch(conn *PeerConnection, msg *Message) {
|
||||||
|
// If ReplyTo is set, it's a response to a request initiated by Controller
|
||||||
|
if msg.ReplyTo != "" && d.controller != nil {
|
||||||
|
d.controller.HandleResponse(conn, msg)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Otherwise treat it as a new request for Worker
|
||||||
|
if d.worker != nil {
|
||||||
|
d.worker.HandleMessage(conn, msg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -706,23 +706,23 @@ func (r *PeerRegistry) load() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Example usage inside a connection handler
|
// Example usage inside a connection handler
|
||||||
func (n *NodeManager) SendEthicalPacket(peerID string, intent uint8, data []byte) error {
|
// func (n *NodeManager) SendEthicalPacket(peerID string, intent uint8, data []byte) error {
|
||||||
// 1. Get the shared secret for this specific peer (derived from ECDH)
|
// // 1. Get the shared secret for this specific peer (derived from ECDH)
|
||||||
secret, err := n.DeriveSharedSecret(peerID)
|
// secret, err := n.DeriveSharedSecret(peerID)
|
||||||
if err != nil {
|
// if err != nil {
|
||||||
return err
|
// return err
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
// 2. Construct the UEPS frame
|
// // 2. Construct the UEPS frame
|
||||||
// Intent 0x20 = e.g., "Distributed Compute"
|
// // Intent 0x20 = e.g., "Distributed Compute"
|
||||||
pkt := ueps.NewBuilder(intent, data)
|
// pkt := ueps.NewBuilder(intent, data)
|
||||||
|
//
|
||||||
// 3. Seal it
|
// // 3. Seal it
|
||||||
wireBytes, err := pkt.MarshalAndSign(secret)
|
// wireBytes, err := pkt.MarshalAndSign(secret)
|
||||||
if err != nil {
|
// if err != nil {
|
||||||
return err
|
// return err
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
// 4. Send wireBytes over your TCP connection...
|
// // 4. Send wireBytes over your TCP connection...
|
||||||
return nil
|
// return nil
|
||||||
}
|
// }
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,6 @@ import (
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"io"
|
"io"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue