From b75fa9dd3fb9ac5cc68c70dd2029dc0f38614fc1 Mon Sep 17 00:00:00 2001 From: Snider Date: Fri, 13 Feb 2026 21:36:08 +0000 Subject: [PATCH] feat(bugseti): implement pending operations queue with disk persistence MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace no-op stubs with real implementations for queueOp, drainPendingOps, savePendingOps, and loadPendingOps. Operations are persisted to hub_pending.json and replayed on next hub connection — 5xx/transport errors are retried, 4xx responses are dropped as stale. Adds PendingCount() for queue inspection. Co-Authored-By: Virgil --- internal/bugseti/hub.go | 122 +++++++++++++++++++++++++++++++++-- internal/bugseti/hub_test.go | 65 +++++++++++++++++++ 2 files changed, 180 insertions(+), 7 deletions(-) diff --git a/internal/bugseti/hub.go b/internal/bugseti/hub.go index 70ed111..83f8367 100644 --- a/internal/bugseti/hub.go +++ b/internal/bugseti/hub.go @@ -11,6 +11,8 @@ import ( "log" "net/http" "net/url" + "os" + "path/filepath" "runtime" "sync" "time" @@ -31,7 +33,7 @@ type HubService struct { type PendingOp struct { Method string `json:"method"` Path string `json:"path"` - Body interface{} `json:"body,omitempty"` + Body json.RawMessage `json:"body,omitempty"` CreatedAt time.Time `json:"createdAt"` } @@ -207,14 +209,120 @@ func (h *HubService) doJSON(method, path string, body, dest interface{}) error { return nil } -// loadPendingOps is a no-op placeholder (disk persistence comes in Task 7). -func (h *HubService) loadPendingOps() {} +// queueOp marshals body to JSON and appends a PendingOp to the queue. +func (h *HubService) queueOp(method, path string, body interface{}) { + var raw json.RawMessage + if body != nil { + data, err := json.Marshal(body) + if err != nil { + log.Printf("BugSETI: queueOp marshal error: %v", err) + return + } + raw = data + } -// savePendingOps is a no-op placeholder (disk persistence comes in Task 7). -func (h *HubService) savePendingOps() {} + h.mu.Lock() + h.pending = append(h.pending, PendingOp{ + Method: method, + Path: path, + Body: raw, + CreatedAt: time.Now(), + }) + h.mu.Unlock() -// drainPendingOps replays queued operations (no-op until Task 7). -func (h *HubService) drainPendingOps() {} + h.savePendingOps() +} + +// drainPendingOps replays queued operations against the hub. +// 5xx/transport errors are kept for retry; 4xx responses are dropped (stale). +func (h *HubService) drainPendingOps() { + h.mu.Lock() + ops := h.pending + h.pending = make([]PendingOp, 0) + h.mu.Unlock() + + if len(ops) == 0 { + return + } + + var failed []PendingOp + for _, op := range ops { + var body interface{} + if len(op.Body) > 0 { + body = json.RawMessage(op.Body) + } + + resp, err := h.doRequest(op.Method, op.Path, body) + if err != nil { + // Transport error — keep for retry. + failed = append(failed, op) + continue + } + resp.Body.Close() + + if resp.StatusCode >= 500 { + // Server error — keep for retry. + failed = append(failed, op) + } // 4xx are dropped (stale). + } + + if len(failed) > 0 { + h.mu.Lock() + h.pending = append(failed, h.pending...) + h.mu.Unlock() + } + + h.savePendingOps() +} + +// savePendingOps persists the pending operations queue to disk. +func (h *HubService) savePendingOps() { + dataDir := h.config.GetDataDir() + if dataDir == "" { + return + } + + h.mu.RLock() + data, err := json.Marshal(h.pending) + h.mu.RUnlock() + if err != nil { + log.Printf("BugSETI: savePendingOps marshal error: %v", err) + return + } + + path := filepath.Join(dataDir, "hub_pending.json") + if err := os.WriteFile(path, data, 0600); err != nil { + log.Printf("BugSETI: savePendingOps write error: %v", err) + } +} + +// loadPendingOps loads the pending operations queue from disk. +// Errors are silently ignored (the file may not exist yet). +func (h *HubService) loadPendingOps() { + dataDir := h.config.GetDataDir() + if dataDir == "" { + return + } + + path := filepath.Join(dataDir, "hub_pending.json") + data, err := os.ReadFile(path) + if err != nil { + return + } + + var ops []PendingOp + if err := json.Unmarshal(data, &ops); err != nil { + return + } + h.pending = ops +} + +// PendingCount returns the number of queued pending operations. +func (h *HubService) PendingCount() int { + h.mu.RLock() + defer h.mu.RUnlock() + return len(h.pending) +} // ---- Task 4: Auto-Register via Forge Token ---- diff --git a/internal/bugseti/hub_test.go b/internal/bugseti/hub_test.go index 206b34b..e5236da 100644 --- a/internal/bugseti/hub_test.go +++ b/internal/bugseti/hub_test.go @@ -491,3 +491,68 @@ func TestGetGlobalStats_Good(t *testing.T) { assert.Equal(t, 25, stats.ActiveClaims) assert.Equal(t, 150, stats.IssuesAvailable) } + +// ---- Task 7: Pending Operations Queue ---- + +func TestPendingOps_Good_QueueAndDrain(t *testing.T) { + var callCount int32 + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + callCount++ + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + cfg := testConfigService(t, nil, nil) + cfg.config.HubURL = srv.URL + cfg.config.HubToken = "tok" + h := NewHubService(cfg) + + // Manually queue a pending op (simulates a previous failed request). + h.queueOp("POST", "/heartbeat", map[string]string{"client_id": "test"}) + assert.Equal(t, 1, h.PendingCount()) + + // Register() calls drainPendingOps() first, then sends its own request. + err := h.Register() + require.NoError(t, err) + + // At least 2 calls: 1 from drain (the queued heartbeat) + 1 from Register itself. + assert.GreaterOrEqual(t, callCount, int32(2)) + assert.Equal(t, 0, h.PendingCount()) +} + +func TestPendingOps_Good_PersistAndLoad(t *testing.T) { + cfg1 := testConfigService(t, nil, nil) + cfg1.config.HubURL = "https://hub.example.com" + cfg1.config.HubToken = "tok" + h1 := NewHubService(cfg1) + + // Queue an op — this also calls savePendingOps. + h1.queueOp("POST", "/heartbeat", map[string]string{"client_id": "test"}) + assert.Equal(t, 1, h1.PendingCount()) + + // Create a second HubService with the same data dir. + // NewHubService calls loadPendingOps() in its constructor. + cfg2 := testConfigService(t, nil, nil) + cfg2.config.DataDir = cfg1.config.DataDir // Share the same data dir. + cfg2.config.HubURL = "https://hub.example.com" + cfg2.config.HubToken = "tok" + h2 := NewHubService(cfg2) + + assert.Equal(t, 1, h2.PendingCount()) +} + +func TestPendingCount_Good(t *testing.T) { + cfg := testConfigService(t, nil, nil) + cfg.config.HubURL = "https://hub.example.com" + cfg.config.HubToken = "tok" + h := NewHubService(cfg) + + assert.Equal(t, 0, h.PendingCount()) + + h.queueOp("POST", "/test1", nil) + assert.Equal(t, 1, h.PendingCount()) + + h.queueOp("POST", "/test2", map[string]string{"key": "val"}) + assert.Equal(t, 2, h.PendingCount()) +}