From 3cd0909d74ff3773cda69f709225d872aaa48d4c Mon Sep 17 00:00:00 2001 From: Virgil Date: Sun, 5 Apr 2026 01:08:23 +0000 Subject: [PATCH] feat(proxy): add share sink fanout Co-Authored-By: Virgil --- core_impl.go | 42 ++++++++++++++++++++++++++++++++++++++++++ proxy.go | 10 ++++++++++ state_impl.go | 12 ++++++++---- 3 files changed, 60 insertions(+), 4 deletions(-) diff --git a/core_impl.go b/core_impl.go index 54e28d6..7233498 100644 --- a/core_impl.go +++ b/core_impl.go @@ -149,6 +149,48 @@ func (b *EventBus) Dispatch(e Event) { } } +type shareSinkGroup struct { + sinks []ShareSink +} + +func newShareSinkGroup(sinks ...ShareSink) *shareSinkGroup { + group := &shareSinkGroup{sinks: make([]ShareSink, 0, len(sinks))} + for _, sink := range sinks { + if sink != nil { + group.sinks = append(group.sinks, sink) + } + } + return group +} + +func (g *shareSinkGroup) OnAccept(e Event) { + if g == nil { + return + } + for _, sink := range g.sinks { + func() { + defer func() { + _ = recover() + }() + sink.OnAccept(e) + }() + } +} + +func (g *shareSinkGroup) OnReject(e Event) { + if g == nil { + return + } + for _, sink := range g.sinks { + func() { + defer func() { + _ = recover() + }() + sink.OnReject(e) + }() + } +} + // IsValid returns true when the job contains a blob and job id. // // if !job.IsValid() { diff --git a/proxy.go b/proxy.go index 83fc0cc..73bdb3c 100644 --- a/proxy.go +++ b/proxy.go @@ -25,6 +25,7 @@ type Proxy struct { config *Config configMu sync.RWMutex splitter Splitter + shareSink ShareSink stats *Stats workers *Workers events *EventBus @@ -66,6 +67,15 @@ type Splitter interface { Upstreams() UpstreamStats } +// ShareSink consumes share outcomes from the proxy event stream. +// +// sink.OnAccept(proxy.Event{Miner: miner, Diff: 100000}) +// sink.OnReject(proxy.Event{Miner: miner, Error: "Invalid nonce"}) +type ShareSink interface { + OnAccept(Event) + OnReject(Event) +} + // UpstreamStats reports pool connection counts. // // stats := proxy.UpstreamStats{Active: 1, Sleep: 0, Error: 0, Total: 1} diff --git a/state_impl.go b/state_impl.go index d8559fe..d0069a5 100644 --- a/state_impl.go +++ b/state_impl.go @@ -69,13 +69,17 @@ func New(config *Config) (*Proxy, Result) { p.events.Subscribe(EventClose, p.stats.OnClose) p.events.Subscribe(EventAccept, p.stats.OnAccept) p.events.Subscribe(EventReject, p.stats.OnReject) + shareSinks := make([]ShareSink, 0, 2) if p.shareLog != nil { - p.events.Subscribe(EventAccept, p.shareLog.OnAccept) - p.events.Subscribe(EventReject, p.shareLog.OnReject) + shareSinks = append(shareSinks, p.shareLog) } if p.customDiffBuckets != nil { - p.events.Subscribe(EventAccept, p.customDiffBuckets.OnAccept) - p.events.Subscribe(EventReject, p.customDiffBuckets.OnReject) + shareSinks = append(shareSinks, p.customDiffBuckets) + } + if len(shareSinks) > 0 { + p.shareSink = newShareSinkGroup(shareSinks...) + p.events.Subscribe(EventAccept, p.shareSink.OnAccept) + p.events.Subscribe(EventReject, p.shareSink.OnReject) } p.events.Subscribe(EventAccept, p.onShareSettled) p.events.Subscribe(EventReject, p.onShareSettled)