feat(proxy): implement runtime HTTP and logging hooks

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-04-04 11:03:54 +00:00
parent 7d2d309529
commit 36fb1232d5
12 changed files with 426 additions and 8 deletions

View file

@ -16,6 +16,7 @@ type Config struct {
AlgoExtension bool `json:"algo-ext"` // forward algo field in jobs
Workers WorkersMode `json:"workers"` // "rig-id", "user", "password", "agent", "ip", "false"
AccessLogFile string `json:"access-log-file"` // "" = disabled
ShareLogFile string `json:"share-log-file"` // "" = disabled
ReuseTimeout int `json:"reuse-timeout"` // seconds; simple mode upstream reuse
Retries int `json:"retries"` // pool reconnect attempts
RetryPause int `json:"retry-pause"` // seconds between retries

View file

@ -30,6 +30,7 @@ type Miner struct {
state MinerState
stateMu sync.RWMutex
extAlgo bool // miner sent algo list in login params
algoExtension bool // config allows forwarding algo negotiation
extNH bool // NiceHash mode active (fixed byte splitting)
ip string // remote IP (without port, for logging)
localPort uint16

View file

@ -75,7 +75,7 @@ func (m *Miner) ForwardJob(job Job, algo string) {
"target": job.Target,
"id": m.rpcID,
}
if m.extAlgo && algo != "" {
if m.algoExtension && m.extAlgo && algo != "" {
params["algo"] = algo
}
@ -229,17 +229,17 @@ func (m *Miner) handleLogin(request minerRequest) {
"target": jobCopy.Target,
"id": m.rpcID,
}
if m.extAlgo && jobCopy.Algo != "" {
if m.algoExtension && m.extAlgo && jobCopy.Algo != "" {
jobResult["algo"] = jobCopy.Algo
}
result["job"] = jobResult
if m.extAlgo {
if m.algoExtension && m.extAlgo {
result["extensions"] = []string{"algo"}
}
m.SetState(MinerStateReady)
} else {
m.SetState(MinerStateWaitReady)
if m.extAlgo {
if m.algoExtension && m.extAlgo {
result["extensions"] = []string{"algo"}
}
}

View file

@ -127,3 +127,45 @@ func TestMiner_Submit_Ugly(t *testing.T) {
t.Fatalf("expected invalid nonce error, got %#v", response)
}
}
func TestMiner_Login_Ugly(t *testing.T) {
serverConn, clientConn := net.Pipe()
defer clientConn.Close()
miner := NewMiner(serverConn, 3333, nil)
miner.algoExtension = true
miner.Start()
defer miner.Close()
encoder := json.NewEncoder(clientConn)
if err := encoder.Encode(map[string]interface{}{
"id": 4,
"jsonrpc": "2.0",
"method": "login",
"params": map[string]interface{}{
"login": "wallet",
"pass": "x",
"agent": "xmrig",
"algo": []string{"cn/r"},
},
}); err != nil {
t.Fatal(err)
}
clientConn.SetReadDeadline(time.Now().Add(time.Second))
line, err := bufio.NewReader(clientConn).ReadBytes('\n')
if err != nil {
t.Fatal(err)
}
var response map[string]interface{}
if err := json.Unmarshal(line, &response); err != nil {
t.Fatal(err)
}
result := response["result"].(map[string]interface{})
extensions, ok := result["extensions"].([]interface{})
if !ok || len(extensions) != 1 || extensions[0] != "algo" {
t.Fatalf("expected algo extension to be advertised, got %#v", response)
}
}

View file

@ -11,6 +11,7 @@
package proxy
import (
"net/http"
"sync"
"time"
)
@ -31,6 +32,7 @@ type Proxy struct {
miners map[int64]*Miner
minerMu sync.RWMutex
servers []*Server
httpServer *http.Server
ticker *time.Ticker
watcher *ConfigWatcher
done chan struct{}

191
proxy_http_runtime.go Normal file
View file

@ -0,0 +1,191 @@
package proxy
import (
"context"
"encoding/json"
"net"
"net/http"
"strconv"
"strings"
"time"
)
const proxyAPIVersion = "1.0.0"
func startHTTPServer(p *Proxy) {
if p == nil || p.config == nil || !p.config.HTTP.Enabled || p.httpServer != nil {
return
}
mux := http.NewServeMux()
registerMonitoringRoutes(mux, p)
address := net.JoinHostPort(p.config.HTTP.Host, strconv.Itoa(int(p.config.HTTP.Port)))
listener, errorValue := net.Listen("tcp", address)
if errorValue != nil {
return
}
server := &http.Server{
Handler: mux,
}
p.httpServer = server
go func() {
_ = server.Serve(listener)
}()
}
func stopHTTPServer(p *Proxy) {
if p == nil || p.httpServer == nil {
return
}
server := p.httpServer
p.httpServer = nil
shutdownContext, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = server.Shutdown(shutdownContext)
}
func registerMonitoringRoutes(router *http.ServeMux, proxyValue *Proxy) {
if router == nil || proxyValue == nil {
return
}
router.HandleFunc("/1/summary", func(writer http.ResponseWriter, request *http.Request) {
if !allowHTTPRequest(writer, request, proxyValue.HTTPConfig()) {
return
}
summary := proxyValue.Summary()
response := map[string]interface{}{
"version": proxyAPIVersion,
"mode": proxyValue.Mode(),
"hashrate": map[string]interface{}{
"total": summary.Hashrate,
},
"miners": map[string]uint64{
"now": proxyValue.CurrentMiners(),
"max": proxyValue.MaxMiners(),
},
"workers": uint64(len(proxyValue.Workers())),
"upstreams": func() map[string]interface{} {
upstreams := proxyValue.Upstreams()
ratio := 0.0
if upstreams.Total > 0 {
ratio = float64(proxyValue.CurrentMiners()) / float64(upstreams.Total)
}
return map[string]interface{}{
"active": upstreams.Active,
"sleep": upstreams.Sleep,
"error": upstreams.Error,
"total": upstreams.Total,
"ratio": ratio,
}
}(),
"results": map[string]interface{}{
"accepted": summary.Accepted,
"rejected": summary.Rejected,
"invalid": summary.Invalid,
"expired": summary.Expired,
"avg_time": summary.AvgTime,
"latency": summary.AvgLatency,
"hashes_total": summary.Hashes,
"best": summary.TopDiff,
},
}
writeHTTPJSON(writer, response)
})
router.HandleFunc("/1/workers", func(writer http.ResponseWriter, request *http.Request) {
if !allowHTTPRequest(writer, request, proxyValue.HTTPConfig()) {
return
}
records := proxyValue.Workers()
rows := make([][]interface{}, 0, len(records))
for _, record := range records {
rows = append(rows, []interface{}{
record.Name,
record.LastIP,
record.Connections,
record.Accepted,
record.Rejected,
record.Invalid,
record.Hashes,
record.LastHashAt.Unix(),
record.Hashrate(60),
record.Hashrate(600),
record.Hashrate(3600),
record.Hashrate(43200),
record.Hashrate(86400),
})
}
writeHTTPJSON(writer, map[string]interface{}{
"mode": proxyValue.WorkersMode(),
"workers": rows,
})
})
router.HandleFunc("/1/miners", func(writer http.ResponseWriter, request *http.Request) {
if !allowHTTPRequest(writer, request, proxyValue.HTTPConfig()) {
return
}
miners := proxyValue.Miners()
rows := make([][]interface{}, 0, len(miners))
for _, miner := range miners {
ip := ""
if remote := miner.RemoteAddr(); remote != nil {
ip = remote.String()
}
rows = append(rows, []interface{}{
miner.ID(),
ip,
miner.TX(),
miner.RX(),
miner.State(),
miner.Diff(),
miner.User(),
"********",
miner.RigID(),
miner.Agent(),
})
}
writeHTTPJSON(writer, map[string]interface{}{
"format": []string{"id", "ip", "tx", "rx", "state", "diff", "user", "password", "rig_id", "agent"},
"miners": rows,
})
})
}
func allowHTTPRequest(writer http.ResponseWriter, request *http.Request, config HTTPConfig) bool {
if request == nil {
return false
}
if config.AccessToken != "" {
header := request.Header.Get("Authorization")
prefix := "Bearer "
if !strings.HasPrefix(header, prefix) || strings.TrimSpace(strings.TrimPrefix(header, prefix)) != config.AccessToken {
writer.Header().Set("WWW-Authenticate", "Bearer")
http.Error(writer, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized)
return false
}
}
if config.Restricted && request.Method != http.MethodGet {
http.Error(writer, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
return false
}
return true
}
func writeHTTPJSON(writer http.ResponseWriter, value interface{}) {
writer.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(writer).Encode(value)
}

97
proxy_logging_runtime.go Normal file
View file

@ -0,0 +1,97 @@
package proxy
import (
"fmt"
"os"
"sync"
"time"
)
type appendLineLogger struct {
path string
mu sync.Mutex
file *os.File
}
func newAppendLineLogger(path string) *appendLineLogger {
return &appendLineLogger{path: path}
}
func (l *appendLineLogger) writeLine(line string) {
if l == nil || l.path == "" {
return
}
l.mu.Lock()
defer l.mu.Unlock()
if l.file == nil {
file, errorValue := os.OpenFile(l.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
if errorValue != nil {
return
}
l.file = file
}
_, _ = l.file.WriteString(line)
}
func subscribeAccessLog(events *EventBus, path string) {
if events == nil || path == "" {
return
}
logger := newAppendLineLogger(path)
events.Subscribe(EventLogin, func(event Event) {
if event.Miner == nil {
return
}
logger.writeLine(fmt.Sprintf("%s CONNECT %s %s %s\n",
time.Now().UTC().Format(time.RFC3339),
event.Miner.IP(),
event.Miner.User(),
event.Miner.Agent(),
))
})
events.Subscribe(EventClose, func(event Event) {
if event.Miner == nil {
return
}
logger.writeLine(fmt.Sprintf("%s CLOSE %s %s rx=%d tx=%d\n",
time.Now().UTC().Format(time.RFC3339),
event.Miner.IP(),
event.Miner.User(),
event.Miner.RX(),
event.Miner.TX(),
))
})
}
func subscribeShareLog(events *EventBus, path string) {
if events == nil || path == "" {
return
}
logger := newAppendLineLogger(path)
events.Subscribe(EventAccept, func(event Event) {
if event.Miner == nil {
return
}
logger.writeLine(fmt.Sprintf("%s ACCEPT %s diff=%d latency=%dms\n",
time.Now().UTC().Format(time.RFC3339),
event.Miner.User(),
event.Diff,
event.Latency,
))
})
events.Subscribe(EventReject, func(event Event) {
if event.Miner == nil {
return
}
logger.writeLine(fmt.Sprintf("%s REJECT %s reason=%q\n",
time.Now().UTC().Format(time.RFC3339),
event.Miner.User(),
event.Error,
))
})
}

View file

@ -36,6 +36,9 @@ func New(cfg *Config) (*Proxy, error) {
done: make(chan struct{}),
}
subscribeAccessLog(events, cfg.AccessLogFile)
subscribeShareLog(events, cfg.ShareLogFile)
events.Subscribe(EventLogin, func(event Event) {
if event.Miner != nil {
proxyValue.minerMu.Lock()
@ -104,6 +107,10 @@ func (p *Proxy) Start() {
server.Start()
}
if p.config != nil && p.config.HTTP.Enabled {
startHTTPServer(p)
}
go func() {
var ticks uint64
for {
@ -123,6 +130,8 @@ func (p *Proxy) Start() {
}
}
}()
<-p.done
}
type noopSplitter struct{}
@ -149,6 +158,7 @@ func (p *Proxy) Stop() {
for _, server := range p.servers {
server.Stop()
}
stopHTTPServer(p)
if p.watcher != nil {
p.watcher.Stop()
}
@ -254,6 +264,7 @@ func (p *Proxy) acceptConn(conn net.Conn, localPort uint16) {
miner.splitter = p.splitter
if p.config != nil {
miner.accessPassword = p.config.AccessPassword
miner.algoExtension = p.config.AlgoExtension
}
miner.Start()
}

View file

@ -156,12 +156,13 @@ func (cd *CustomDiff) OnLogin(event Event) {
user := event.Miner.User()
index := strings.LastIndex(user, "+")
if index > 0 {
if index > 0 && index < len(user)-1 {
if value, errorValue := strconv.ParseUint(user[index+1:], 10, 64); errorValue == nil {
event.Miner.SetUser(user[:index])
event.Miner.SetCustomDiff(value)
return
}
return
}
if cd == nil {

View file

@ -51,7 +51,7 @@ func TestCustomDiff_OnLogin_Bad(t *testing.T) {
func TestCustomDiff_OnLogin_Ugly(t *testing.T) {
miner := &Miner{user: "wallet+bad"}
NewCustomDiff(100).OnLogin(Event{Miner: miner})
if miner.User() != "wallet+bad" || miner.CustomDiff() != 100 {
t.Fatalf("expected invalid suffix to preserve user and apply global diff, got user=%q diff=%d", miner.User(), miner.CustomDiff())
if miner.User() != "wallet+bad" || miner.CustomDiff() != 0 {
t.Fatalf("expected invalid suffix to preserve user and leave diff unset, got user=%q diff=%d", miner.User(), miner.CustomDiff())
}
}

View file

@ -53,6 +53,13 @@ func (s *SimpleSplitter) OnLogin(event *proxy.LoginEvent) {
var mapper *SimpleMapper
for mapperID, idleMapper := range s.idle {
if idleMapper == nil || idleMapper.stopped || idleMapper.strategy == nil || !idleMapper.strategy.IsActive() {
if idleMapper != nil && idleMapper.strategy != nil {
idleMapper.strategy.Disconnect()
}
delete(s.idle, mapperID)
continue
}
mapper = idleMapper
delete(s.idle, mapperID)
break
@ -156,7 +163,11 @@ func (s *SimpleSplitter) GC() {
now := time.Now().UTC()
for mapperID, mapper := range s.idle {
if timeout == 0 || (!mapper.idleAt.IsZero() && now.Sub(mapper.idleAt) > timeout) || mapper.stopped {
if mapper == nil {
delete(s.idle, mapperID)
continue
}
if mapper.stopped || mapper.strategy == nil || !mapper.strategy.IsActive() || timeout == 0 || (!mapper.idleAt.IsZero() && now.Sub(mapper.idleAt) > timeout) {
if mapper.strategy != nil {
mapper.strategy.Disconnect()
}

View file

@ -0,0 +1,61 @@
package simple
import (
"testing"
"time"
"dappco.re/go/core/proxy"
"dappco.re/go/core/proxy/pool"
)
type fakeStrategy struct {
active bool
connects int
disconnects int
}
func (s *fakeStrategy) Connect() {}
func (s *fakeStrategy) Submit(jobID, nonce, result, algo string) int64 { return 1 }
func (s *fakeStrategy) Disconnect() {
s.disconnects++
s.active = false
}
func (s *fakeStrategy) IsActive() bool { return s.active }
func TestSimpleSplitter_OnLogin_Ugly(t *testing.T) {
deadStrategy := &fakeStrategy{active: false}
liveStrategy := &fakeStrategy{active: true}
splitter := &SimpleSplitter{
active: make(map[int64]*SimpleMapper),
idle: map[int64]*SimpleMapper{
1: {
id: 1,
strategy: deadStrategy,
idleAt: time.Now().UTC(),
},
},
cfg: &proxy.Config{ReuseTimeout: 60},
factory: func(listener pool.StratumListener) pool.Strategy {
return liveStrategy
},
}
miner := &proxy.Miner{}
splitter.OnLogin(&proxy.LoginEvent{Miner: miner})
if len(splitter.idle) != 0 {
t.Fatalf("expected dead idle mapper to be discarded, got %d idle mappers", len(splitter.idle))
}
if len(splitter.active) != 1 {
t.Fatalf("expected one active mapper, got %d", len(splitter.active))
}
if deadStrategy.disconnects != 1 {
t.Fatalf("expected dead mapper to be disconnected once, got %d", deadStrategy.disconnects)
}
if miner.RouteID() == 0 {
t.Fatal("expected miner to receive a route ID")
}
}