package pool import ( "sync" "time" "dappco.re/go/core/proxy" ) // FailoverStrategy wraps an ordered slice of PoolConfig entries. // It connects to the first enabled pool and fails over in order on error. // On reconnect it always retries from the primary first. // // strategy := pool.NewFailoverStrategy(cfg.Pools, listener, cfg) // strategy.Connect() type FailoverStrategy struct { pools []proxy.PoolConfig current int client *StratumClient listener StratumListener config *proxy.Config closed bool running bool mu sync.Mutex } // StrategyFactory creates a new FailoverStrategy for a given StratumListener. // Used by splitters to create per-mapper strategies without coupling to Config. // // factory := pool.NewStrategyFactory(cfg) // strategy := factory(listener) // each mapper calls this type StrategyFactory func(listener StratumListener) Strategy // Strategy is the interface the splitters use to submit shares and check pool state. type Strategy interface { Connect() Submit(jobID, nonce, result, algo string) int64 Disconnect() IsActive() bool } // NewStrategyFactory captures the live pool list and retry settings. // // factory := pool.NewStrategyFactory(proxy.Config{Pools: []proxy.PoolConfig{{URL: "pool.lthn.io:3333", Enabled: true}}}) func NewStrategyFactory(cfg *proxy.Config) StrategyFactory { return func(listener StratumListener) Strategy { if cfg == nil { return NewFailoverStrategy(nil, listener, nil) } return NewFailoverStrategy(cfg.Pools, listener, cfg) } } // NewFailoverStrategy builds one failover client stack. // // strategy := pool.NewFailoverStrategy([]proxy.PoolConfig{{URL: "pool.lthn.io:3333", Enabled: true}}, listener, cfg) func NewFailoverStrategy(pools []proxy.PoolConfig, listener StratumListener, cfg *proxy.Config) *FailoverStrategy { return &FailoverStrategy{ pools: append([]proxy.PoolConfig(nil), pools...), listener: listener, config: cfg, } } // Connect dials the first enabled pool and rotates through fallbacks on failure. // // strategy.Connect() func (s *FailoverStrategy) Connect() { s.mu.Lock() s.closed = false s.mu.Unlock() s.connectFrom(0) } func (s *FailoverStrategy) connectFrom(start int) { s.mu.Lock() if s.running || s.closed { s.mu.Unlock() return } s.running = true s.mu.Unlock() defer func() { s.mu.Lock() s.running = false s.mu.Unlock() }() pools := s.pools if s.config != nil && len(s.config.Pools) > 0 { pools = s.config.Pools } if len(pools) == 0 { return } retries := 1 pause := time.Duration(0) if s.config != nil { if s.config.Retries > 0 { retries = s.config.Retries } if s.config.RetryPause > 0 { pause = time.Duration(s.config.RetryPause) * time.Second } } for attempt := 0; attempt < retries; attempt++ { for offset := 0; offset < len(pools); offset++ { index := (start + offset) % len(pools) poolConfig := pools[index] if !poolConfig.Enabled { continue } client := NewStratumClient(poolConfig, s) if errorValue := client.Connect(); errorValue == nil { s.mu.Lock() if s.closed { s.mu.Unlock() client.Disconnect() return } s.client = client s.current = index s.mu.Unlock() client.Login() return } } if pause > 0 && attempt < retries-1 { time.Sleep(pause) } } } func (s *FailoverStrategy) Submit(jobID string, nonce string, result string, algo string) int64 { s.mu.Lock() client := s.client s.mu.Unlock() if client == nil { return 0 } return client.Submit(jobID, nonce, result, algo) } func (s *FailoverStrategy) Disconnect() { s.mu.Lock() s.closed = true client := s.client s.client = nil s.mu.Unlock() if client != nil { client.Disconnect() } } func (s *FailoverStrategy) IsActive() bool { s.mu.Lock() defer s.mu.Unlock() return s.client != nil && s.client.active } func (s *FailoverStrategy) OnJob(job proxy.Job) { if s.listener != nil { s.listener.OnJob(job) } } func (s *FailoverStrategy) OnResultAccepted(sequence int64, accepted bool, errorMessage string) { if s.listener != nil { s.listener.OnResultAccepted(sequence, accepted, errorMessage) } } func (s *FailoverStrategy) OnDisconnect() { s.mu.Lock() client := s.client s.client = nil closed := s.closed s.mu.Unlock() if s.listener != nil { s.listener.OnDisconnect() } if closed { return } if client != nil { client.active = false } go func() { time.Sleep(10 * time.Millisecond) s.connectFrom(0) }() }