fix(proxy): relax config validation and dedupe disconnects
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
b63e7562de
commit
8579b0cc11
4 changed files with 74 additions and 22 deletions
|
|
@ -43,11 +43,6 @@ func (c *Config) Validate() error {
|
|||
if len(c.Pools) == 0 {
|
||||
return errors.New("pool list is empty")
|
||||
}
|
||||
switch c.Mode {
|
||||
case "nicehash", "simple":
|
||||
default:
|
||||
return errors.New("unsupported mode")
|
||||
}
|
||||
|
||||
for _, poolConfig := range c.Pools {
|
||||
if poolConfig.Enabled && strings.TrimSpace(poolConfig.URL) == "" {
|
||||
|
|
|
|||
|
|
@ -16,13 +16,12 @@ func TestConfig_Validate_Good(t *testing.T) {
|
|||
|
||||
func TestConfig_Validate_Bad(t *testing.T) {
|
||||
cfg := &Config{
|
||||
Mode: "noop",
|
||||
Bind: []BindAddr{{Host: "127.0.0.1", Port: 3333}},
|
||||
Pools: []PoolConfig{{URL: "pool-a:3333", Enabled: true}},
|
||||
}
|
||||
|
||||
if errorValue := cfg.Validate(); errorValue == nil {
|
||||
t.Fatal("expected unsupported mode to fail validation")
|
||||
if errorValue := cfg.Validate(); errorValue != nil {
|
||||
t.Fatalf("expected missing mode to be allowed, got %v", errorValue)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -27,14 +27,15 @@ import (
|
|||
// client := pool.NewStratumClient(poolCfg, listener)
|
||||
// client.Connect()
|
||||
type StratumClient struct {
|
||||
cfg proxy.PoolConfig
|
||||
listener StratumListener
|
||||
conn net.Conn
|
||||
tlsConn *tls.Conn // nil if plain TCP
|
||||
sessionID string // pool-assigned session id from login reply
|
||||
seq int64 // atomic JSON-RPC request id counter
|
||||
active bool // true once first job received
|
||||
sendMu sync.Mutex
|
||||
cfg proxy.PoolConfig
|
||||
listener StratumListener
|
||||
conn net.Conn
|
||||
tlsConn *tls.Conn // nil if plain TCP
|
||||
sessionID string // pool-assigned session id from login reply
|
||||
seq int64 // atomic JSON-RPC request id counter
|
||||
active bool // true once first job received
|
||||
disconnectOnce sync.Once
|
||||
sendMu sync.Mutex
|
||||
}
|
||||
|
||||
// StratumListener receives events from the pool connection.
|
||||
|
|
@ -128,6 +129,7 @@ func (c *StratumClient) Connect() error {
|
|||
}
|
||||
|
||||
c.conn = connection
|
||||
c.disconnectOnce = sync.Once{}
|
||||
go c.readLoop()
|
||||
return nil
|
||||
}
|
||||
|
|
@ -183,9 +185,7 @@ func (c *StratumClient) Disconnect() {
|
|||
if c.conn != nil {
|
||||
_ = c.conn.Close()
|
||||
}
|
||||
if c.listener != nil {
|
||||
c.listener.OnDisconnect()
|
||||
}
|
||||
c.notifyDisconnect()
|
||||
}
|
||||
|
||||
func (c *StratumClient) writeJSON(value interface{}) error {
|
||||
|
|
@ -209,9 +209,7 @@ func (c *StratumClient) readLoop() {
|
|||
for {
|
||||
line, errorValue := reader.ReadBytes('\n')
|
||||
if errorValue != nil {
|
||||
if c.listener != nil {
|
||||
c.listener.OnDisconnect()
|
||||
}
|
||||
c.notifyDisconnect()
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -223,6 +221,14 @@ func (c *StratumClient) readLoop() {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *StratumClient) notifyDisconnect() {
|
||||
c.disconnectOnce.Do(func() {
|
||||
if c.listener != nil {
|
||||
c.listener.OnDisconnect()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (c *StratumClient) handleMessage(response jsonRPCResponse) {
|
||||
if len(response.Result) > 0 {
|
||||
var loginResult struct {
|
||||
|
|
|
|||
52
pool/client_test.go
Normal file
52
pool/client_test.go
Normal file
|
|
@ -0,0 +1,52 @@
|
|||
package pool
|
||||
|
||||
import (
|
||||
"net"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"dappco.re/go/core/proxy"
|
||||
)
|
||||
|
||||
type disconnectCountingListener struct {
|
||||
mu sync.Mutex
|
||||
count int
|
||||
}
|
||||
|
||||
func (l *disconnectCountingListener) OnJob(job proxy.Job) {}
|
||||
|
||||
func (l *disconnectCountingListener) OnResultAccepted(sequence int64, accepted bool, errorMessage string) {
|
||||
}
|
||||
|
||||
func (l *disconnectCountingListener) OnDisconnect() {
|
||||
l.mu.Lock()
|
||||
l.count++
|
||||
l.mu.Unlock()
|
||||
}
|
||||
|
||||
func (l *disconnectCountingListener) Count() int {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
return l.count
|
||||
}
|
||||
|
||||
func TestStratumClient_Disconnect_Good(t *testing.T) {
|
||||
serverConn, clientConn := net.Pipe()
|
||||
defer clientConn.Close()
|
||||
|
||||
listener := &disconnectCountingListener{}
|
||||
client := &StratumClient{
|
||||
listener: listener,
|
||||
conn: serverConn,
|
||||
}
|
||||
|
||||
go client.readLoop()
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
client.Disconnect()
|
||||
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
if got := listener.Count(); got != 1 {
|
||||
t.Fatalf("expected one disconnect callback, got %d", got)
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue