From 417b967d489ad42961d8328de45573f91be1f217 Mon Sep 17 00:00:00 2001 From: Virgil Date: Sat, 4 Apr 2026 11:44:43 +0000 Subject: [PATCH] fix(proxy): preserve submitted job snapshots Co-Authored-By: Virgil --- config_runtime.go | 3 ++ splitter/nicehash/mapper.go | 16 ++++++-- splitter/nicehash/storage.go | 22 +++++++---- splitter/nicehash/storage_test.go | 38 ++++++++++++++++++ splitter/simple/mapper.go | 37 ++++++++++-------- splitter/simple/splitter.go | 3 +- splitter/simple/splitter_test.go | 65 +++++++++++++++++++++++++++++++ 7 files changed, 157 insertions(+), 27 deletions(-) diff --git a/config_runtime.go b/config_runtime.go index 6af1e7a..6288549 100644 --- a/config_runtime.go +++ b/config_runtime.go @@ -75,6 +75,9 @@ func (w *ConfigWatcher) Start() { go func() { ticker := time.NewTicker(time.Second) defer ticker.Stop() + if info, errorValue := os.Stat(w.path); errorValue == nil { + w.lastMod = info.ModTime() + } for { select { diff --git a/splitter/nicehash/mapper.go b/splitter/nicehash/mapper.go index 22b2d56..9b19f83 100644 --- a/splitter/nicehash/mapper.go +++ b/splitter/nicehash/mapper.go @@ -32,6 +32,7 @@ type NonceMapper struct { type SubmitContext struct { RequestID int64 // JSON-RPC id from the miner's submit request MinerID int64 // miner that submitted + Job proxy.Job JobID string Expired bool SubmittedAt time.Time @@ -79,7 +80,7 @@ func (m *NonceMapper) Submit(event *proxy.SubmitEvent) { return } - valid, expired := m.storage.JobStatus(event.JobID) + job, valid, expired := m.storage.JobForID(event.JobID) if !valid { event.Miner.ReplyWithError(event.RequestID, "Invalid job id") return @@ -96,6 +97,7 @@ func (m *NonceMapper) Submit(event *proxy.SubmitEvent) { m.pending[sequence] = SubmitContext{ RequestID: event.RequestID, MinerID: event.Miner.ID(), + Job: job, JobID: event.JobID, Expired: expired, SubmittedAt: time.Now().UTC(), @@ -145,7 +147,6 @@ func (m *NonceMapper) OnResultAccepted(sequence int64, accepted bool, errorMessa } if m.events != nil { - jobCopy := m.storage.CurrentJob() latency := uint16(0) if !context.SubmittedAt.IsZero() { elapsed := time.Since(context.SubmittedAt).Milliseconds() @@ -160,7 +161,7 @@ func (m *NonceMapper) OnResultAccepted(sequence int64, accepted bool, errorMessa m.events.Dispatch(proxy.Event{ Type: eventType, Miner: miner, - Job: jobCopy, + Job: jobPointer(context.Job), Diff: miner.Diff(), Error: errorMessage, Latency: latency, @@ -200,3 +201,12 @@ func (m *NonceMapper) clearPending() { m.pending = make(map[int64]SubmitContext) m.mu.Unlock() } + +func jobPointer(job proxy.Job) *proxy.Job { + if !job.IsValid() { + return nil + } + + jobCopy := job + return &jobCopy +} diff --git a/splitter/nicehash/storage.go b/splitter/nicehash/storage.go index d44cee4..6ecf124 100644 --- a/splitter/nicehash/storage.go +++ b/splitter/nicehash/storage.go @@ -118,23 +118,31 @@ func (s *NonceStorage) IsValidJobID(id string) bool { return valid } -// JobStatus returns whether the job ID is current or stale-but-still-acceptable. +// JobForID returns a copy of the current or previous job for the given ID. // -// valid, expired := storage.JobStatus(submitJobID) -func (s *NonceStorage) JobStatus(id string) (valid bool, expired bool) { +// job, valid, expired := storage.JobForID(submitJobID) +func (s *NonceStorage) JobForID(id string) (job proxy.Job, valid bool, expired bool) { s.mu.Lock() defer s.mu.Unlock() if id == "" { - return false, false + return proxy.Job{}, false, false } if id == s.job.JobID { - return true, false + return s.job, true, false } if id == s.prevJob.JobID && id != "" { - return true, true + return s.prevJob, true, true } - return false, false + return proxy.Job{}, false, false +} + +// JobStatus returns whether the job ID is current or stale-but-still-acceptable. +// +// valid, expired := storage.JobStatus(submitJobID) +func (s *NonceStorage) JobStatus(id string) (valid bool, expired bool) { + _, valid, expired = s.JobForID(id) + return valid, expired } // SlotCount returns free, dead, and active slot counts for monitoring output. diff --git a/splitter/nicehash/storage_test.go b/splitter/nicehash/storage_test.go index 57064a1..b1aaaa3 100644 --- a/splitter/nicehash/storage_test.go +++ b/splitter/nicehash/storage_test.go @@ -1,7 +1,9 @@ package nicehash import ( + "strings" "testing" + "time" "dappco.re/go/core/proxy" ) @@ -68,3 +70,39 @@ func TestNonceMapper_OnDisconnect_Ugly(t *testing.T) { t.Fatalf("expected pending submits to be cleared, got %d", len(mapper.pending)) } } + +func TestNonceMapper_OnResultAccepted_Good(t *testing.T) { + bus := proxy.NewEventBus() + resultCh := make(chan proxy.Event, 1) + bus.Subscribe(proxy.EventAccept, func(event proxy.Event) { + resultCh <- event + }) + + miner := proxy.NewMiner(nil, 0, nil) + mapper := NewNonceMapper(1, &proxy.Config{}, nil) + mapper.events = bus + if !mapper.storage.Add(miner) { + t.Fatal("expected miner slot allocation") + } + mapper.storage.SetJob(proxy.Job{Blob: strings.Repeat("0", 160), JobID: "job-a", Target: "b88d0600"}) + mapper.mu.Lock() + mapper.pending[1] = SubmitContext{ + RequestID: 7, + MinerID: miner.ID(), + Job: proxy.Job{Blob: strings.Repeat("0", 160), JobID: "job-a", Target: "b88d0600"}, + SubmittedAt: time.Now().UTC(), + } + mapper.mu.Unlock() + mapper.storage.SetJob(proxy.Job{Blob: strings.Repeat("1", 160), JobID: "job-b", Target: "b88d0600"}) + + mapper.OnResultAccepted(1, true, "") + + select { + case event := <-resultCh: + if event.Job == nil || event.Job.JobID != "job-a" { + t.Fatalf("expected submitted job to be reported, got %#v", event.Job) + } + case <-time.After(time.Second): + t.Fatal("expected accept event") + } +} diff --git a/splitter/simple/mapper.go b/splitter/simple/mapper.go index a6f4863..2af9a38 100644 --- a/splitter/simple/mapper.go +++ b/splitter/simple/mapper.go @@ -28,6 +28,7 @@ type SimpleMapper struct { type simpleSubmitContext struct { RequestID int64 + Job proxy.Job Expired bool SubmittedAt time.Time } @@ -60,19 +61,24 @@ func (m *SimpleMapper) OnJob(job proxy.Job) { } func (m *SimpleMapper) JobStatus(id string) (valid bool, expired bool) { + _, valid, expired = m.JobForID(id) + return valid, expired +} + +func (m *SimpleMapper) JobForID(id string) (proxy.Job, bool, bool) { m.mu.Lock() defer m.mu.Unlock() if id == "" { - return false, false + return proxy.Job{}, false, false } if id == m.job.JobID { - return true, false + return m.job, true, false } if id == m.prevJob.JobID { - return true, true + return m.prevJob, true, true } - return false, false + return proxy.Job{}, false, false } func (m *SimpleMapper) OnResultAccepted(sequence int64, accepted bool, errorMessage string) { @@ -93,7 +99,7 @@ func (m *SimpleMapper) OnResultAccepted(sequence int64, accepted bool, errorMess if accepted { latency := shareLatency(context.SubmittedAt) if m.events != nil { - m.events.Dispatch(proxy.Event{Type: proxy.EventAccept, Miner: miner, Job: m.currentJob(), Diff: miner.Diff(), Latency: latency, Expired: context.Expired}) + m.events.Dispatch(proxy.Event{Type: proxy.EventAccept, Miner: miner, Job: jobPointer(context.Job), Diff: miner.Diff(), Latency: latency, Expired: context.Expired}) } miner.Success(context.RequestID, "OK") return @@ -101,7 +107,7 @@ func (m *SimpleMapper) OnResultAccepted(sequence int64, accepted bool, errorMess latency := shareLatency(context.SubmittedAt) if m.events != nil { - m.events.Dispatch(proxy.Event{Type: proxy.EventReject, Miner: miner, Job: m.currentJob(), Diff: miner.Diff(), Error: errorMessage, Latency: latency, Expired: context.Expired}) + m.events.Dispatch(proxy.Event{Type: proxy.EventReject, Miner: miner, Job: jobPointer(context.Job), Diff: miner.Diff(), Error: errorMessage, Latency: latency, Expired: context.Expired}) } miner.ReplyWithError(context.RequestID, errorMessage) } @@ -111,22 +117,21 @@ func (m *SimpleMapper) OnDisconnect() { m.stopped = true } -func (m *SimpleMapper) currentJob() *proxy.Job { - m.mu.Lock() - defer m.mu.Unlock() - if !m.job.IsValid() { - return nil - } - job := m.job - return &job -} - func (m *SimpleMapper) clearPending() { m.mu.Lock() m.pending = make(map[int64]simpleSubmitContext) m.mu.Unlock() } +func jobPointer(job proxy.Job) *proxy.Job { + if !job.IsValid() { + return nil + } + + jobCopy := job + return &jobCopy +} + func shareLatency(submittedAt time.Time) uint16 { if submittedAt.IsZero() { return 0 diff --git a/splitter/simple/splitter.go b/splitter/simple/splitter.go index 56501de..98246c3 100644 --- a/splitter/simple/splitter.go +++ b/splitter/simple/splitter.go @@ -113,7 +113,7 @@ func (s *SimpleSplitter) OnSubmit(event *proxy.SubmitEvent) { return } - valid, expired := mapper.JobStatus(event.JobID) + job, valid, expired := mapper.JobForID(event.JobID) if !valid { event.Miner.ReplyWithError(event.RequestID, "Invalid job id") return @@ -128,6 +128,7 @@ func (s *SimpleSplitter) OnSubmit(event *proxy.SubmitEvent) { mapper.mu.Lock() mapper.pending[sequence] = simpleSubmitContext{ RequestID: event.RequestID, + Job: job, Expired: expired, SubmittedAt: time.Now().UTC(), } diff --git a/splitter/simple/splitter_test.go b/splitter/simple/splitter_test.go index eb6cf07..383a1ab 100644 --- a/splitter/simple/splitter_test.go +++ b/splitter/simple/splitter_test.go @@ -1,6 +1,8 @@ package simple import ( + "os" + "strings" "testing" "time" @@ -117,3 +119,66 @@ func TestSimpleSplitter_OnClose_Ugly(t *testing.T) { t.Fatal("expected mapper to move to idle pool") } } + +func TestSimpleMapper_OnResultAccepted_Good(t *testing.T) { + bus := proxy.NewEventBus() + resultCh := make(chan proxy.Event, 1) + bus.Subscribe(proxy.EventAccept, func(event proxy.Event) { + resultCh <- event + }) + + mapper := &SimpleMapper{ + miner: &proxy.Miner{}, + events: bus, + pending: make(map[int64]simpleSubmitContext), + job: proxy.Job{Blob: strings.Repeat("0", 160), JobID: "job-b", Target: "b88d0600"}, + prevJob: proxy.Job{Blob: strings.Repeat("1", 160), JobID: "job-a", Target: "b88d0600"}, + } + mapper.pending[1] = simpleSubmitContext{ + RequestID: 7, + Job: proxy.Job{Blob: strings.Repeat("1", 160), JobID: "job-a", Target: "b88d0600"}, + SubmittedAt: time.Now().UTC(), + } + + mapper.OnResultAccepted(1, true, "") + + select { + case event := <-resultCh: + if event.Job == nil || event.Job.JobID != "job-a" { + t.Fatalf("expected submitted job to be reported, got %#v", event.Job) + } + case <-time.After(time.Second): + t.Fatal("expected accept event") + } +} + +func TestConfigWatcher_Start_Ugly(t *testing.T) { + path := t.TempDir() + "/config.json" + errorValue := os.WriteFile(path, []byte(`{"bind":[{"host":"127.0.0.1","port":3333}],"pools":[{"url":"pool-a:3333","enabled":true}]}`), 0o644) + if errorValue != nil { + t.Fatal(errorValue) + } + + watcherTriggered := make(chan struct{}, 1) + watcher := proxy.NewConfigWatcher(path, func(cfg *proxy.Config) { + watcherTriggered <- struct{}{} + }) + watcher.Start() + defer watcher.Stop() + + select { + case <-watcherTriggered: + t.Fatal("expected watcher to stay quiet until the file changes") + case <-time.After(1200 * time.Millisecond): + } + + if errorValue = os.WriteFile(path, []byte(`{"bind":[{"host":"127.0.0.1","port":3333}],"pools":[{"url":"pool-b:3333","enabled":true}]}`), 0o644); errorValue != nil { + t.Fatal(errorValue) + } + + select { + case <-watcherTriggered: + case <-time.After(2 * time.Second): + t.Fatal("expected watcher to observe the modification") + } +}