package node import ( "slices" "time" core "dappco.re/go/core" "github.com/google/uuid" ) // Protocol version constants. const ( // ProtocolVersion is the current protocol version. ProtocolVersion = "1.0" // MinProtocolVersion is the minimum supported version. MinProtocolVersion = "1.0" ) // SupportedProtocolVersions lists all protocol versions this node supports. // Used for version negotiation during handshake. // // versions := SupportedProtocolVersions var SupportedProtocolVersions = []string{"1.0"} // RawMessage stores an already-encoded JSON payload for deferred decoding. // // payload := RawMessage(`{"pool":"pool.example.com:3333"}`) type RawMessage []byte // MarshalJSON preserves the raw JSON payload when the message is encoded. // // data, err := RawMessage(`{"ok":true}`).MarshalJSON() func (m RawMessage) MarshalJSON() ([]byte, error) { if m == nil { return []byte("null"), nil } return m, nil } // UnmarshalJSON stores the raw JSON payload bytes without decoding them. // // var payload RawMessage // _ = payload.UnmarshalJSON([]byte(`{"ok":true}`)) func (m *RawMessage) UnmarshalJSON(data []byte) error { if m == nil { return core.E("node.RawMessage.UnmarshalJSON", "raw message target is nil", nil) } *m = append((*m)[:0], data...) return nil } // IsProtocolVersionSupported checks if a given version is supported. // // ok := IsProtocolVersionSupported("1.0") func IsProtocolVersionSupported(version string) bool { return slices.Contains(SupportedProtocolVersions, version) } // MessageType defines the type of P2P message. // // messageType := MsgPing type MessageType string const ( // Connection lifecycle MsgHandshake MessageType = "handshake" MsgHandshakeAck MessageType = "handshake_ack" MsgPing MessageType = "ping" MsgPong MessageType = "pong" MsgDisconnect MessageType = "disconnect" // Miner operations MsgGetStats MessageType = "get_stats" MsgStats MessageType = "stats" MsgStartMiner MessageType = "start_miner" MsgStopMiner MessageType = "stop_miner" MsgMinerAck MessageType = "miner_ack" // Deployment MsgDeploy MessageType = "deploy" MsgDeployAck MessageType = "deploy_ack" // Logs MsgGetLogs MessageType = "get_logs" MsgLogs MessageType = "logs" // Error response MsgError MessageType = "error" ) // Message represents a P2P message between nodes. // // message, err := NewMessage(MsgPing, "controller", "worker", PingPayload{SentAt: time.Now().UnixMilli()}) type Message struct { ID string `json:"id"` // UUID Type MessageType `json:"type"` From string `json:"from"` // Sender node ID To string `json:"to"` // Recipient node ID (empty for broadcast) Timestamp time.Time `json:"ts"` Payload RawMessage `json:"payload"` ReplyTo string `json:"replyTo,omitempty"` // ID of message being replied to } // NewMessage builds a message with a generated ID and timestamp. // // message, err := NewMessage(MsgPing, "controller", "worker-1", PingPayload{SentAt: 42}) func NewMessage(messageType MessageType, from, to string, payload any) (*Message, error) { var payloadBytes RawMessage if payload != nil { data, err := MarshalJSON(payload) if err != nil { return nil, err } payloadBytes = RawMessage(data) } return &Message{ ID: uuid.New().String(), Type: messageType, From: from, To: to, Timestamp: time.Now(), Payload: payloadBytes, }, nil } // Reply creates a response message that points back to the original. // // reply, err := message.Reply(MsgPong, PongPayload{SentAt: 42, ReceivedAt: 43}) func (m *Message) Reply(messageType MessageType, payload any) (*Message, error) { reply, err := NewMessage(messageType, m.To, m.From, payload) if err != nil { return nil, err } reply.ReplyTo = m.ID return reply, nil } // ParsePayload decodes the payload into the supplied target. // // var ping PingPayload // err := message.ParsePayload(&ping) func (m *Message) ParsePayload(target any) error { if m.Payload == nil { return nil } result := core.JSONUnmarshal(m.Payload, target) if !result.OK { return result.Value.(error) } return nil } // --- Payload Types --- // HandshakePayload is sent during connection establishment. // // payload := HandshakePayload{Identity: NodeIdentity{Name: "worker-1"}, Version: ProtocolVersion} type HandshakePayload struct { Identity NodeIdentity `json:"identity"` Challenge []byte `json:"challenge,omitempty"` // Random bytes for auth Version string `json:"version"` // Protocol version } // HandshakeAckPayload is the response to a handshake. // // ack := HandshakeAckPayload{Accepted: true} type HandshakeAckPayload struct { Identity NodeIdentity `json:"identity"` ChallengeResponse []byte `json:"challengeResponse,omitempty"` Accepted bool `json:"accepted"` Reason string `json:"reason,omitempty"` // If not accepted } // PingPayload for keepalive/latency measurement. // // payload := PingPayload{SentAt: 42} type PingPayload struct { SentAt int64 `json:"sentAt"` // Unix timestamp in milliseconds } // PongPayload response to ping. // // payload := PongPayload{SentAt: 42, ReceivedAt: 43} type PongPayload struct { SentAt int64 `json:"sentAt"` // Echo of ping's sentAt ReceivedAt int64 `json:"receivedAt"` // When ping was received } // StartMinerPayload requests starting a miner. // // payload := StartMinerPayload{MinerType: "xmrig"} type StartMinerPayload struct { MinerType string `json:"minerType"` // Required: miner type (e.g., "xmrig", "tt-miner") ProfileID string `json:"profileId,omitempty"` Config RawMessage `json:"config,omitempty"` // Override profile config } // StopMinerPayload requests stopping a miner. // // payload := StopMinerPayload{MinerName: "xmrig-0"} type StopMinerPayload struct { MinerName string `json:"minerName"` } // MinerAckPayload acknowledges a miner start/stop operation. // // ack := MinerAckPayload{Success: true, MinerName: "xmrig-0"} type MinerAckPayload struct { Success bool `json:"success"` MinerName string `json:"minerName,omitempty"` Error string `json:"error,omitempty"` } // MinerStatsItem represents stats for a single miner. // // miner := MinerStatsItem{Name: "xmrig-0", Hashrate: 1200} type MinerStatsItem struct { Name string `json:"name"` Type string `json:"type"` Hashrate float64 `json:"hashrate"` Shares int `json:"shares"` Rejected int `json:"rejected"` Uptime int `json:"uptime"` // Seconds Pool string `json:"pool"` Algorithm string `json:"algorithm"` CPUThreads int `json:"cpuThreads,omitempty"` } // StatsPayload contains miner statistics. // // stats := StatsPayload{NodeID: "worker-1"} type StatsPayload struct { NodeID string `json:"nodeId"` NodeName string `json:"nodeName"` Miners []MinerStatsItem `json:"miners"` Uptime int64 `json:"uptime"` // Node uptime in seconds } // GetLogsPayload requests console logs from a miner. // // payload := GetLogsPayload{MinerName: "xmrig-0", Lines: 100} type GetLogsPayload struct { MinerName string `json:"minerName"` Lines int `json:"lines"` // Number of lines to fetch Since int64 `json:"since,omitempty"` // Unix timestamp, logs after this time } // LogsPayload contains console log lines. // // payload := LogsPayload{MinerName: "xmrig-0", Lines: []string{"started"}} type LogsPayload struct { MinerName string `json:"minerName"` Lines []string `json:"lines"` HasMore bool `json:"hasMore"` // More logs available } // DeployPayload contains a deployment bundle. // // payload := DeployPayload{Name: "xmrig", BundleType: string(BundleMiner)} type DeployPayload struct { BundleType string `json:"type"` // "profile" | "miner" | "full" Data []byte `json:"data"` // STIM-encrypted bundle Checksum string `json:"checksum"` // SHA-256 of Data Name string `json:"name"` // Profile or miner name } // DeployAckPayload acknowledges a deployment. // // ack := DeployAckPayload{Success: true, Name: "xmrig"} type DeployAckPayload struct { Success bool `json:"success"` Name string `json:"name,omitempty"` Error string `json:"error,omitempty"` } // ErrorPayload contains error information. // // payload := ErrorPayload{Code: ErrorCodeOperationFailed, Message: "start failed"} type ErrorPayload struct { Code int `json:"code"` Message string `json:"message"` Details string `json:"details,omitempty"` } // Common error codes. const ( ErrorCodeUnknown = 1000 ErrorCodeInvalidMessage = 1001 ErrorCodeUnauthorized = 1002 ErrorCodeNotFound = 1003 ErrorCodeOperationFailed = 1004 ErrorCodeTimeout = 1005 // Deprecated: use ErrorCodeUnknown. ErrCodeUnknown = ErrorCodeUnknown // Deprecated: use ErrorCodeInvalidMessage. ErrCodeInvalidMessage = ErrorCodeInvalidMessage // Deprecated: use ErrorCodeUnauthorized. ErrCodeUnauthorized = ErrorCodeUnauthorized // Deprecated: use ErrorCodeNotFound. ErrCodeNotFound = ErrorCodeNotFound // Deprecated: use ErrorCodeOperationFailed. ErrCodeOperationFailed = ErrorCodeOperationFailed // Deprecated: use ErrorCodeTimeout. ErrCodeTimeout = ErrorCodeTimeout ) // NewErrorMessage builds an error response message for an existing request. // // msg, err := NewErrorMessage("worker-1", "controller-1", ErrorCodeOperationFailed, "miner start failed", "req-1") func NewErrorMessage(from, to string, code int, message string, replyTo string) (*Message, error) { msg, err := NewMessage(MsgError, from, to, ErrorPayload{ Code: code, Message: message, }) if err != nil { return nil, err } msg.ReplyTo = replyTo return msg, nil }