fix(agent): harden sync fallback and state model

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-03-31 07:54:29 +00:00
parent acc647c24b
commit 073938ca6f
3 changed files with 260 additions and 150 deletions

View file

@ -9,22 +9,6 @@ use Illuminate\Database\Eloquent\Factories\HasFactory;
use Illuminate\Database\Eloquent\Model;
use Illuminate\Database\Eloquent\Relations\BelongsTo;
/**
* Workspace State Model
*
* Persistent key-value state storage for agent plans.
* Stores typed values shared across agent sessions within a plan,
* enabling context sharing and state recovery.
*
* @property int $id
* @property int $agent_plan_id
* @property string $key
* @property array $value
* @property string $type
* @property string|null $description
* @property \Carbon\Carbon|null $created_at
* @property \Carbon\Carbon|null $updated_at
*/
class WorkspaceState extends Model
{
use HasFactory;
@ -51,109 +35,23 @@ class WorkspaceState extends Model
'value' => 'array',
];
// Relationships
public function plan(): BelongsTo
{
return $this->belongsTo(AgentPlan::class, 'agent_plan_id');
}
// Scopes
public function scopeForPlan($query, AgentPlan|int $plan): mixed
public function scopeForPlan(Builder $query, AgentPlan|int $plan): Builder
{
$planId = $plan instanceof AgentPlan ? $plan->id : $plan;
<<<<<<< HEAD
/**
* Set typed value.
*/
public function setTypedValue(mixed $value): void
{
$storedValue = match ($this->type) {
self::TYPE_JSON => json_encode($value),
default => (string) $value,
};
$this->update(['value' => $storedValue]);
}
/**
* Get or create state for a plan.
*/
public static function getOrCreate(AgentPlan $plan, string $key, mixed $default = null, string $type = self::TYPE_JSON): self
{
$state = static::where('agent_plan_id', $plan->id)
->where('key', $key)
->first();
if (! $state) {
$value = match ($type) {
self::TYPE_JSON => json_encode($default),
default => (string) ($default ?? ''),
};
$state = static::create([
'agent_plan_id' => $plan->id,
'key' => $key,
'value' => $value,
'type' => $type,
]);
}
return $state;
}
/**
* Set state value for a plan.
*/
public static function set(AgentPlan $plan, string $key, mixed $value, string $type = self::TYPE_JSON): self
{
$storedValue = match ($type) {
self::TYPE_JSON => json_encode($value),
default => (string) $value,
};
return static::updateOrCreate(
['agent_plan_id' => $plan->id, 'key' => $key],
['value' => $storedValue, 'type' => $type]
);
}
/**
* Get state value for a plan.
*/
public static function get(AgentPlan $plan, string $key, mixed $default = null): mixed
{
$state = static::where('agent_plan_id', $plan->id)
->where('key', $key)
->first();
if (! $state) {
return $default;
}
return $state->getTypedValue();
}
/**
* Scope: for plan.
*/
public function scopeForPlan(Builder $query, int $planId): Builder
{
return $query->where('agent_plan_id', $planId);
}
/**
* Scope: by type.
*/
public function scopeByType(Builder $query, string $type): Builder
public function scopeOfType(Builder $query, string $type): Builder
{
return $query->where('type', $type);
}
// Type helpers
public function isJson(): bool
{
return $this->type === self::TYPE_JSON;
@ -176,37 +74,55 @@ class WorkspaceState extends Model
public function getFormattedValue(): string
{
if ($this->isMarkdown() || $this->isCode()) {
return is_string($this->value) ? $this->value : json_encode($this->value, JSON_PRETTY_PRINT);
$value = $this->value;
if (is_string($value)) {
return $value;
}
return json_encode($this->value, JSON_PRETTY_PRINT);
return (string) json_encode($value, JSON_PRETTY_PRINT);
}
// Static helpers
public static function getOrCreate(
AgentPlan $plan,
string $key,
mixed $default = null,
string $type = self::TYPE_JSON
): self {
return static::firstOrCreate(
['agent_plan_id' => $plan->id, 'key' => $key],
['value' => $default, 'type' => $type]
);
}
/**
* Get a state value for a plan, returning $default if not set.
*/
public static function getValue(AgentPlan $plan, string $key, mixed $default = null): mixed
{
$state = static::where('agent_plan_id', $plan->id)->where('key', $key)->first();
$state = static::forPlan($plan)->where('key', $key)->first();
return $state !== null ? $state->value : $default;
return $state?->value ?? $default;
}
/**
* Set (upsert) a state value for a plan.
*/
public static function setValue(AgentPlan $plan, string $key, mixed $value, string $type = self::TYPE_JSON): self
{
public static function setValue(
AgentPlan $plan,
string $key,
mixed $value,
string $type = self::TYPE_JSON
): self {
return static::updateOrCreate(
['agent_plan_id' => $plan->id, 'key' => $key],
['value' => $value, 'type' => $type]
);
}
// MCP output
public function setTypedValue(mixed $value): void
{
$this->update(['value' => $value]);
}
public function getTypedValue(): mixed
{
return $this->value;
}
public function toMcpContext(): array
{

View file

@ -4,6 +4,7 @@ package agentic
import (
"context"
"time"
core "dappco.re/go/core"
)
@ -27,6 +28,12 @@ type SyncPullOutput struct {
Context []map[string]any `json:"context"`
}
type syncQueuedPush struct {
AgentID string `json:"agent_id"`
Dispatches []map[string]any `json:"dispatches"`
QueuedAt time.Time `json:"queued_at"`
}
// result := c.Action("agent.sync.push").Run(ctx, core.NewOptions())
func (s *PrepSubsystem) handleSyncPush(ctx context.Context, options core.Options) core.Result {
output, err := s.syncPush(ctx, options.String("agent_id"))
@ -49,31 +56,38 @@ func (s *PrepSubsystem) syncPush(ctx context.Context, agentID string) (SyncPushO
if agentID == "" {
agentID = AgentName()
}
dispatches := collectSyncDispatches()
token := s.syncToken()
if token == "" {
return SyncPushOutput{}, core.E("agent.sync.push", "api token is required", nil)
}
dispatches := collectSyncDispatches()
if len(dispatches) == 0 {
return SyncPushOutput{Success: true, Count: 0}, nil
}
payload := map[string]any{
"agent_id": agentID,
"dispatches": dispatches,
queuedPushes := readSyncQueue()
if len(dispatches) > 0 {
queuedPushes = append(queuedPushes, syncQueuedPush{
AgentID: agentID,
Dispatches: dispatches,
QueuedAt: time.Now(),
})
}
if len(queuedPushes) == 0 {
return SyncPushOutput{Success: true, Count: 0}, nil
}
result := HTTPPost(ctx, core.Concat(s.syncAPIURL(), "/v1/agent/sync"), core.JSONMarshalString(payload), token, "Bearer")
if !result.OK {
err, _ := result.Value.(error)
if err == nil {
err = core.E("agent.sync.push", "sync push failed", nil)
synced := 0
for i, queued := range queuedPushes {
if len(queued.Dispatches) == 0 {
continue
}
return SyncPushOutput{}, err
if err := s.postSyncPush(ctx, queued.AgentID, queued.Dispatches, token); err != nil {
writeSyncQueue(queuedPushes[i:])
return SyncPushOutput{Success: true, Count: synced}, nil
}
synced += len(queued.Dispatches)
}
return SyncPushOutput{Success: true, Count: len(dispatches)}, nil
writeSyncQueue(nil)
return SyncPushOutput{Success: true, Count: synced}, nil
}
func (s *PrepSubsystem) syncPull(ctx context.Context, agentID string) (SyncPullOutput, error) {
@ -82,17 +96,15 @@ func (s *PrepSubsystem) syncPull(ctx context.Context, agentID string) (SyncPullO
}
token := s.syncToken()
if token == "" {
return SyncPullOutput{}, core.E("agent.sync.pull", "api token is required", nil)
cached := readSyncContext()
return SyncPullOutput{Success: true, Count: len(cached), Context: cached}, nil
}
endpoint := core.Concat(s.syncAPIURL(), "/v1/agent/context?agent_id=", agentID)
result := HTTPGet(ctx, endpoint, token, "Bearer")
if !result.OK {
err, _ := result.Value.(error)
if err == nil {
err = core.E("agent.sync.pull", "sync pull failed", nil)
}
return SyncPullOutput{}, err
cached := readSyncContext()
return SyncPullOutput{Success: true, Count: len(cached), Context: cached}, nil
}
var response struct {
@ -100,12 +112,10 @@ func (s *PrepSubsystem) syncPull(ctx context.Context, agentID string) (SyncPullO
}
parseResult := core.JSONUnmarshalString(result.Value.(string), &response)
if !parseResult.OK {
err, _ := parseResult.Value.(error)
if err == nil {
err = core.E("agent.sync.pull", "failed to parse sync response", nil)
}
return SyncPullOutput{}, err
cached := readSyncContext()
return SyncPullOutput{Success: true, Count: len(cached), Context: cached}, nil
}
writeSyncContext(response.Data)
return SyncPullOutput{
Success: true,
@ -172,3 +182,73 @@ func shouldSyncStatus(status string) bool {
}
return false
}
func (s *PrepSubsystem) postSyncPush(ctx context.Context, agentID string, dispatches []map[string]any, token string) error {
payload := map[string]any{
"agent_id": agentID,
"dispatches": dispatches,
}
result := HTTPPost(ctx, core.Concat(s.syncAPIURL(), "/v1/agent/sync"), core.JSONMarshalString(payload), token, "Bearer")
if result.OK {
return nil
}
err, _ := result.Value.(error)
if err == nil {
err = core.E("agent.sync.push", "sync push failed", nil)
}
return err
}
func syncStateDir() string {
return core.JoinPath(CoreRoot(), "sync")
}
func syncQueuePath() string {
return core.JoinPath(syncStateDir(), "queue.json")
}
func syncContextPath() string {
return core.JoinPath(syncStateDir(), "context.json")
}
func readSyncQueue() []syncQueuedPush {
var queued []syncQueuedPush
result := fs.Read(syncQueuePath())
if !result.OK {
return queued
}
parseResult := core.JSONUnmarshalString(result.Value.(string), &queued)
if !parseResult.OK {
return []syncQueuedPush{}
}
return queued
}
func writeSyncQueue(queued []syncQueuedPush) {
if len(queued) == 0 {
fs.Delete(syncQueuePath())
return
}
fs.EnsureDir(syncStateDir())
fs.WriteAtomic(syncQueuePath(), core.JSONMarshalString(queued))
}
func readSyncContext() []map[string]any {
var contextData []map[string]any
result := fs.Read(syncContextPath())
if !result.OK {
return contextData
}
parseResult := core.JSONUnmarshalString(result.Value.(string), &contextData)
if !parseResult.OK {
return []map[string]any{}
}
return contextData
}
func writeSyncContext(contextData []map[string]any) {
fs.EnsureDir(syncStateDir())
fs.WriteAtomic(syncContextPath(), core.JSONMarshalString(contextData))
}

View file

@ -61,9 +61,110 @@ func TestSync_HandleSyncPush_Good(t *testing.T) {
assert.Equal(t, 1, output.Count)
}
func TestSync_HandleSyncPull_Bad(t *testing.T) {
func TestSync_HandleSyncPush_Bad(t *testing.T) {
root := t.TempDir()
t.Setenv("CORE_WORKSPACE", root)
t.Setenv("CORE_AGENT_API_KEY", "")
workspaceDir := core.JoinPath(root, "workspace", "core", "go-io", "task-5")
fs.EnsureDir(workspaceDir)
writeStatusResult(workspaceDir, &WorkspaceStatus{
Status: "completed",
Agent: "codex",
Repo: "go-io",
Org: "core",
Task: "Fix tests",
Branch: "agent/fix-tests",
StartedAt: time.Now(),
UpdatedAt: time.Now(),
})
subsystem := &PrepSubsystem{
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
}
output, err := subsystem.syncPush(context.Background(), "")
require.NoError(t, err)
assert.True(t, output.Success)
assert.Equal(t, 0, output.Count)
assert.Empty(t, readSyncQueue())
}
func TestSync_HandleSyncPush_Ugly(t *testing.T) {
root := t.TempDir()
t.Setenv("CORE_WORKSPACE", root)
t.Setenv("CORE_AGENT_API_KEY", "secret-token")
workspaceDir := core.JoinPath(root, "workspace", "core", "go-io", "task-5")
fs.EnsureDir(workspaceDir)
writeStatusResult(workspaceDir, &WorkspaceStatus{
Status: "completed",
Agent: "codex",
Repo: "go-io",
Org: "core",
Task: "Fix tests",
Branch: "agent/fix-tests",
StartedAt: time.Now(),
UpdatedAt: time.Now(),
})
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "/v1/agent/sync", r.URL.Path)
w.WriteHeader(http.StatusServiceUnavailable)
_, _ = w.Write([]byte(`{"error":"unavailable"}`))
}))
defer server.Close()
subsystem := &PrepSubsystem{
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
brainURL: server.URL,
}
output, err := subsystem.syncPush(context.Background(), "")
require.NoError(t, err)
assert.True(t, output.Success)
assert.Equal(t, 0, output.Count)
queued := readSyncQueue()
require.Len(t, queued, 1)
assert.Equal(t, AgentName(), queued[0].AgentID)
require.Len(t, queued[0].Dispatches, 1)
}
func TestSync_HandleSyncPull_Good(t *testing.T) {
root := t.TempDir()
t.Setenv("CORE_WORKSPACE", root)
t.Setenv("CORE_AGENT_API_KEY", "secret-token")
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "/v1/agent/context", r.URL.Path)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"data":[{"id":"mem-1","content":"Known pattern"}]}`))
}))
defer server.Close()
subsystem := &PrepSubsystem{
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
brainURL: server.URL,
}
output, err := subsystem.syncPull(context.Background(), "codex")
require.NoError(t, err)
assert.True(t, output.Success)
assert.Equal(t, 1, output.Count)
require.Len(t, output.Context, 1)
assert.Equal(t, "mem-1", output.Context[0]["id"])
cached := readSyncContext()
require.Len(t, cached, 1)
assert.Equal(t, "mem-1", cached[0]["id"])
}
func TestSync_HandleSyncPull_Bad(t *testing.T) {
root := t.TempDir()
t.Setenv("CORE_WORKSPACE", root)
t.Setenv("CORE_AGENT_API_KEY", "secret-token")
writeSyncContext([]map[string]any{
{"id": "cached-1", "content": "Cached context"},
})
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "/v1/agent/context", r.URL.Path)
w.WriteHeader(http.StatusInternalServerError)
@ -75,12 +176,21 @@ func TestSync_HandleSyncPull_Bad(t *testing.T) {
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
brainURL: server.URL,
}
_, err := subsystem.syncPull(context.Background(), "codex")
require.Error(t, err)
output, err := subsystem.syncPull(context.Background(), "codex")
require.NoError(t, err)
assert.True(t, output.Success)
assert.Equal(t, 1, output.Count)
require.Len(t, output.Context, 1)
assert.Equal(t, "cached-1", output.Context[0]["id"])
}
func TestSync_HandleSyncPull_Ugly(t *testing.T) {
root := t.TempDir()
t.Setenv("CORE_WORKSPACE", root)
t.Setenv("CORE_AGENT_API_KEY", "secret-token")
writeSyncContext([]map[string]any{
{"id": "cached-2", "content": "Fallback context"},
})
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "/v1/agent/context", r.URL.Path)
@ -93,6 +203,10 @@ func TestSync_HandleSyncPull_Ugly(t *testing.T) {
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
brainURL: server.URL,
}
_, err := subsystem.syncPull(context.Background(), "codex")
require.Error(t, err)
output, err := subsystem.syncPull(context.Background(), "codex")
require.NoError(t, err)
assert.True(t, output.Success)
assert.Equal(t, 1, output.Count)
require.Len(t, output.Context, 1)
assert.Equal(t, "cached-2", output.Context[0]["id"])
}