feat(proxy): wire share log events

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-04-04 20:09:13 +00:00
parent d10a57e377
commit b3ad79d832
5 changed files with 154 additions and 0 deletions

View file

@ -16,6 +16,7 @@ type Config struct {
AlgoExtension bool `json:"algo-ext"` // forward algo field in jobs
Workers WorkersMode `json:"workers"` // "rig-id", "user", "password", "agent", "ip", "false"
AccessLogFile string `json:"access-log-file"` // "" = disabled
ShareLogFile string `json:"share-log-file"` // "" = disabled
ReuseTimeout int `json:"reuse-timeout"` // seconds; simple mode upstream reuse
Retries int `json:"retries"` // pool reconnect attempts
RetryPause int `json:"retry-pause"` // seconds between retries

View file

@ -40,6 +40,7 @@ type Proxy struct {
rateLimit *RateLimiter
httpServer *http.Server
accessLog *accessLogSink
shareLog *shareLogSink
submitCount atomic.Int64
}

95
sharelog_impl.go Normal file
View file

@ -0,0 +1,95 @@
package proxy
import (
"os"
"strings"
"sync"
"time"
)
type shareLogSink struct {
path string
file *os.File
mu sync.Mutex
}
func newShareLogSink(path string) *shareLogSink {
return &shareLogSink{path: path}
}
func (l *shareLogSink) SetPath(path string) {
if l == nil {
return
}
l.mu.Lock()
defer l.mu.Unlock()
if l.path == path {
return
}
l.path = path
if l.file != nil {
_ = l.file.Close()
l.file = nil
}
}
func (l *shareLogSink) Close() {
if l == nil {
return
}
l.mu.Lock()
defer l.mu.Unlock()
if l.file != nil {
_ = l.file.Close()
l.file = nil
}
}
func (l *shareLogSink) OnAccept(e Event) {
if l == nil || e.Miner == nil {
return
}
l.writeLine("ACCEPT", e.Miner.User(), e.Diff, e.Latency, "")
}
func (l *shareLogSink) OnReject(e Event) {
if l == nil || e.Miner == nil {
return
}
l.writeLine("REJECT", e.Miner.User(), 0, 0, e.Error)
}
func (l *shareLogSink) writeLine(kind, user string, diff uint64, latency uint16, reason string) {
l.mu.Lock()
defer l.mu.Unlock()
if strings.TrimSpace(l.path) == "" {
return
}
if l.file == nil {
file, err := os.OpenFile(l.path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644)
if err != nil {
return
}
l.file = file
}
var builder strings.Builder
builder.WriteString(time.Now().UTC().Format(time.RFC3339))
builder.WriteByte(' ')
builder.WriteString(kind)
builder.WriteString(" ")
builder.WriteString(user)
switch kind {
case "ACCEPT":
builder.WriteString(" diff=")
builder.WriteString(formatUint(diff))
builder.WriteString(" latency=")
builder.WriteString(formatUint(uint64(latency)))
builder.WriteString("ms")
case "REJECT":
builder.WriteString(" reason=\"")
builder.WriteString(reason)
builder.WriteString("\"")
}
builder.WriteByte('\n')
_, _ = l.file.WriteString(builder.String())
}

46
sharelog_test.go Normal file
View file

@ -0,0 +1,46 @@
package proxy
import (
"os"
"path/filepath"
"strings"
"testing"
)
func TestProxy_ShareLog_WritesOutcomeLines(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "shares.log")
cfg := &Config{
Mode: "nicehash",
Workers: WorkersByRigID,
ShareLogFile: path,
Bind: []BindAddr{{Host: "127.0.0.1", Port: 3333}},
Pools: []PoolConfig{{URL: "pool.example:3333", Enabled: true}},
}
p, result := New(cfg)
if !result.OK {
t.Fatalf("expected valid proxy, got error: %v", result.Error)
}
miner := &Miner{
user: "WALLET",
conn: noopConn{},
state: MinerStateReady,
}
p.events.Dispatch(Event{Type: EventAccept, Miner: miner, Diff: 1234, Latency: 56})
p.events.Dispatch(Event{Type: EventReject, Miner: miner, Error: "Invalid nonce"})
p.Stop()
data, err := os.ReadFile(path)
if err != nil {
t.Fatalf("read share log: %v", err)
}
text := string(data)
if !strings.Contains(text, "ACCEPT WALLET diff=1234 latency=56ms") {
t.Fatalf("expected ACCEPT line, got %q", text)
}
if !strings.Contains(text, "REJECT WALLET reason=\"Invalid nonce\"") {
t.Fatalf("expected REJECT line, got %q", text)
}
}

View file

@ -48,6 +48,7 @@ func New(cfg *Config) (*Proxy, Result) {
customDiffBuckets: NewCustomDiffBuckets(cfg.CustomDiffStats),
rateLimit: NewRateLimiter(cfg.RateLimit),
accessLog: newAccessLogSink(cfg.AccessLogFile),
shareLog: newShareLogSink(cfg.ShareLogFile),
done: make(chan struct{}),
}
p.workers.bindEvents(p.events)
@ -61,6 +62,10 @@ func New(cfg *Config) (*Proxy, Result) {
p.events.Subscribe(EventClose, p.stats.OnClose)
p.events.Subscribe(EventAccept, p.stats.OnAccept)
p.events.Subscribe(EventReject, p.stats.OnReject)
if p.shareLog != nil {
p.events.Subscribe(EventAccept, p.shareLog.OnAccept)
p.events.Subscribe(EventReject, p.shareLog.OnReject)
}
if p.customDiffBuckets != nil {
p.events.Subscribe(EventAccept, p.customDiffBuckets.OnAccept)
p.events.Subscribe(EventReject, p.customDiffBuckets.OnReject)
@ -260,6 +265,9 @@ func (p *Proxy) Stop() {
if p.accessLog != nil {
p.accessLog.Close()
}
if p.shareLog != nil {
p.shareLog.Close()
}
})
}
@ -313,6 +321,9 @@ func (p *Proxy) Reload(cfg *Config) {
if p.accessLog != nil {
p.accessLog.SetPath(cfg.AccessLogFile)
}
if p.shareLog != nil {
p.shareLog.SetPath(cfg.ShareLogFile)
}
}
func (p *Proxy) acceptMiner(conn net.Conn, localPort uint16) {