feat(bugseti): implement pending operations queue with disk persistence

Replace no-op stubs with real implementations for queueOp, drainPendingOps,
savePendingOps, and loadPendingOps. Operations are persisted to hub_pending.json
and replayed on next hub connection — 5xx/transport errors are retried, 4xx
responses are dropped as stale. Adds PendingCount() for queue inspection.

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Snider 2026-02-13 21:36:08 +00:00 committed by Snider
parent 7a92fe0040
commit a567568bbf
2 changed files with 180 additions and 7 deletions

View file

@ -11,6 +11,8 @@ import (
"log"
"net/http"
"net/url"
"os"
"path/filepath"
"runtime"
"sync"
"time"
@ -31,7 +33,7 @@ type HubService struct {
type PendingOp struct {
Method string `json:"method"`
Path string `json:"path"`
Body interface{} `json:"body,omitempty"`
Body json.RawMessage `json:"body,omitempty"`
CreatedAt time.Time `json:"createdAt"`
}
@ -207,14 +209,120 @@ func (h *HubService) doJSON(method, path string, body, dest interface{}) error {
return nil
}
// loadPendingOps is a no-op placeholder (disk persistence comes in Task 7).
func (h *HubService) loadPendingOps() {}
// queueOp marshals body to JSON and appends a PendingOp to the queue.
func (h *HubService) queueOp(method, path string, body interface{}) {
var raw json.RawMessage
if body != nil {
data, err := json.Marshal(body)
if err != nil {
log.Printf("BugSETI: queueOp marshal error: %v", err)
return
}
raw = data
}
// savePendingOps is a no-op placeholder (disk persistence comes in Task 7).
func (h *HubService) savePendingOps() {}
h.mu.Lock()
h.pending = append(h.pending, PendingOp{
Method: method,
Path: path,
Body: raw,
CreatedAt: time.Now(),
})
h.mu.Unlock()
// drainPendingOps replays queued operations (no-op until Task 7).
func (h *HubService) drainPendingOps() {}
h.savePendingOps()
}
// drainPendingOps replays queued operations against the hub.
// 5xx/transport errors are kept for retry; 4xx responses are dropped (stale).
func (h *HubService) drainPendingOps() {
h.mu.Lock()
ops := h.pending
h.pending = make([]PendingOp, 0)
h.mu.Unlock()
if len(ops) == 0 {
return
}
var failed []PendingOp
for _, op := range ops {
var body interface{}
if len(op.Body) > 0 {
body = json.RawMessage(op.Body)
}
resp, err := h.doRequest(op.Method, op.Path, body)
if err != nil {
// Transport error — keep for retry.
failed = append(failed, op)
continue
}
resp.Body.Close()
if resp.StatusCode >= 500 {
// Server error — keep for retry.
failed = append(failed, op)
} // 4xx are dropped (stale).
}
if len(failed) > 0 {
h.mu.Lock()
h.pending = append(failed, h.pending...)
h.mu.Unlock()
}
h.savePendingOps()
}
// savePendingOps persists the pending operations queue to disk.
func (h *HubService) savePendingOps() {
dataDir := h.config.GetDataDir()
if dataDir == "" {
return
}
h.mu.RLock()
data, err := json.Marshal(h.pending)
h.mu.RUnlock()
if err != nil {
log.Printf("BugSETI: savePendingOps marshal error: %v", err)
return
}
path := filepath.Join(dataDir, "hub_pending.json")
if err := os.WriteFile(path, data, 0600); err != nil {
log.Printf("BugSETI: savePendingOps write error: %v", err)
}
}
// loadPendingOps loads the pending operations queue from disk.
// Errors are silently ignored (the file may not exist yet).
func (h *HubService) loadPendingOps() {
dataDir := h.config.GetDataDir()
if dataDir == "" {
return
}
path := filepath.Join(dataDir, "hub_pending.json")
data, err := os.ReadFile(path)
if err != nil {
return
}
var ops []PendingOp
if err := json.Unmarshal(data, &ops); err != nil {
return
}
h.pending = ops
}
// PendingCount returns the number of queued pending operations.
func (h *HubService) PendingCount() int {
h.mu.RLock()
defer h.mu.RUnlock()
return len(h.pending)
}
// ---- Task 4: Auto-Register via Forge Token ----

View file

@ -491,3 +491,68 @@ func TestGetGlobalStats_Good(t *testing.T) {
assert.Equal(t, 25, stats.ActiveClaims)
assert.Equal(t, 150, stats.IssuesAvailable)
}
// ---- Task 7: Pending Operations Queue ----
func TestPendingOps_Good_QueueAndDrain(t *testing.T) {
var callCount int32
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
callCount++
w.WriteHeader(http.StatusOK)
}))
defer srv.Close()
cfg := testConfigService(t, nil, nil)
cfg.config.HubURL = srv.URL
cfg.config.HubToken = "tok"
h := NewHubService(cfg)
// Manually queue a pending op (simulates a previous failed request).
h.queueOp("POST", "/heartbeat", map[string]string{"client_id": "test"})
assert.Equal(t, 1, h.PendingCount())
// Register() calls drainPendingOps() first, then sends its own request.
err := h.Register()
require.NoError(t, err)
// At least 2 calls: 1 from drain (the queued heartbeat) + 1 from Register itself.
assert.GreaterOrEqual(t, callCount, int32(2))
assert.Equal(t, 0, h.PendingCount())
}
func TestPendingOps_Good_PersistAndLoad(t *testing.T) {
cfg1 := testConfigService(t, nil, nil)
cfg1.config.HubURL = "https://hub.example.com"
cfg1.config.HubToken = "tok"
h1 := NewHubService(cfg1)
// Queue an op — this also calls savePendingOps.
h1.queueOp("POST", "/heartbeat", map[string]string{"client_id": "test"})
assert.Equal(t, 1, h1.PendingCount())
// Create a second HubService with the same data dir.
// NewHubService calls loadPendingOps() in its constructor.
cfg2 := testConfigService(t, nil, nil)
cfg2.config.DataDir = cfg1.config.DataDir // Share the same data dir.
cfg2.config.HubURL = "https://hub.example.com"
cfg2.config.HubToken = "tok"
h2 := NewHubService(cfg2)
assert.Equal(t, 1, h2.PendingCount())
}
func TestPendingCount_Good(t *testing.T) {
cfg := testConfigService(t, nil, nil)
cfg.config.HubURL = "https://hub.example.com"
cfg.config.HubToken = "tok"
h := NewHubService(cfg)
assert.Equal(t, 0, h.PendingCount())
h.queueOp("POST", "/test1", nil)
assert.Equal(t, 1, h.PendingCount())
h.queueOp("POST", "/test2", map[string]string{"key": "val"})
assert.Equal(t, 2, h.PendingCount())
}