Compare commits
14 commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
520d0f5728 | ||
|
|
c823c46bb2 | ||
|
|
56bd30d3d2 | ||
|
|
3eeaf90d38 | ||
|
|
d5a962996b | ||
|
|
572970d255 | ||
|
|
ee623a7343 | ||
|
|
8d1caa3a59 | ||
| 727b5fdb8d | |||
|
|
6fd3fe1cd2 | ||
| 3f1f9a7d60 | |||
|
|
b334cb4909 | ||
| 36f0582bfc | |||
|
|
3ea407c115 |
27 changed files with 376 additions and 62 deletions
4
.gitignore
vendored
4
.gitignore
vendored
|
|
@ -1,2 +1,4 @@
|
|||
.core/
|
||||
.idea/
|
||||
.vscode/
|
||||
*.log
|
||||
.core/
|
||||
|
|
|
|||
|
|
@ -80,6 +80,8 @@ type ProfileManager interface {
|
|||
- Licence: EUPL-1.2 — new files need `// SPDX-License-Identifier: EUPL-1.2`
|
||||
- Security-first: do not weaken HMAC, challenge-response, Zip Slip defence, or rate limiting
|
||||
- Use `logging` package only — no `fmt.Println` or `log.Printf` in library code
|
||||
- Error handling: use `coreerr.E()` from `go-log` — never `fmt.Errorf` or `errors.New` in library code
|
||||
- File I/O: use `coreio.Local` from `go-io` — never `os.ReadFile`/`os.WriteFile` in library code (exception: `os.OpenFile` for streaming writes where `coreio` lacks support)
|
||||
- Hot-path debug logging uses sampling pattern: `if counter.Add(1)%interval == 0`
|
||||
|
||||
### Transport test helper
|
||||
|
|
|
|||
|
|
@ -98,7 +98,7 @@ The `Transport` manages a WebSocket server (gorilla/websocket) and outbound conn
|
|||
| Timeout | −3.0 (floored at 0) |
|
||||
| Default (new peer) | 50.0 |
|
||||
|
||||
**Peer name validation**: Names must be 1–64 characters, start and end with an alphanumeric character, and contain only alphanumeric, hyphen, underscore, or space characters.
|
||||
**Peer name validation**: Empty names are permitted. Non-empty names must be 1–64 characters, start and end with an alphanumeric character, and contain only alphanumeric, hyphen, underscore, or space characters.
|
||||
|
||||
### message.go — Protocol Messages
|
||||
|
||||
|
|
|
|||
9
go.mod
9
go.mod
|
|
@ -1,12 +1,12 @@
|
|||
module forge.lthn.ai/core/go-p2p
|
||||
module dappco.re/go/core/p2p
|
||||
|
||||
go 1.26.0
|
||||
|
||||
require (
|
||||
dappco.re/go/core/io v0.2.0
|
||||
dappco.re/go/core/log v0.1.0
|
||||
forge.lthn.ai/Snider/Borg v0.3.1
|
||||
forge.lthn.ai/Snider/Poindexter v0.0.3
|
||||
forge.lthn.ai/core/go-io v0.1.7
|
||||
forge.lthn.ai/core/go-log v0.0.4
|
||||
github.com/adrg/xdg v0.5.3
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/gorilla/websocket v1.5.3
|
||||
|
|
@ -14,7 +14,8 @@ require (
|
|||
)
|
||||
|
||||
require (
|
||||
forge.lthn.ai/Snider/Enchantrix v0.0.5 // indirect
|
||||
forge.lthn.ai/Snider/Enchantrix v0.0.4 // indirect
|
||||
forge.lthn.ai/core/go-log v0.0.4 // indirect
|
||||
github.com/ProtonMail/go-crypto v1.4.0 // indirect
|
||||
github.com/cloudflare/circl v1.6.3 // indirect
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
|
||||
|
|
|
|||
10
go.sum
10
go.sum
|
|
@ -1,11 +1,13 @@
|
|||
dappco.re/go/core/io v0.2.0 h1:zuudgIiTsQQ5ipVt97saWdGLROovbEB/zdVyy9/l+I4=
|
||||
dappco.re/go/core/io v0.2.0/go.mod h1:1QnQV6X9LNgFKfm8SkOtR9LLaj3bDcsOIeJOOyjbL5E=
|
||||
dappco.re/go/core/log v0.1.0 h1:pa71Vq2TD2aoEUQWFKwNcaJ3GBY8HbaNGqtE688Unyc=
|
||||
dappco.re/go/core/log v0.1.0/go.mod h1:Nkqb8gsXhZAO8VLpx7B8i1iAmohhzqA20b9Zr8VUcJs=
|
||||
forge.lthn.ai/Snider/Borg v0.3.1 h1:gfC1ZTpLoZai07oOWJiVeQ8+qJYK8A795tgVGJHbVL8=
|
||||
forge.lthn.ai/Snider/Borg v0.3.1/go.mod h1:Z7DJD0yHXsxSyM7Mjl6/g4gH1NBsIz44Bf5AFlV76Wg=
|
||||
forge.lthn.ai/Snider/Enchantrix v0.0.5 h1:Yam0z+3AOvCUCHAMP68Ty8qHr2e4MMs7j2FjMM2JWc8=
|
||||
forge.lthn.ai/Snider/Enchantrix v0.0.5/go.mod h1:/YcjKMNpC4Ze/fz7zbTx3djN0CJmSM83YiR2KaMK6zQ=
|
||||
forge.lthn.ai/Snider/Enchantrix v0.0.4 h1:biwpix/bdedfyc0iVeK15awhhJKH6TEMYOTXzHXx5TI=
|
||||
forge.lthn.ai/Snider/Enchantrix v0.0.4/go.mod h1:OGCwuVeZPq3OPe2h6TX/ZbgEjHU6B7owpIBeXQGbSe0=
|
||||
forge.lthn.ai/Snider/Poindexter v0.0.3 h1:cx5wRhuLRKBM8riIZyNVAT2a8rwRhn1dodFBktocsVE=
|
||||
forge.lthn.ai/Snider/Poindexter v0.0.3/go.mod h1:ddzGia98k3HKkR0gl58IDzqz+MmgW2cQJOCNLfuWPpo=
|
||||
forge.lthn.ai/core/go-io v0.1.7 h1:Tdb6sqh+zz1lsGJaNX9RFWM6MJ/RhSAyxfulLXrJsbk=
|
||||
forge.lthn.ai/core/go-io v0.1.7/go.mod h1:8lRLFk4Dnp5cR/Cyzh9WclD5566TbpdRgwcH7UZLWn4=
|
||||
forge.lthn.ai/core/go-log v0.0.4 h1:KTuCEPgFmuM8KJfnyQ8vPOU1Jg654W74h8IJvfQMfv0=
|
||||
forge.lthn.ai/core/go-log v0.0.4/go.mod h1:r14MXKOD3LF/sI8XUJQhRk/SZHBE7jAFVuCfgkXoZPw=
|
||||
github.com/ProtonMail/go-crypto v1.4.0 h1:Zq/pbM3F5DFgJiMouxEdSVY44MVoQNEKp5d5QxIQceQ=
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
coreerr "forge.lthn.ai/core/go-log"
|
||||
coreerr "dappco.re/go/core/log"
|
||||
)
|
||||
|
||||
// Level represents the severity of a log message.
|
||||
|
|
|
|||
|
|
@ -11,8 +11,8 @@ import (
|
|||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
coreio "forge.lthn.ai/core/go-io"
|
||||
coreerr "forge.lthn.ai/core/go-log"
|
||||
coreio "dappco.re/go/core/io"
|
||||
coreerr "dappco.re/go/core/log"
|
||||
|
||||
"forge.lthn.ai/Snider/Borg/pkg/datanode"
|
||||
"forge.lthn.ai/Snider/Borg/pkg/tim"
|
||||
|
|
@ -309,9 +309,12 @@ func extractTarball(tarData []byte, destDir string) (string, error) {
|
|||
return "", err
|
||||
}
|
||||
|
||||
// os.OpenFile is used deliberately here instead of coreio.Local.Create/Write
|
||||
// because coreio hardcodes file permissions (0644) and we need to preserve
|
||||
// the tar header's mode bits — executable binaries require 0755.
|
||||
f, err := os.OpenFile(fullPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, os.FileMode(hdr.Mode))
|
||||
if err != nil {
|
||||
return "", err
|
||||
return "", coreerr.E("extractTarball", "failed to create file "+hdr.Name, err)
|
||||
}
|
||||
|
||||
// Limit file size to prevent decompression bombs (100MB max per file)
|
||||
|
|
@ -320,7 +323,7 @@ func extractTarball(tarData []byte, destDir string) (string, error) {
|
|||
written, err := io.Copy(f, limitedReader)
|
||||
f.Close()
|
||||
if err != nil {
|
||||
return "", err
|
||||
return "", coreerr.E("extractTarball", "failed to write file "+hdr.Name, err)
|
||||
}
|
||||
if written > maxFileSize {
|
||||
coreio.Local.Delete(fullPath)
|
||||
|
|
|
|||
|
|
@ -6,9 +6,9 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
coreerr "forge.lthn.ai/core/go-log"
|
||||
coreerr "dappco.re/go/core/log"
|
||||
|
||||
"forge.lthn.ai/core/go-p2p/logging"
|
||||
"dappco.re/go/core/p2p/logging"
|
||||
)
|
||||
|
||||
// Controller manages remote peer operations from a controller node.
|
||||
|
|
@ -210,6 +210,11 @@ func (c *Controller) StopRemoteMiner(peerID, minerName string) error {
|
|||
|
||||
// GetRemoteLogs requests console logs from a remote miner.
|
||||
func (c *Controller) GetRemoteLogs(peerID, minerName string, lines int) ([]string, error) {
|
||||
return c.GetRemoteLogsSince(peerID, minerName, lines, time.Time{})
|
||||
}
|
||||
|
||||
// GetRemoteLogsSince requests console logs from a remote miner after a point in time.
|
||||
func (c *Controller) GetRemoteLogsSince(peerID, minerName string, lines int, since time.Time) ([]string, error) {
|
||||
identity := c.node.GetIdentity()
|
||||
if identity == nil {
|
||||
return nil, ErrIdentityNotInitialized
|
||||
|
|
@ -219,10 +224,13 @@ func (c *Controller) GetRemoteLogs(peerID, minerName string, lines int) ([]strin
|
|||
MinerName: minerName,
|
||||
Lines: lines,
|
||||
}
|
||||
if !since.IsZero() {
|
||||
payload.Since = since.UnixMilli()
|
||||
}
|
||||
|
||||
msg, err := NewMessage(MsgGetLogs, identity.ID, peerID, payload)
|
||||
if err != nil {
|
||||
return nil, coreerr.E("Controller.GetRemoteLogs", "failed to create message", err)
|
||||
return nil, coreerr.E("Controller.GetRemoteLogsSince", "failed to create message", err)
|
||||
}
|
||||
|
||||
resp, err := c.sendRequest(peerID, msg, 10*time.Second)
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import (
|
|||
"net/http/httptest"
|
||||
"net/url"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
|
@ -514,6 +515,40 @@ type mockMinerFull struct {
|
|||
func (m *mockMinerFull) GetName() string { return m.name }
|
||||
func (m *mockMinerFull) GetType() string { return m.minerType }
|
||||
func (m *mockMinerFull) GetStats() (any, error) { return m.stats, nil }
|
||||
func (m *mockMinerFull) GetConsoleHistorySince(lines int, since time.Time) []string {
|
||||
if since.IsZero() {
|
||||
if lines >= len(m.consoleHistory) {
|
||||
return m.consoleHistory
|
||||
}
|
||||
return m.consoleHistory[:lines]
|
||||
}
|
||||
|
||||
filtered := make([]string, 0, len(m.consoleHistory))
|
||||
for _, line := range m.consoleHistory {
|
||||
if lineAfter(line, since) {
|
||||
filtered = append(filtered, line)
|
||||
}
|
||||
}
|
||||
if lines >= len(filtered) {
|
||||
return filtered
|
||||
}
|
||||
return filtered[:lines]
|
||||
}
|
||||
|
||||
func lineAfter(line string, since time.Time) bool {
|
||||
start := strings.IndexByte(line, '[')
|
||||
end := strings.IndexByte(line, ']')
|
||||
if start != 0 || end <= start+1 {
|
||||
return true
|
||||
}
|
||||
|
||||
ts, err := time.Parse("2006-01-02 15:04:05", line[start+1:end])
|
||||
if err != nil {
|
||||
return true
|
||||
}
|
||||
return ts.After(since) || ts.Equal(since)
|
||||
}
|
||||
|
||||
func (m *mockMinerFull) GetConsoleHistory(lines int) []string {
|
||||
if lines >= len(m.consoleHistory) {
|
||||
return m.consoleHistory
|
||||
|
|
@ -616,6 +651,20 @@ func TestController_GetRemoteLogs_LimitedLines(t *testing.T) {
|
|||
assert.Len(t, lines, 1, "should return only 1 line")
|
||||
}
|
||||
|
||||
func TestController_GetRemoteLogsSince(t *testing.T) {
|
||||
controller, _, tp := setupControllerPairWithMiner(t)
|
||||
serverID := tp.ServerNode.GetIdentity().ID
|
||||
|
||||
since, err := time.Parse("2006-01-02 15:04:05", "2026-02-20 10:00:01")
|
||||
require.NoError(t, err)
|
||||
|
||||
lines, err := controller.GetRemoteLogsSince(serverID, "running-miner", 10, since)
|
||||
require.NoError(t, err, "GetRemoteLogsSince should succeed")
|
||||
require.Len(t, lines, 2, "should return only log lines on or after the requested timestamp")
|
||||
assert.Contains(t, lines[0], "connected to pool")
|
||||
assert.Contains(t, lines[1], "new job received")
|
||||
}
|
||||
|
||||
func TestController_GetRemoteLogs_NoIdentity(t *testing.T) {
|
||||
tp := setupTestTransportPair(t)
|
||||
nmNoID, err := NewNodeManagerWithPaths(
|
||||
|
|
|
|||
|
|
@ -5,10 +5,10 @@ import (
|
|||
"iter"
|
||||
"sync"
|
||||
|
||||
coreerr "forge.lthn.ai/core/go-log"
|
||||
coreerr "dappco.re/go/core/log"
|
||||
|
||||
"forge.lthn.ai/core/go-p2p/logging"
|
||||
"forge.lthn.ai/core/go-p2p/ueps"
|
||||
"dappco.re/go/core/p2p/logging"
|
||||
"dappco.re/go/core/p2p/ueps"
|
||||
)
|
||||
|
||||
// ThreatScoreThreshold is the maximum allowable threat score. Packets exceeding
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ import (
|
|||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"forge.lthn.ai/core/go-p2p/ueps"
|
||||
"dappco.re/go/core/p2p/ueps"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
package node
|
||||
|
||||
import coreerr "forge.lthn.ai/core/go-log"
|
||||
import coreerr "dappco.re/go/core/log"
|
||||
|
||||
// Sentinel errors shared across the node package.
|
||||
var (
|
||||
|
|
|
|||
|
|
@ -8,12 +8,13 @@ import (
|
|||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
coreio "forge.lthn.ai/core/go-io"
|
||||
coreerr "forge.lthn.ai/core/go-log"
|
||||
coreio "dappco.re/go/core/io"
|
||||
coreerr "dappco.re/go/core/log"
|
||||
|
||||
"forge.lthn.ai/Snider/Borg/pkg/stmf"
|
||||
"github.com/adrg/xdg"
|
||||
|
|
@ -108,6 +109,48 @@ func NewNodeManagerWithPaths(keyPath, configPath string) (*NodeManager, error) {
|
|||
return nm, nil
|
||||
}
|
||||
|
||||
// LoadOrCreateIdentity loads the node identity from the default XDG paths or
|
||||
// generates a new dual-role identity when none exists yet.
|
||||
func LoadOrCreateIdentity() (*NodeManager, error) {
|
||||
keyPath, err := xdg.DataFile("lethean-desktop/node/private.key")
|
||||
if err != nil {
|
||||
return nil, coreerr.E("LoadOrCreateIdentity", "failed to get key path", err)
|
||||
}
|
||||
|
||||
configPath, err := xdg.ConfigFile("lethean-desktop/node.json")
|
||||
if err != nil {
|
||||
return nil, coreerr.E("LoadOrCreateIdentity", "failed to get config path", err)
|
||||
}
|
||||
|
||||
return LoadOrCreateIdentityWithPaths(keyPath, configPath)
|
||||
}
|
||||
|
||||
// LoadOrCreateIdentityWithPaths loads an existing identity from the supplied
|
||||
// paths or creates a new dual-role identity if no persisted identity exists.
|
||||
// The generated identity name falls back to the host name, then a stable
|
||||
// project-specific default if the host name cannot be determined.
|
||||
func LoadOrCreateIdentityWithPaths(keyPath, configPath string) (*NodeManager, error) {
|
||||
nm, err := NewNodeManagerWithPaths(keyPath, configPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if nm.HasIdentity() {
|
||||
return nm, nil
|
||||
}
|
||||
|
||||
name, err := os.Hostname()
|
||||
if err != nil || name == "" {
|
||||
name = "lethean-node"
|
||||
}
|
||||
|
||||
if err := nm.GenerateIdentity(name, RoleDual); err != nil {
|
||||
return nil, coreerr.E("LoadOrCreateIdentityWithPaths", "failed to generate identity", err)
|
||||
}
|
||||
|
||||
return nm, nil
|
||||
}
|
||||
|
||||
// HasIdentity returns true if a node identity has been initialized.
|
||||
func (n *NodeManager) HasIdentity() bool {
|
||||
n.mu.RLock()
|
||||
|
|
@ -208,10 +251,13 @@ func (n *NodeManager) savePrivateKey() error {
|
|||
return coreerr.E("NodeManager.savePrivateKey", "failed to create key directory", err)
|
||||
}
|
||||
|
||||
// Write private key
|
||||
// Write private key and then tighten permissions explicitly.
|
||||
if err := coreio.Local.Write(n.keyPath, string(n.privateKey)); err != nil {
|
||||
return coreerr.E("NodeManager.savePrivateKey", "failed to write private key", err)
|
||||
}
|
||||
if err := os.Chmod(n.keyPath, 0600); err != nil {
|
||||
return coreerr.E("NodeManager.savePrivateKey", "failed to set private key permissions", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -74,6 +74,25 @@ func TestNodeIdentity(t *testing.T) {
|
|||
}
|
||||
})
|
||||
|
||||
t.Run("PrivateKeyPermissions", func(t *testing.T) {
|
||||
nm, cleanup := setupTestNodeManager(t)
|
||||
defer cleanup()
|
||||
|
||||
err := nm.GenerateIdentity("permission-test", RoleDual)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to generate identity: %v", err)
|
||||
}
|
||||
|
||||
info, err := os.Stat(nm.keyPath)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to stat private key: %v", err)
|
||||
}
|
||||
|
||||
if got := info.Mode().Perm(); got != 0600 {
|
||||
t.Fatalf("expected private key permissions 0600, got %04o", got)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("LoadExistingIdentity", func(t *testing.T) {
|
||||
tmpDir, err := os.MkdirTemp("", "node-load-test")
|
||||
if err != nil {
|
||||
|
|
@ -196,6 +215,47 @@ func TestNodeIdentity(t *testing.T) {
|
|||
t.Error("should not have identity after delete")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("LoadOrCreateIdentityWithPaths", func(t *testing.T) {
|
||||
tmpDir, err := os.MkdirTemp("", "node-load-or-create-test")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create temp dir: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
keyPath := filepath.Join(tmpDir, "private.key")
|
||||
configPath := filepath.Join(tmpDir, "node.json")
|
||||
|
||||
nm, err := LoadOrCreateIdentityWithPaths(keyPath, configPath)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to load or create identity: %v", err)
|
||||
}
|
||||
|
||||
if !nm.HasIdentity() {
|
||||
t.Fatal("expected identity to be initialised")
|
||||
}
|
||||
|
||||
identity := nm.GetIdentity()
|
||||
if identity == nil {
|
||||
t.Fatal("identity should not be nil")
|
||||
}
|
||||
|
||||
if identity.Name == "" {
|
||||
t.Error("identity name should be populated")
|
||||
}
|
||||
|
||||
if identity.Role != RoleDual {
|
||||
t.Errorf("expected default role dual, got %s", identity.Role)
|
||||
}
|
||||
|
||||
if _, err := os.Stat(keyPath); err != nil {
|
||||
t.Fatalf("expected private key to be persisted: %v", err)
|
||||
}
|
||||
|
||||
if _, err := os.Stat(configPath); err != nil {
|
||||
t.Fatalf("expected identity config to be persisted: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestNodeRoles(t *testing.T) {
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"forge.lthn.ai/core/go-p2p/ueps"
|
||||
"dappco.re/go/core/p2p/ueps"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ package levin
|
|||
import (
|
||||
"encoding/binary"
|
||||
|
||||
coreerr "forge.lthn.ai/core/go-log"
|
||||
coreerr "dappco.re/go/core/log"
|
||||
)
|
||||
|
||||
// HeaderSize is the exact byte length of a serialised Levin header.
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ import (
|
|||
"math"
|
||||
"slices"
|
||||
|
||||
coreerr "forge.lthn.ai/core/go-log"
|
||||
coreerr "dappco.re/go/core/log"
|
||||
)
|
||||
|
||||
// Portable storage signatures and version (9-byte header).
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ package levin
|
|||
import (
|
||||
"encoding/binary"
|
||||
|
||||
coreerr "forge.lthn.ai/core/go-log"
|
||||
coreerr "dappco.re/go/core/log"
|
||||
)
|
||||
|
||||
// Size-mark bits occupying the two lowest bits of the first byte.
|
||||
|
|
|
|||
107
node/peer.go
107
node/peer.go
|
|
@ -10,10 +10,11 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
coreio "dappco.re/go/core/io"
|
||||
coreerr "dappco.re/go/core/log"
|
||||
"dappco.re/go/core/p2p/logging"
|
||||
|
||||
poindexter "forge.lthn.ai/Snider/Poindexter"
|
||||
coreio "forge.lthn.ai/core/go-io"
|
||||
coreerr "forge.lthn.ai/core/go-log"
|
||||
"forge.lthn.ai/core/go-p2p/logging"
|
||||
"github.com/adrg/xdg"
|
||||
)
|
||||
|
||||
|
|
@ -50,9 +51,8 @@ const (
|
|||
PeerAuthAllowlist
|
||||
)
|
||||
|
||||
// Peer name validation constants
|
||||
// Peer name validation constants.
|
||||
const (
|
||||
PeerNameMinLength = 1
|
||||
PeerNameMaxLength = 64
|
||||
)
|
||||
|
||||
|
|
@ -71,14 +71,12 @@ func safeKeyPrefix(key string) string {
|
|||
}
|
||||
|
||||
// validatePeerName checks if a peer name is valid.
|
||||
// Peer names must be 1-64 characters, start and end with alphanumeric,
|
||||
// and contain only alphanumeric, hyphens, underscores, and spaces.
|
||||
// Empty names are permitted. Non-empty names must be 1-64 characters,
|
||||
// start and end with alphanumeric, and contain only alphanumeric,
|
||||
// hyphens, underscores, and spaces.
|
||||
func validatePeerName(name string) error {
|
||||
if name == "" {
|
||||
return nil // Empty names are allowed (optional field)
|
||||
}
|
||||
if len(name) < PeerNameMinLength {
|
||||
return coreerr.E("validatePeerName", "peer name too short", nil)
|
||||
return nil
|
||||
}
|
||||
if len(name) > PeerNameMaxLength {
|
||||
return coreerr.E("validatePeerName", "peer name too long", nil)
|
||||
|
|
@ -100,6 +98,7 @@ type PeerRegistry struct {
|
|||
authMode PeerAuthMode // How to handle unknown peers
|
||||
allowedPublicKeys map[string]bool // Allowlist of public keys (when authMode is Allowlist)
|
||||
allowedPublicKeyMu sync.RWMutex // Protects allowedPublicKeys
|
||||
allowlistPath string // Sidecar file for persisted allowlist keys
|
||||
|
||||
// Debounce disk writes
|
||||
dirty bool // Whether there are unsaved changes
|
||||
|
|
@ -134,6 +133,7 @@ func NewPeerRegistryWithPath(peersPath string) (*PeerRegistry, error) {
|
|||
pr := &PeerRegistry{
|
||||
peers: make(map[string]*Peer),
|
||||
path: peersPath,
|
||||
allowlistPath: peersPath + ".allowlist.json",
|
||||
stopChan: make(chan struct{}),
|
||||
authMode: PeerAuthOpen, // Default to open for backward compatibility
|
||||
allowedPublicKeys: make(map[string]bool),
|
||||
|
|
@ -143,7 +143,12 @@ func NewPeerRegistryWithPath(peersPath string) (*PeerRegistry, error) {
|
|||
if err := pr.load(); err != nil {
|
||||
// No existing peers, that's ok
|
||||
pr.rebuildKDTree()
|
||||
return pr, nil
|
||||
}
|
||||
|
||||
// Load any persisted allowlist entries. This is best effort so that a
|
||||
// missing or corrupt sidecar does not block peer registry startup.
|
||||
if err := pr.loadAllowedPublicKeys(); err != nil {
|
||||
logging.Warn("failed to load peer allowlist", logging.Fields{"error": err})
|
||||
}
|
||||
|
||||
pr.rebuildKDTree()
|
||||
|
|
@ -168,17 +173,25 @@ func (r *PeerRegistry) GetAuthMode() PeerAuthMode {
|
|||
// AllowPublicKey adds a public key to the allowlist.
|
||||
func (r *PeerRegistry) AllowPublicKey(publicKey string) {
|
||||
r.allowedPublicKeyMu.Lock()
|
||||
defer r.allowedPublicKeyMu.Unlock()
|
||||
r.allowedPublicKeys[publicKey] = true
|
||||
r.allowedPublicKeyMu.Unlock()
|
||||
logging.Debug("public key added to allowlist", logging.Fields{"key": safeKeyPrefix(publicKey)})
|
||||
|
||||
if err := r.saveAllowedPublicKeys(); err != nil {
|
||||
logging.Warn("failed to persist peer allowlist", logging.Fields{"error": err})
|
||||
}
|
||||
}
|
||||
|
||||
// RevokePublicKey removes a public key from the allowlist.
|
||||
func (r *PeerRegistry) RevokePublicKey(publicKey string) {
|
||||
r.allowedPublicKeyMu.Lock()
|
||||
defer r.allowedPublicKeyMu.Unlock()
|
||||
delete(r.allowedPublicKeys, publicKey)
|
||||
r.allowedPublicKeyMu.Unlock()
|
||||
logging.Debug("public key removed from allowlist", logging.Fields{"key": safeKeyPrefix(publicKey)})
|
||||
|
||||
if err := r.saveAllowedPublicKeys(); err != nil {
|
||||
logging.Warn("failed to persist peer allowlist", logging.Fields{"error": err})
|
||||
}
|
||||
}
|
||||
|
||||
// IsPublicKeyAllowed checks if a public key is in the allowlist.
|
||||
|
|
@ -707,6 +720,72 @@ func (r *PeerRegistry) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// saveAllowedPublicKeys persists the allowlist to disk immediately.
|
||||
// It keeps the allowlist in a separate sidecar file so peer persistence remains
|
||||
// backwards compatible with the existing peers.json array format.
|
||||
func (r *PeerRegistry) saveAllowedPublicKeys() error {
|
||||
r.allowedPublicKeyMu.RLock()
|
||||
keys := make([]string, 0, len(r.allowedPublicKeys))
|
||||
for key := range r.allowedPublicKeys {
|
||||
keys = append(keys, key)
|
||||
}
|
||||
r.allowedPublicKeyMu.RUnlock()
|
||||
|
||||
slices.Sort(keys)
|
||||
|
||||
dir := filepath.Dir(r.allowlistPath)
|
||||
if err := coreio.Local.EnsureDir(dir); err != nil {
|
||||
return coreerr.E("PeerRegistry.saveAllowedPublicKeys", "failed to create allowlist directory", err)
|
||||
}
|
||||
|
||||
data, err := json.MarshalIndent(keys, "", " ")
|
||||
if err != nil {
|
||||
return coreerr.E("PeerRegistry.saveAllowedPublicKeys", "failed to marshal allowlist", err)
|
||||
}
|
||||
|
||||
tmpPath := r.allowlistPath + ".tmp"
|
||||
if err := coreio.Local.Write(tmpPath, string(data)); err != nil {
|
||||
return coreerr.E("PeerRegistry.saveAllowedPublicKeys", "failed to write allowlist temp file", err)
|
||||
}
|
||||
|
||||
if err := coreio.Local.Rename(tmpPath, r.allowlistPath); err != nil {
|
||||
coreio.Local.Delete(tmpPath)
|
||||
return coreerr.E("PeerRegistry.saveAllowedPublicKeys", "failed to rename allowlist file", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// loadAllowedPublicKeys loads the allowlist from disk.
|
||||
func (r *PeerRegistry) loadAllowedPublicKeys() error {
|
||||
if !coreio.Local.Exists(r.allowlistPath) {
|
||||
return nil
|
||||
}
|
||||
|
||||
content, err := coreio.Local.Read(r.allowlistPath)
|
||||
if err != nil {
|
||||
return coreerr.E("PeerRegistry.loadAllowedPublicKeys", "failed to read allowlist", err)
|
||||
}
|
||||
|
||||
var keys []string
|
||||
if err := json.Unmarshal([]byte(content), &keys); err != nil {
|
||||
return coreerr.E("PeerRegistry.loadAllowedPublicKeys", "failed to unmarshal allowlist", err)
|
||||
}
|
||||
|
||||
r.allowedPublicKeyMu.Lock()
|
||||
defer r.allowedPublicKeyMu.Unlock()
|
||||
|
||||
r.allowedPublicKeys = make(map[string]bool, len(keys))
|
||||
for _, key := range keys {
|
||||
if key == "" {
|
||||
continue
|
||||
}
|
||||
r.allowedPublicKeys[key] = true
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// save is a helper that schedules a debounced save.
|
||||
// Kept for backward compatibility but now debounces writes.
|
||||
// Must NOT be called with r.mu held.
|
||||
|
|
|
|||
|
|
@ -389,6 +389,39 @@ func TestPeerRegistry_Persistence(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestPeerRegistry_AllowlistPersistence(t *testing.T) {
|
||||
tmpDir, _ := os.MkdirTemp("", "allowlist-persist-test")
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
peersPath := filepath.Join(tmpDir, "peers.json")
|
||||
|
||||
pr1, err := NewPeerRegistryWithPath(peersPath)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create first registry: %v", err)
|
||||
}
|
||||
|
||||
key := "allowlist-key-1234567890"
|
||||
pr1.AllowPublicKey(key)
|
||||
|
||||
if err := pr1.Close(); err != nil {
|
||||
t.Fatalf("failed to close first registry: %v", err)
|
||||
}
|
||||
|
||||
pr2, err := NewPeerRegistryWithPath(peersPath)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create second registry: %v", err)
|
||||
}
|
||||
|
||||
if !pr2.IsPublicKeyAllowed(key) {
|
||||
t.Fatal("expected allowlisted key to survive reload")
|
||||
}
|
||||
|
||||
keys := pr2.ListAllowedPublicKeys()
|
||||
if !slices.Contains(keys, key) {
|
||||
t.Fatalf("expected allowlisted key to be listed after reload, got %v", keys)
|
||||
}
|
||||
}
|
||||
|
||||
// --- Security Feature Tests ---
|
||||
|
||||
func TestPeerRegistry_AuthMode(t *testing.T) {
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@ package node
|
|||
import (
|
||||
"fmt"
|
||||
|
||||
coreerr "forge.lthn.ai/core/go-log"
|
||||
coreerr "dappco.re/go/core/log"
|
||||
)
|
||||
|
||||
// ProtocolError represents an error from the remote peer.
|
||||
|
|
|
|||
|
|
@ -15,10 +15,10 @@ import (
|
|||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
coreerr "forge.lthn.ai/core/go-log"
|
||||
coreerr "dappco.re/go/core/log"
|
||||
"dappco.re/go/core/p2p/logging"
|
||||
|
||||
"forge.lthn.ai/Snider/Borg/pkg/smsg"
|
||||
"forge.lthn.ai/core/go-p2p/logging"
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
|
|
@ -76,10 +76,20 @@ func NewMessageDeduplicator(ttl time.Duration) *MessageDeduplicator {
|
|||
|
||||
// IsDuplicate checks if a message ID has been seen recently
|
||||
func (d *MessageDeduplicator) IsDuplicate(msgID string) bool {
|
||||
d.mu.RLock()
|
||||
_, exists := d.seen[msgID]
|
||||
d.mu.RUnlock()
|
||||
return exists
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
|
||||
seenAt, exists := d.seen[msgID]
|
||||
if !exists {
|
||||
return false
|
||||
}
|
||||
|
||||
if d.ttl > 0 && time.Since(seenAt) > d.ttl {
|
||||
delete(d.seen, msgID)
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// Mark records a message ID as seen
|
||||
|
|
|
|||
|
|
@ -159,6 +159,17 @@ func TestMessageDeduplicator(t *testing.T) {
|
|||
}
|
||||
})
|
||||
|
||||
t.Run("ExpiredEntriesAreNotDuplicates", func(t *testing.T) {
|
||||
d := NewMessageDeduplicator(25 * time.Millisecond)
|
||||
d.Mark("msg-expired")
|
||||
|
||||
time.Sleep(40 * time.Millisecond)
|
||||
|
||||
if d.IsDuplicate("msg-expired") {
|
||||
t.Error("expired message should not remain a duplicate")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("ConcurrentAccess", func(t *testing.T) {
|
||||
d := NewMessageDeduplicator(5 * time.Minute)
|
||||
var wg sync.WaitGroup
|
||||
|
|
|
|||
|
|
@ -6,9 +6,9 @@ import (
|
|||
"path/filepath"
|
||||
"time"
|
||||
|
||||
coreerr "forge.lthn.ai/core/go-log"
|
||||
coreerr "dappco.re/go/core/log"
|
||||
|
||||
"forge.lthn.ai/core/go-p2p/logging"
|
||||
"dappco.re/go/core/p2p/logging"
|
||||
"github.com/adrg/xdg"
|
||||
)
|
||||
|
||||
|
|
@ -26,7 +26,7 @@ type MinerInstance interface {
|
|||
GetName() string
|
||||
GetType() string
|
||||
GetStats() (any, error)
|
||||
GetConsoleHistory(lines int) []string
|
||||
GetConsoleHistorySince(lines int, since time.Time) []string
|
||||
}
|
||||
|
||||
// ProfileManager interface for profile operations.
|
||||
|
|
@ -55,7 +55,6 @@ func NewWorker(node *NodeManager, transport *Transport) *Worker {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
// SetMinerManager sets the miner manager for handling miner operations.
|
||||
func (w *Worker) SetMinerManager(manager MinerManager) {
|
||||
w.minerManager = manager
|
||||
|
|
@ -286,7 +285,12 @@ func (w *Worker) handleGetLogs(msg *Message) (*Message, error) {
|
|||
return nil, coreerr.E("Worker.handleGetLogs", "miner not found: "+payload.MinerName, nil)
|
||||
}
|
||||
|
||||
lines := miner.GetConsoleHistory(payload.Lines)
|
||||
var since time.Time
|
||||
if payload.Since > 0 {
|
||||
since = time.UnixMilli(payload.Since)
|
||||
}
|
||||
|
||||
lines := miner.GetConsoleHistorySince(payload.Lines, since)
|
||||
|
||||
logs := LogsPayload{
|
||||
MinerName: payload.MinerName,
|
||||
|
|
|
|||
|
|
@ -550,10 +550,14 @@ type mockMinerInstance struct {
|
|||
stats any
|
||||
}
|
||||
|
||||
func (m *mockMinerInstance) GetName() string { return m.name }
|
||||
func (m *mockMinerInstance) GetType() string { return m.minerType }
|
||||
func (m *mockMinerInstance) GetStats() (any, error) { return m.stats, nil }
|
||||
func (m *mockMinerInstance) GetConsoleHistory(lines int) []string { return []string{} }
|
||||
func (m *mockMinerInstance) GetName() string { return m.name }
|
||||
func (m *mockMinerInstance) GetType() string { return m.minerType }
|
||||
func (m *mockMinerInstance) GetStats() (any, error) {
|
||||
return m.stats, nil
|
||||
}
|
||||
func (m *mockMinerInstance) GetConsoleHistorySince(lines int, since time.Time) []string {
|
||||
return []string{}
|
||||
}
|
||||
|
||||
type mockProfileManager struct{}
|
||||
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ import (
|
|||
"encoding/binary"
|
||||
"io"
|
||||
|
||||
coreerr "forge.lthn.ai/core/go-log"
|
||||
coreerr "dappco.re/go/core/log"
|
||||
)
|
||||
|
||||
// TLV Types
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ import (
|
|||
"encoding/binary"
|
||||
"io"
|
||||
|
||||
coreerr "forge.lthn.ai/core/go-log"
|
||||
coreerr "dappco.re/go/core/log"
|
||||
)
|
||||
|
||||
// ParsedPacket holds the verified data
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue