Compare commits
20 commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
520d0f5728 | ||
|
|
c823c46bb2 | ||
|
|
56bd30d3d2 | ||
|
|
3eeaf90d38 | ||
|
|
d5a962996b | ||
|
|
572970d255 | ||
|
|
ee623a7343 | ||
|
|
8d1caa3a59 | ||
| 727b5fdb8d | |||
|
|
6fd3fe1cd2 | ||
|
|
23994a66ac | ||
|
|
eaa919af89 | ||
| 3f1f9a7d60 | |||
|
|
b334cb4909 | ||
| 36f0582bfc | |||
|
|
3ea407c115 | ||
|
|
bc47006152 | ||
|
|
66bc0b862f | ||
|
|
c2d2d5d126 | ||
|
|
6da95f3ed4 |
27 changed files with 556 additions and 226 deletions
4
.gitignore
vendored
Normal file
4
.gitignore
vendored
Normal file
|
|
@ -0,0 +1,4 @@
|
||||||
|
.idea/
|
||||||
|
.vscode/
|
||||||
|
*.log
|
||||||
|
.core/
|
||||||
|
|
@ -80,6 +80,8 @@ type ProfileManager interface {
|
||||||
- Licence: EUPL-1.2 — new files need `// SPDX-License-Identifier: EUPL-1.2`
|
- Licence: EUPL-1.2 — new files need `// SPDX-License-Identifier: EUPL-1.2`
|
||||||
- Security-first: do not weaken HMAC, challenge-response, Zip Slip defence, or rate limiting
|
- Security-first: do not weaken HMAC, challenge-response, Zip Slip defence, or rate limiting
|
||||||
- Use `logging` package only — no `fmt.Println` or `log.Printf` in library code
|
- Use `logging` package only — no `fmt.Println` or `log.Printf` in library code
|
||||||
|
- Error handling: use `coreerr.E()` from `go-log` — never `fmt.Errorf` or `errors.New` in library code
|
||||||
|
- File I/O: use `coreio.Local` from `go-io` — never `os.ReadFile`/`os.WriteFile` in library code (exception: `os.OpenFile` for streaming writes where `coreio` lacks support)
|
||||||
- Hot-path debug logging uses sampling pattern: `if counter.Add(1)%interval == 0`
|
- Hot-path debug logging uses sampling pattern: `if counter.Add(1)%interval == 0`
|
||||||
|
|
||||||
### Transport test helper
|
### Transport test helper
|
||||||
|
|
|
||||||
|
|
@ -98,7 +98,7 @@ The `Transport` manages a WebSocket server (gorilla/websocket) and outbound conn
|
||||||
| Timeout | −3.0 (floored at 0) |
|
| Timeout | −3.0 (floored at 0) |
|
||||||
| Default (new peer) | 50.0 |
|
| Default (new peer) | 50.0 |
|
||||||
|
|
||||||
**Peer name validation**: Names must be 1–64 characters, start and end with an alphanumeric character, and contain only alphanumeric, hyphen, underscore, or space characters.
|
**Peer name validation**: Empty names are permitted. Non-empty names must be 1–64 characters, start and end with an alphanumeric character, and contain only alphanumeric, hyphen, underscore, or space characters.
|
||||||
|
|
||||||
### message.go — Protocol Messages
|
### message.go — Protocol Messages
|
||||||
|
|
||||||
|
|
|
||||||
18
go.mod
18
go.mod
|
|
@ -1,10 +1,12 @@
|
||||||
module forge.lthn.ai/core/go-p2p
|
module dappco.re/go/core/p2p
|
||||||
|
|
||||||
go 1.26.0
|
go 1.26.0
|
||||||
|
|
||||||
require (
|
require (
|
||||||
forge.lthn.ai/Snider/Borg v0.2.1
|
dappco.re/go/core/io v0.2.0
|
||||||
forge.lthn.ai/Snider/Poindexter v0.0.2
|
dappco.re/go/core/log v0.1.0
|
||||||
|
forge.lthn.ai/Snider/Borg v0.3.1
|
||||||
|
forge.lthn.ai/Snider/Poindexter v0.0.3
|
||||||
github.com/adrg/xdg v0.5.3
|
github.com/adrg/xdg v0.5.3
|
||||||
github.com/google/uuid v1.6.0
|
github.com/google/uuid v1.6.0
|
||||||
github.com/gorilla/websocket v1.5.3
|
github.com/gorilla/websocket v1.5.3
|
||||||
|
|
@ -13,15 +15,13 @@ require (
|
||||||
|
|
||||||
require (
|
require (
|
||||||
forge.lthn.ai/Snider/Enchantrix v0.0.4 // indirect
|
forge.lthn.ai/Snider/Enchantrix v0.0.4 // indirect
|
||||||
github.com/ProtonMail/go-crypto v1.3.0 // indirect
|
forge.lthn.ai/core/go-log v0.0.4 // indirect
|
||||||
|
github.com/ProtonMail/go-crypto v1.4.0 // indirect
|
||||||
github.com/cloudflare/circl v1.6.3 // indirect
|
github.com/cloudflare/circl v1.6.3 // indirect
|
||||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
|
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
|
||||||
github.com/klauspost/compress v1.18.4 // indirect
|
github.com/klauspost/compress v1.18.4 // indirect
|
||||||
github.com/kr/pretty v0.3.1 // indirect
|
|
||||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
|
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
|
||||||
github.com/rogpeppe/go-internal v1.14.1 // indirect
|
golang.org/x/crypto v0.49.0 // indirect
|
||||||
golang.org/x/crypto v0.48.0 // indirect
|
golang.org/x/sys v0.42.0 // indirect
|
||||||
golang.org/x/sys v0.41.0 // indirect
|
|
||||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
|
|
||||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||||
)
|
)
|
||||||
|
|
|
||||||
32
go.sum
32
go.sum
|
|
@ -1,16 +1,21 @@
|
||||||
forge.lthn.ai/Snider/Borg v0.2.1 h1:Uf/YtUJLL8jlxTCjvP4J+5GHe3LLeALGtbh7zj8d8Qc=
|
dappco.re/go/core/io v0.2.0 h1:zuudgIiTsQQ5ipVt97saWdGLROovbEB/zdVyy9/l+I4=
|
||||||
forge.lthn.ai/Snider/Borg v0.2.1/go.mod h1:MVfolb7F6/A2LOIijcbBhWImu5db5NSMcSjvShMoMCA=
|
dappco.re/go/core/io v0.2.0/go.mod h1:1QnQV6X9LNgFKfm8SkOtR9LLaj3bDcsOIeJOOyjbL5E=
|
||||||
|
dappco.re/go/core/log v0.1.0 h1:pa71Vq2TD2aoEUQWFKwNcaJ3GBY8HbaNGqtE688Unyc=
|
||||||
|
dappco.re/go/core/log v0.1.0/go.mod h1:Nkqb8gsXhZAO8VLpx7B8i1iAmohhzqA20b9Zr8VUcJs=
|
||||||
|
forge.lthn.ai/Snider/Borg v0.3.1 h1:gfC1ZTpLoZai07oOWJiVeQ8+qJYK8A795tgVGJHbVL8=
|
||||||
|
forge.lthn.ai/Snider/Borg v0.3.1/go.mod h1:Z7DJD0yHXsxSyM7Mjl6/g4gH1NBsIz44Bf5AFlV76Wg=
|
||||||
forge.lthn.ai/Snider/Enchantrix v0.0.4 h1:biwpix/bdedfyc0iVeK15awhhJKH6TEMYOTXzHXx5TI=
|
forge.lthn.ai/Snider/Enchantrix v0.0.4 h1:biwpix/bdedfyc0iVeK15awhhJKH6TEMYOTXzHXx5TI=
|
||||||
forge.lthn.ai/Snider/Enchantrix v0.0.4/go.mod h1:OGCwuVeZPq3OPe2h6TX/ZbgEjHU6B7owpIBeXQGbSe0=
|
forge.lthn.ai/Snider/Enchantrix v0.0.4/go.mod h1:OGCwuVeZPq3OPe2h6TX/ZbgEjHU6B7owpIBeXQGbSe0=
|
||||||
forge.lthn.ai/Snider/Poindexter v0.0.2 h1:XXzSKFjO6MeftQAnB9qR+IkOTp9f57Tg4sIx8Qzi/II=
|
forge.lthn.ai/Snider/Poindexter v0.0.3 h1:cx5wRhuLRKBM8riIZyNVAT2a8rwRhn1dodFBktocsVE=
|
||||||
forge.lthn.ai/Snider/Poindexter v0.0.2/go.mod h1:ddzGia98k3HKkR0gl58IDzqz+MmgW2cQJOCNLfuWPpo=
|
forge.lthn.ai/Snider/Poindexter v0.0.3/go.mod h1:ddzGia98k3HKkR0gl58IDzqz+MmgW2cQJOCNLfuWPpo=
|
||||||
github.com/ProtonMail/go-crypto v1.3.0 h1:ILq8+Sf5If5DCpHQp4PbZdS1J7HDFRXz/+xKBiRGFrw=
|
forge.lthn.ai/core/go-log v0.0.4 h1:KTuCEPgFmuM8KJfnyQ8vPOU1Jg654W74h8IJvfQMfv0=
|
||||||
github.com/ProtonMail/go-crypto v1.3.0/go.mod h1:9whxjD8Rbs29b4XWbB8irEcE8KHMqaR2e7GWU1R+/PE=
|
forge.lthn.ai/core/go-log v0.0.4/go.mod h1:r14MXKOD3LF/sI8XUJQhRk/SZHBE7jAFVuCfgkXoZPw=
|
||||||
|
github.com/ProtonMail/go-crypto v1.4.0 h1:Zq/pbM3F5DFgJiMouxEdSVY44MVoQNEKp5d5QxIQceQ=
|
||||||
|
github.com/ProtonMail/go-crypto v1.4.0/go.mod h1:e1OaTyu5SYVrO9gKOEhTc+5UcXtTUa+P3uLudwcgPqo=
|
||||||
github.com/adrg/xdg v0.5.3 h1:xRnxJXne7+oWDatRhR1JLnvuccuIeCoBu2rtuLqQB78=
|
github.com/adrg/xdg v0.5.3 h1:xRnxJXne7+oWDatRhR1JLnvuccuIeCoBu2rtuLqQB78=
|
||||||
github.com/adrg/xdg v0.5.3/go.mod h1:nlTsY+NNiCBGCK2tpm09vRqfVzrc2fLmXGpBLF0zlTQ=
|
github.com/adrg/xdg v0.5.3/go.mod h1:nlTsY+NNiCBGCK2tpm09vRqfVzrc2fLmXGpBLF0zlTQ=
|
||||||
github.com/cloudflare/circl v1.6.3 h1:9GPOhQGF9MCYUeXyMYlqTR6a5gTrgR/fBLXvUgtVcg8=
|
github.com/cloudflare/circl v1.6.3 h1:9GPOhQGF9MCYUeXyMYlqTR6a5gTrgR/fBLXvUgtVcg8=
|
||||||
github.com/cloudflare/circl v1.6.3/go.mod h1:2eXP6Qfat4O/Yhh8BznvKnJ+uzEoTQ6jVKJRn81BiS4=
|
github.com/cloudflare/circl v1.6.3/go.mod h1:2eXP6Qfat4O/Yhh8BznvKnJ+uzEoTQ6jVKJRn81BiS4=
|
||||||
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
|
|
||||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
|
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
|
||||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||||
|
|
@ -19,25 +24,20 @@ github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aN
|
||||||
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||||
github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c=
|
github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c=
|
||||||
github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
|
github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
|
||||||
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
|
|
||||||
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
||||||
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
|
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
|
||||||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
|
||||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
|
||||||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||||
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||||
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
|
|
||||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
|
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
|
||||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
|
|
||||||
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
|
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
|
||||||
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
|
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
|
||||||
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
|
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
|
||||||
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
||||||
golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts=
|
golang.org/x/crypto v0.49.0 h1:+Ng2ULVvLHnJ/ZFEq4KdcDd/cfjrrjjNSXNzxg0Y4U4=
|
||||||
golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos=
|
golang.org/x/crypto v0.49.0/go.mod h1:ErX4dUh2UM+CFYiXZRTcMpEcN8b/1gxEuv3nODoYtCA=
|
||||||
golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k=
|
golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo=
|
||||||
golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
|
golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,8 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
coreerr "dappco.re/go/core/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Level represents the severity of a log message.
|
// Level represents the severity of a log message.
|
||||||
|
|
@ -278,6 +280,6 @@ func ParseLevel(s string) (Level, error) {
|
||||||
case "ERROR":
|
case "ERROR":
|
||||||
return LevelError, nil
|
return LevelError, nil
|
||||||
default:
|
default:
|
||||||
return LevelInfo, fmt.Errorf("unknown log level: %s", s)
|
return LevelInfo, coreerr.E("logging.ParseLevel", "unknown log level: "+s, nil)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,13 +6,14 @@ import (
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
coreio "dappco.re/go/core/io"
|
||||||
|
coreerr "dappco.re/go/core/log"
|
||||||
|
|
||||||
"forge.lthn.ai/Snider/Borg/pkg/datanode"
|
"forge.lthn.ai/Snider/Borg/pkg/datanode"
|
||||||
"forge.lthn.ai/Snider/Borg/pkg/tim"
|
"forge.lthn.ai/Snider/Borg/pkg/tim"
|
||||||
)
|
)
|
||||||
|
|
@ -49,14 +50,14 @@ func CreateProfileBundle(profileJSON []byte, name string, password string) (*Bun
|
||||||
// Create a TIM with just the profile config
|
// Create a TIM with just the profile config
|
||||||
t, err := tim.New()
|
t, err := tim.New()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create TIM: %w", err)
|
return nil, coreerr.E("CreateProfileBundle", "failed to create TIM", err)
|
||||||
}
|
}
|
||||||
t.Config = profileJSON
|
t.Config = profileJSON
|
||||||
|
|
||||||
// Encrypt to STIM format
|
// Encrypt to STIM format
|
||||||
stimData, err := t.ToSigil(password)
|
stimData, err := t.ToSigil(password)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to encrypt bundle: %w", err)
|
return nil, coreerr.E("CreateProfileBundle", "failed to encrypt bundle", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Calculate checksum
|
// Calculate checksum
|
||||||
|
|
@ -85,29 +86,30 @@ func CreateProfileBundleUnencrypted(profileJSON []byte, name string) (*Bundle, e
|
||||||
// CreateMinerBundle creates an encrypted bundle containing a miner binary and optional profile.
|
// CreateMinerBundle creates an encrypted bundle containing a miner binary and optional profile.
|
||||||
func CreateMinerBundle(minerPath string, profileJSON []byte, name string, password string) (*Bundle, error) {
|
func CreateMinerBundle(minerPath string, profileJSON []byte, name string, password string) (*Bundle, error) {
|
||||||
// Read miner binary
|
// Read miner binary
|
||||||
minerData, err := os.ReadFile(minerPath)
|
minerContent, err := coreio.Local.Read(minerPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to read miner binary: %w", err)
|
return nil, coreerr.E("CreateMinerBundle", "failed to read miner binary", err)
|
||||||
}
|
}
|
||||||
|
minerData := []byte(minerContent)
|
||||||
|
|
||||||
// Create a tarball with the miner binary
|
// Create a tarball with the miner binary
|
||||||
tarData, err := createTarball(map[string][]byte{
|
tarData, err := createTarball(map[string][]byte{
|
||||||
filepath.Base(minerPath): minerData,
|
filepath.Base(minerPath): minerData,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create tarball: %w", err)
|
return nil, coreerr.E("CreateMinerBundle", "failed to create tarball", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create DataNode from tarball
|
// Create DataNode from tarball
|
||||||
dn, err := datanode.FromTar(tarData)
|
dn, err := datanode.FromTar(tarData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create datanode: %w", err)
|
return nil, coreerr.E("CreateMinerBundle", "failed to create datanode", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create TIM from DataNode
|
// Create TIM from DataNode
|
||||||
t, err := tim.FromDataNode(dn)
|
t, err := tim.FromDataNode(dn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create TIM: %w", err)
|
return nil, coreerr.E("CreateMinerBundle", "failed to create TIM", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set profile as config if provided
|
// Set profile as config if provided
|
||||||
|
|
@ -118,7 +120,7 @@ func CreateMinerBundle(minerPath string, profileJSON []byte, name string, passwo
|
||||||
// Encrypt to STIM format
|
// Encrypt to STIM format
|
||||||
stimData, err := t.ToSigil(password)
|
stimData, err := t.ToSigil(password)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to encrypt bundle: %w", err)
|
return nil, coreerr.E("CreateMinerBundle", "failed to encrypt bundle", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
checksum := calculateChecksum(stimData)
|
checksum := calculateChecksum(stimData)
|
||||||
|
|
@ -135,7 +137,7 @@ func CreateMinerBundle(minerPath string, profileJSON []byte, name string, passwo
|
||||||
func ExtractProfileBundle(bundle *Bundle, password string) ([]byte, error) {
|
func ExtractProfileBundle(bundle *Bundle, password string) ([]byte, error) {
|
||||||
// Verify checksum first
|
// Verify checksum first
|
||||||
if calculateChecksum(bundle.Data) != bundle.Checksum {
|
if calculateChecksum(bundle.Data) != bundle.Checksum {
|
||||||
return nil, errors.New("checksum mismatch - bundle may be corrupted")
|
return nil, coreerr.E("ExtractProfileBundle", "checksum mismatch - bundle may be corrupted", nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// If it's unencrypted JSON, just return it
|
// If it's unencrypted JSON, just return it
|
||||||
|
|
@ -146,7 +148,7 @@ func ExtractProfileBundle(bundle *Bundle, password string) ([]byte, error) {
|
||||||
// Decrypt STIM format
|
// Decrypt STIM format
|
||||||
t, err := tim.FromSigil(bundle.Data, password)
|
t, err := tim.FromSigil(bundle.Data, password)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to decrypt bundle: %w", err)
|
return nil, coreerr.E("ExtractProfileBundle", "failed to decrypt bundle", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return t.Config, nil
|
return t.Config, nil
|
||||||
|
|
@ -156,25 +158,25 @@ func ExtractProfileBundle(bundle *Bundle, password string) ([]byte, error) {
|
||||||
func ExtractMinerBundle(bundle *Bundle, password string, destDir string) (string, []byte, error) {
|
func ExtractMinerBundle(bundle *Bundle, password string, destDir string) (string, []byte, error) {
|
||||||
// Verify checksum
|
// Verify checksum
|
||||||
if calculateChecksum(bundle.Data) != bundle.Checksum {
|
if calculateChecksum(bundle.Data) != bundle.Checksum {
|
||||||
return "", nil, errors.New("checksum mismatch - bundle may be corrupted")
|
return "", nil, coreerr.E("ExtractMinerBundle", "checksum mismatch - bundle may be corrupted", nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Decrypt STIM format
|
// Decrypt STIM format
|
||||||
t, err := tim.FromSigil(bundle.Data, password)
|
t, err := tim.FromSigil(bundle.Data, password)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", nil, fmt.Errorf("failed to decrypt bundle: %w", err)
|
return "", nil, coreerr.E("ExtractMinerBundle", "failed to decrypt bundle", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Convert rootfs to tarball and extract
|
// Convert rootfs to tarball and extract
|
||||||
tarData, err := t.RootFS.ToTar()
|
tarData, err := t.RootFS.ToTar()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", nil, fmt.Errorf("failed to convert rootfs to tar: %w", err)
|
return "", nil, coreerr.E("ExtractMinerBundle", "failed to convert rootfs to tar", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Extract tarball to destination
|
// Extract tarball to destination
|
||||||
minerPath, err := extractTarball(tarData, destDir)
|
minerPath, err := extractTarball(tarData, destDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", nil, fmt.Errorf("failed to extract tarball: %w", err)
|
return "", nil, coreerr.E("ExtractMinerBundle", "failed to extract tarball", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return minerPath, t.Config, nil
|
return minerPath, t.Config, nil
|
||||||
|
|
@ -254,11 +256,11 @@ func extractTarball(tarData []byte, destDir string) (string, error) {
|
||||||
// Ensure destDir is an absolute, clean path for security checks
|
// Ensure destDir is an absolute, clean path for security checks
|
||||||
absDestDir, err := filepath.Abs(destDir)
|
absDestDir, err := filepath.Abs(destDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", fmt.Errorf("failed to resolve destination directory: %w", err)
|
return "", coreerr.E("extractTarball", "failed to resolve destination directory", err)
|
||||||
}
|
}
|
||||||
absDestDir = filepath.Clean(absDestDir)
|
absDestDir = filepath.Clean(absDestDir)
|
||||||
|
|
||||||
if err := os.MkdirAll(absDestDir, 0755); err != nil {
|
if err := coreio.Local.EnsureDir(absDestDir); err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -279,12 +281,12 @@ func extractTarball(tarData []byte, destDir string) (string, error) {
|
||||||
|
|
||||||
// Reject absolute paths
|
// Reject absolute paths
|
||||||
if filepath.IsAbs(cleanName) {
|
if filepath.IsAbs(cleanName) {
|
||||||
return "", fmt.Errorf("invalid tar entry: absolute path not allowed: %s", hdr.Name)
|
return "", coreerr.E("extractTarball", "invalid tar entry: absolute path not allowed: "+hdr.Name, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reject paths that escape the destination directory
|
// Reject paths that escape the destination directory
|
||||||
if strings.HasPrefix(cleanName, ".."+string(os.PathSeparator)) || cleanName == ".." {
|
if strings.HasPrefix(cleanName, ".."+string(os.PathSeparator)) || cleanName == ".." {
|
||||||
return "", fmt.Errorf("invalid tar entry: path traversal attempt: %s", hdr.Name)
|
return "", coreerr.E("extractTarball", "invalid tar entry: path traversal attempt: "+hdr.Name, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Build the full path and verify it's within destDir
|
// Build the full path and verify it's within destDir
|
||||||
|
|
@ -293,23 +295,26 @@ func extractTarball(tarData []byte, destDir string) (string, error) {
|
||||||
|
|
||||||
// Final security check: ensure the path is still within destDir
|
// Final security check: ensure the path is still within destDir
|
||||||
if !strings.HasPrefix(fullPath, absDestDir+string(os.PathSeparator)) && fullPath != absDestDir {
|
if !strings.HasPrefix(fullPath, absDestDir+string(os.PathSeparator)) && fullPath != absDestDir {
|
||||||
return "", fmt.Errorf("invalid tar entry: path escape attempt: %s", hdr.Name)
|
return "", coreerr.E("extractTarball", "invalid tar entry: path escape attempt: "+hdr.Name, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
switch hdr.Typeflag {
|
switch hdr.Typeflag {
|
||||||
case tar.TypeDir:
|
case tar.TypeDir:
|
||||||
if err := os.MkdirAll(fullPath, os.FileMode(hdr.Mode)); err != nil {
|
if err := coreio.Local.EnsureDir(fullPath); err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
case tar.TypeReg:
|
case tar.TypeReg:
|
||||||
// Ensure parent directory exists
|
// Ensure parent directory exists
|
||||||
if err := os.MkdirAll(filepath.Dir(fullPath), 0755); err != nil {
|
if err := coreio.Local.EnsureDir(filepath.Dir(fullPath)); err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// os.OpenFile is used deliberately here instead of coreio.Local.Create/Write
|
||||||
|
// because coreio hardcodes file permissions (0644) and we need to preserve
|
||||||
|
// the tar header's mode bits — executable binaries require 0755.
|
||||||
f, err := os.OpenFile(fullPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, os.FileMode(hdr.Mode))
|
f, err := os.OpenFile(fullPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, os.FileMode(hdr.Mode))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", coreerr.E("extractTarball", "failed to create file "+hdr.Name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Limit file size to prevent decompression bombs (100MB max per file)
|
// Limit file size to prevent decompression bombs (100MB max per file)
|
||||||
|
|
@ -318,11 +323,11 @@ func extractTarball(tarData []byte, destDir string) (string, error) {
|
||||||
written, err := io.Copy(f, limitedReader)
|
written, err := io.Copy(f, limitedReader)
|
||||||
f.Close()
|
f.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", coreerr.E("extractTarball", "failed to write file "+hdr.Name, err)
|
||||||
}
|
}
|
||||||
if written > maxFileSize {
|
if written > maxFileSize {
|
||||||
os.Remove(fullPath)
|
coreio.Local.Delete(fullPath)
|
||||||
return "", fmt.Errorf("file %s exceeds maximum size of %d bytes", hdr.Name, maxFileSize)
|
return "", coreerr.E("extractTarball", "file "+hdr.Name+" exceeds maximum size", nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Track first executable
|
// Track first executable
|
||||||
|
|
|
||||||
|
|
@ -3,12 +3,12 @@ package node
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"forge.lthn.ai/core/go-p2p/logging"
|
coreerr "dappco.re/go/core/log"
|
||||||
|
|
||||||
|
"dappco.re/go/core/p2p/logging"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Controller manages remote peer operations from a controller node.
|
// Controller manages remote peer operations from a controller node.
|
||||||
|
|
@ -67,11 +67,11 @@ func (c *Controller) sendRequest(peerID string, msg *Message, timeout time.Durat
|
||||||
if c.transport.GetConnection(peerID) == nil {
|
if c.transport.GetConnection(peerID) == nil {
|
||||||
peer := c.peers.GetPeer(peerID)
|
peer := c.peers.GetPeer(peerID)
|
||||||
if peer == nil {
|
if peer == nil {
|
||||||
return nil, fmt.Errorf("peer not found: %s", peerID)
|
return nil, coreerr.E("Controller.sendRequest", "peer not found: "+peerID, nil)
|
||||||
}
|
}
|
||||||
conn, err := c.transport.Connect(peer)
|
conn, err := c.transport.Connect(peer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to connect to peer: %w", err)
|
return nil, coreerr.E("Controller.sendRequest", "failed to connect to peer", err)
|
||||||
}
|
}
|
||||||
// Use the real peer ID after handshake (it may have changed)
|
// Use the real peer ID after handshake (it may have changed)
|
||||||
actualPeerID = conn.Peer.ID
|
actualPeerID = conn.Peer.ID
|
||||||
|
|
@ -96,7 +96,7 @@ func (c *Controller) sendRequest(peerID string, msg *Message, timeout time.Durat
|
||||||
|
|
||||||
// Send the message
|
// Send the message
|
||||||
if err := c.transport.Send(actualPeerID, msg); err != nil {
|
if err := c.transport.Send(actualPeerID, msg); err != nil {
|
||||||
return nil, fmt.Errorf("failed to send message: %w", err)
|
return nil, coreerr.E("Controller.sendRequest", "failed to send message", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for response
|
// Wait for response
|
||||||
|
|
@ -107,7 +107,7 @@ func (c *Controller) sendRequest(peerID string, msg *Message, timeout time.Durat
|
||||||
case resp := <-respCh:
|
case resp := <-respCh:
|
||||||
return resp, nil
|
return resp, nil
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return nil, errors.New("request timeout")
|
return nil, coreerr.E("Controller.sendRequest", "request timeout", nil)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -120,7 +120,7 @@ func (c *Controller) GetRemoteStats(peerID string) (*StatsPayload, error) {
|
||||||
|
|
||||||
msg, err := NewMessage(MsgGetStats, identity.ID, peerID, nil)
|
msg, err := NewMessage(MsgGetStats, identity.ID, peerID, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create message: %w", err)
|
return nil, coreerr.E("Controller.GetRemoteStats", "failed to create message", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := c.sendRequest(peerID, msg, 10*time.Second)
|
resp, err := c.sendRequest(peerID, msg, 10*time.Second)
|
||||||
|
|
@ -144,7 +144,7 @@ func (c *Controller) StartRemoteMiner(peerID, minerType, profileID string, confi
|
||||||
}
|
}
|
||||||
|
|
||||||
if minerType == "" {
|
if minerType == "" {
|
||||||
return errors.New("miner type is required")
|
return coreerr.E("Controller.StartRemoteMiner", "miner type is required", nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
payload := StartMinerPayload{
|
payload := StartMinerPayload{
|
||||||
|
|
@ -155,7 +155,7 @@ func (c *Controller) StartRemoteMiner(peerID, minerType, profileID string, confi
|
||||||
|
|
||||||
msg, err := NewMessage(MsgStartMiner, identity.ID, peerID, payload)
|
msg, err := NewMessage(MsgStartMiner, identity.ID, peerID, payload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to create message: %w", err)
|
return coreerr.E("Controller.StartRemoteMiner", "failed to create message", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := c.sendRequest(peerID, msg, 30*time.Second)
|
resp, err := c.sendRequest(peerID, msg, 30*time.Second)
|
||||||
|
|
@ -169,7 +169,7 @@ func (c *Controller) StartRemoteMiner(peerID, minerType, profileID string, confi
|
||||||
}
|
}
|
||||||
|
|
||||||
if !ack.Success {
|
if !ack.Success {
|
||||||
return fmt.Errorf("miner start failed: %s", ack.Error)
|
return coreerr.E("Controller.StartRemoteMiner", "miner start failed: "+ack.Error, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -188,7 +188,7 @@ func (c *Controller) StopRemoteMiner(peerID, minerName string) error {
|
||||||
|
|
||||||
msg, err := NewMessage(MsgStopMiner, identity.ID, peerID, payload)
|
msg, err := NewMessage(MsgStopMiner, identity.ID, peerID, payload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to create message: %w", err)
|
return coreerr.E("Controller.StopRemoteMiner", "failed to create message", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := c.sendRequest(peerID, msg, 30*time.Second)
|
resp, err := c.sendRequest(peerID, msg, 30*time.Second)
|
||||||
|
|
@ -202,7 +202,7 @@ func (c *Controller) StopRemoteMiner(peerID, minerName string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if !ack.Success {
|
if !ack.Success {
|
||||||
return fmt.Errorf("miner stop failed: %s", ack.Error)
|
return coreerr.E("Controller.StopRemoteMiner", "miner stop failed: "+ack.Error, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -210,6 +210,11 @@ func (c *Controller) StopRemoteMiner(peerID, minerName string) error {
|
||||||
|
|
||||||
// GetRemoteLogs requests console logs from a remote miner.
|
// GetRemoteLogs requests console logs from a remote miner.
|
||||||
func (c *Controller) GetRemoteLogs(peerID, minerName string, lines int) ([]string, error) {
|
func (c *Controller) GetRemoteLogs(peerID, minerName string, lines int) ([]string, error) {
|
||||||
|
return c.GetRemoteLogsSince(peerID, minerName, lines, time.Time{})
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetRemoteLogsSince requests console logs from a remote miner after a point in time.
|
||||||
|
func (c *Controller) GetRemoteLogsSince(peerID, minerName string, lines int, since time.Time) ([]string, error) {
|
||||||
identity := c.node.GetIdentity()
|
identity := c.node.GetIdentity()
|
||||||
if identity == nil {
|
if identity == nil {
|
||||||
return nil, ErrIdentityNotInitialized
|
return nil, ErrIdentityNotInitialized
|
||||||
|
|
@ -219,10 +224,13 @@ func (c *Controller) GetRemoteLogs(peerID, minerName string, lines int) ([]strin
|
||||||
MinerName: minerName,
|
MinerName: minerName,
|
||||||
Lines: lines,
|
Lines: lines,
|
||||||
}
|
}
|
||||||
|
if !since.IsZero() {
|
||||||
|
payload.Since = since.UnixMilli()
|
||||||
|
}
|
||||||
|
|
||||||
msg, err := NewMessage(MsgGetLogs, identity.ID, peerID, payload)
|
msg, err := NewMessage(MsgGetLogs, identity.ID, peerID, payload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create message: %w", err)
|
return nil, coreerr.E("Controller.GetRemoteLogsSince", "failed to create message", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := c.sendRequest(peerID, msg, 10*time.Second)
|
resp, err := c.sendRequest(peerID, msg, 10*time.Second)
|
||||||
|
|
@ -281,7 +289,7 @@ func (c *Controller) PingPeer(peerID string) (float64, error) {
|
||||||
|
|
||||||
msg, err := NewMessage(MsgPing, identity.ID, peerID, payload)
|
msg, err := NewMessage(MsgPing, identity.ID, peerID, payload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("failed to create message: %w", err)
|
return 0, coreerr.E("Controller.PingPeer", "failed to create message", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := c.sendRequest(peerID, msg, 5*time.Second)
|
resp, err := c.sendRequest(peerID, msg, 5*time.Second)
|
||||||
|
|
@ -309,7 +317,7 @@ func (c *Controller) PingPeer(peerID string) (float64, error) {
|
||||||
func (c *Controller) ConnectToPeer(peerID string) error {
|
func (c *Controller) ConnectToPeer(peerID string) error {
|
||||||
peer := c.peers.GetPeer(peerID)
|
peer := c.peers.GetPeer(peerID)
|
||||||
if peer == nil {
|
if peer == nil {
|
||||||
return fmt.Errorf("peer not found: %s", peerID)
|
return coreerr.E("Controller.ConnectToPeer", "peer not found: "+peerID, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := c.transport.Connect(peer)
|
_, err := c.transport.Connect(peer)
|
||||||
|
|
@ -320,7 +328,7 @@ func (c *Controller) ConnectToPeer(peerID string) error {
|
||||||
func (c *Controller) DisconnectFromPeer(peerID string) error {
|
func (c *Controller) DisconnectFromPeer(peerID string) error {
|
||||||
conn := c.transport.GetConnection(peerID)
|
conn := c.transport.GetConnection(peerID)
|
||||||
if conn == nil {
|
if conn == nil {
|
||||||
return fmt.Errorf("peer not connected: %s", peerID)
|
return coreerr.E("Controller.DisconnectFromPeer", "peer not connected: "+peerID, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
return conn.Close()
|
return conn.Close()
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"net/url"
|
"net/url"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
@ -514,6 +515,40 @@ type mockMinerFull struct {
|
||||||
func (m *mockMinerFull) GetName() string { return m.name }
|
func (m *mockMinerFull) GetName() string { return m.name }
|
||||||
func (m *mockMinerFull) GetType() string { return m.minerType }
|
func (m *mockMinerFull) GetType() string { return m.minerType }
|
||||||
func (m *mockMinerFull) GetStats() (any, error) { return m.stats, nil }
|
func (m *mockMinerFull) GetStats() (any, error) { return m.stats, nil }
|
||||||
|
func (m *mockMinerFull) GetConsoleHistorySince(lines int, since time.Time) []string {
|
||||||
|
if since.IsZero() {
|
||||||
|
if lines >= len(m.consoleHistory) {
|
||||||
|
return m.consoleHistory
|
||||||
|
}
|
||||||
|
return m.consoleHistory[:lines]
|
||||||
|
}
|
||||||
|
|
||||||
|
filtered := make([]string, 0, len(m.consoleHistory))
|
||||||
|
for _, line := range m.consoleHistory {
|
||||||
|
if lineAfter(line, since) {
|
||||||
|
filtered = append(filtered, line)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if lines >= len(filtered) {
|
||||||
|
return filtered
|
||||||
|
}
|
||||||
|
return filtered[:lines]
|
||||||
|
}
|
||||||
|
|
||||||
|
func lineAfter(line string, since time.Time) bool {
|
||||||
|
start := strings.IndexByte(line, '[')
|
||||||
|
end := strings.IndexByte(line, ']')
|
||||||
|
if start != 0 || end <= start+1 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
ts, err := time.Parse("2006-01-02 15:04:05", line[start+1:end])
|
||||||
|
if err != nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return ts.After(since) || ts.Equal(since)
|
||||||
|
}
|
||||||
|
|
||||||
func (m *mockMinerFull) GetConsoleHistory(lines int) []string {
|
func (m *mockMinerFull) GetConsoleHistory(lines int) []string {
|
||||||
if lines >= len(m.consoleHistory) {
|
if lines >= len(m.consoleHistory) {
|
||||||
return m.consoleHistory
|
return m.consoleHistory
|
||||||
|
|
@ -616,6 +651,20 @@ func TestController_GetRemoteLogs_LimitedLines(t *testing.T) {
|
||||||
assert.Len(t, lines, 1, "should return only 1 line")
|
assert.Len(t, lines, 1, "should return only 1 line")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestController_GetRemoteLogsSince(t *testing.T) {
|
||||||
|
controller, _, tp := setupControllerPairWithMiner(t)
|
||||||
|
serverID := tp.ServerNode.GetIdentity().ID
|
||||||
|
|
||||||
|
since, err := time.Parse("2006-01-02 15:04:05", "2026-02-20 10:00:01")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
lines, err := controller.GetRemoteLogsSince(serverID, "running-miner", 10, since)
|
||||||
|
require.NoError(t, err, "GetRemoteLogsSince should succeed")
|
||||||
|
require.Len(t, lines, 2, "should return only log lines on or after the requested timestamp")
|
||||||
|
assert.Contains(t, lines[0], "connected to pool")
|
||||||
|
assert.Contains(t, lines[1], "new job received")
|
||||||
|
}
|
||||||
|
|
||||||
func TestController_GetRemoteLogs_NoIdentity(t *testing.T) {
|
func TestController_GetRemoteLogs_NoIdentity(t *testing.T) {
|
||||||
tp := setupTestTransportPair(t)
|
tp := setupTestTransportPair(t)
|
||||||
nmNoID, err := NewNodeManagerWithPaths(
|
nmNoID, err := NewNodeManagerWithPaths(
|
||||||
|
|
|
||||||
|
|
@ -1,13 +1,14 @@
|
||||||
package node
|
package node
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"iter"
|
"iter"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"forge.lthn.ai/core/go-p2p/logging"
|
coreerr "dappco.re/go/core/log"
|
||||||
"forge.lthn.ai/core/go-p2p/ueps"
|
|
||||||
|
"dappco.re/go/core/p2p/logging"
|
||||||
|
"dappco.re/go/core/p2p/ueps"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ThreatScoreThreshold is the maximum allowable threat score. Packets exceeding
|
// ThreatScoreThreshold is the maximum allowable threat score. Packets exceeding
|
||||||
|
|
@ -133,12 +134,12 @@ func (d *Dispatcher) Dispatch(pkt *ueps.ParsedPacket) error {
|
||||||
var (
|
var (
|
||||||
// ErrThreatScoreExceeded is returned when a packet's ThreatScore exceeds
|
// ErrThreatScoreExceeded is returned when a packet's ThreatScore exceeds
|
||||||
// the safety threshold.
|
// the safety threshold.
|
||||||
ErrThreatScoreExceeded = fmt.Errorf("packet rejected: threat score exceeds safety threshold (%d)", ThreatScoreThreshold)
|
ErrThreatScoreExceeded = coreerr.E("Dispatcher.Dispatch", fmt.Sprintf("packet rejected: threat score exceeds safety threshold (%d)", ThreatScoreThreshold), nil)
|
||||||
|
|
||||||
// ErrUnknownIntent is returned when no handler is registered for the
|
// ErrUnknownIntent is returned when no handler is registered for the
|
||||||
// packet's IntentID.
|
// packet's IntentID.
|
||||||
ErrUnknownIntent = errors.New("packet dropped: unknown intent")
|
ErrUnknownIntent = coreerr.E("Dispatcher.Dispatch", "packet dropped: unknown intent", nil)
|
||||||
|
|
||||||
// ErrNilPacket is returned when a nil packet is passed to Dispatch.
|
// ErrNilPacket is returned when a nil packet is passed to Dispatch.
|
||||||
ErrNilPacket = errors.New("dispatch: nil packet")
|
ErrNilPacket = coreerr.E("Dispatcher.Dispatch", "nil packet", nil)
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,7 @@ import (
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"forge.lthn.ai/core/go-p2p/ueps"
|
"dappco.re/go/core/p2p/ueps"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -1,14 +1,14 @@
|
||||||
package node
|
package node
|
||||||
|
|
||||||
import "errors"
|
import coreerr "dappco.re/go/core/log"
|
||||||
|
|
||||||
// Sentinel errors shared across the node package.
|
// Sentinel errors shared across the node package.
|
||||||
var (
|
var (
|
||||||
// ErrIdentityNotInitialized is returned when a node operation requires
|
// ErrIdentityNotInitialized is returned when a node operation requires
|
||||||
// a node identity but none has been generated or loaded.
|
// a node identity but none has been generated or loaded.
|
||||||
ErrIdentityNotInitialized = errors.New("node identity not initialized")
|
ErrIdentityNotInitialized = coreerr.E("node", "node identity not initialized", nil)
|
||||||
|
|
||||||
// ErrMinerManagerNotConfigured is returned when a miner operation is
|
// ErrMinerManagerNotConfigured is returned when a miner operation is
|
||||||
// attempted but no MinerManager has been set on the Worker.
|
// attempted but no MinerManager has been set on the Worker.
|
||||||
ErrMinerManagerNotConfigured = errors.New("miner manager not configured")
|
ErrMinerManagerNotConfigured = coreerr.E("node", "miner manager not configured", nil)
|
||||||
)
|
)
|
||||||
|
|
|
||||||
118
node/identity.go
118
node/identity.go
|
|
@ -8,12 +8,14 @@ import (
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
coreio "dappco.re/go/core/io"
|
||||||
|
coreerr "dappco.re/go/core/log"
|
||||||
|
|
||||||
"forge.lthn.ai/Snider/Borg/pkg/stmf"
|
"forge.lthn.ai/Snider/Borg/pkg/stmf"
|
||||||
"github.com/adrg/xdg"
|
"github.com/adrg/xdg"
|
||||||
)
|
)
|
||||||
|
|
@ -25,7 +27,7 @@ const ChallengeSize = 32
|
||||||
func GenerateChallenge() ([]byte, error) {
|
func GenerateChallenge() ([]byte, error) {
|
||||||
challenge := make([]byte, ChallengeSize)
|
challenge := make([]byte, ChallengeSize)
|
||||||
if _, err := rand.Read(challenge); err != nil {
|
if _, err := rand.Read(challenge); err != nil {
|
||||||
return nil, fmt.Errorf("failed to generate challenge: %w", err)
|
return nil, coreerr.E("GenerateChallenge", "failed to generate challenge", err)
|
||||||
}
|
}
|
||||||
return challenge, nil
|
return challenge, nil
|
||||||
}
|
}
|
||||||
|
|
@ -79,12 +81,12 @@ type NodeManager struct {
|
||||||
func NewNodeManager() (*NodeManager, error) {
|
func NewNodeManager() (*NodeManager, error) {
|
||||||
keyPath, err := xdg.DataFile("lethean-desktop/node/private.key")
|
keyPath, err := xdg.DataFile("lethean-desktop/node/private.key")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to get key path: %w", err)
|
return nil, coreerr.E("NodeManager.New", "failed to get key path", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
configPath, err := xdg.ConfigFile("lethean-desktop/node.json")
|
configPath, err := xdg.ConfigFile("lethean-desktop/node.json")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to get config path: %w", err)
|
return nil, coreerr.E("NodeManager.New", "failed to get config path", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return NewNodeManagerWithPaths(keyPath, configPath)
|
return NewNodeManagerWithPaths(keyPath, configPath)
|
||||||
|
|
@ -107,6 +109,48 @@ func NewNodeManagerWithPaths(keyPath, configPath string) (*NodeManager, error) {
|
||||||
return nm, nil
|
return nm, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// LoadOrCreateIdentity loads the node identity from the default XDG paths or
|
||||||
|
// generates a new dual-role identity when none exists yet.
|
||||||
|
func LoadOrCreateIdentity() (*NodeManager, error) {
|
||||||
|
keyPath, err := xdg.DataFile("lethean-desktop/node/private.key")
|
||||||
|
if err != nil {
|
||||||
|
return nil, coreerr.E("LoadOrCreateIdentity", "failed to get key path", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
configPath, err := xdg.ConfigFile("lethean-desktop/node.json")
|
||||||
|
if err != nil {
|
||||||
|
return nil, coreerr.E("LoadOrCreateIdentity", "failed to get config path", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return LoadOrCreateIdentityWithPaths(keyPath, configPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
// LoadOrCreateIdentityWithPaths loads an existing identity from the supplied
|
||||||
|
// paths or creates a new dual-role identity if no persisted identity exists.
|
||||||
|
// The generated identity name falls back to the host name, then a stable
|
||||||
|
// project-specific default if the host name cannot be determined.
|
||||||
|
func LoadOrCreateIdentityWithPaths(keyPath, configPath string) (*NodeManager, error) {
|
||||||
|
nm, err := NewNodeManagerWithPaths(keyPath, configPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if nm.HasIdentity() {
|
||||||
|
return nm, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
name, err := os.Hostname()
|
||||||
|
if err != nil || name == "" {
|
||||||
|
name = "lethean-node"
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := nm.GenerateIdentity(name, RoleDual); err != nil {
|
||||||
|
return nil, coreerr.E("LoadOrCreateIdentityWithPaths", "failed to generate identity", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nm, nil
|
||||||
|
}
|
||||||
|
|
||||||
// HasIdentity returns true if a node identity has been initialized.
|
// HasIdentity returns true if a node identity has been initialized.
|
||||||
func (n *NodeManager) HasIdentity() bool {
|
func (n *NodeManager) HasIdentity() bool {
|
||||||
n.mu.RLock()
|
n.mu.RLock()
|
||||||
|
|
@ -134,7 +178,7 @@ func (n *NodeManager) GenerateIdentity(name string, role NodeRole) error {
|
||||||
// Generate X25519 keypair using STMF
|
// Generate X25519 keypair using STMF
|
||||||
keyPair, err := stmf.GenerateKeyPair()
|
keyPair, err := stmf.GenerateKeyPair()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to generate keypair: %w", err)
|
return coreerr.E("NodeManager.GenerateIdentity", "failed to generate keypair", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Derive node ID from public key (first 16 bytes as hex = 32 char ID)
|
// Derive node ID from public key (first 16 bytes as hex = 32 char ID)
|
||||||
|
|
@ -155,12 +199,12 @@ func (n *NodeManager) GenerateIdentity(name string, role NodeRole) error {
|
||||||
|
|
||||||
// Save private key
|
// Save private key
|
||||||
if err := n.savePrivateKey(); err != nil {
|
if err := n.savePrivateKey(); err != nil {
|
||||||
return fmt.Errorf("failed to save private key: %w", err)
|
return coreerr.E("NodeManager.GenerateIdentity", "failed to save private key", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Save identity config
|
// Save identity config
|
||||||
if err := n.saveIdentity(); err != nil {
|
if err := n.saveIdentity(); err != nil {
|
||||||
return fmt.Errorf("failed to save identity: %w", err)
|
return coreerr.E("NodeManager.GenerateIdentity", "failed to save identity", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -179,19 +223,19 @@ func (n *NodeManager) DeriveSharedSecret(peerPubKeyBase64 string) ([]byte, error
|
||||||
// Load peer's public key
|
// Load peer's public key
|
||||||
peerPubKey, err := stmf.LoadPublicKeyBase64(peerPubKeyBase64)
|
peerPubKey, err := stmf.LoadPublicKeyBase64(peerPubKeyBase64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to load peer public key: %w", err)
|
return nil, coreerr.E("NodeManager.DeriveSharedSecret", "failed to load peer public key", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load our private key
|
// Load our private key
|
||||||
privateKey, err := ecdh.X25519().NewPrivateKey(n.privateKey)
|
privateKey, err := ecdh.X25519().NewPrivateKey(n.privateKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to load private key: %w", err)
|
return nil, coreerr.E("NodeManager.DeriveSharedSecret", "failed to load private key", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Derive shared secret using ECDH
|
// Derive shared secret using ECDH
|
||||||
sharedSecret, err := privateKey.ECDH(peerPubKey)
|
sharedSecret, err := privateKey.ECDH(peerPubKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to derive shared secret: %w", err)
|
return nil, coreerr.E("NodeManager.DeriveSharedSecret", "failed to derive shared secret", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Hash the shared secret using SHA-256 (same pattern as Borg/trix)
|
// Hash the shared secret using SHA-256 (same pattern as Borg/trix)
|
||||||
|
|
@ -203,13 +247,16 @@ func (n *NodeManager) DeriveSharedSecret(peerPubKeyBase64 string) ([]byte, error
|
||||||
func (n *NodeManager) savePrivateKey() error {
|
func (n *NodeManager) savePrivateKey() error {
|
||||||
// Ensure directory exists
|
// Ensure directory exists
|
||||||
dir := filepath.Dir(n.keyPath)
|
dir := filepath.Dir(n.keyPath)
|
||||||
if err := os.MkdirAll(dir, 0700); err != nil {
|
if err := coreio.Local.EnsureDir(dir); err != nil {
|
||||||
return fmt.Errorf("failed to create key directory: %w", err)
|
return coreerr.E("NodeManager.savePrivateKey", "failed to create key directory", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write private key with restricted permissions (0600)
|
// Write private key and then tighten permissions explicitly.
|
||||||
if err := os.WriteFile(n.keyPath, n.privateKey, 0600); err != nil {
|
if err := coreio.Local.Write(n.keyPath, string(n.privateKey)); err != nil {
|
||||||
return fmt.Errorf("failed to write private key: %w", err)
|
return coreerr.E("NodeManager.savePrivateKey", "failed to write private key", err)
|
||||||
|
}
|
||||||
|
if err := os.Chmod(n.keyPath, 0600); err != nil {
|
||||||
|
return coreerr.E("NodeManager.savePrivateKey", "failed to set private key permissions", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -219,17 +266,17 @@ func (n *NodeManager) savePrivateKey() error {
|
||||||
func (n *NodeManager) saveIdentity() error {
|
func (n *NodeManager) saveIdentity() error {
|
||||||
// Ensure directory exists
|
// Ensure directory exists
|
||||||
dir := filepath.Dir(n.configPath)
|
dir := filepath.Dir(n.configPath)
|
||||||
if err := os.MkdirAll(dir, 0755); err != nil {
|
if err := coreio.Local.EnsureDir(dir); err != nil {
|
||||||
return fmt.Errorf("failed to create config directory: %w", err)
|
return coreerr.E("NodeManager.saveIdentity", "failed to create config directory", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
data, err := json.MarshalIndent(n.identity, "", " ")
|
data, err := json.MarshalIndent(n.identity, "", " ")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to marshal identity: %w", err)
|
return coreerr.E("NodeManager.saveIdentity", "failed to marshal identity", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := os.WriteFile(n.configPath, data, 0644); err != nil {
|
if err := coreio.Local.Write(n.configPath, string(data)); err != nil {
|
||||||
return fmt.Errorf("failed to write identity: %w", err)
|
return coreerr.E("NodeManager.saveIdentity", "failed to write identity", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -238,26 +285,27 @@ func (n *NodeManager) saveIdentity() error {
|
||||||
// loadIdentity loads the node identity from disk.
|
// loadIdentity loads the node identity from disk.
|
||||||
func (n *NodeManager) loadIdentity() error {
|
func (n *NodeManager) loadIdentity() error {
|
||||||
// Load identity config
|
// Load identity config
|
||||||
data, err := os.ReadFile(n.configPath)
|
content, err := coreio.Local.Read(n.configPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to read identity: %w", err)
|
return coreerr.E("NodeManager.loadIdentity", "failed to read identity", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var identity NodeIdentity
|
var identity NodeIdentity
|
||||||
if err := json.Unmarshal(data, &identity); err != nil {
|
if err := json.Unmarshal([]byte(content), &identity); err != nil {
|
||||||
return fmt.Errorf("failed to unmarshal identity: %w", err)
|
return coreerr.E("NodeManager.loadIdentity", "failed to unmarshal identity", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load private key
|
// Load private key
|
||||||
privateKey, err := os.ReadFile(n.keyPath)
|
keyContent, err := coreio.Local.Read(n.keyPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to read private key: %w", err)
|
return coreerr.E("NodeManager.loadIdentity", "failed to read private key", err)
|
||||||
}
|
}
|
||||||
|
privateKey := []byte(keyContent)
|
||||||
|
|
||||||
// Reconstruct keypair from private key
|
// Reconstruct keypair from private key
|
||||||
keyPair, err := stmf.LoadKeyPair(privateKey)
|
keyPair, err := stmf.LoadKeyPair(privateKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to load keypair: %w", err)
|
return coreerr.E("NodeManager.loadIdentity", "failed to load keypair", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
n.identity = &identity
|
n.identity = &identity
|
||||||
|
|
@ -272,14 +320,18 @@ func (n *NodeManager) Delete() error {
|
||||||
n.mu.Lock()
|
n.mu.Lock()
|
||||||
defer n.mu.Unlock()
|
defer n.mu.Unlock()
|
||||||
|
|
||||||
// Remove private key
|
// Remove private key (ignore if already absent)
|
||||||
if err := os.Remove(n.keyPath); err != nil && !os.IsNotExist(err) {
|
if coreio.Local.Exists(n.keyPath) {
|
||||||
return fmt.Errorf("failed to remove private key: %w", err)
|
if err := coreio.Local.Delete(n.keyPath); err != nil {
|
||||||
|
return coreerr.E("NodeManager.Delete", "failed to remove private key", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove identity config
|
// Remove identity config (ignore if already absent)
|
||||||
if err := os.Remove(n.configPath); err != nil && !os.IsNotExist(err) {
|
if coreio.Local.Exists(n.configPath) {
|
||||||
return fmt.Errorf("failed to remove identity: %w", err)
|
if err := coreio.Local.Delete(n.configPath); err != nil {
|
||||||
|
return coreerr.E("NodeManager.Delete", "failed to remove identity", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
n.identity = nil
|
n.identity = nil
|
||||||
|
|
|
||||||
|
|
@ -74,6 +74,25 @@ func TestNodeIdentity(t *testing.T) {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("PrivateKeyPermissions", func(t *testing.T) {
|
||||||
|
nm, cleanup := setupTestNodeManager(t)
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
err := nm.GenerateIdentity("permission-test", RoleDual)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to generate identity: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
info, err := os.Stat(nm.keyPath)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to stat private key: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if got := info.Mode().Perm(); got != 0600 {
|
||||||
|
t.Fatalf("expected private key permissions 0600, got %04o", got)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
t.Run("LoadExistingIdentity", func(t *testing.T) {
|
t.Run("LoadExistingIdentity", func(t *testing.T) {
|
||||||
tmpDir, err := os.MkdirTemp("", "node-load-test")
|
tmpDir, err := os.MkdirTemp("", "node-load-test")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -196,6 +215,47 @@ func TestNodeIdentity(t *testing.T) {
|
||||||
t.Error("should not have identity after delete")
|
t.Error("should not have identity after delete")
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("LoadOrCreateIdentityWithPaths", func(t *testing.T) {
|
||||||
|
tmpDir, err := os.MkdirTemp("", "node-load-or-create-test")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to create temp dir: %v", err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(tmpDir)
|
||||||
|
|
||||||
|
keyPath := filepath.Join(tmpDir, "private.key")
|
||||||
|
configPath := filepath.Join(tmpDir, "node.json")
|
||||||
|
|
||||||
|
nm, err := LoadOrCreateIdentityWithPaths(keyPath, configPath)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to load or create identity: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !nm.HasIdentity() {
|
||||||
|
t.Fatal("expected identity to be initialised")
|
||||||
|
}
|
||||||
|
|
||||||
|
identity := nm.GetIdentity()
|
||||||
|
if identity == nil {
|
||||||
|
t.Fatal("identity should not be nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
if identity.Name == "" {
|
||||||
|
t.Error("identity name should be populated")
|
||||||
|
}
|
||||||
|
|
||||||
|
if identity.Role != RoleDual {
|
||||||
|
t.Errorf("expected default role dual, got %s", identity.Role)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := os.Stat(keyPath); err != nil {
|
||||||
|
t.Fatalf("expected private key to be persisted: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := os.Stat(configPath); err != nil {
|
||||||
|
t.Fatalf("expected identity config to be persisted: %v", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNodeRoles(t *testing.T) {
|
func TestNodeRoles(t *testing.T) {
|
||||||
|
|
|
||||||
|
|
@ -13,7 +13,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"forge.lthn.ai/core/go-p2p/ueps"
|
"dappco.re/go/core/p2p/ueps"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,8 @@ package levin
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"errors"
|
|
||||||
|
coreerr "dappco.re/go/core/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
// HeaderSize is the exact byte length of a serialised Levin header.
|
// HeaderSize is the exact byte length of a serialised Levin header.
|
||||||
|
|
@ -42,8 +43,8 @@ const (
|
||||||
|
|
||||||
// Sentinel errors returned by DecodeHeader.
|
// Sentinel errors returned by DecodeHeader.
|
||||||
var (
|
var (
|
||||||
ErrBadSignature = errors.New("levin: bad signature")
|
ErrBadSignature = coreerr.E("levin", "bad signature", nil)
|
||||||
ErrPayloadTooBig = errors.New("levin: payload exceeds maximum size")
|
ErrPayloadTooBig = coreerr.E("levin", "payload exceeds maximum size", nil)
|
||||||
)
|
)
|
||||||
|
|
||||||
// Header is the 33-byte packed header that prefixes every Levin message.
|
// Header is the 33-byte packed header that prefixes every Levin message.
|
||||||
|
|
|
||||||
|
|
@ -5,11 +5,12 @@ package levin
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"maps"
|
"maps"
|
||||||
"math"
|
"math"
|
||||||
"slices"
|
"slices"
|
||||||
|
|
||||||
|
coreerr "dappco.re/go/core/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Portable storage signatures and version (9-byte header).
|
// Portable storage signatures and version (9-byte header).
|
||||||
|
|
@ -40,12 +41,12 @@ const (
|
||||||
|
|
||||||
// Sentinel errors for storage encoding and decoding.
|
// Sentinel errors for storage encoding and decoding.
|
||||||
var (
|
var (
|
||||||
ErrStorageBadSignature = errors.New("levin: bad storage signature")
|
ErrStorageBadSignature = coreerr.E("levin.storage", "bad storage signature", nil)
|
||||||
ErrStorageTruncated = errors.New("levin: truncated storage data")
|
ErrStorageTruncated = coreerr.E("levin.storage", "truncated storage data", nil)
|
||||||
ErrStorageBadVersion = errors.New("levin: unsupported storage version")
|
ErrStorageBadVersion = coreerr.E("levin.storage", "unsupported storage version", nil)
|
||||||
ErrStorageNameTooLong = errors.New("levin: entry name exceeds 255 bytes")
|
ErrStorageNameTooLong = coreerr.E("levin.storage", "entry name exceeds 255 bytes", nil)
|
||||||
ErrStorageTypeMismatch = errors.New("levin: value type mismatch")
|
ErrStorageTypeMismatch = coreerr.E("levin.storage", "value type mismatch", nil)
|
||||||
ErrStorageUnknownType = errors.New("levin: unknown type tag")
|
ErrStorageUnknownType = coreerr.E("levin.storage", "unknown type tag", nil)
|
||||||
)
|
)
|
||||||
|
|
||||||
// Section is an ordered map of named values forming a portable storage section.
|
// Section is an ordered map of named values forming a portable storage section.
|
||||||
|
|
@ -393,7 +394,7 @@ func encodeValue(buf []byte, v Value) ([]byte, error) {
|
||||||
return encodeSection(buf, v.objectVal)
|
return encodeSection(buf, v.objectVal)
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("%w: 0x%02x", ErrStorageUnknownType, v.Type)
|
return nil, coreerr.E("levin.encodeValue", fmt.Sprintf("unknown type tag: 0x%02x", v.Type), ErrStorageUnknownType)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -440,7 +441,7 @@ func encodeArray(buf []byte, v Value) ([]byte, error) {
|
||||||
return buf, nil
|
return buf, nil
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("%w: array of 0x%02x", ErrStorageUnknownType, elemType)
|
return nil, coreerr.E("levin.encodeArray", fmt.Sprintf("unknown type tag: array of 0x%02x", elemType), ErrStorageUnknownType)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -475,7 +476,7 @@ func DecodeStorage(data []byte) (Section, error) {
|
||||||
func decodeSection(buf []byte) (Section, int, error) {
|
func decodeSection(buf []byte) (Section, int, error) {
|
||||||
count, n, err := UnpackVarint(buf)
|
count, n, err := UnpackVarint(buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, fmt.Errorf("section entry count: %w", err)
|
return nil, 0, coreerr.E("levin.decodeSection", "section entry count", err)
|
||||||
}
|
}
|
||||||
off := n
|
off := n
|
||||||
|
|
||||||
|
|
@ -506,7 +507,7 @@ func decodeSection(buf []byte) (Section, int, error) {
|
||||||
// Value.
|
// Value.
|
||||||
val, consumed, err := decodeValue(buf[off:], tag)
|
val, consumed, err := decodeValue(buf[off:], tag)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, fmt.Errorf("field %q: %w", name, err)
|
return nil, 0, coreerr.E("levin.decodeSection", "field "+name, err)
|
||||||
}
|
}
|
||||||
off += consumed
|
off += consumed
|
||||||
|
|
||||||
|
|
@ -612,7 +613,7 @@ func decodeValue(buf []byte, tag uint8) (Value, int, error) {
|
||||||
return Value{Type: TypeObject, objectVal: sec}, consumed, nil
|
return Value{Type: TypeObject, objectVal: sec}, consumed, nil
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return Value{}, 0, fmt.Errorf("%w: 0x%02x", ErrStorageUnknownType, tag)
|
return Value{}, 0, coreerr.E("levin.decodeValue", fmt.Sprintf("unknown type tag: 0x%02x", tag), ErrStorageUnknownType)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -680,6 +681,6 @@ func decodeArray(buf []byte, tag uint8) (Value, int, error) {
|
||||||
return Value{Type: tag, objectArray: arr}, off, nil
|
return Value{Type: tag, objectArray: arr}, off, nil
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return Value{}, 0, fmt.Errorf("%w: array of 0x%02x", ErrStorageUnknownType, elemType)
|
return Value{}, 0, coreerr.E("levin.decodeArray", fmt.Sprintf("unknown type tag: array of 0x%02x", elemType), ErrStorageUnknownType)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,8 @@ package levin
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"errors"
|
|
||||||
|
coreerr "dappco.re/go/core/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Size-mark bits occupying the two lowest bits of the first byte.
|
// Size-mark bits occupying the two lowest bits of the first byte.
|
||||||
|
|
@ -22,10 +23,10 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
// ErrVarintTruncated is returned when the buffer is too short.
|
// ErrVarintTruncated is returned when the buffer is too short.
|
||||||
var ErrVarintTruncated = errors.New("levin: truncated varint")
|
var ErrVarintTruncated = coreerr.E("levin", "truncated varint", nil)
|
||||||
|
|
||||||
// ErrVarintOverflow is returned when the value is too large to encode.
|
// ErrVarintOverflow is returned when the value is too large to encode.
|
||||||
var ErrVarintOverflow = errors.New("levin: varint overflow")
|
var ErrVarintOverflow = coreerr.E("levin", "varint overflow", nil)
|
||||||
|
|
||||||
// PackVarint encodes v using the epee portable-storage varint scheme.
|
// PackVarint encodes v using the epee portable-storage varint scheme.
|
||||||
// The low two bits of the first byte indicate the total encoded width;
|
// The low two bits of the first byte indicate the total encoded width;
|
||||||
|
|
|
||||||
150
node/peer.go
150
node/peer.go
|
|
@ -2,19 +2,19 @@ package node
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"iter"
|
"iter"
|
||||||
"maps"
|
"maps"
|
||||||
"os"
|
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"regexp"
|
"regexp"
|
||||||
"slices"
|
"slices"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
coreio "dappco.re/go/core/io"
|
||||||
|
coreerr "dappco.re/go/core/log"
|
||||||
|
"dappco.re/go/core/p2p/logging"
|
||||||
|
|
||||||
poindexter "forge.lthn.ai/Snider/Poindexter"
|
poindexter "forge.lthn.ai/Snider/Poindexter"
|
||||||
"forge.lthn.ai/core/go-p2p/logging"
|
|
||||||
"github.com/adrg/xdg"
|
"github.com/adrg/xdg"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -51,9 +51,8 @@ const (
|
||||||
PeerAuthAllowlist
|
PeerAuthAllowlist
|
||||||
)
|
)
|
||||||
|
|
||||||
// Peer name validation constants
|
// Peer name validation constants.
|
||||||
const (
|
const (
|
||||||
PeerNameMinLength = 1
|
|
||||||
PeerNameMaxLength = 64
|
PeerNameMaxLength = 64
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -72,20 +71,18 @@ func safeKeyPrefix(key string) string {
|
||||||
}
|
}
|
||||||
|
|
||||||
// validatePeerName checks if a peer name is valid.
|
// validatePeerName checks if a peer name is valid.
|
||||||
// Peer names must be 1-64 characters, start and end with alphanumeric,
|
// Empty names are permitted. Non-empty names must be 1-64 characters,
|
||||||
// and contain only alphanumeric, hyphens, underscores, and spaces.
|
// start and end with alphanumeric, and contain only alphanumeric,
|
||||||
|
// hyphens, underscores, and spaces.
|
||||||
func validatePeerName(name string) error {
|
func validatePeerName(name string) error {
|
||||||
if name == "" {
|
if name == "" {
|
||||||
return nil // Empty names are allowed (optional field)
|
return nil
|
||||||
}
|
|
||||||
if len(name) < PeerNameMinLength {
|
|
||||||
return fmt.Errorf("peer name too short (min %d characters)", PeerNameMinLength)
|
|
||||||
}
|
}
|
||||||
if len(name) > PeerNameMaxLength {
|
if len(name) > PeerNameMaxLength {
|
||||||
return fmt.Errorf("peer name too long (max %d characters)", PeerNameMaxLength)
|
return coreerr.E("validatePeerName", "peer name too long", nil)
|
||||||
}
|
}
|
||||||
if !peerNameRegex.MatchString(name) {
|
if !peerNameRegex.MatchString(name) {
|
||||||
return errors.New("peer name contains invalid characters (use alphanumeric, hyphens, underscores, spaces)")
|
return coreerr.E("validatePeerName", "peer name contains invalid characters (use alphanumeric, hyphens, underscores, spaces)", nil)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
@ -101,6 +98,7 @@ type PeerRegistry struct {
|
||||||
authMode PeerAuthMode // How to handle unknown peers
|
authMode PeerAuthMode // How to handle unknown peers
|
||||||
allowedPublicKeys map[string]bool // Allowlist of public keys (when authMode is Allowlist)
|
allowedPublicKeys map[string]bool // Allowlist of public keys (when authMode is Allowlist)
|
||||||
allowedPublicKeyMu sync.RWMutex // Protects allowedPublicKeys
|
allowedPublicKeyMu sync.RWMutex // Protects allowedPublicKeys
|
||||||
|
allowlistPath string // Sidecar file for persisted allowlist keys
|
||||||
|
|
||||||
// Debounce disk writes
|
// Debounce disk writes
|
||||||
dirty bool // Whether there are unsaved changes
|
dirty bool // Whether there are unsaved changes
|
||||||
|
|
@ -123,7 +121,7 @@ var (
|
||||||
func NewPeerRegistry() (*PeerRegistry, error) {
|
func NewPeerRegistry() (*PeerRegistry, error) {
|
||||||
peersPath, err := xdg.ConfigFile("lethean-desktop/peers.json")
|
peersPath, err := xdg.ConfigFile("lethean-desktop/peers.json")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to get peers path: %w", err)
|
return nil, coreerr.E("PeerRegistry.New", "failed to get peers path", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return NewPeerRegistryWithPath(peersPath)
|
return NewPeerRegistryWithPath(peersPath)
|
||||||
|
|
@ -135,6 +133,7 @@ func NewPeerRegistryWithPath(peersPath string) (*PeerRegistry, error) {
|
||||||
pr := &PeerRegistry{
|
pr := &PeerRegistry{
|
||||||
peers: make(map[string]*Peer),
|
peers: make(map[string]*Peer),
|
||||||
path: peersPath,
|
path: peersPath,
|
||||||
|
allowlistPath: peersPath + ".allowlist.json",
|
||||||
stopChan: make(chan struct{}),
|
stopChan: make(chan struct{}),
|
||||||
authMode: PeerAuthOpen, // Default to open for backward compatibility
|
authMode: PeerAuthOpen, // Default to open for backward compatibility
|
||||||
allowedPublicKeys: make(map[string]bool),
|
allowedPublicKeys: make(map[string]bool),
|
||||||
|
|
@ -144,7 +143,12 @@ func NewPeerRegistryWithPath(peersPath string) (*PeerRegistry, error) {
|
||||||
if err := pr.load(); err != nil {
|
if err := pr.load(); err != nil {
|
||||||
// No existing peers, that's ok
|
// No existing peers, that's ok
|
||||||
pr.rebuildKDTree()
|
pr.rebuildKDTree()
|
||||||
return pr, nil
|
}
|
||||||
|
|
||||||
|
// Load any persisted allowlist entries. This is best effort so that a
|
||||||
|
// missing or corrupt sidecar does not block peer registry startup.
|
||||||
|
if err := pr.loadAllowedPublicKeys(); err != nil {
|
||||||
|
logging.Warn("failed to load peer allowlist", logging.Fields{"error": err})
|
||||||
}
|
}
|
||||||
|
|
||||||
pr.rebuildKDTree()
|
pr.rebuildKDTree()
|
||||||
|
|
@ -169,17 +173,25 @@ func (r *PeerRegistry) GetAuthMode() PeerAuthMode {
|
||||||
// AllowPublicKey adds a public key to the allowlist.
|
// AllowPublicKey adds a public key to the allowlist.
|
||||||
func (r *PeerRegistry) AllowPublicKey(publicKey string) {
|
func (r *PeerRegistry) AllowPublicKey(publicKey string) {
|
||||||
r.allowedPublicKeyMu.Lock()
|
r.allowedPublicKeyMu.Lock()
|
||||||
defer r.allowedPublicKeyMu.Unlock()
|
|
||||||
r.allowedPublicKeys[publicKey] = true
|
r.allowedPublicKeys[publicKey] = true
|
||||||
|
r.allowedPublicKeyMu.Unlock()
|
||||||
logging.Debug("public key added to allowlist", logging.Fields{"key": safeKeyPrefix(publicKey)})
|
logging.Debug("public key added to allowlist", logging.Fields{"key": safeKeyPrefix(publicKey)})
|
||||||
|
|
||||||
|
if err := r.saveAllowedPublicKeys(); err != nil {
|
||||||
|
logging.Warn("failed to persist peer allowlist", logging.Fields{"error": err})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// RevokePublicKey removes a public key from the allowlist.
|
// RevokePublicKey removes a public key from the allowlist.
|
||||||
func (r *PeerRegistry) RevokePublicKey(publicKey string) {
|
func (r *PeerRegistry) RevokePublicKey(publicKey string) {
|
||||||
r.allowedPublicKeyMu.Lock()
|
r.allowedPublicKeyMu.Lock()
|
||||||
defer r.allowedPublicKeyMu.Unlock()
|
|
||||||
delete(r.allowedPublicKeys, publicKey)
|
delete(r.allowedPublicKeys, publicKey)
|
||||||
|
r.allowedPublicKeyMu.Unlock()
|
||||||
logging.Debug("public key removed from allowlist", logging.Fields{"key": safeKeyPrefix(publicKey)})
|
logging.Debug("public key removed from allowlist", logging.Fields{"key": safeKeyPrefix(publicKey)})
|
||||||
|
|
||||||
|
if err := r.saveAllowedPublicKeys(); err != nil {
|
||||||
|
logging.Warn("failed to persist peer allowlist", logging.Fields{"error": err})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsPublicKeyAllowed checks if a public key is in the allowlist.
|
// IsPublicKeyAllowed checks if a public key is in the allowlist.
|
||||||
|
|
@ -244,7 +256,7 @@ func (r *PeerRegistry) AddPeer(peer *Peer) error {
|
||||||
|
|
||||||
if peer.ID == "" {
|
if peer.ID == "" {
|
||||||
r.mu.Unlock()
|
r.mu.Unlock()
|
||||||
return errors.New("peer ID is required")
|
return coreerr.E("PeerRegistry.AddPeer", "peer ID is required", nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate peer name (P2P-LOW-3)
|
// Validate peer name (P2P-LOW-3)
|
||||||
|
|
@ -255,7 +267,7 @@ func (r *PeerRegistry) AddPeer(peer *Peer) error {
|
||||||
|
|
||||||
if _, exists := r.peers[peer.ID]; exists {
|
if _, exists := r.peers[peer.ID]; exists {
|
||||||
r.mu.Unlock()
|
r.mu.Unlock()
|
||||||
return fmt.Errorf("peer %s already exists", peer.ID)
|
return coreerr.E("PeerRegistry.AddPeer", "peer "+peer.ID+" already exists", nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set defaults
|
// Set defaults
|
||||||
|
|
@ -280,7 +292,7 @@ func (r *PeerRegistry) UpdatePeer(peer *Peer) error {
|
||||||
|
|
||||||
if _, exists := r.peers[peer.ID]; !exists {
|
if _, exists := r.peers[peer.ID]; !exists {
|
||||||
r.mu.Unlock()
|
r.mu.Unlock()
|
||||||
return fmt.Errorf("peer %s not found", peer.ID)
|
return coreerr.E("PeerRegistry.UpdatePeer", "peer "+peer.ID+" not found", nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
r.peers[peer.ID] = peer
|
r.peers[peer.ID] = peer
|
||||||
|
|
@ -297,7 +309,7 @@ func (r *PeerRegistry) RemovePeer(id string) error {
|
||||||
|
|
||||||
if _, exists := r.peers[id]; !exists {
|
if _, exists := r.peers[id]; !exists {
|
||||||
r.mu.Unlock()
|
r.mu.Unlock()
|
||||||
return fmt.Errorf("peer %s not found", id)
|
return coreerr.E("PeerRegistry.RemovePeer", "peer "+id+" not found", nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
delete(r.peers, id)
|
delete(r.peers, id)
|
||||||
|
|
@ -351,7 +363,7 @@ func (r *PeerRegistry) UpdateMetrics(id string, pingMS, geoKM float64, hops int)
|
||||||
peer, exists := r.peers[id]
|
peer, exists := r.peers[id]
|
||||||
if !exists {
|
if !exists {
|
||||||
r.mu.Unlock()
|
r.mu.Unlock()
|
||||||
return fmt.Errorf("peer %s not found", id)
|
return coreerr.E("PeerRegistry.UpdateMetrics", "peer "+id+" not found", nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
peer.PingMS = pingMS
|
peer.PingMS = pingMS
|
||||||
|
|
@ -373,7 +385,7 @@ func (r *PeerRegistry) UpdateScore(id string, score float64) error {
|
||||||
peer, exists := r.peers[id]
|
peer, exists := r.peers[id]
|
||||||
if !exists {
|
if !exists {
|
||||||
r.mu.Unlock()
|
r.mu.Unlock()
|
||||||
return fmt.Errorf("peer %s not found", id)
|
return coreerr.E("PeerRegistry.UpdateScore", "peer "+id+" not found", nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clamp score to 0-100
|
// Clamp score to 0-100
|
||||||
|
|
@ -656,8 +668,8 @@ func (r *PeerRegistry) scheduleSave() {
|
||||||
func (r *PeerRegistry) saveNow() error {
|
func (r *PeerRegistry) saveNow() error {
|
||||||
// Ensure directory exists
|
// Ensure directory exists
|
||||||
dir := filepath.Dir(r.path)
|
dir := filepath.Dir(r.path)
|
||||||
if err := os.MkdirAll(dir, 0755); err != nil {
|
if err := coreio.Local.EnsureDir(dir); err != nil {
|
||||||
return fmt.Errorf("failed to create peers directory: %w", err)
|
return coreerr.E("PeerRegistry.saveNow", "failed to create peers directory", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Convert to slice for JSON
|
// Convert to slice for JSON
|
||||||
|
|
@ -665,18 +677,18 @@ func (r *PeerRegistry) saveNow() error {
|
||||||
|
|
||||||
data, err := json.MarshalIndent(peers, "", " ")
|
data, err := json.MarshalIndent(peers, "", " ")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to marshal peers: %w", err)
|
return coreerr.E("PeerRegistry.saveNow", "failed to marshal peers", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Use atomic write pattern: write to temp file, then rename
|
// Use atomic write pattern: write to temp file, then rename
|
||||||
tmpPath := r.path + ".tmp"
|
tmpPath := r.path + ".tmp"
|
||||||
if err := os.WriteFile(tmpPath, data, 0644); err != nil {
|
if err := coreio.Local.Write(tmpPath, string(data)); err != nil {
|
||||||
return fmt.Errorf("failed to write peers temp file: %w", err)
|
return coreerr.E("PeerRegistry.saveNow", "failed to write peers temp file", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := os.Rename(tmpPath, r.path); err != nil {
|
if err := coreio.Local.Rename(tmpPath, r.path); err != nil {
|
||||||
os.Remove(tmpPath) // Clean up temp file
|
coreio.Local.Delete(tmpPath) // Clean up temp file
|
||||||
return fmt.Errorf("failed to rename peers file: %w", err)
|
return coreerr.E("PeerRegistry.saveNow", "failed to rename peers file", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -708,6 +720,72 @@ func (r *PeerRegistry) Close() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// saveAllowedPublicKeys persists the allowlist to disk immediately.
|
||||||
|
// It keeps the allowlist in a separate sidecar file so peer persistence remains
|
||||||
|
// backwards compatible with the existing peers.json array format.
|
||||||
|
func (r *PeerRegistry) saveAllowedPublicKeys() error {
|
||||||
|
r.allowedPublicKeyMu.RLock()
|
||||||
|
keys := make([]string, 0, len(r.allowedPublicKeys))
|
||||||
|
for key := range r.allowedPublicKeys {
|
||||||
|
keys = append(keys, key)
|
||||||
|
}
|
||||||
|
r.allowedPublicKeyMu.RUnlock()
|
||||||
|
|
||||||
|
slices.Sort(keys)
|
||||||
|
|
||||||
|
dir := filepath.Dir(r.allowlistPath)
|
||||||
|
if err := coreio.Local.EnsureDir(dir); err != nil {
|
||||||
|
return coreerr.E("PeerRegistry.saveAllowedPublicKeys", "failed to create allowlist directory", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
data, err := json.MarshalIndent(keys, "", " ")
|
||||||
|
if err != nil {
|
||||||
|
return coreerr.E("PeerRegistry.saveAllowedPublicKeys", "failed to marshal allowlist", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tmpPath := r.allowlistPath + ".tmp"
|
||||||
|
if err := coreio.Local.Write(tmpPath, string(data)); err != nil {
|
||||||
|
return coreerr.E("PeerRegistry.saveAllowedPublicKeys", "failed to write allowlist temp file", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := coreio.Local.Rename(tmpPath, r.allowlistPath); err != nil {
|
||||||
|
coreio.Local.Delete(tmpPath)
|
||||||
|
return coreerr.E("PeerRegistry.saveAllowedPublicKeys", "failed to rename allowlist file", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// loadAllowedPublicKeys loads the allowlist from disk.
|
||||||
|
func (r *PeerRegistry) loadAllowedPublicKeys() error {
|
||||||
|
if !coreio.Local.Exists(r.allowlistPath) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
content, err := coreio.Local.Read(r.allowlistPath)
|
||||||
|
if err != nil {
|
||||||
|
return coreerr.E("PeerRegistry.loadAllowedPublicKeys", "failed to read allowlist", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var keys []string
|
||||||
|
if err := json.Unmarshal([]byte(content), &keys); err != nil {
|
||||||
|
return coreerr.E("PeerRegistry.loadAllowedPublicKeys", "failed to unmarshal allowlist", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
r.allowedPublicKeyMu.Lock()
|
||||||
|
defer r.allowedPublicKeyMu.Unlock()
|
||||||
|
|
||||||
|
r.allowedPublicKeys = make(map[string]bool, len(keys))
|
||||||
|
for _, key := range keys {
|
||||||
|
if key == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
r.allowedPublicKeys[key] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// save is a helper that schedules a debounced save.
|
// save is a helper that schedules a debounced save.
|
||||||
// Kept for backward compatibility but now debounces writes.
|
// Kept for backward compatibility but now debounces writes.
|
||||||
// Must NOT be called with r.mu held.
|
// Must NOT be called with r.mu held.
|
||||||
|
|
@ -718,14 +796,14 @@ func (r *PeerRegistry) save() error {
|
||||||
|
|
||||||
// load reads peers from disk.
|
// load reads peers from disk.
|
||||||
func (r *PeerRegistry) load() error {
|
func (r *PeerRegistry) load() error {
|
||||||
data, err := os.ReadFile(r.path)
|
content, err := coreio.Local.Read(r.path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to read peers: %w", err)
|
return coreerr.E("PeerRegistry.load", "failed to read peers", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var peers []*Peer
|
var peers []*Peer
|
||||||
if err := json.Unmarshal(data, &peers); err != nil {
|
if err := json.Unmarshal([]byte(content), &peers); err != nil {
|
||||||
return fmt.Errorf("failed to unmarshal peers: %w", err)
|
return coreerr.E("PeerRegistry.load", "failed to unmarshal peers", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
r.peers = make(map[string]*Peer)
|
r.peers = make(map[string]*Peer)
|
||||||
|
|
|
||||||
|
|
@ -389,6 +389,39 @@ func TestPeerRegistry_Persistence(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPeerRegistry_AllowlistPersistence(t *testing.T) {
|
||||||
|
tmpDir, _ := os.MkdirTemp("", "allowlist-persist-test")
|
||||||
|
defer os.RemoveAll(tmpDir)
|
||||||
|
|
||||||
|
peersPath := filepath.Join(tmpDir, "peers.json")
|
||||||
|
|
||||||
|
pr1, err := NewPeerRegistryWithPath(peersPath)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to create first registry: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
key := "allowlist-key-1234567890"
|
||||||
|
pr1.AllowPublicKey(key)
|
||||||
|
|
||||||
|
if err := pr1.Close(); err != nil {
|
||||||
|
t.Fatalf("failed to close first registry: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
pr2, err := NewPeerRegistryWithPath(peersPath)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to create second registry: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !pr2.IsPublicKeyAllowed(key) {
|
||||||
|
t.Fatal("expected allowlisted key to survive reload")
|
||||||
|
}
|
||||||
|
|
||||||
|
keys := pr2.ListAllowedPublicKeys()
|
||||||
|
if !slices.Contains(keys, key) {
|
||||||
|
t.Fatalf("expected allowlisted key to be listed after reload, got %v", keys)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// --- Security Feature Tests ---
|
// --- Security Feature Tests ---
|
||||||
|
|
||||||
func TestPeerRegistry_AuthMode(t *testing.T) {
|
func TestPeerRegistry_AuthMode(t *testing.T) {
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,9 @@
|
||||||
package node
|
package node
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
coreerr "dappco.re/go/core/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ProtocolError represents an error from the remote peer.
|
// ProtocolError represents an error from the remote peer.
|
||||||
|
|
@ -25,7 +26,7 @@ type ResponseHandler struct{}
|
||||||
// 3. If response type matches expected (returns error if not)
|
// 3. If response type matches expected (returns error if not)
|
||||||
func (h *ResponseHandler) ValidateResponse(resp *Message, expectedType MessageType) error {
|
func (h *ResponseHandler) ValidateResponse(resp *Message, expectedType MessageType) error {
|
||||||
if resp == nil {
|
if resp == nil {
|
||||||
return errors.New("nil response")
|
return coreerr.E("ResponseHandler.ValidateResponse", "nil response", nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check for error response
|
// Check for error response
|
||||||
|
|
@ -39,7 +40,7 @@ func (h *ResponseHandler) ValidateResponse(resp *Message, expectedType MessageTy
|
||||||
|
|
||||||
// Check expected type
|
// Check expected type
|
||||||
if resp.Type != expectedType {
|
if resp.Type != expectedType {
|
||||||
return fmt.Errorf("unexpected response type: expected %s, got %s", expectedType, resp.Type)
|
return coreerr.E("ResponseHandler.ValidateResponse", "unexpected response type: expected "+string(expectedType)+", got "+string(resp.Type), nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -54,7 +55,7 @@ func (h *ResponseHandler) ParseResponse(resp *Message, expectedType MessageType,
|
||||||
|
|
||||||
if target != nil {
|
if target != nil {
|
||||||
if err := resp.ParsePayload(target); err != nil {
|
if err := resp.ParsePayload(target); err != nil {
|
||||||
return fmt.Errorf("failed to parse %s payload: %w", expectedType, err)
|
return coreerr.E("ResponseHandler.ParseResponse", "failed to parse "+string(expectedType)+" payload", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,6 @@ import (
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"iter"
|
"iter"
|
||||||
"maps"
|
"maps"
|
||||||
|
|
@ -16,8 +15,10 @@ import (
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
coreerr "dappco.re/go/core/log"
|
||||||
|
"dappco.re/go/core/p2p/logging"
|
||||||
|
|
||||||
"forge.lthn.ai/Snider/Borg/pkg/smsg"
|
"forge.lthn.ai/Snider/Borg/pkg/smsg"
|
||||||
"forge.lthn.ai/core/go-p2p/logging"
|
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -75,10 +76,20 @@ func NewMessageDeduplicator(ttl time.Duration) *MessageDeduplicator {
|
||||||
|
|
||||||
// IsDuplicate checks if a message ID has been seen recently
|
// IsDuplicate checks if a message ID has been seen recently
|
||||||
func (d *MessageDeduplicator) IsDuplicate(msgID string) bool {
|
func (d *MessageDeduplicator) IsDuplicate(msgID string) bool {
|
||||||
d.mu.RLock()
|
d.mu.Lock()
|
||||||
_, exists := d.seen[msgID]
|
defer d.mu.Unlock()
|
||||||
d.mu.RUnlock()
|
|
||||||
return exists
|
seenAt, exists := d.seen[msgID]
|
||||||
|
if !exists {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if d.ttl > 0 && time.Since(seenAt) > d.ttl {
|
||||||
|
delete(d.seen, msgID)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Mark records a message ID as seen
|
// Mark records a message ID as seen
|
||||||
|
|
@ -289,7 +300,7 @@ func (t *Transport) Stop() error {
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
if err := t.server.Shutdown(ctx); err != nil {
|
if err := t.server.Shutdown(ctx); err != nil {
|
||||||
return fmt.Errorf("server shutdown error: %w", err)
|
return coreerr.E("Transport.Stop", "server shutdown error", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -320,7 +331,7 @@ func (t *Transport) Connect(peer *Peer) (*PeerConnection, error) {
|
||||||
}
|
}
|
||||||
conn, _, err := dialer.Dial(u.String(), nil)
|
conn, _, err := dialer.Dial(u.String(), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to connect to peer: %w", err)
|
return nil, coreerr.E("Transport.Connect", "failed to connect to peer", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
pc := &PeerConnection{
|
pc := &PeerConnection{
|
||||||
|
|
@ -335,7 +346,7 @@ func (t *Transport) Connect(peer *Peer) (*PeerConnection, error) {
|
||||||
// This also derives and stores the shared secret in pc.SharedSecret
|
// This also derives and stores the shared secret in pc.SharedSecret
|
||||||
if err := t.performHandshake(pc); err != nil {
|
if err := t.performHandshake(pc); err != nil {
|
||||||
conn.Close()
|
conn.Close()
|
||||||
return nil, fmt.Errorf("handshake failed: %w", err)
|
return nil, coreerr.E("Transport.Connect", "handshake failed", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store connection using the real peer ID from handshake
|
// Store connection using the real peer ID from handshake
|
||||||
|
|
@ -368,7 +379,7 @@ func (t *Transport) Send(peerID string, msg *Message) error {
|
||||||
t.mu.RUnlock()
|
t.mu.RUnlock()
|
||||||
|
|
||||||
if !exists {
|
if !exists {
|
||||||
return fmt.Errorf("peer %s not connected", peerID)
|
return coreerr.E("Transport.Send", "peer "+peerID+" not connected", nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
return pc.Send(msg)
|
return pc.Send(msg)
|
||||||
|
|
@ -628,7 +639,7 @@ func (t *Transport) performHandshake(pc *PeerConnection) error {
|
||||||
// Generate challenge for the server to prove it has the matching private key
|
// Generate challenge for the server to prove it has the matching private key
|
||||||
challenge, err := GenerateChallenge()
|
challenge, err := GenerateChallenge()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("generate challenge: %w", err)
|
return coreerr.E("Transport.performHandshake", "generate challenge", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
payload := HandshakePayload{
|
payload := HandshakePayload{
|
||||||
|
|
@ -639,41 +650,41 @@ func (t *Transport) performHandshake(pc *PeerConnection) error {
|
||||||
|
|
||||||
msg, err := NewMessage(MsgHandshake, identity.ID, pc.Peer.ID, payload)
|
msg, err := NewMessage(MsgHandshake, identity.ID, pc.Peer.ID, payload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("create handshake message: %w", err)
|
return coreerr.E("Transport.performHandshake", "create handshake message", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// First message is unencrypted (peer needs our public key)
|
// First message is unencrypted (peer needs our public key)
|
||||||
data, err := MarshalJSON(msg)
|
data, err := MarshalJSON(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("marshal handshake message: %w", err)
|
return coreerr.E("Transport.performHandshake", "marshal handshake message", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := pc.Conn.WriteMessage(websocket.TextMessage, data); err != nil {
|
if err := pc.Conn.WriteMessage(websocket.TextMessage, data); err != nil {
|
||||||
return fmt.Errorf("send handshake: %w", err)
|
return coreerr.E("Transport.performHandshake", "send handshake", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for ack
|
// Wait for ack
|
||||||
_, ackData, err := pc.Conn.ReadMessage()
|
_, ackData, err := pc.Conn.ReadMessage()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("read handshake ack: %w", err)
|
return coreerr.E("Transport.performHandshake", "read handshake ack", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var ackMsg Message
|
var ackMsg Message
|
||||||
if err := json.Unmarshal(ackData, &ackMsg); err != nil {
|
if err := json.Unmarshal(ackData, &ackMsg); err != nil {
|
||||||
return fmt.Errorf("unmarshal handshake ack: %w", err)
|
return coreerr.E("Transport.performHandshake", "unmarshal handshake ack", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if ackMsg.Type != MsgHandshakeAck {
|
if ackMsg.Type != MsgHandshakeAck {
|
||||||
return fmt.Errorf("expected handshake_ack, got %s", ackMsg.Type)
|
return coreerr.E("Transport.performHandshake", "expected handshake_ack, got "+string(ackMsg.Type), nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
var ackPayload HandshakeAckPayload
|
var ackPayload HandshakeAckPayload
|
||||||
if err := ackMsg.ParsePayload(&ackPayload); err != nil {
|
if err := ackMsg.ParsePayload(&ackPayload); err != nil {
|
||||||
return fmt.Errorf("parse handshake ack payload: %w", err)
|
return coreerr.E("Transport.performHandshake", "parse handshake ack payload", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !ackPayload.Accepted {
|
if !ackPayload.Accepted {
|
||||||
return fmt.Errorf("handshake rejected: %s", ackPayload.Reason)
|
return coreerr.E("Transport.performHandshake", "handshake rejected: "+ackPayload.Reason, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update peer with the received identity info
|
// Update peer with the received identity info
|
||||||
|
|
@ -685,15 +696,15 @@ func (t *Transport) performHandshake(pc *PeerConnection) error {
|
||||||
// Verify challenge response - derive shared secret first using the peer's public key
|
// Verify challenge response - derive shared secret first using the peer's public key
|
||||||
sharedSecret, err := t.node.DeriveSharedSecret(pc.Peer.PublicKey)
|
sharedSecret, err := t.node.DeriveSharedSecret(pc.Peer.PublicKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("derive shared secret for challenge verification: %w", err)
|
return coreerr.E("Transport.performHandshake", "derive shared secret for challenge verification", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify the server's response to our challenge
|
// Verify the server's response to our challenge
|
||||||
if len(ackPayload.ChallengeResponse) == 0 {
|
if len(ackPayload.ChallengeResponse) == 0 {
|
||||||
return errors.New("server did not provide challenge response")
|
return coreerr.E("Transport.performHandshake", "server did not provide challenge response", nil)
|
||||||
}
|
}
|
||||||
if !VerifyChallenge(challenge, ackPayload.ChallengeResponse, sharedSecret) {
|
if !VerifyChallenge(challenge, ackPayload.ChallengeResponse, sharedSecret) {
|
||||||
return errors.New("challenge response verification failed: server may not have matching private key")
|
return coreerr.E("Transport.performHandshake", "challenge response verification failed: server may not have matching private key", nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store the shared secret for later use
|
// Store the shared secret for later use
|
||||||
|
|
@ -840,7 +851,7 @@ func (pc *PeerConnection) Send(msg *Message) error {
|
||||||
|
|
||||||
// Set write deadline to prevent blocking forever
|
// Set write deadline to prevent blocking forever
|
||||||
if err := pc.Conn.SetWriteDeadline(time.Now().Add(10 * time.Second)); err != nil {
|
if err := pc.Conn.SetWriteDeadline(time.Now().Add(10 * time.Second)); err != nil {
|
||||||
return fmt.Errorf("failed to set write deadline: %w", err)
|
return coreerr.E("PeerConnection.Send", "failed to set write deadline", err)
|
||||||
}
|
}
|
||||||
defer pc.Conn.SetWriteDeadline(time.Time{}) // Reset deadline after send
|
defer pc.Conn.SetWriteDeadline(time.Time{}) // Reset deadline after send
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -159,6 +159,17 @@ func TestMessageDeduplicator(t *testing.T) {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("ExpiredEntriesAreNotDuplicates", func(t *testing.T) {
|
||||||
|
d := NewMessageDeduplicator(25 * time.Millisecond)
|
||||||
|
d.Mark("msg-expired")
|
||||||
|
|
||||||
|
time.Sleep(40 * time.Millisecond)
|
||||||
|
|
||||||
|
if d.IsDuplicate("msg-expired") {
|
||||||
|
t.Error("expired message should not remain a duplicate")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
t.Run("ConcurrentAccess", func(t *testing.T) {
|
t.Run("ConcurrentAccess", func(t *testing.T) {
|
||||||
d := NewMessageDeduplicator(5 * time.Minute)
|
d := NewMessageDeduplicator(5 * time.Minute)
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|
|
||||||
|
|
@ -3,12 +3,12 @@ package node
|
||||||
import (
|
import (
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"forge.lthn.ai/core/go-p2p/logging"
|
coreerr "dappco.re/go/core/log"
|
||||||
|
|
||||||
|
"dappco.re/go/core/p2p/logging"
|
||||||
"github.com/adrg/xdg"
|
"github.com/adrg/xdg"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -26,7 +26,7 @@ type MinerInstance interface {
|
||||||
GetName() string
|
GetName() string
|
||||||
GetType() string
|
GetType() string
|
||||||
GetStats() (any, error)
|
GetStats() (any, error)
|
||||||
GetConsoleHistory(lines int) []string
|
GetConsoleHistorySince(lines int, since time.Time) []string
|
||||||
}
|
}
|
||||||
|
|
||||||
// ProfileManager interface for profile operations.
|
// ProfileManager interface for profile operations.
|
||||||
|
|
@ -55,7 +55,6 @@ func NewWorker(node *NodeManager, transport *Transport) *Worker {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// SetMinerManager sets the miner manager for handling miner operations.
|
// SetMinerManager sets the miner manager for handling miner operations.
|
||||||
func (w *Worker) SetMinerManager(manager MinerManager) {
|
func (w *Worker) SetMinerManager(manager MinerManager) {
|
||||||
w.minerManager = manager
|
w.minerManager = manager
|
||||||
|
|
@ -119,7 +118,7 @@ func (w *Worker) HandleMessage(conn *PeerConnection, msg *Message) {
|
||||||
func (w *Worker) handlePing(msg *Message) (*Message, error) {
|
func (w *Worker) handlePing(msg *Message) (*Message, error) {
|
||||||
var ping PingPayload
|
var ping PingPayload
|
||||||
if err := msg.ParsePayload(&ping); err != nil {
|
if err := msg.ParsePayload(&ping); err != nil {
|
||||||
return nil, fmt.Errorf("invalid ping payload: %w", err)
|
return nil, coreerr.E("Worker.handlePing", "invalid ping payload", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
pong := PongPayload{
|
pong := PongPayload{
|
||||||
|
|
@ -202,12 +201,12 @@ func (w *Worker) handleStartMiner(msg *Message) (*Message, error) {
|
||||||
|
|
||||||
var payload StartMinerPayload
|
var payload StartMinerPayload
|
||||||
if err := msg.ParsePayload(&payload); err != nil {
|
if err := msg.ParsePayload(&payload); err != nil {
|
||||||
return nil, fmt.Errorf("invalid start miner payload: %w", err)
|
return nil, coreerr.E("Worker.handleStartMiner", "invalid start miner payload", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate miner type is provided
|
// Validate miner type is provided
|
||||||
if payload.MinerType == "" {
|
if payload.MinerType == "" {
|
||||||
return nil, errors.New("miner type is required")
|
return nil, coreerr.E("Worker.handleStartMiner", "miner type is required", nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the config from the profile or use the override
|
// Get the config from the profile or use the override
|
||||||
|
|
@ -217,11 +216,11 @@ func (w *Worker) handleStartMiner(msg *Message) (*Message, error) {
|
||||||
} else if w.profileManager != nil {
|
} else if w.profileManager != nil {
|
||||||
profile, err := w.profileManager.GetProfile(payload.ProfileID)
|
profile, err := w.profileManager.GetProfile(payload.ProfileID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("profile not found: %s", payload.ProfileID)
|
return nil, coreerr.E("Worker.handleStartMiner", "profile not found: "+payload.ProfileID, nil)
|
||||||
}
|
}
|
||||||
config = profile
|
config = profile
|
||||||
} else {
|
} else {
|
||||||
return nil, errors.New("no config provided and no profile manager configured")
|
return nil, coreerr.E("Worker.handleStartMiner", "no config provided and no profile manager configured", nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start the miner
|
// Start the miner
|
||||||
|
|
@ -249,7 +248,7 @@ func (w *Worker) handleStopMiner(msg *Message) (*Message, error) {
|
||||||
|
|
||||||
var payload StopMinerPayload
|
var payload StopMinerPayload
|
||||||
if err := msg.ParsePayload(&payload); err != nil {
|
if err := msg.ParsePayload(&payload); err != nil {
|
||||||
return nil, fmt.Errorf("invalid stop miner payload: %w", err)
|
return nil, coreerr.E("Worker.handleStopMiner", "invalid stop miner payload", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err := w.minerManager.StopMiner(payload.MinerName)
|
err := w.minerManager.StopMiner(payload.MinerName)
|
||||||
|
|
@ -272,7 +271,7 @@ func (w *Worker) handleGetLogs(msg *Message) (*Message, error) {
|
||||||
|
|
||||||
var payload GetLogsPayload
|
var payload GetLogsPayload
|
||||||
if err := msg.ParsePayload(&payload); err != nil {
|
if err := msg.ParsePayload(&payload); err != nil {
|
||||||
return nil, fmt.Errorf("invalid get logs payload: %w", err)
|
return nil, coreerr.E("Worker.handleGetLogs", "invalid get logs payload", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate and limit the Lines parameter to prevent resource exhaustion
|
// Validate and limit the Lines parameter to prevent resource exhaustion
|
||||||
|
|
@ -283,10 +282,15 @@ func (w *Worker) handleGetLogs(msg *Message) (*Message, error) {
|
||||||
|
|
||||||
miner, err := w.minerManager.GetMiner(payload.MinerName)
|
miner, err := w.minerManager.GetMiner(payload.MinerName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("miner not found: %s", payload.MinerName)
|
return nil, coreerr.E("Worker.handleGetLogs", "miner not found: "+payload.MinerName, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
lines := miner.GetConsoleHistory(payload.Lines)
|
var since time.Time
|
||||||
|
if payload.Since > 0 {
|
||||||
|
since = time.UnixMilli(payload.Since)
|
||||||
|
}
|
||||||
|
|
||||||
|
lines := miner.GetConsoleHistorySince(payload.Lines, since)
|
||||||
|
|
||||||
logs := LogsPayload{
|
logs := LogsPayload{
|
||||||
MinerName: payload.MinerName,
|
MinerName: payload.MinerName,
|
||||||
|
|
@ -301,7 +305,7 @@ func (w *Worker) handleGetLogs(msg *Message) (*Message, error) {
|
||||||
func (w *Worker) handleDeploy(conn *PeerConnection, msg *Message) (*Message, error) {
|
func (w *Worker) handleDeploy(conn *PeerConnection, msg *Message) (*Message, error) {
|
||||||
var payload DeployPayload
|
var payload DeployPayload
|
||||||
if err := msg.ParsePayload(&payload); err != nil {
|
if err := msg.ParsePayload(&payload); err != nil {
|
||||||
return nil, fmt.Errorf("invalid deploy payload: %w", err)
|
return nil, coreerr.E("Worker.handleDeploy", "invalid deploy payload", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reconstruct Bundle object from payload
|
// Reconstruct Bundle object from payload
|
||||||
|
|
@ -321,19 +325,19 @@ func (w *Worker) handleDeploy(conn *PeerConnection, msg *Message) (*Message, err
|
||||||
switch bundle.Type {
|
switch bundle.Type {
|
||||||
case BundleProfile:
|
case BundleProfile:
|
||||||
if w.profileManager == nil {
|
if w.profileManager == nil {
|
||||||
return nil, errors.New("profile manager not configured")
|
return nil, coreerr.E("Worker.handleDeploy", "profile manager not configured", nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Decrypt and extract profile data
|
// Decrypt and extract profile data
|
||||||
profileData, err := ExtractProfileBundle(bundle, password)
|
profileData, err := ExtractProfileBundle(bundle, password)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to extract profile bundle: %w", err)
|
return nil, coreerr.E("Worker.handleDeploy", "failed to extract profile bundle", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unmarshal into interface{} to pass to ProfileManager
|
// Unmarshal into interface{} to pass to ProfileManager
|
||||||
var profile any
|
var profile any
|
||||||
if err := json.Unmarshal(profileData, &profile); err != nil {
|
if err := json.Unmarshal(profileData, &profile); err != nil {
|
||||||
return nil, fmt.Errorf("invalid profile data JSON: %w", err)
|
return nil, coreerr.E("Worker.handleDeploy", "invalid profile data JSON", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := w.profileManager.SaveProfile(profile); err != nil {
|
if err := w.profileManager.SaveProfile(profile); err != nil {
|
||||||
|
|
@ -366,7 +370,7 @@ func (w *Worker) handleDeploy(conn *PeerConnection, msg *Message) (*Message, err
|
||||||
// Extract miner bundle
|
// Extract miner bundle
|
||||||
minerPath, profileData, err := ExtractMinerBundle(bundle, password, installDir)
|
minerPath, profileData, err := ExtractMinerBundle(bundle, password, installDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to extract miner bundle: %w", err)
|
return nil, coreerr.E("Worker.handleDeploy", "failed to extract miner bundle", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the bundle contained a profile config, save it
|
// If the bundle contained a profile config, save it
|
||||||
|
|
@ -396,7 +400,7 @@ func (w *Worker) handleDeploy(conn *PeerConnection, msg *Message) (*Message, err
|
||||||
return msg.Reply(MsgDeployAck, ack)
|
return msg.Reply(MsgDeployAck, ack)
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("unknown bundle type: %s", payload.BundleType)
|
return nil, coreerr.E("Worker.handleDeploy", "unknown bundle type: "+payload.BundleType, nil)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -550,10 +550,14 @@ type mockMinerInstance struct {
|
||||||
stats any
|
stats any
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mockMinerInstance) GetName() string { return m.name }
|
func (m *mockMinerInstance) GetName() string { return m.name }
|
||||||
func (m *mockMinerInstance) GetType() string { return m.minerType }
|
func (m *mockMinerInstance) GetType() string { return m.minerType }
|
||||||
func (m *mockMinerInstance) GetStats() (any, error) { return m.stats, nil }
|
func (m *mockMinerInstance) GetStats() (any, error) {
|
||||||
func (m *mockMinerInstance) GetConsoleHistory(lines int) []string { return []string{} }
|
return m.stats, nil
|
||||||
|
}
|
||||||
|
func (m *mockMinerInstance) GetConsoleHistorySince(lines int, since time.Time) []string {
|
||||||
|
return []string{}
|
||||||
|
}
|
||||||
|
|
||||||
type mockProfileManager struct{}
|
type mockProfileManager struct{}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -5,8 +5,9 @@ import (
|
||||||
"crypto/hmac"
|
"crypto/hmac"
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"errors"
|
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
|
coreerr "dappco.re/go/core/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TLV Types
|
// TLV Types
|
||||||
|
|
@ -104,7 +105,7 @@ func (p *PacketBuilder) MarshalAndSign(sharedSecret []byte) ([]byte, error) {
|
||||||
func writeTLV(w io.Writer, tag uint8, value []byte) error {
|
func writeTLV(w io.Writer, tag uint8, value []byte) error {
|
||||||
// Check length constraint (2 byte length = max 65535 bytes)
|
// Check length constraint (2 byte length = max 65535 bytes)
|
||||||
if len(value) > 65535 {
|
if len(value) > 65535 {
|
||||||
return errors.New("TLV value too large for 2-byte length header")
|
return coreerr.E("ueps.writeTLV", "TLV value too large for 2-byte length header", nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := w.Write([]byte{tag}); err != nil {
|
if _, err := w.Write([]byte{tag}); err != nil {
|
||||||
|
|
|
||||||
|
|
@ -6,8 +6,9 @@ import (
|
||||||
"crypto/hmac"
|
"crypto/hmac"
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"errors"
|
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
|
coreerr "dappco.re/go/core/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ParsedPacket holds the verified data
|
// ParsedPacket holds the verified data
|
||||||
|
|
@ -92,7 +93,7 @@ func ReadAndVerify(r *bufio.Reader, sharedSecret []byte) (*ParsedPacket, error)
|
||||||
|
|
||||||
verify:
|
verify:
|
||||||
if len(signature) == 0 {
|
if len(signature) == 0 {
|
||||||
return nil, errors.New("UEPS packet missing HMAC signature")
|
return nil, coreerr.E("ueps.ReadAndVerify", "UEPS packet missing HMAC signature", nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 5. Verify HMAC
|
// 5. Verify HMAC
|
||||||
|
|
@ -103,7 +104,7 @@ verify:
|
||||||
expectedMAC := mac.Sum(nil)
|
expectedMAC := mac.Sum(nil)
|
||||||
|
|
||||||
if !hmac.Equal(signature, expectedMAC) {
|
if !hmac.Equal(signature, expectedMAC) {
|
||||||
return nil, errors.New("integrity violation: HMAC mismatch (ThreatScore +100)")
|
return nil, coreerr.E("ueps.ReadAndVerify", "integrity violation: HMAC mismatch (ThreatScore +100)", nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &ParsedPacket{
|
return &ParsedPacket{
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue