feat(bugseti): implement pending operations queue with disk persistence
Replace no-op stubs with real implementations for queueOp, drainPendingOps, savePendingOps, and loadPendingOps. Operations are persisted to hub_pending.json and replayed on next hub connection — 5xx/transport errors are retried, 4xx responses are dropped as stale. Adds PendingCount() for queue inspection. Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
5d0b6c3a71
commit
2a8b5c207f
2 changed files with 180 additions and 7 deletions
|
|
@ -11,6 +11,8 @@ import (
|
|||
"log"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
|
|
@ -31,7 +33,7 @@ type HubService struct {
|
|||
type PendingOp struct {
|
||||
Method string `json:"method"`
|
||||
Path string `json:"path"`
|
||||
Body interface{} `json:"body,omitempty"`
|
||||
Body json.RawMessage `json:"body,omitempty"`
|
||||
CreatedAt time.Time `json:"createdAt"`
|
||||
}
|
||||
|
||||
|
|
@ -207,14 +209,120 @@ func (h *HubService) doJSON(method, path string, body, dest interface{}) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// loadPendingOps is a no-op placeholder (disk persistence comes in Task 7).
|
||||
func (h *HubService) loadPendingOps() {}
|
||||
// queueOp marshals body to JSON and appends a PendingOp to the queue.
|
||||
func (h *HubService) queueOp(method, path string, body interface{}) {
|
||||
var raw json.RawMessage
|
||||
if body != nil {
|
||||
data, err := json.Marshal(body)
|
||||
if err != nil {
|
||||
log.Printf("BugSETI: queueOp marshal error: %v", err)
|
||||
return
|
||||
}
|
||||
raw = data
|
||||
}
|
||||
|
||||
// savePendingOps is a no-op placeholder (disk persistence comes in Task 7).
|
||||
func (h *HubService) savePendingOps() {}
|
||||
h.mu.Lock()
|
||||
h.pending = append(h.pending, PendingOp{
|
||||
Method: method,
|
||||
Path: path,
|
||||
Body: raw,
|
||||
CreatedAt: time.Now(),
|
||||
})
|
||||
h.mu.Unlock()
|
||||
|
||||
// drainPendingOps replays queued operations (no-op until Task 7).
|
||||
func (h *HubService) drainPendingOps() {}
|
||||
h.savePendingOps()
|
||||
}
|
||||
|
||||
// drainPendingOps replays queued operations against the hub.
|
||||
// 5xx/transport errors are kept for retry; 4xx responses are dropped (stale).
|
||||
func (h *HubService) drainPendingOps() {
|
||||
h.mu.Lock()
|
||||
ops := h.pending
|
||||
h.pending = make([]PendingOp, 0)
|
||||
h.mu.Unlock()
|
||||
|
||||
if len(ops) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
var failed []PendingOp
|
||||
for _, op := range ops {
|
||||
var body interface{}
|
||||
if len(op.Body) > 0 {
|
||||
body = json.RawMessage(op.Body)
|
||||
}
|
||||
|
||||
resp, err := h.doRequest(op.Method, op.Path, body)
|
||||
if err != nil {
|
||||
// Transport error — keep for retry.
|
||||
failed = append(failed, op)
|
||||
continue
|
||||
}
|
||||
resp.Body.Close()
|
||||
|
||||
if resp.StatusCode >= 500 {
|
||||
// Server error — keep for retry.
|
||||
failed = append(failed, op)
|
||||
} // 4xx are dropped (stale).
|
||||
}
|
||||
|
||||
if len(failed) > 0 {
|
||||
h.mu.Lock()
|
||||
h.pending = append(failed, h.pending...)
|
||||
h.mu.Unlock()
|
||||
}
|
||||
|
||||
h.savePendingOps()
|
||||
}
|
||||
|
||||
// savePendingOps persists the pending operations queue to disk.
|
||||
func (h *HubService) savePendingOps() {
|
||||
dataDir := h.config.GetDataDir()
|
||||
if dataDir == "" {
|
||||
return
|
||||
}
|
||||
|
||||
h.mu.RLock()
|
||||
data, err := json.Marshal(h.pending)
|
||||
h.mu.RUnlock()
|
||||
if err != nil {
|
||||
log.Printf("BugSETI: savePendingOps marshal error: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
path := filepath.Join(dataDir, "hub_pending.json")
|
||||
if err := os.WriteFile(path, data, 0600); err != nil {
|
||||
log.Printf("BugSETI: savePendingOps write error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// loadPendingOps loads the pending operations queue from disk.
|
||||
// Errors are silently ignored (the file may not exist yet).
|
||||
func (h *HubService) loadPendingOps() {
|
||||
dataDir := h.config.GetDataDir()
|
||||
if dataDir == "" {
|
||||
return
|
||||
}
|
||||
|
||||
path := filepath.Join(dataDir, "hub_pending.json")
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
var ops []PendingOp
|
||||
if err := json.Unmarshal(data, &ops); err != nil {
|
||||
return
|
||||
}
|
||||
h.pending = ops
|
||||
}
|
||||
|
||||
// PendingCount returns the number of queued pending operations.
|
||||
func (h *HubService) PendingCount() int {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
return len(h.pending)
|
||||
}
|
||||
|
||||
// ---- Task 4: Auto-Register via Forge Token ----
|
||||
|
||||
|
|
|
|||
|
|
@ -491,3 +491,68 @@ func TestGetGlobalStats_Good(t *testing.T) {
|
|||
assert.Equal(t, 25, stats.ActiveClaims)
|
||||
assert.Equal(t, 150, stats.IssuesAvailable)
|
||||
}
|
||||
|
||||
// ---- Task 7: Pending Operations Queue ----
|
||||
|
||||
func TestPendingOps_Good_QueueAndDrain(t *testing.T) {
|
||||
var callCount int32
|
||||
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
callCount++
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
cfg := testConfigService(t, nil, nil)
|
||||
cfg.config.HubURL = srv.URL
|
||||
cfg.config.HubToken = "tok"
|
||||
h := NewHubService(cfg)
|
||||
|
||||
// Manually queue a pending op (simulates a previous failed request).
|
||||
h.queueOp("POST", "/heartbeat", map[string]string{"client_id": "test"})
|
||||
assert.Equal(t, 1, h.PendingCount())
|
||||
|
||||
// Register() calls drainPendingOps() first, then sends its own request.
|
||||
err := h.Register()
|
||||
require.NoError(t, err)
|
||||
|
||||
// At least 2 calls: 1 from drain (the queued heartbeat) + 1 from Register itself.
|
||||
assert.GreaterOrEqual(t, callCount, int32(2))
|
||||
assert.Equal(t, 0, h.PendingCount())
|
||||
}
|
||||
|
||||
func TestPendingOps_Good_PersistAndLoad(t *testing.T) {
|
||||
cfg1 := testConfigService(t, nil, nil)
|
||||
cfg1.config.HubURL = "https://hub.example.com"
|
||||
cfg1.config.HubToken = "tok"
|
||||
h1 := NewHubService(cfg1)
|
||||
|
||||
// Queue an op — this also calls savePendingOps.
|
||||
h1.queueOp("POST", "/heartbeat", map[string]string{"client_id": "test"})
|
||||
assert.Equal(t, 1, h1.PendingCount())
|
||||
|
||||
// Create a second HubService with the same data dir.
|
||||
// NewHubService calls loadPendingOps() in its constructor.
|
||||
cfg2 := testConfigService(t, nil, nil)
|
||||
cfg2.config.DataDir = cfg1.config.DataDir // Share the same data dir.
|
||||
cfg2.config.HubURL = "https://hub.example.com"
|
||||
cfg2.config.HubToken = "tok"
|
||||
h2 := NewHubService(cfg2)
|
||||
|
||||
assert.Equal(t, 1, h2.PendingCount())
|
||||
}
|
||||
|
||||
func TestPendingCount_Good(t *testing.T) {
|
||||
cfg := testConfigService(t, nil, nil)
|
||||
cfg.config.HubURL = "https://hub.example.com"
|
||||
cfg.config.HubToken = "tok"
|
||||
h := NewHubService(cfg)
|
||||
|
||||
assert.Equal(t, 0, h.PendingCount())
|
||||
|
||||
h.queueOp("POST", "/test1", nil)
|
||||
assert.Equal(t, 1, h.PendingCount())
|
||||
|
||||
h.queueOp("POST", "/test2", map[string]string{"key": "val"})
|
||||
assert.Equal(t, 2, h.PendingCount())
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue