go-p2p/node/message.go
Virgil e3b66f7e8c refactor(node): align remaining AX naming and examples
Co-Authored-By: Virgil <virgil@lethean.io>
2026-03-31 15:39:28 +01:00

259 lines
7.7 KiB
Go

package node
import (
"slices"
"time"
core "dappco.re/go/core"
"github.com/google/uuid"
)
const (
// version := ProtocolVersion
ProtocolVersion = "1.0"
// minimumVersion := MinProtocolVersion
MinProtocolVersion = "1.0"
)
// versions := SupportedProtocolVersions
var SupportedProtocolVersions = []string{"1.0"}
// payload := RawMessage(`{"pool":"pool.example.com:3333"}`)
type RawMessage []byte
// data, err := RawMessage(`{"ok":true}`).MarshalJSON()
func (m RawMessage) MarshalJSON() ([]byte, error) {
if m == nil {
return []byte("null"), nil
}
return m, nil
}
// var payload RawMessage
// _ = payload.UnmarshalJSON([]byte(`{"ok":true}`))
func (m *RawMessage) UnmarshalJSON(data []byte) error {
if m == nil {
return core.E("node.RawMessage.UnmarshalJSON", "raw message target is nil", nil)
}
*m = append((*m)[:0], data...)
return nil
}
// ok := IsProtocolVersionSupported("1.0")
func IsProtocolVersionSupported(version string) bool {
return slices.Contains(SupportedProtocolVersions, version)
}
// messageType := MessagePing
type MessageType string
const (
// Connection lifecycle
MessageHandshake MessageType = "handshake"
MessageHandshakeAck MessageType = "handshake_ack"
MessagePing MessageType = "ping"
MessagePong MessageType = "pong"
MessageDisconnect MessageType = "disconnect"
// Miner operations
MessageGetStats MessageType = "get_stats"
MessageStats MessageType = "stats"
MessageStartMiner MessageType = "start_miner"
MessageStopMiner MessageType = "stop_miner"
MessageMinerAck MessageType = "miner_ack"
// Deployment
MessageDeploy MessageType = "deploy"
MessageDeployAck MessageType = "deploy_ack"
// Logs
MessageGetLogs MessageType = "get_logs"
MessageLogs MessageType = "logs"
// Error response
MessageError MessageType = "error"
)
// message, err := NewMessage(MessagePing, "controller", "worker", PingPayload{SentAt: time.Now().UnixMilli()})
type Message struct {
ID string `json:"id"` // UUID
Type MessageType `json:"type"`
From string `json:"from"` // Sender node ID
To string `json:"to"` // Recipient node ID (empty for broadcast)
Timestamp time.Time `json:"ts"`
Payload RawMessage `json:"payload"`
ReplyTo string `json:"replyTo,omitempty"` // ID of message being replied to
}
// message, err := NewMessage(MessagePing, "controller", "worker-1", PingPayload{SentAt: 42})
func NewMessage(messageType MessageType, from, to string, payload any) (*Message, error) {
var payloadBytes RawMessage
if payload != nil {
data, err := MarshalJSON(payload)
if err != nil {
return nil, err
}
payloadBytes = RawMessage(data)
}
return &Message{
ID: uuid.New().String(),
Type: messageType,
From: from,
To: to,
Timestamp: time.Now(),
Payload: payloadBytes,
}, nil
}
// reply, err := message.Reply(MessagePong, PongPayload{SentAt: 42, ReceivedAt: 43})
func (m *Message) Reply(messageType MessageType, payload any) (*Message, error) {
reply, err := NewMessage(messageType, m.To, m.From, payload)
if err != nil {
return nil, err
}
reply.ReplyTo = m.ID
return reply, nil
}
// var ping PingPayload
// err := message.ParsePayload(&ping)
func (m *Message) ParsePayload(target any) error {
if m.Payload == nil {
return nil
}
result := core.JSONUnmarshal(m.Payload, target)
if !result.OK {
return result.Value.(error)
}
return nil
}
// --- Payload Types ---
// payload := HandshakePayload{Identity: NodeIdentity{Name: "worker-1"}, Version: ProtocolVersion}
type HandshakePayload struct {
Identity NodeIdentity `json:"identity"`
Challenge []byte `json:"challenge,omitempty"` // Random bytes for auth
Version string `json:"version"` // Protocol version
}
// ack := HandshakeAckPayload{Accepted: true}
type HandshakeAckPayload struct {
Identity NodeIdentity `json:"identity"`
ChallengeResponse []byte `json:"challengeResponse,omitempty"`
Accepted bool `json:"accepted"`
Reason string `json:"reason,omitempty"` // If not accepted
}
// payload := PingPayload{SentAt: 42}
type PingPayload struct {
SentAt int64 `json:"sentAt"` // Unix timestamp in milliseconds
}
// payload := PongPayload{SentAt: 42, ReceivedAt: 43}
type PongPayload struct {
SentAt int64 `json:"sentAt"` // Echo of ping's sentAt
ReceivedAt int64 `json:"receivedAt"` // When ping was received
}
// payload := StartMinerPayload{MinerType: "xmrig"}
type StartMinerPayload struct {
MinerType string `json:"minerType"` // Required: miner type (e.g., "xmrig", "tt-miner")
ProfileID string `json:"profileId,omitempty"`
Config RawMessage `json:"config,omitempty"` // Override profile config
}
// payload := StopMinerPayload{MinerName: "xmrig-0"}
type StopMinerPayload struct {
MinerName string `json:"minerName"`
}
// ack := MinerAckPayload{Success: true, MinerName: "xmrig-0"}
type MinerAckPayload struct {
Success bool `json:"success"`
MinerName string `json:"minerName,omitempty"`
Error string `json:"error,omitempty"`
}
// miner := MinerStatsItem{Name: "xmrig-0", Hashrate: 1200}
type MinerStatsItem struct {
Name string `json:"name"`
Type string `json:"type"`
Hashrate float64 `json:"hashrate"`
Shares int `json:"shares"`
Rejected int `json:"rejected"`
Uptime int `json:"uptime"` // Seconds
Pool string `json:"pool"`
Algorithm string `json:"algorithm"`
CPUThreads int `json:"cpuThreads,omitempty"`
}
// stats := StatsPayload{NodeID: "worker-1"}
type StatsPayload struct {
NodeID string `json:"nodeId"`
NodeName string `json:"nodeName"`
Miners []MinerStatsItem `json:"miners"`
Uptime int64 `json:"uptime"` // Node uptime in seconds
}
// payload := LogsRequestPayload{MinerName: "xmrig-0", Lines: 100}
type LogsRequestPayload struct {
MinerName string `json:"minerName"`
Lines int `json:"lines"` // Number of lines to fetch
Since int64 `json:"since,omitempty"` // Unix timestamp, logs after this time
}
// payload := LogsPayload{MinerName: "xmrig-0", Lines: []string{"started"}}
type LogsPayload struct {
MinerName string `json:"minerName"`
Lines []string `json:"lines"`
HasMore bool `json:"hasMore"` // More logs available
}
// payload := DeployPayload{Name: "xmrig", BundleType: string(BundleMiner)}
type DeployPayload struct {
BundleType string `json:"type"` // "profile" | "miner" | "full"
Data []byte `json:"data"` // STIM-encrypted bundle
Checksum string `json:"checksum"` // SHA-256 of Data
Name string `json:"name"` // Profile or miner name
}
// ack := DeployAckPayload{Success: true, Name: "xmrig"}
type DeployAckPayload struct {
Success bool `json:"success"`
Name string `json:"name,omitempty"`
Error string `json:"error,omitempty"`
}
// payload := ErrorPayload{Code: ErrorCodeOperationFailed, Message: "start failed"}
type ErrorPayload struct {
Code int `json:"code"`
Message string `json:"message"`
Details string `json:"details,omitempty"`
}
const (
ErrorCodeUnknown = 1000
ErrorCodeInvalidMessage = 1001
ErrorCodeUnauthorized = 1002
ErrorCodeNotFound = 1003
// code := ErrorCodeOperationFailed
ErrorCodeOperationFailed = 1004
ErrorCodeTimeout = 1005
)
// errorMessage, err := NewErrorMessage("worker-1", "controller-1", ErrorCodeOperationFailed, "miner start failed", "req-1")
func NewErrorMessage(from, to string, code int, message string, replyTo string) (*Message, error) {
errorMessage, err := NewMessage(MessageError, from, to, ErrorPayload{
Code: code,
Message: message,
})
if err != nil {
return nil, err
}
errorMessage.ReplyTo = replyTo
return errorMessage, nil
}