diff --git a/php/Models/WorkspaceState.php b/php/Models/WorkspaceState.php index 091fa36..52fc547 100644 --- a/php/Models/WorkspaceState.php +++ b/php/Models/WorkspaceState.php @@ -9,22 +9,6 @@ use Illuminate\Database\Eloquent\Factories\HasFactory; use Illuminate\Database\Eloquent\Model; use Illuminate\Database\Eloquent\Relations\BelongsTo; -/** - * Workspace State Model - * - * Persistent key-value state storage for agent plans. - * Stores typed values shared across agent sessions within a plan, - * enabling context sharing and state recovery. - * - * @property int $id - * @property int $agent_plan_id - * @property string $key - * @property array $value - * @property string $type - * @property string|null $description - * @property \Carbon\Carbon|null $created_at - * @property \Carbon\Carbon|null $updated_at - */ class WorkspaceState extends Model { use HasFactory; @@ -51,109 +35,23 @@ class WorkspaceState extends Model 'value' => 'array', ]; - // Relationships - public function plan(): BelongsTo { return $this->belongsTo(AgentPlan::class, 'agent_plan_id'); } - // Scopes - - public function scopeForPlan($query, AgentPlan|int $plan): mixed + public function scopeForPlan(Builder $query, AgentPlan|int $plan): Builder { $planId = $plan instanceof AgentPlan ? $plan->id : $plan; -<<<<<<< HEAD - /** - * Set typed value. - */ - public function setTypedValue(mixed $value): void - { - $storedValue = match ($this->type) { - self::TYPE_JSON => json_encode($value), - default => (string) $value, - }; - - $this->update(['value' => $storedValue]); - } - - /** - * Get or create state for a plan. - */ - public static function getOrCreate(AgentPlan $plan, string $key, mixed $default = null, string $type = self::TYPE_JSON): self - { - $state = static::where('agent_plan_id', $plan->id) - ->where('key', $key) - ->first(); - - if (! $state) { - $value = match ($type) { - self::TYPE_JSON => json_encode($default), - default => (string) ($default ?? ''), - }; - - $state = static::create([ - 'agent_plan_id' => $plan->id, - 'key' => $key, - 'value' => $value, - 'type' => $type, - ]); - } - - return $state; - } - - /** - * Set state value for a plan. - */ - public static function set(AgentPlan $plan, string $key, mixed $value, string $type = self::TYPE_JSON): self - { - $storedValue = match ($type) { - self::TYPE_JSON => json_encode($value), - default => (string) $value, - }; - - return static::updateOrCreate( - ['agent_plan_id' => $plan->id, 'key' => $key], - ['value' => $storedValue, 'type' => $type] - ); - } - - /** - * Get state value for a plan. - */ - public static function get(AgentPlan $plan, string $key, mixed $default = null): mixed - { - $state = static::where('agent_plan_id', $plan->id) - ->where('key', $key) - ->first(); - - if (! $state) { - return $default; - } - - return $state->getTypedValue(); - } - - /** - * Scope: for plan. - */ - public function scopeForPlan(Builder $query, int $planId): Builder - { return $query->where('agent_plan_id', $planId); } - /** - * Scope: by type. - */ - public function scopeByType(Builder $query, string $type): Builder + public function scopeOfType(Builder $query, string $type): Builder { return $query->where('type', $type); } - // Type helpers - public function isJson(): bool { return $this->type === self::TYPE_JSON; @@ -176,37 +74,55 @@ class WorkspaceState extends Model public function getFormattedValue(): string { - if ($this->isMarkdown() || $this->isCode()) { - return is_string($this->value) ? $this->value : json_encode($this->value, JSON_PRETTY_PRINT); + $value = $this->value; + + if (is_string($value)) { + return $value; } - return json_encode($this->value, JSON_PRETTY_PRINT); + return (string) json_encode($value, JSON_PRETTY_PRINT); } - // Static helpers + public static function getOrCreate( + AgentPlan $plan, + string $key, + mixed $default = null, + string $type = self::TYPE_JSON + ): self { + return static::firstOrCreate( + ['agent_plan_id' => $plan->id, 'key' => $key], + ['value' => $default, 'type' => $type] + ); + } - /** - * Get a state value for a plan, returning $default if not set. - */ public static function getValue(AgentPlan $plan, string $key, mixed $default = null): mixed { - $state = static::where('agent_plan_id', $plan->id)->where('key', $key)->first(); + $state = static::forPlan($plan)->where('key', $key)->first(); - return $state !== null ? $state->value : $default; + return $state?->value ?? $default; } - /** - * Set (upsert) a state value for a plan. - */ - public static function setValue(AgentPlan $plan, string $key, mixed $value, string $type = self::TYPE_JSON): self - { + public static function setValue( + AgentPlan $plan, + string $key, + mixed $value, + string $type = self::TYPE_JSON + ): self { return static::updateOrCreate( ['agent_plan_id' => $plan->id, 'key' => $key], ['value' => $value, 'type' => $type] ); } - // MCP output + public function setTypedValue(mixed $value): void + { + $this->update(['value' => $value]); + } + + public function getTypedValue(): mixed + { + return $this->value; + } public function toMcpContext(): array { diff --git a/pkg/agentic/sync.go b/pkg/agentic/sync.go index 1835b1d..632e679 100644 --- a/pkg/agentic/sync.go +++ b/pkg/agentic/sync.go @@ -4,6 +4,7 @@ package agentic import ( "context" + "time" core "dappco.re/go/core" ) @@ -27,6 +28,12 @@ type SyncPullOutput struct { Context []map[string]any `json:"context"` } +type syncQueuedPush struct { + AgentID string `json:"agent_id"` + Dispatches []map[string]any `json:"dispatches"` + QueuedAt time.Time `json:"queued_at"` +} + // result := c.Action("agent.sync.push").Run(ctx, core.NewOptions()) func (s *PrepSubsystem) handleSyncPush(ctx context.Context, options core.Options) core.Result { output, err := s.syncPush(ctx, options.String("agent_id")) @@ -49,31 +56,38 @@ func (s *PrepSubsystem) syncPush(ctx context.Context, agentID string) (SyncPushO if agentID == "" { agentID = AgentName() } + dispatches := collectSyncDispatches() token := s.syncToken() if token == "" { - return SyncPushOutput{}, core.E("agent.sync.push", "api token is required", nil) - } - - dispatches := collectSyncDispatches() - if len(dispatches) == 0 { return SyncPushOutput{Success: true, Count: 0}, nil } - payload := map[string]any{ - "agent_id": agentID, - "dispatches": dispatches, + queuedPushes := readSyncQueue() + if len(dispatches) > 0 { + queuedPushes = append(queuedPushes, syncQueuedPush{ + AgentID: agentID, + Dispatches: dispatches, + QueuedAt: time.Now(), + }) + } + if len(queuedPushes) == 0 { + return SyncPushOutput{Success: true, Count: 0}, nil } - result := HTTPPost(ctx, core.Concat(s.syncAPIURL(), "/v1/agent/sync"), core.JSONMarshalString(payload), token, "Bearer") - if !result.OK { - err, _ := result.Value.(error) - if err == nil { - err = core.E("agent.sync.push", "sync push failed", nil) + synced := 0 + for i, queued := range queuedPushes { + if len(queued.Dispatches) == 0 { + continue } - return SyncPushOutput{}, err + if err := s.postSyncPush(ctx, queued.AgentID, queued.Dispatches, token); err != nil { + writeSyncQueue(queuedPushes[i:]) + return SyncPushOutput{Success: true, Count: synced}, nil + } + synced += len(queued.Dispatches) } - return SyncPushOutput{Success: true, Count: len(dispatches)}, nil + writeSyncQueue(nil) + return SyncPushOutput{Success: true, Count: synced}, nil } func (s *PrepSubsystem) syncPull(ctx context.Context, agentID string) (SyncPullOutput, error) { @@ -82,17 +96,15 @@ func (s *PrepSubsystem) syncPull(ctx context.Context, agentID string) (SyncPullO } token := s.syncToken() if token == "" { - return SyncPullOutput{}, core.E("agent.sync.pull", "api token is required", nil) + cached := readSyncContext() + return SyncPullOutput{Success: true, Count: len(cached), Context: cached}, nil } endpoint := core.Concat(s.syncAPIURL(), "/v1/agent/context?agent_id=", agentID) result := HTTPGet(ctx, endpoint, token, "Bearer") if !result.OK { - err, _ := result.Value.(error) - if err == nil { - err = core.E("agent.sync.pull", "sync pull failed", nil) - } - return SyncPullOutput{}, err + cached := readSyncContext() + return SyncPullOutput{Success: true, Count: len(cached), Context: cached}, nil } var response struct { @@ -100,12 +112,10 @@ func (s *PrepSubsystem) syncPull(ctx context.Context, agentID string) (SyncPullO } parseResult := core.JSONUnmarshalString(result.Value.(string), &response) if !parseResult.OK { - err, _ := parseResult.Value.(error) - if err == nil { - err = core.E("agent.sync.pull", "failed to parse sync response", nil) - } - return SyncPullOutput{}, err + cached := readSyncContext() + return SyncPullOutput{Success: true, Count: len(cached), Context: cached}, nil } + writeSyncContext(response.Data) return SyncPullOutput{ Success: true, @@ -172,3 +182,73 @@ func shouldSyncStatus(status string) bool { } return false } + +func (s *PrepSubsystem) postSyncPush(ctx context.Context, agentID string, dispatches []map[string]any, token string) error { + payload := map[string]any{ + "agent_id": agentID, + "dispatches": dispatches, + } + + result := HTTPPost(ctx, core.Concat(s.syncAPIURL(), "/v1/agent/sync"), core.JSONMarshalString(payload), token, "Bearer") + if result.OK { + return nil + } + + err, _ := result.Value.(error) + if err == nil { + err = core.E("agent.sync.push", "sync push failed", nil) + } + return err +} + +func syncStateDir() string { + return core.JoinPath(CoreRoot(), "sync") +} + +func syncQueuePath() string { + return core.JoinPath(syncStateDir(), "queue.json") +} + +func syncContextPath() string { + return core.JoinPath(syncStateDir(), "context.json") +} + +func readSyncQueue() []syncQueuedPush { + var queued []syncQueuedPush + result := fs.Read(syncQueuePath()) + if !result.OK { + return queued + } + parseResult := core.JSONUnmarshalString(result.Value.(string), &queued) + if !parseResult.OK { + return []syncQueuedPush{} + } + return queued +} + +func writeSyncQueue(queued []syncQueuedPush) { + if len(queued) == 0 { + fs.Delete(syncQueuePath()) + return + } + fs.EnsureDir(syncStateDir()) + fs.WriteAtomic(syncQueuePath(), core.JSONMarshalString(queued)) +} + +func readSyncContext() []map[string]any { + var contextData []map[string]any + result := fs.Read(syncContextPath()) + if !result.OK { + return contextData + } + parseResult := core.JSONUnmarshalString(result.Value.(string), &contextData) + if !parseResult.OK { + return []map[string]any{} + } + return contextData +} + +func writeSyncContext(contextData []map[string]any) { + fs.EnsureDir(syncStateDir()) + fs.WriteAtomic(syncContextPath(), core.JSONMarshalString(contextData)) +} diff --git a/pkg/agentic/sync_test.go b/pkg/agentic/sync_test.go index 1f76a55..190edb0 100644 --- a/pkg/agentic/sync_test.go +++ b/pkg/agentic/sync_test.go @@ -61,9 +61,110 @@ func TestSync_HandleSyncPush_Good(t *testing.T) { assert.Equal(t, 1, output.Count) } -func TestSync_HandleSyncPull_Bad(t *testing.T) { +func TestSync_HandleSyncPush_Bad(t *testing.T) { + root := t.TempDir() + t.Setenv("CORE_WORKSPACE", root) + t.Setenv("CORE_AGENT_API_KEY", "") + + workspaceDir := core.JoinPath(root, "workspace", "core", "go-io", "task-5") + fs.EnsureDir(workspaceDir) + writeStatusResult(workspaceDir, &WorkspaceStatus{ + Status: "completed", + Agent: "codex", + Repo: "go-io", + Org: "core", + Task: "Fix tests", + Branch: "agent/fix-tests", + StartedAt: time.Now(), + UpdatedAt: time.Now(), + }) + + subsystem := &PrepSubsystem{ + ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), + } + output, err := subsystem.syncPush(context.Background(), "") + require.NoError(t, err) + assert.True(t, output.Success) + assert.Equal(t, 0, output.Count) + assert.Empty(t, readSyncQueue()) +} + +func TestSync_HandleSyncPush_Ugly(t *testing.T) { + root := t.TempDir() + t.Setenv("CORE_WORKSPACE", root) t.Setenv("CORE_AGENT_API_KEY", "secret-token") + workspaceDir := core.JoinPath(root, "workspace", "core", "go-io", "task-5") + fs.EnsureDir(workspaceDir) + writeStatusResult(workspaceDir, &WorkspaceStatus{ + Status: "completed", + Agent: "codex", + Repo: "go-io", + Org: "core", + Task: "Fix tests", + Branch: "agent/fix-tests", + StartedAt: time.Now(), + UpdatedAt: time.Now(), + }) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, "/v1/agent/sync", r.URL.Path) + w.WriteHeader(http.StatusServiceUnavailable) + _, _ = w.Write([]byte(`{"error":"unavailable"}`)) + })) + defer server.Close() + + subsystem := &PrepSubsystem{ + ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), + brainURL: server.URL, + } + output, err := subsystem.syncPush(context.Background(), "") + require.NoError(t, err) + assert.True(t, output.Success) + assert.Equal(t, 0, output.Count) + + queued := readSyncQueue() + require.Len(t, queued, 1) + assert.Equal(t, AgentName(), queued[0].AgentID) + require.Len(t, queued[0].Dispatches, 1) +} + +func TestSync_HandleSyncPull_Good(t *testing.T) { + root := t.TempDir() + t.Setenv("CORE_WORKSPACE", root) + t.Setenv("CORE_AGENT_API_KEY", "secret-token") + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, "/v1/agent/context", r.URL.Path) + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"data":[{"id":"mem-1","content":"Known pattern"}]}`)) + })) + defer server.Close() + + subsystem := &PrepSubsystem{ + ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), + brainURL: server.URL, + } + output, err := subsystem.syncPull(context.Background(), "codex") + require.NoError(t, err) + assert.True(t, output.Success) + assert.Equal(t, 1, output.Count) + require.Len(t, output.Context, 1) + assert.Equal(t, "mem-1", output.Context[0]["id"]) + + cached := readSyncContext() + require.Len(t, cached, 1) + assert.Equal(t, "mem-1", cached[0]["id"]) +} + +func TestSync_HandleSyncPull_Bad(t *testing.T) { + root := t.TempDir() + t.Setenv("CORE_WORKSPACE", root) + t.Setenv("CORE_AGENT_API_KEY", "secret-token") + writeSyncContext([]map[string]any{ + {"id": "cached-1", "content": "Cached context"}, + }) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { require.Equal(t, "/v1/agent/context", r.URL.Path) w.WriteHeader(http.StatusInternalServerError) @@ -75,12 +176,21 @@ func TestSync_HandleSyncPull_Bad(t *testing.T) { ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), brainURL: server.URL, } - _, err := subsystem.syncPull(context.Background(), "codex") - require.Error(t, err) + output, err := subsystem.syncPull(context.Background(), "codex") + require.NoError(t, err) + assert.True(t, output.Success) + assert.Equal(t, 1, output.Count) + require.Len(t, output.Context, 1) + assert.Equal(t, "cached-1", output.Context[0]["id"]) } func TestSync_HandleSyncPull_Ugly(t *testing.T) { + root := t.TempDir() + t.Setenv("CORE_WORKSPACE", root) t.Setenv("CORE_AGENT_API_KEY", "secret-token") + writeSyncContext([]map[string]any{ + {"id": "cached-2", "content": "Fallback context"}, + }) server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { require.Equal(t, "/v1/agent/context", r.URL.Path) @@ -93,6 +203,10 @@ func TestSync_HandleSyncPull_Ugly(t *testing.T) { ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), brainURL: server.URL, } - _, err := subsystem.syncPull(context.Background(), "codex") - require.Error(t, err) + output, err := subsystem.syncPull(context.Background(), "codex") + require.NoError(t, err) + assert.True(t, output.Success) + assert.Equal(t, 1, output.Count) + require.Len(t, output.Context, 1) + assert.Equal(t, "cached-2", output.Context[0]["id"]) }