feat(proxy): add share sink fanout

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-04-05 01:08:23 +00:00
parent d0ae26a1a2
commit 3cd0909d74
3 changed files with 60 additions and 4 deletions

View file

@ -149,6 +149,48 @@ func (b *EventBus) Dispatch(e Event) {
}
}
type shareSinkGroup struct {
sinks []ShareSink
}
func newShareSinkGroup(sinks ...ShareSink) *shareSinkGroup {
group := &shareSinkGroup{sinks: make([]ShareSink, 0, len(sinks))}
for _, sink := range sinks {
if sink != nil {
group.sinks = append(group.sinks, sink)
}
}
return group
}
func (g *shareSinkGroup) OnAccept(e Event) {
if g == nil {
return
}
for _, sink := range g.sinks {
func() {
defer func() {
_ = recover()
}()
sink.OnAccept(e)
}()
}
}
func (g *shareSinkGroup) OnReject(e Event) {
if g == nil {
return
}
for _, sink := range g.sinks {
func() {
defer func() {
_ = recover()
}()
sink.OnReject(e)
}()
}
}
// IsValid returns true when the job contains a blob and job id.
//
// if !job.IsValid() {

View file

@ -25,6 +25,7 @@ type Proxy struct {
config *Config
configMu sync.RWMutex
splitter Splitter
shareSink ShareSink
stats *Stats
workers *Workers
events *EventBus
@ -66,6 +67,15 @@ type Splitter interface {
Upstreams() UpstreamStats
}
// ShareSink consumes share outcomes from the proxy event stream.
//
// sink.OnAccept(proxy.Event{Miner: miner, Diff: 100000})
// sink.OnReject(proxy.Event{Miner: miner, Error: "Invalid nonce"})
type ShareSink interface {
OnAccept(Event)
OnReject(Event)
}
// UpstreamStats reports pool connection counts.
//
// stats := proxy.UpstreamStats{Active: 1, Sleep: 0, Error: 0, Total: 1}

View file

@ -69,13 +69,17 @@ func New(config *Config) (*Proxy, Result) {
p.events.Subscribe(EventClose, p.stats.OnClose)
p.events.Subscribe(EventAccept, p.stats.OnAccept)
p.events.Subscribe(EventReject, p.stats.OnReject)
shareSinks := make([]ShareSink, 0, 2)
if p.shareLog != nil {
p.events.Subscribe(EventAccept, p.shareLog.OnAccept)
p.events.Subscribe(EventReject, p.shareLog.OnReject)
shareSinks = append(shareSinks, p.shareLog)
}
if p.customDiffBuckets != nil {
p.events.Subscribe(EventAccept, p.customDiffBuckets.OnAccept)
p.events.Subscribe(EventReject, p.customDiffBuckets.OnReject)
shareSinks = append(shareSinks, p.customDiffBuckets)
}
if len(shareSinks) > 0 {
p.shareSink = newShareSinkGroup(shareSinks...)
p.events.Subscribe(EventAccept, p.shareSink.OnAccept)
p.events.Subscribe(EventReject, p.shareSink.OnReject)
}
p.events.Subscribe(EventAccept, p.onShareSettled)
p.events.Subscribe(EventReject, p.onShareSettled)