Mining/pkg/node/message.go
Claude aba728ff83
Some checks are pending
Security Scan / security (push) Waiting to run
Test / test (push) Waiting to run
ax(node): rename msg to errorMessage in NewErrorMessage
AX Principle 1: Predictable names over short names.
`msg` is an abbreviation; `errorMessage` names what it is.

Co-Authored-By: Charon <charon@lethean.io>
2026-04-02 17:55:08 +01:00

267 lines
9.1 KiB
Go

package node
import (
"encoding/json"
"time"
"github.com/google/uuid"
)
// if payload.Version != ProtocolVersion { return errVersionMismatch }
// if payload.Version < MinProtocolVersion { return errUnsupportedVersion }
const (
ProtocolVersion = "1.0"
MinProtocolVersion = "1.0"
)
// if !IsProtocolVersionSupported(peer.Version) { return errUnsupportedVersion }
var SupportedProtocolVersions = []string{"1.0"}
// joinVersions(SupportedProtocolVersions) // "1.0, 1.1"
func joinVersions(versions []string) string {
result := ""
for i, version := range versions {
if i > 0 {
result += ", "
}
result += version
}
return result
}
// if node.IsProtocolVersionSupported(peerVersion) { proceed() }
// if !node.IsProtocolVersionSupported("2.0") { return errUnsupportedVersion }
func IsProtocolVersionSupported(version string) bool {
for _, supported := range SupportedProtocolVersions {
if supported == version {
return true
}
}
return false
}
// msg, err := NewMessage(MsgPing, identity.ID, peer.ID, PingPayload{SentAt: time.Now().UnixMilli()})
// if msg.Type == MsgHandshake { handleHandshake(msg) }
type MessageType string
const (
// Connection lifecycle
MsgHandshake MessageType = "handshake"
MsgHandshakeAck MessageType = "handshake_ack"
MsgPing MessageType = "ping"
MsgPong MessageType = "pong"
MsgDisconnect MessageType = "disconnect"
// Miner operations
MsgGetStats MessageType = "get_stats"
MsgStats MessageType = "stats"
MsgStartMiner MessageType = "start_miner"
MsgStopMiner MessageType = "stop_miner"
MsgMinerAck MessageType = "miner_ack"
// Deployment
MsgDeploy MessageType = "deploy"
MsgDeployAck MessageType = "deploy_ack"
// Logs
MsgGetLogs MessageType = "get_logs"
MsgLogs MessageType = "logs"
// Error response
MsgError MessageType = "error"
)
// msg, err := NewMessage(MsgPing, identity.ID, peer.ID, PingPayload{SentAt: time.Now().UnixMilli()})
// reply, err := msg.Reply(MsgPong, PongPayload{SentAt: msg.Payload.SentAt, ReceivedAt: time.Now().UnixMilli()})
type Message struct {
ID string `json:"id"` // UUID
Type MessageType `json:"type"`
From string `json:"from"` // Sender node ID
To string `json:"to"` // Recipient node ID (empty for broadcast)
Timestamp time.Time `json:"timestamp"`
Payload json.RawMessage `json:"payload"`
ReplyTo string `json:"replyTo,omitempty"` // ID of message being replied to
}
// msg, err := NewMessage(MsgPing, identity.ID, peer.ID, PingPayload{SentAt: time.Now().UnixMilli()})
// if err != nil { return 0, err }
func NewMessage(msgType MessageType, from, to string, payload interface{}) (*Message, error) {
var payloadBytes json.RawMessage
if payload != nil {
data, err := MarshalJSON(payload)
if err != nil {
return nil, err
}
payloadBytes = data
}
return &Message{
ID: uuid.New().String(),
Type: msgType,
From: from,
To: to,
Timestamp: time.Now(),
Payload: payloadBytes,
}, nil
}
// response, err := msg.Reply(MsgPong, PongPayload{SentAt: ping.SentAt, ReceivedAt: time.Now().UnixMilli()})
// if err != nil { return nil, err }
func (message *Message) Reply(msgType MessageType, payload interface{}) (*Message, error) {
reply, err := NewMessage(msgType, message.To, message.From, payload)
if err != nil {
return nil, err
}
reply.ReplyTo = message.ID
return reply, nil
}
// var ping PingPayload
// if err := msg.ParsePayload(&ping); err != nil { return nil, err }
func (message *Message) ParsePayload(target interface{}) error {
if message.Payload == nil {
return nil
}
return UnmarshalJSON(message.Payload, target)
}
// --- Payload Types ---
// payload := HandshakePayload{Identity: *identity, Challenge: challenge, Version: ProtocolVersion}
// msg, err := NewMessage(MsgHandshake, identity.ID, peer.ID, payload)
type HandshakePayload struct {
Identity NodeIdentity `json:"identity"`
Challenge []byte `json:"challenge,omitempty"` // Random bytes for auth
Version string `json:"version"` // Protocol version
}
// ack := HandshakeAckPayload{Identity: *identity, ChallengeResponse: sig, Accepted: true}
// ack := HandshakeAckPayload{Identity: *identity, Accepted: false, Reason: "peer not authorized"}
type HandshakeAckPayload struct {
Identity NodeIdentity `json:"identity"`
ChallengeResponse []byte `json:"challengeResponse,omitempty"`
Accepted bool `json:"accepted"`
Reason string `json:"reason,omitempty"` // If not accepted
}
// payload := PingPayload{SentAt: time.Now().UnixMilli()}
// msg, err := NewMessage(MsgPing, identity.ID, peer.ID, payload)
type PingPayload struct {
SentAt int64 `json:"sentAt"` // Unix timestamp in milliseconds
}
// pong := PongPayload{SentAt: ping.SentAt, ReceivedAt: time.Now().UnixMilli()}
// return msg.Reply(MsgPong, pong)
type PongPayload struct {
SentAt int64 `json:"sentAt"` // Echo of ping's sentAt
ReceivedAt int64 `json:"receivedAt"` // When ping was received
}
// payload := StartMinerPayload{MinerType: "xmrig", ProfileID: "profile-1"}
// payload := StartMinerPayload{MinerType: "xmrig", Config: json.RawMessage(`{"pool":"stratum+tcp://pool.lthn.io:3333"}`)}
type StartMinerPayload struct {
MinerType string `json:"minerType"` // Required: miner type (e.g., "xmrig", "tt-miner")
ProfileID string `json:"profileId,omitempty"`
Config json.RawMessage `json:"config,omitempty"` // Override profile config
}
// payload := StopMinerPayload{MinerName: "xmrig"}
// msg, err := NewMessage(MsgStopMiner, identity.ID, peer.ID, payload)
type StopMinerPayload struct {
MinerName string `json:"minerName"`
}
// ack := MinerAckPayload{Success: true, MinerName: miner.GetName()}
// ack := MinerAckPayload{Success: false, Error: err.Error()}
type MinerAckPayload struct {
Success bool `json:"success"`
MinerName string `json:"minerName,omitempty"`
Error string `json:"error,omitempty"`
}
// item := MinerStatsItem{Name: "xmrig", Type: "xmrig", Hashrate: 1234.5, Pool: "stratum+tcp://pool.lthn.io:3333"}
type MinerStatsItem struct {
Name string `json:"name"`
Type string `json:"type"`
Hashrate float64 `json:"hashrate"`
Shares int `json:"shares"`
Rejected int `json:"rejected"`
Uptime int `json:"uptime"` // Seconds
Pool string `json:"pool"`
Algorithm string `json:"algorithm"`
CPUThreads int `json:"cpuThreads,omitempty"`
}
// stats := StatsPayload{NodeID: identity.ID, NodeName: identity.Name, Miners: items, Uptime: int64(time.Since(start).Seconds())}
// return msg.Reply(MsgStats, stats)
type StatsPayload struct {
NodeID string `json:"nodeId"`
NodeName string `json:"nodeName"`
Miners []MinerStatsItem `json:"miners"`
Uptime int64 `json:"uptime"` // Node uptime in seconds
}
// payload := GetLogsPayload{MinerName: "xmrig", Lines: 100}
// msg, err := NewMessage(MsgGetLogs, identity.ID, peer.ID, payload)
type GetLogsPayload struct {
MinerName string `json:"minerName"`
Lines int `json:"lines"` // Number of lines to fetch
Since int64 `json:"since,omitempty"` // Unix timestamp, logs after this time
}
// logs := LogsPayload{MinerName: "xmrig", Lines: lines, HasMore: len(lines) >= requested}
// return msg.Reply(MsgLogs, logs)
type LogsPayload struct {
MinerName string `json:"minerName"`
Lines []string `json:"lines"`
HasMore bool `json:"hasMore"` // More logs available
}
// payload := DeployPayload{BundleType: "profile", Data: encryptedBytes, Checksum: sha256hex, Name: "pool-main"}
// msg, err := NewMessage(MsgDeploy, identity.ID, peer.ID, payload)
type DeployPayload struct {
BundleType string `json:"type"` // "profile" | "miner" | "full"
Data []byte `json:"data"` // STIM-encrypted bundle
Checksum string `json:"checksum"` // SHA-256 of Data
Name string `json:"name"` // Profile or miner name
}
// ack := DeployAckPayload{Success: true, Name: payload.Name}
// ack := DeployAckPayload{Success: false, Name: payload.Name, Error: err.Error()}
type DeployAckPayload struct {
Success bool `json:"success"`
Name string `json:"name,omitempty"`
Error string `json:"error,omitempty"`
}
// payload := ErrorPayload{Code: ErrCodeOperationFailed, Message: "miner not found", Details: minerName}
// msg, err := NewMessage(MsgError, from, to, payload)
type ErrorPayload struct {
Code int `json:"code"`
Message string `json:"message"`
Details string `json:"details,omitempty"`
}
// Common error codes
const (
ErrCodeUnknown = 1000
ErrCodeInvalidMessage = 1001
ErrCodeUnauthorized = 1002
ErrCodeNotFound = 1003
ErrCodeOperationFailed = 1004
ErrCodeTimeout = 1005
)
// errorMessage, _ := NewErrorMessage(identity.ID, msg.From, ErrCodeOperationFailed, err.Error(), msg.ID)
// conn.Send(errorMessage)
func NewErrorMessage(from, to string, code int, message string, replyTo string) (*Message, error) {
errorMessage, err := NewMessage(MsgError, from, to, ErrorPayload{
Code: code,
Message: message,
})
if err != nil {
return nil, err
}
errorMessage.ReplyTo = replyTo
return errorMessage, nil
}