Wire access log config into proxy
This commit is contained in:
parent
6f4d7019e2
commit
31a8ba558f
4 changed files with 176 additions and 0 deletions
100
accesslog_impl.go
Normal file
100
accesslog_impl.go
Normal file
|
|
@ -0,0 +1,100 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type accessLogSink struct {
|
||||
path string
|
||||
file *os.File
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func newAccessLogSink(path string) *accessLogSink {
|
||||
return &accessLogSink{path: path}
|
||||
}
|
||||
|
||||
func (l *accessLogSink) SetPath(path string) {
|
||||
if l == nil {
|
||||
return
|
||||
}
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
if l.path == path {
|
||||
return
|
||||
}
|
||||
l.path = path
|
||||
if l.file != nil {
|
||||
_ = l.file.Close()
|
||||
l.file = nil
|
||||
}
|
||||
}
|
||||
|
||||
func (l *accessLogSink) Close() {
|
||||
if l == nil {
|
||||
return
|
||||
}
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
if l.file != nil {
|
||||
_ = l.file.Close()
|
||||
l.file = nil
|
||||
}
|
||||
}
|
||||
|
||||
func (l *accessLogSink) OnLogin(e Event) {
|
||||
if l == nil || e.Miner == nil {
|
||||
return
|
||||
}
|
||||
l.writeLine("CONNECT", e.Miner.IP(), e.Miner.User(), e.Miner.Agent(), 0, 0)
|
||||
}
|
||||
|
||||
func (l *accessLogSink) OnClose(e Event) {
|
||||
if l == nil || e.Miner == nil {
|
||||
return
|
||||
}
|
||||
l.writeLine("CLOSE", e.Miner.IP(), e.Miner.User(), "", e.Miner.RX(), e.Miner.TX())
|
||||
}
|
||||
|
||||
func (l *accessLogSink) writeLine(kind, ip, user, agent string, rx, tx uint64) {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
if strings.TrimSpace(l.path) == "" {
|
||||
return
|
||||
}
|
||||
if l.file == nil {
|
||||
file, err := os.OpenFile(l.path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
l.file = file
|
||||
}
|
||||
var builder strings.Builder
|
||||
builder.WriteString(time.Now().UTC().Format(time.RFC3339))
|
||||
builder.WriteByte(' ')
|
||||
builder.WriteString(kind)
|
||||
builder.WriteString(" ")
|
||||
builder.WriteString(ip)
|
||||
builder.WriteString(" ")
|
||||
builder.WriteString(user)
|
||||
if agent != "" {
|
||||
builder.WriteString(" ")
|
||||
builder.WriteString(agent)
|
||||
}
|
||||
if rx > 0 || tx > 0 {
|
||||
builder.WriteString(" rx=")
|
||||
builder.WriteString(formatUint(rx))
|
||||
builder.WriteString(" tx=")
|
||||
builder.WriteString(formatUint(tx))
|
||||
}
|
||||
builder.WriteByte('\n')
|
||||
_, _ = l.file.WriteString(builder.String())
|
||||
}
|
||||
|
||||
func formatUint(value uint64) string {
|
||||
return strconv.FormatUint(value, 10)
|
||||
}
|
||||
64
accesslog_test.go
Normal file
64
accesslog_test.go
Normal file
|
|
@ -0,0 +1,64 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestProxy_AccessLog_WritesLifecycleLines(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
path := filepath.Join(dir, "access.log")
|
||||
|
||||
cfg := &Config{
|
||||
Mode: "nicehash",
|
||||
Workers: WorkersByRigID,
|
||||
AccessLogFile: path,
|
||||
Bind: []BindAddr{{Host: "127.0.0.1", Port: 3333}},
|
||||
Pools: []PoolConfig{{URL: "pool.example:3333", Enabled: true}},
|
||||
}
|
||||
p, result := New(cfg)
|
||||
if !result.OK {
|
||||
t.Fatalf("expected valid proxy, got error: %v", result.Error)
|
||||
}
|
||||
|
||||
miner := &Miner{
|
||||
ip: "10.0.0.1",
|
||||
user: "WALLET",
|
||||
agent: "XMRig/6.21.0",
|
||||
rx: 512,
|
||||
tx: 4096,
|
||||
conn: noopConn{},
|
||||
state: MinerStateReady,
|
||||
rpcID: "session",
|
||||
}
|
||||
p.events.Dispatch(Event{Type: EventLogin, Miner: miner})
|
||||
p.events.Dispatch(Event{Type: EventClose, Miner: miner})
|
||||
p.Stop()
|
||||
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
t.Fatalf("read access log: %v", err)
|
||||
}
|
||||
text := string(data)
|
||||
if !strings.Contains(text, "CONNECT 10.0.0.1 WALLET XMRig/6.21.0") {
|
||||
t.Fatalf("expected CONNECT line, got %q", text)
|
||||
}
|
||||
if !strings.Contains(text, "CLOSE 10.0.0.1 WALLET rx=512 tx=4096") {
|
||||
t.Fatalf("expected CLOSE line, got %q", text)
|
||||
}
|
||||
}
|
||||
|
||||
type noopConn struct{}
|
||||
|
||||
func (noopConn) Read([]byte) (int, error) { return 0, os.ErrClosed }
|
||||
func (noopConn) Write([]byte) (int, error) { return 0, os.ErrClosed }
|
||||
func (noopConn) Close() error { return nil }
|
||||
func (noopConn) LocalAddr() net.Addr { return nil }
|
||||
func (noopConn) RemoteAddr() net.Addr { return nil }
|
||||
func (noopConn) SetDeadline(time.Time) error { return nil }
|
||||
func (noopConn) SetReadDeadline(time.Time) error { return nil }
|
||||
func (noopConn) SetWriteDeadline(time.Time) error { return nil }
|
||||
1
proxy.go
1
proxy.go
|
|
@ -37,6 +37,7 @@ type Proxy struct {
|
|||
customDiff *CustomDiff
|
||||
rateLimit *RateLimiter
|
||||
httpServer *http.Server
|
||||
accessLog *accessLogSink
|
||||
}
|
||||
|
||||
// Splitter is the interface both NonceSplitter and SimpleSplitter satisfy.
|
||||
|
|
|
|||
|
|
@ -46,11 +46,16 @@ func New(cfg *Config) (*Proxy, Result) {
|
|||
miners: make(map[int64]*Miner),
|
||||
customDiff: NewCustomDiff(cfg.CustomDiff),
|
||||
rateLimit: NewRateLimiter(cfg.RateLimit),
|
||||
accessLog: newAccessLogSink(cfg.AccessLogFile),
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
p.workers.bindEvents(p.events)
|
||||
|
||||
p.events.Subscribe(EventLogin, p.customDiff.OnLogin)
|
||||
if p.accessLog != nil {
|
||||
p.events.Subscribe(EventLogin, p.accessLog.OnLogin)
|
||||
p.events.Subscribe(EventClose, p.accessLog.OnClose)
|
||||
}
|
||||
p.events.Subscribe(EventLogin, p.stats.OnLogin)
|
||||
p.events.Subscribe(EventLogin, p.workers.OnLogin)
|
||||
p.events.Subscribe(EventClose, p.stats.OnClose)
|
||||
|
|
@ -229,6 +234,9 @@ func (p *Proxy) Stop() {
|
|||
defer cancel()
|
||||
_ = p.httpServer.Shutdown(ctx)
|
||||
}
|
||||
if p.accessLog != nil {
|
||||
p.accessLog.Close()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -254,6 +262,9 @@ func (p *Proxy) Reload(cfg *Config) {
|
|||
p.customDiff.globalDiff = cfg.CustomDiff
|
||||
}
|
||||
p.rateLimit = NewRateLimiter(cfg.RateLimit)
|
||||
if p.accessLog != nil {
|
||||
p.accessLog.SetPath(cfg.AccessLogFile)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Proxy) acceptMiner(conn net.Conn, localPort uint16) {
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue