fix(proxy): preserve submitted job snapshots

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-04-04 11:44:43 +00:00
parent 96a0652235
commit 417b967d48
7 changed files with 157 additions and 27 deletions

View file

@ -75,6 +75,9 @@ func (w *ConfigWatcher) Start() {
go func() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
if info, errorValue := os.Stat(w.path); errorValue == nil {
w.lastMod = info.ModTime()
}
for {
select {

View file

@ -32,6 +32,7 @@ type NonceMapper struct {
type SubmitContext struct {
RequestID int64 // JSON-RPC id from the miner's submit request
MinerID int64 // miner that submitted
Job proxy.Job
JobID string
Expired bool
SubmittedAt time.Time
@ -79,7 +80,7 @@ func (m *NonceMapper) Submit(event *proxy.SubmitEvent) {
return
}
valid, expired := m.storage.JobStatus(event.JobID)
job, valid, expired := m.storage.JobForID(event.JobID)
if !valid {
event.Miner.ReplyWithError(event.RequestID, "Invalid job id")
return
@ -96,6 +97,7 @@ func (m *NonceMapper) Submit(event *proxy.SubmitEvent) {
m.pending[sequence] = SubmitContext{
RequestID: event.RequestID,
MinerID: event.Miner.ID(),
Job: job,
JobID: event.JobID,
Expired: expired,
SubmittedAt: time.Now().UTC(),
@ -145,7 +147,6 @@ func (m *NonceMapper) OnResultAccepted(sequence int64, accepted bool, errorMessa
}
if m.events != nil {
jobCopy := m.storage.CurrentJob()
latency := uint16(0)
if !context.SubmittedAt.IsZero() {
elapsed := time.Since(context.SubmittedAt).Milliseconds()
@ -160,7 +161,7 @@ func (m *NonceMapper) OnResultAccepted(sequence int64, accepted bool, errorMessa
m.events.Dispatch(proxy.Event{
Type: eventType,
Miner: miner,
Job: jobCopy,
Job: jobPointer(context.Job),
Diff: miner.Diff(),
Error: errorMessage,
Latency: latency,
@ -200,3 +201,12 @@ func (m *NonceMapper) clearPending() {
m.pending = make(map[int64]SubmitContext)
m.mu.Unlock()
}
func jobPointer(job proxy.Job) *proxy.Job {
if !job.IsValid() {
return nil
}
jobCopy := job
return &jobCopy
}

View file

@ -118,23 +118,31 @@ func (s *NonceStorage) IsValidJobID(id string) bool {
return valid
}
// JobStatus returns whether the job ID is current or stale-but-still-acceptable.
// JobForID returns a copy of the current or previous job for the given ID.
//
// valid, expired := storage.JobStatus(submitJobID)
func (s *NonceStorage) JobStatus(id string) (valid bool, expired bool) {
// job, valid, expired := storage.JobForID(submitJobID)
func (s *NonceStorage) JobForID(id string) (job proxy.Job, valid bool, expired bool) {
s.mu.Lock()
defer s.mu.Unlock()
if id == "" {
return false, false
return proxy.Job{}, false, false
}
if id == s.job.JobID {
return true, false
return s.job, true, false
}
if id == s.prevJob.JobID && id != "" {
return true, true
return s.prevJob, true, true
}
return false, false
return proxy.Job{}, false, false
}
// JobStatus returns whether the job ID is current or stale-but-still-acceptable.
//
// valid, expired := storage.JobStatus(submitJobID)
func (s *NonceStorage) JobStatus(id string) (valid bool, expired bool) {
_, valid, expired = s.JobForID(id)
return valid, expired
}
// SlotCount returns free, dead, and active slot counts for monitoring output.

View file

@ -1,7 +1,9 @@
package nicehash
import (
"strings"
"testing"
"time"
"dappco.re/go/core/proxy"
)
@ -68,3 +70,39 @@ func TestNonceMapper_OnDisconnect_Ugly(t *testing.T) {
t.Fatalf("expected pending submits to be cleared, got %d", len(mapper.pending))
}
}
func TestNonceMapper_OnResultAccepted_Good(t *testing.T) {
bus := proxy.NewEventBus()
resultCh := make(chan proxy.Event, 1)
bus.Subscribe(proxy.EventAccept, func(event proxy.Event) {
resultCh <- event
})
miner := proxy.NewMiner(nil, 0, nil)
mapper := NewNonceMapper(1, &proxy.Config{}, nil)
mapper.events = bus
if !mapper.storage.Add(miner) {
t.Fatal("expected miner slot allocation")
}
mapper.storage.SetJob(proxy.Job{Blob: strings.Repeat("0", 160), JobID: "job-a", Target: "b88d0600"})
mapper.mu.Lock()
mapper.pending[1] = SubmitContext{
RequestID: 7,
MinerID: miner.ID(),
Job: proxy.Job{Blob: strings.Repeat("0", 160), JobID: "job-a", Target: "b88d0600"},
SubmittedAt: time.Now().UTC(),
}
mapper.mu.Unlock()
mapper.storage.SetJob(proxy.Job{Blob: strings.Repeat("1", 160), JobID: "job-b", Target: "b88d0600"})
mapper.OnResultAccepted(1, true, "")
select {
case event := <-resultCh:
if event.Job == nil || event.Job.JobID != "job-a" {
t.Fatalf("expected submitted job to be reported, got %#v", event.Job)
}
case <-time.After(time.Second):
t.Fatal("expected accept event")
}
}

View file

@ -28,6 +28,7 @@ type SimpleMapper struct {
type simpleSubmitContext struct {
RequestID int64
Job proxy.Job
Expired bool
SubmittedAt time.Time
}
@ -60,19 +61,24 @@ func (m *SimpleMapper) OnJob(job proxy.Job) {
}
func (m *SimpleMapper) JobStatus(id string) (valid bool, expired bool) {
_, valid, expired = m.JobForID(id)
return valid, expired
}
func (m *SimpleMapper) JobForID(id string) (proxy.Job, bool, bool) {
m.mu.Lock()
defer m.mu.Unlock()
if id == "" {
return false, false
return proxy.Job{}, false, false
}
if id == m.job.JobID {
return true, false
return m.job, true, false
}
if id == m.prevJob.JobID {
return true, true
return m.prevJob, true, true
}
return false, false
return proxy.Job{}, false, false
}
func (m *SimpleMapper) OnResultAccepted(sequence int64, accepted bool, errorMessage string) {
@ -93,7 +99,7 @@ func (m *SimpleMapper) OnResultAccepted(sequence int64, accepted bool, errorMess
if accepted {
latency := shareLatency(context.SubmittedAt)
if m.events != nil {
m.events.Dispatch(proxy.Event{Type: proxy.EventAccept, Miner: miner, Job: m.currentJob(), Diff: miner.Diff(), Latency: latency, Expired: context.Expired})
m.events.Dispatch(proxy.Event{Type: proxy.EventAccept, Miner: miner, Job: jobPointer(context.Job), Diff: miner.Diff(), Latency: latency, Expired: context.Expired})
}
miner.Success(context.RequestID, "OK")
return
@ -101,7 +107,7 @@ func (m *SimpleMapper) OnResultAccepted(sequence int64, accepted bool, errorMess
latency := shareLatency(context.SubmittedAt)
if m.events != nil {
m.events.Dispatch(proxy.Event{Type: proxy.EventReject, Miner: miner, Job: m.currentJob(), Diff: miner.Diff(), Error: errorMessage, Latency: latency, Expired: context.Expired})
m.events.Dispatch(proxy.Event{Type: proxy.EventReject, Miner: miner, Job: jobPointer(context.Job), Diff: miner.Diff(), Error: errorMessage, Latency: latency, Expired: context.Expired})
}
miner.ReplyWithError(context.RequestID, errorMessage)
}
@ -111,22 +117,21 @@ func (m *SimpleMapper) OnDisconnect() {
m.stopped = true
}
func (m *SimpleMapper) currentJob() *proxy.Job {
m.mu.Lock()
defer m.mu.Unlock()
if !m.job.IsValid() {
return nil
}
job := m.job
return &job
}
func (m *SimpleMapper) clearPending() {
m.mu.Lock()
m.pending = make(map[int64]simpleSubmitContext)
m.mu.Unlock()
}
func jobPointer(job proxy.Job) *proxy.Job {
if !job.IsValid() {
return nil
}
jobCopy := job
return &jobCopy
}
func shareLatency(submittedAt time.Time) uint16 {
if submittedAt.IsZero() {
return 0

View file

@ -113,7 +113,7 @@ func (s *SimpleSplitter) OnSubmit(event *proxy.SubmitEvent) {
return
}
valid, expired := mapper.JobStatus(event.JobID)
job, valid, expired := mapper.JobForID(event.JobID)
if !valid {
event.Miner.ReplyWithError(event.RequestID, "Invalid job id")
return
@ -128,6 +128,7 @@ func (s *SimpleSplitter) OnSubmit(event *proxy.SubmitEvent) {
mapper.mu.Lock()
mapper.pending[sequence] = simpleSubmitContext{
RequestID: event.RequestID,
Job: job,
Expired: expired,
SubmittedAt: time.Now().UTC(),
}

View file

@ -1,6 +1,8 @@
package simple
import (
"os"
"strings"
"testing"
"time"
@ -117,3 +119,66 @@ func TestSimpleSplitter_OnClose_Ugly(t *testing.T) {
t.Fatal("expected mapper to move to idle pool")
}
}
func TestSimpleMapper_OnResultAccepted_Good(t *testing.T) {
bus := proxy.NewEventBus()
resultCh := make(chan proxy.Event, 1)
bus.Subscribe(proxy.EventAccept, func(event proxy.Event) {
resultCh <- event
})
mapper := &SimpleMapper{
miner: &proxy.Miner{},
events: bus,
pending: make(map[int64]simpleSubmitContext),
job: proxy.Job{Blob: strings.Repeat("0", 160), JobID: "job-b", Target: "b88d0600"},
prevJob: proxy.Job{Blob: strings.Repeat("1", 160), JobID: "job-a", Target: "b88d0600"},
}
mapper.pending[1] = simpleSubmitContext{
RequestID: 7,
Job: proxy.Job{Blob: strings.Repeat("1", 160), JobID: "job-a", Target: "b88d0600"},
SubmittedAt: time.Now().UTC(),
}
mapper.OnResultAccepted(1, true, "")
select {
case event := <-resultCh:
if event.Job == nil || event.Job.JobID != "job-a" {
t.Fatalf("expected submitted job to be reported, got %#v", event.Job)
}
case <-time.After(time.Second):
t.Fatal("expected accept event")
}
}
func TestConfigWatcher_Start_Ugly(t *testing.T) {
path := t.TempDir() + "/config.json"
errorValue := os.WriteFile(path, []byte(`{"bind":[{"host":"127.0.0.1","port":3333}],"pools":[{"url":"pool-a:3333","enabled":true}]}`), 0o644)
if errorValue != nil {
t.Fatal(errorValue)
}
watcherTriggered := make(chan struct{}, 1)
watcher := proxy.NewConfigWatcher(path, func(cfg *proxy.Config) {
watcherTriggered <- struct{}{}
})
watcher.Start()
defer watcher.Stop()
select {
case <-watcherTriggered:
t.Fatal("expected watcher to stay quiet until the file changes")
case <-time.After(1200 * time.Millisecond):
}
if errorValue = os.WriteFile(path, []byte(`{"bind":[{"host":"127.0.0.1","port":3333}],"pools":[{"url":"pool-b:3333","enabled":true}]}`), 0o644); errorValue != nil {
t.Fatal(errorValue)
}
select {
case <-watcherTriggered:
case <-time.After(2 * time.Second):
t.Fatal("expected watcher to observe the modification")
}
}