feat(agentic): add session replay and shared state tools
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
bad6d66abf
commit
c27af1435d
8 changed files with 1352 additions and 5 deletions
|
|
@ -566,6 +566,17 @@ func optionStringMapValue(options core.Options, keys ...string) map[string]strin
|
|||
return nil
|
||||
}
|
||||
|
||||
func optionAnyValue(options core.Options, keys ...string) any {
|
||||
for _, key := range keys {
|
||||
result := options.Get(key)
|
||||
if !result.OK {
|
||||
continue
|
||||
}
|
||||
return normaliseOptionValue(result.Value)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func stringValue(value any) string {
|
||||
switch typed := value.(type) {
|
||||
case string:
|
||||
|
|
@ -618,6 +629,40 @@ func stringSliceValue(value any) []string {
|
|||
return nil
|
||||
}
|
||||
|
||||
func normaliseOptionValue(value any) any {
|
||||
switch typed := value.(type) {
|
||||
case string:
|
||||
trimmed := core.Trim(typed)
|
||||
if trimmed == "" {
|
||||
return ""
|
||||
}
|
||||
if core.HasPrefix(trimmed, "{") {
|
||||
var values map[string]any
|
||||
if result := core.JSONUnmarshalString(trimmed, &values); result.OK {
|
||||
return values
|
||||
}
|
||||
}
|
||||
if core.HasPrefix(trimmed, "[") {
|
||||
var values []any
|
||||
if result := core.JSONUnmarshalString(trimmed, &values); result.OK {
|
||||
return values
|
||||
}
|
||||
}
|
||||
switch core.Lower(trimmed) {
|
||||
case "true":
|
||||
return true
|
||||
case "false":
|
||||
return false
|
||||
}
|
||||
if parsed := parseInt(trimmed); parsed != 0 || trimmed == "0" {
|
||||
return parsed
|
||||
}
|
||||
return typed
|
||||
default:
|
||||
return value
|
||||
}
|
||||
}
|
||||
|
||||
func stringMapValue(value any) map[string]string {
|
||||
switch typed := value.(type) {
|
||||
case map[string]string:
|
||||
|
|
|
|||
|
|
@ -183,6 +183,14 @@ func (s *PrepSubsystem) OnStartup(ctx context.Context) core.Result {
|
|||
c.Action("session.list", s.handleSessionList).Description = "List sessions with optional plan or status filters"
|
||||
c.Action("session.continue", s.handleSessionContinue).Description = "Continue a session from its latest saved context"
|
||||
c.Action("session.end", s.handleSessionEnd).Description = "End a session with status and summary"
|
||||
c.Action("session.log", s.handleSessionLog).Description = "Append a typed work-log entry to a stored session"
|
||||
c.Action("session.artifact", s.handleSessionArtifact).Description = "Record a created, modified, deleted, or reviewed artifact for a session"
|
||||
c.Action("session.handoff", s.handleSessionHandoff).Description = "Pause a session with handoff notes for the next agent"
|
||||
c.Action("session.resume", s.handleSessionResume).Description = "Resume a paused or handed-off session from local cache"
|
||||
c.Action("session.replay", s.handleSessionReplay).Description = "Build replay context for a session from work logs and artifacts"
|
||||
c.Action("state.set", s.handleStateSet).Description = "Store shared plan state for later sessions"
|
||||
c.Action("state.get", s.handleStateGet).Description = "Read shared plan state by key"
|
||||
c.Action("state.list", s.handleStateList).Description = "List shared plan state for a plan"
|
||||
|
||||
c.Action("agentic.prompt", s.handlePrompt).Description = "Read a system prompt by slug"
|
||||
c.Action("agentic.task", s.handleTask).Description = "Read a task plan by slug"
|
||||
|
|
@ -286,6 +294,7 @@ func (s *PrepSubsystem) RegisterTools(server *mcp.Server) {
|
|||
s.registerReviewQueueTool(server)
|
||||
s.registerShutdownTools(server)
|
||||
s.registerSessionTools(server)
|
||||
s.registerStateTools(server)
|
||||
|
||||
mcp.AddTool(server, &mcp.Tool{
|
||||
Name: "agentic_scan",
|
||||
|
|
|
|||
|
|
@ -461,6 +461,14 @@ func TestPrep_OnStartup_Good_RegistersSessionActions(t *testing.T) {
|
|||
assert.True(t, c.Action("session.list").Exists())
|
||||
assert.True(t, c.Action("session.continue").Exists())
|
||||
assert.True(t, c.Action("session.end").Exists())
|
||||
assert.True(t, c.Action("session.log").Exists())
|
||||
assert.True(t, c.Action("session.artifact").Exists())
|
||||
assert.True(t, c.Action("session.handoff").Exists())
|
||||
assert.True(t, c.Action("session.resume").Exists())
|
||||
assert.True(t, c.Action("session.replay").Exists())
|
||||
assert.True(t, c.Action("state.set").Exists())
|
||||
assert.True(t, c.Action("state.get").Exists())
|
||||
assert.True(t, c.Action("state.list").Exists())
|
||||
}
|
||||
|
||||
func TestPrep_OnStartup_Good_RegistersPlatformActionAliases(t *testing.T) {
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ package agentic
|
|||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
"github.com/modelcontextprotocol/go-sdk/mcp"
|
||||
|
|
@ -75,6 +76,75 @@ type SessionListOutput struct {
|
|||
Sessions []Session `json:"sessions"`
|
||||
}
|
||||
|
||||
// input := agentic.SessionLogInput{SessionID: "ses_abc123", Message: "Checked build", Type: "checkpoint"}
|
||||
type SessionLogInput struct {
|
||||
SessionID string `json:"session_id"`
|
||||
Message string `json:"message"`
|
||||
Type string `json:"type,omitempty"`
|
||||
Data map[string]any `json:"data,omitempty"`
|
||||
}
|
||||
|
||||
// out := agentic.SessionLogOutput{Success: true, Logged: "Checked build"}
|
||||
type SessionLogOutput struct {
|
||||
Success bool `json:"success"`
|
||||
Logged string `json:"logged"`
|
||||
}
|
||||
|
||||
// input := agentic.SessionArtifactInput{SessionID: "ses_abc123", Path: "pkg/agentic/session.go", Action: "modified"}
|
||||
type SessionArtifactInput struct {
|
||||
SessionID string `json:"session_id"`
|
||||
Path string `json:"path"`
|
||||
Action string `json:"action"`
|
||||
Metadata map[string]any `json:"metadata,omitempty"`
|
||||
Description string `json:"description,omitempty"`
|
||||
}
|
||||
|
||||
// out := agentic.SessionArtifactOutput{Success: true, Artifact: "pkg/agentic/session.go"}
|
||||
type SessionArtifactOutput struct {
|
||||
Success bool `json:"success"`
|
||||
Artifact string `json:"artifact"`
|
||||
}
|
||||
|
||||
// input := agentic.SessionHandoffInput{SessionID: "ses_abc123", Summary: "Ready for review"}
|
||||
type SessionHandoffInput struct {
|
||||
SessionID string `json:"session_id"`
|
||||
Summary string `json:"summary"`
|
||||
NextSteps []string `json:"next_steps,omitempty"`
|
||||
Blockers []string `json:"blockers,omitempty"`
|
||||
ContextForNext map[string]any `json:"context_for_next,omitempty"`
|
||||
}
|
||||
|
||||
// out := agentic.SessionHandoffOutput{Success: true}
|
||||
type SessionHandoffOutput struct {
|
||||
Success bool `json:"success"`
|
||||
HandoffContext map[string]any `json:"handoff_context"`
|
||||
}
|
||||
|
||||
// input := agentic.SessionResumeInput{SessionID: "ses_abc123"}
|
||||
type SessionResumeInput struct {
|
||||
SessionID string `json:"session_id"`
|
||||
}
|
||||
|
||||
// out := agentic.SessionResumeOutput{Success: true, Session: agentic.Session{SessionID: "ses_abc123"}}
|
||||
type SessionResumeOutput struct {
|
||||
Success bool `json:"success"`
|
||||
Session Session `json:"session"`
|
||||
HandoffContext map[string]any `json:"handoff_context,omitempty"`
|
||||
RecentActions []map[string]any `json:"recent_actions,omitempty"`
|
||||
Artifacts []map[string]any `json:"artifacts,omitempty"`
|
||||
}
|
||||
|
||||
// input := agentic.SessionReplayInput{SessionID: "ses_abc123"}
|
||||
type SessionReplayInput struct {
|
||||
SessionID string `json:"session_id"`
|
||||
}
|
||||
|
||||
// out := agentic.SessionReplayOutput{Success: true}
|
||||
type SessionReplayOutput struct {
|
||||
Success bool `json:"success"`
|
||||
ReplayContext map[string]any `json:"replay_context"`
|
||||
}
|
||||
|
||||
// result := c.Action("session.start").Run(ctx, core.NewOptions(
|
||||
//
|
||||
// core.Option{Key: "agent_type", Value: "codex"},
|
||||
|
|
@ -145,6 +215,87 @@ func (s *PrepSubsystem) handleSessionEnd(ctx context.Context, options core.Optio
|
|||
return core.Result{Value: output, OK: true}
|
||||
}
|
||||
|
||||
// result := c.Action("session.log").Run(ctx, core.NewOptions(
|
||||
//
|
||||
// core.Option{Key: "session_id", Value: "ses_abc123"},
|
||||
// core.Option{Key: "message", Value: "Checked build"},
|
||||
//
|
||||
// ))
|
||||
func (s *PrepSubsystem) handleSessionLog(ctx context.Context, options core.Options) core.Result {
|
||||
_, output, err := s.sessionLog(ctx, nil, SessionLogInput{
|
||||
SessionID: optionStringValue(options, "session_id", "id", "_arg"),
|
||||
Message: optionStringValue(options, "message"),
|
||||
Type: optionStringValue(options, "type"),
|
||||
Data: optionAnyMapValue(options, "data"),
|
||||
})
|
||||
if err != nil {
|
||||
return core.Result{Value: err, OK: false}
|
||||
}
|
||||
return core.Result{Value: output, OK: true}
|
||||
}
|
||||
|
||||
// result := c.Action("session.artifact").Run(ctx, core.NewOptions(
|
||||
//
|
||||
// core.Option{Key: "session_id", Value: "ses_abc123"},
|
||||
// core.Option{Key: "path", Value: "pkg/agentic/session.go"},
|
||||
//
|
||||
// ))
|
||||
func (s *PrepSubsystem) handleSessionArtifact(ctx context.Context, options core.Options) core.Result {
|
||||
_, output, err := s.sessionArtifact(ctx, nil, SessionArtifactInput{
|
||||
SessionID: optionStringValue(options, "session_id", "id", "_arg"),
|
||||
Path: optionStringValue(options, "path"),
|
||||
Action: optionStringValue(options, "action"),
|
||||
Metadata: optionAnyMapValue(options, "metadata"),
|
||||
Description: optionStringValue(options, "description"),
|
||||
})
|
||||
if err != nil {
|
||||
return core.Result{Value: err, OK: false}
|
||||
}
|
||||
return core.Result{Value: output, OK: true}
|
||||
}
|
||||
|
||||
// result := c.Action("session.handoff").Run(ctx, core.NewOptions(
|
||||
//
|
||||
// core.Option{Key: "session_id", Value: "ses_abc123"},
|
||||
// core.Option{Key: "summary", Value: "Ready for review"},
|
||||
//
|
||||
// ))
|
||||
func (s *PrepSubsystem) handleSessionHandoff(ctx context.Context, options core.Options) core.Result {
|
||||
_, output, err := s.sessionHandoff(ctx, nil, SessionHandoffInput{
|
||||
SessionID: optionStringValue(options, "session_id", "id", "_arg"),
|
||||
Summary: optionStringValue(options, "summary"),
|
||||
NextSteps: optionStringSliceValue(options, "next_steps", "next-steps"),
|
||||
Blockers: optionStringSliceValue(options, "blockers"),
|
||||
ContextForNext: optionAnyMapValue(options, "context_for_next", "context-for-next"),
|
||||
})
|
||||
if err != nil {
|
||||
return core.Result{Value: err, OK: false}
|
||||
}
|
||||
return core.Result{Value: output, OK: true}
|
||||
}
|
||||
|
||||
// result := c.Action("session.resume").Run(ctx, core.NewOptions(core.Option{Key: "session_id", Value: "ses_abc123"}))
|
||||
func (s *PrepSubsystem) handleSessionResume(ctx context.Context, options core.Options) core.Result {
|
||||
_, output, err := s.sessionResume(ctx, nil, SessionResumeInput{
|
||||
SessionID: optionStringValue(options, "session_id", "id", "_arg"),
|
||||
})
|
||||
if err != nil {
|
||||
return core.Result{Value: err, OK: false}
|
||||
}
|
||||
return core.Result{Value: output, OK: true}
|
||||
}
|
||||
|
||||
// result := c.Action("session.replay").Run(ctx, core.NewOptions(core.Option{Key: "session_id", Value: "ses_abc123"}))
|
||||
func (s *PrepSubsystem) handleSessionReplay(ctx context.Context, options core.Options) core.Result {
|
||||
_, output, err := s.sessionReplay(ctx, nil, SessionReplayInput{
|
||||
SessionID: optionStringValue(options, "session_id", "id", "_arg"),
|
||||
})
|
||||
if err != nil {
|
||||
return core.Result{Value: err, OK: false}
|
||||
}
|
||||
return core.Result{Value: output, OK: true}
|
||||
}
|
||||
|
||||
func (s *PrepSubsystem) registerSessionTools(server *mcp.Server) {
|
||||
mcp.AddTool(server, &mcp.Tool{
|
||||
Name: "session_start",
|
||||
|
|
@ -170,6 +321,31 @@ func (s *PrepSubsystem) registerSessionTools(server *mcp.Server) {
|
|||
Name: "session_end",
|
||||
Description: "End a session with status, summary, and optional handoff notes.",
|
||||
}, s.sessionEnd)
|
||||
|
||||
mcp.AddTool(server, &mcp.Tool{
|
||||
Name: "session_log",
|
||||
Description: "Add a typed work log entry to a stored session.",
|
||||
}, s.sessionLog)
|
||||
|
||||
mcp.AddTool(server, &mcp.Tool{
|
||||
Name: "session_artifact",
|
||||
Description: "Record a created, modified, deleted, or reviewed artifact for a stored session.",
|
||||
}, s.sessionArtifact)
|
||||
|
||||
mcp.AddTool(server, &mcp.Tool{
|
||||
Name: "session_handoff",
|
||||
Description: "Prepare a stored session for handoff and pause it with summary, blockers, and next-step context.",
|
||||
}, s.sessionHandoff)
|
||||
|
||||
mcp.AddTool(server, &mcp.Tool{
|
||||
Name: "session_resume",
|
||||
Description: "Resume a paused or handed-off stored session and return handoff context.",
|
||||
}, s.sessionResume)
|
||||
|
||||
mcp.AddTool(server, &mcp.Tool{
|
||||
Name: "session_replay",
|
||||
Description: "Build replay context for a stored session from its work log, checkpoints, errors, and artifacts.",
|
||||
}, s.sessionReplay)
|
||||
}
|
||||
|
||||
func (s *PrepSubsystem) sessionStart(ctx context.Context, _ *mcp.CallToolRequest, input SessionStartInput) (*mcp.CallToolResult, SessionOutput, error) {
|
||||
|
|
@ -194,7 +370,7 @@ func (s *PrepSubsystem) sessionStart(ctx context.Context, _ *mcp.CallToolRequest
|
|||
|
||||
return nil, SessionOutput{
|
||||
Success: true,
|
||||
Session: parseSession(sessionDataMap(result.Value.(map[string]any))),
|
||||
Session: s.storeSession(sessionFromInput(parseSession(sessionDataMap(result.Value.(map[string]any))), input)),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
@ -211,7 +387,7 @@ func (s *PrepSubsystem) sessionGet(ctx context.Context, _ *mcp.CallToolRequest,
|
|||
|
||||
return nil, SessionOutput{
|
||||
Success: true,
|
||||
Session: parseSession(sessionDataMap(result.Value.(map[string]any))),
|
||||
Session: s.storeSession(parseSession(sessionDataMap(result.Value.(map[string]any)))),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
@ -228,7 +404,12 @@ func (s *PrepSubsystem) sessionList(ctx context.Context, _ *mcp.CallToolRequest,
|
|||
return nil, SessionListOutput{}, resultErrorValue("session.list", result)
|
||||
}
|
||||
|
||||
return nil, parseSessionListOutput(result.Value.(map[string]any)), nil
|
||||
output := parseSessionListOutput(result.Value.(map[string]any))
|
||||
for i := range output.Sessions {
|
||||
output.Sessions[i] = s.storeSession(output.Sessions[i])
|
||||
}
|
||||
|
||||
return nil, output, nil
|
||||
}
|
||||
|
||||
func (s *PrepSubsystem) sessionContinue(ctx context.Context, _ *mcp.CallToolRequest, input SessionContinueInput) (*mcp.CallToolResult, SessionOutput, error) {
|
||||
|
|
@ -255,7 +436,7 @@ func (s *PrepSubsystem) sessionContinue(ctx context.Context, _ *mcp.CallToolRequ
|
|||
|
||||
return nil, SessionOutput{
|
||||
Success: true,
|
||||
Session: parseSession(sessionDataMap(result.Value.(map[string]any))),
|
||||
Session: s.storeSession(parseSession(sessionDataMap(result.Value.(map[string]any)))),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
@ -283,7 +464,168 @@ func (s *PrepSubsystem) sessionEnd(ctx context.Context, _ *mcp.CallToolRequest,
|
|||
|
||||
return nil, SessionOutput{
|
||||
Success: true,
|
||||
Session: parseSession(sessionDataMap(result.Value.(map[string]any))),
|
||||
Session: s.storeSession(sessionEndFromInput(parseSession(sessionDataMap(result.Value.(map[string]any))), input)),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *PrepSubsystem) sessionLog(ctx context.Context, _ *mcp.CallToolRequest, input SessionLogInput) (*mcp.CallToolResult, SessionLogOutput, error) {
|
||||
if input.SessionID == "" {
|
||||
return nil, SessionLogOutput{}, core.E("sessionLog", "session_id is required", nil)
|
||||
}
|
||||
if input.Message == "" {
|
||||
return nil, SessionLogOutput{}, core.E("sessionLog", "message is required", nil)
|
||||
}
|
||||
|
||||
session, err := s.loadSession(ctx, input.SessionID)
|
||||
if err != nil {
|
||||
return nil, SessionLogOutput{}, err
|
||||
}
|
||||
|
||||
entryType := input.Type
|
||||
if entryType == "" {
|
||||
entryType = "info"
|
||||
}
|
||||
|
||||
entry := map[string]any{
|
||||
"message": input.Message,
|
||||
"type": entryType,
|
||||
"timestamp": time.Now().Format(time.RFC3339),
|
||||
}
|
||||
if len(input.Data) > 0 {
|
||||
entry["data"] = input.Data
|
||||
}
|
||||
|
||||
session.WorkLog = append(session.WorkLog, entry)
|
||||
session.UpdatedAt = time.Now().Format(time.RFC3339)
|
||||
|
||||
if err := writeSessionCache(&session); err != nil {
|
||||
return nil, SessionLogOutput{}, err
|
||||
}
|
||||
|
||||
return nil, SessionLogOutput{
|
||||
Success: true,
|
||||
Logged: input.Message,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *PrepSubsystem) sessionArtifact(ctx context.Context, _ *mcp.CallToolRequest, input SessionArtifactInput) (*mcp.CallToolResult, SessionArtifactOutput, error) {
|
||||
if input.SessionID == "" {
|
||||
return nil, SessionArtifactOutput{}, core.E("sessionArtifact", "session_id is required", nil)
|
||||
}
|
||||
if input.Path == "" {
|
||||
return nil, SessionArtifactOutput{}, core.E("sessionArtifact", "path is required", nil)
|
||||
}
|
||||
if input.Action == "" {
|
||||
return nil, SessionArtifactOutput{}, core.E("sessionArtifact", "action is required", nil)
|
||||
}
|
||||
|
||||
session, err := s.loadSession(ctx, input.SessionID)
|
||||
if err != nil {
|
||||
return nil, SessionArtifactOutput{}, err
|
||||
}
|
||||
|
||||
artifact := map[string]any{
|
||||
"path": input.Path,
|
||||
"action": input.Action,
|
||||
"timestamp": time.Now().Format(time.RFC3339),
|
||||
}
|
||||
metadata := input.Metadata
|
||||
if metadata == nil {
|
||||
metadata = map[string]any{}
|
||||
}
|
||||
if input.Description != "" {
|
||||
metadata["description"] = input.Description
|
||||
}
|
||||
if len(metadata) > 0 {
|
||||
artifact["metadata"] = metadata
|
||||
}
|
||||
|
||||
session.Artifacts = append(session.Artifacts, artifact)
|
||||
session.UpdatedAt = time.Now().Format(time.RFC3339)
|
||||
|
||||
if err := writeSessionCache(&session); err != nil {
|
||||
return nil, SessionArtifactOutput{}, err
|
||||
}
|
||||
|
||||
return nil, SessionArtifactOutput{
|
||||
Success: true,
|
||||
Artifact: input.Path,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *PrepSubsystem) sessionHandoff(ctx context.Context, _ *mcp.CallToolRequest, input SessionHandoffInput) (*mcp.CallToolResult, SessionHandoffOutput, error) {
|
||||
if input.SessionID == "" {
|
||||
return nil, SessionHandoffOutput{}, core.E("sessionHandoff", "session_id is required", nil)
|
||||
}
|
||||
if input.Summary == "" {
|
||||
return nil, SessionHandoffOutput{}, core.E("sessionHandoff", "summary is required", nil)
|
||||
}
|
||||
|
||||
session, err := s.loadSession(ctx, input.SessionID)
|
||||
if err != nil {
|
||||
return nil, SessionHandoffOutput{}, err
|
||||
}
|
||||
|
||||
session.Handoff = map[string]any{
|
||||
"summary": input.Summary,
|
||||
"next_steps": cleanStrings(input.NextSteps),
|
||||
"blockers": cleanStrings(input.Blockers),
|
||||
"context_for_next": input.ContextForNext,
|
||||
}
|
||||
session.Status = "paused"
|
||||
session.UpdatedAt = time.Now().Format(time.RFC3339)
|
||||
|
||||
if err := writeSessionCache(&session); err != nil {
|
||||
return nil, SessionHandoffOutput{}, err
|
||||
}
|
||||
|
||||
return nil, SessionHandoffOutput{
|
||||
Success: true,
|
||||
HandoffContext: sessionHandoffContext(session),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *PrepSubsystem) sessionResume(ctx context.Context, _ *mcp.CallToolRequest, input SessionResumeInput) (*mcp.CallToolResult, SessionResumeOutput, error) {
|
||||
if input.SessionID == "" {
|
||||
return nil, SessionResumeOutput{}, core.E("sessionResume", "session_id is required", nil)
|
||||
}
|
||||
|
||||
session, err := s.loadSession(ctx, input.SessionID)
|
||||
if err != nil {
|
||||
return nil, SessionResumeOutput{}, err
|
||||
}
|
||||
|
||||
if session.Status == "" || session.Status == "paused" || session.Status == "handed_off" {
|
||||
session.Status = "active"
|
||||
}
|
||||
session.UpdatedAt = time.Now().Format(time.RFC3339)
|
||||
|
||||
if err := writeSessionCache(&session); err != nil {
|
||||
return nil, SessionResumeOutput{}, err
|
||||
}
|
||||
|
||||
return nil, SessionResumeOutput{
|
||||
Success: true,
|
||||
Session: session,
|
||||
HandoffContext: session.Handoff,
|
||||
RecentActions: recentSessionActions(session.WorkLog, 20),
|
||||
Artifacts: session.Artifacts,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *PrepSubsystem) sessionReplay(ctx context.Context, _ *mcp.CallToolRequest, input SessionReplayInput) (*mcp.CallToolResult, SessionReplayOutput, error) {
|
||||
if input.SessionID == "" {
|
||||
return nil, SessionReplayOutput{}, core.E("sessionReplay", "session_id is required", nil)
|
||||
}
|
||||
|
||||
session, err := s.loadSession(ctx, input.SessionID)
|
||||
if err != nil {
|
||||
return nil, SessionReplayOutput{}, err
|
||||
}
|
||||
|
||||
return nil, SessionReplayOutput{
|
||||
Success: true,
|
||||
ReplayContext: sessionReplayContext(session),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
@ -319,6 +661,313 @@ func parseSession(values map[string]any) Session {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *PrepSubsystem) loadSession(ctx context.Context, sessionID string) (Session, error) {
|
||||
if cached, err := readSessionCache(sessionID); err == nil && cached != nil {
|
||||
return *cached, nil
|
||||
}
|
||||
|
||||
_, output, err := s.sessionGet(ctx, nil, SessionGetInput{SessionID: sessionID})
|
||||
if err != nil {
|
||||
return Session{}, err
|
||||
}
|
||||
|
||||
return output.Session, nil
|
||||
}
|
||||
|
||||
func (s *PrepSubsystem) storeSession(session Session) Session {
|
||||
merged, err := mergeSessionCache(session)
|
||||
if err != nil {
|
||||
return session
|
||||
}
|
||||
if err := writeSessionCache(&merged); err != nil {
|
||||
return merged
|
||||
}
|
||||
return merged
|
||||
}
|
||||
|
||||
func sessionFromInput(session Session, input SessionStartInput) Session {
|
||||
if session.PlanSlug == "" {
|
||||
session.PlanSlug = input.PlanSlug
|
||||
}
|
||||
if session.Plan == "" {
|
||||
session.Plan = input.PlanSlug
|
||||
}
|
||||
if session.AgentType == "" {
|
||||
session.AgentType = input.AgentType
|
||||
}
|
||||
if len(session.ContextSummary) == 0 && len(input.Context) > 0 {
|
||||
session.ContextSummary = input.Context
|
||||
}
|
||||
return session
|
||||
}
|
||||
|
||||
func sessionEndFromInput(session Session, input SessionEndInput) Session {
|
||||
if session.SessionID == "" {
|
||||
session.SessionID = input.SessionID
|
||||
}
|
||||
if session.Status == "" {
|
||||
session.Status = input.Status
|
||||
}
|
||||
if session.Summary == "" {
|
||||
session.Summary = input.Summary
|
||||
}
|
||||
if len(session.Handoff) == 0 && len(input.Handoff) > 0 {
|
||||
session.Handoff = input.Handoff
|
||||
}
|
||||
if session.Status == "completed" || session.Status == "failed" || session.Status == "handed_off" {
|
||||
if session.EndedAt == "" {
|
||||
session.EndedAt = time.Now().Format(time.RFC3339)
|
||||
}
|
||||
}
|
||||
return session
|
||||
}
|
||||
|
||||
func sessionCacheRoot() string {
|
||||
return core.JoinPath(CoreRoot(), "sessions")
|
||||
}
|
||||
|
||||
func sessionCachePath(sessionID string) string {
|
||||
return core.JoinPath(sessionCacheRoot(), core.Concat(core.SanitisePath(sessionID), ".json"))
|
||||
}
|
||||
|
||||
func readSessionCache(sessionID string) (*Session, error) {
|
||||
if sessionID == "" {
|
||||
return nil, core.E("readSessionCache", "session_id is required", nil)
|
||||
}
|
||||
|
||||
result := fs.Read(sessionCachePath(sessionID))
|
||||
if !result.OK {
|
||||
err, _ := result.Value.(error)
|
||||
if err == nil {
|
||||
return nil, core.E("readSessionCache", core.Concat("session not found: ", sessionID), nil)
|
||||
}
|
||||
return nil, core.E("readSessionCache", core.Concat("session not found: ", sessionID), err)
|
||||
}
|
||||
|
||||
var session Session
|
||||
if parseResult := core.JSONUnmarshalString(result.Value.(string), &session); !parseResult.OK {
|
||||
err, _ := parseResult.Value.(error)
|
||||
return nil, core.E("readSessionCache", "failed to parse session cache", err)
|
||||
}
|
||||
|
||||
return &session, nil
|
||||
}
|
||||
|
||||
func writeSessionCache(session *Session) error {
|
||||
if session == nil {
|
||||
return core.E("writeSessionCache", "session is required", nil)
|
||||
}
|
||||
if session.SessionID == "" {
|
||||
return core.E("writeSessionCache", "session_id is required", nil)
|
||||
}
|
||||
|
||||
now := time.Now().Format(time.RFC3339)
|
||||
if session.CreatedAt == "" {
|
||||
session.CreatedAt = now
|
||||
}
|
||||
if session.UpdatedAt == "" {
|
||||
session.UpdatedAt = now
|
||||
}
|
||||
|
||||
if ensureDirResult := fs.EnsureDir(sessionCacheRoot()); !ensureDirResult.OK {
|
||||
err, _ := ensureDirResult.Value.(error)
|
||||
return core.E("writeSessionCache", "failed to create session cache directory", err)
|
||||
}
|
||||
|
||||
if writeResult := fs.WriteAtomic(sessionCachePath(session.SessionID), core.JSONMarshalString(session)); !writeResult.OK {
|
||||
err, _ := writeResult.Value.(error)
|
||||
return core.E("writeSessionCache", "failed to write session cache", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func mergeSessionCache(session Session) (Session, error) {
|
||||
existing, err := readSessionCache(session.SessionID)
|
||||
if err == nil && existing != nil {
|
||||
if session.ID == 0 {
|
||||
session.ID = existing.ID
|
||||
}
|
||||
if session.Plan == "" {
|
||||
session.Plan = existing.Plan
|
||||
}
|
||||
if session.PlanSlug == "" {
|
||||
session.PlanSlug = existing.PlanSlug
|
||||
}
|
||||
if session.AgentType == "" {
|
||||
session.AgentType = existing.AgentType
|
||||
}
|
||||
if session.Status == "" {
|
||||
session.Status = existing.Status
|
||||
}
|
||||
if len(session.ContextSummary) == 0 {
|
||||
session.ContextSummary = existing.ContextSummary
|
||||
}
|
||||
if len(session.WorkLog) == 0 {
|
||||
session.WorkLog = existing.WorkLog
|
||||
}
|
||||
if len(session.Artifacts) == 0 {
|
||||
session.Artifacts = existing.Artifacts
|
||||
}
|
||||
if len(session.Handoff) == 0 {
|
||||
session.Handoff = existing.Handoff
|
||||
}
|
||||
if session.Summary == "" {
|
||||
session.Summary = existing.Summary
|
||||
}
|
||||
if session.CreatedAt == "" {
|
||||
session.CreatedAt = existing.CreatedAt
|
||||
}
|
||||
if session.EndedAt == "" {
|
||||
session.EndedAt = existing.EndedAt
|
||||
}
|
||||
}
|
||||
|
||||
if session.SessionID == "" {
|
||||
return session, core.E("mergeSessionCache", "session_id is required", nil)
|
||||
}
|
||||
|
||||
if session.CreatedAt == "" {
|
||||
session.CreatedAt = time.Now().Format(time.RFC3339)
|
||||
}
|
||||
session.UpdatedAt = time.Now().Format(time.RFC3339)
|
||||
|
||||
return session, nil
|
||||
}
|
||||
|
||||
func sessionHandoffContext(session Session) map[string]any {
|
||||
context := map[string]any{
|
||||
"session_id": session.SessionID,
|
||||
"agent_type": session.AgentType,
|
||||
"context_summary": session.ContextSummary,
|
||||
"recent_actions": recentSessionActions(session.WorkLog, 20),
|
||||
"artifacts": session.Artifacts,
|
||||
"handoff_notes": session.Handoff,
|
||||
}
|
||||
if session.PlanSlug != "" || session.Plan != "" {
|
||||
context["plan"] = map[string]any{
|
||||
"slug": sessionPlanSlug(session),
|
||||
}
|
||||
}
|
||||
if session.CreatedAt != "" {
|
||||
context["started_at"] = session.CreatedAt
|
||||
}
|
||||
if session.UpdatedAt != "" {
|
||||
context["last_active_at"] = session.UpdatedAt
|
||||
}
|
||||
return context
|
||||
}
|
||||
|
||||
func sessionReplayContext(session Session) map[string]any {
|
||||
checkpoints := filterSessionEntries(session.WorkLog, "checkpoint")
|
||||
decisions := filterSessionEntries(session.WorkLog, "decision")
|
||||
errors := filterSessionEntries(session.WorkLog, "error")
|
||||
lastCheckpoint := map[string]any(nil)
|
||||
if len(checkpoints) > 0 {
|
||||
lastCheckpoint = checkpoints[len(checkpoints)-1]
|
||||
}
|
||||
|
||||
return map[string]any{
|
||||
"session_id": session.SessionID,
|
||||
"status": session.Status,
|
||||
"agent_type": session.AgentType,
|
||||
"plan": map[string]any{"slug": sessionPlanSlug(session)},
|
||||
"started_at": session.CreatedAt,
|
||||
"last_active_at": session.UpdatedAt,
|
||||
"context_summary": session.ContextSummary,
|
||||
"progress_summary": sessionProgressSummary(session.WorkLog),
|
||||
"last_checkpoint": lastCheckpoint,
|
||||
"checkpoints": checkpoints,
|
||||
"decisions": decisions,
|
||||
"errors": errors,
|
||||
"artifacts": session.Artifacts,
|
||||
"artifacts_by_action": map[string]any{
|
||||
"created": filterArtifactsByAction(session.Artifacts, "created"),
|
||||
"modified": filterArtifactsByAction(session.Artifacts, "modified"),
|
||||
"deleted": filterArtifactsByAction(session.Artifacts, "deleted"),
|
||||
"reviewed": filterArtifactsByAction(session.Artifacts, "reviewed"),
|
||||
},
|
||||
"recent_actions": recentSessionActions(session.WorkLog, 20),
|
||||
"total_actions": len(session.WorkLog),
|
||||
"handoff_notes": session.Handoff,
|
||||
"final_summary": session.Summary,
|
||||
}
|
||||
}
|
||||
|
||||
func filterSessionEntries(entries []map[string]any, entryType string) []map[string]any {
|
||||
filtered := make([]map[string]any, 0, len(entries))
|
||||
for _, entry := range entries {
|
||||
if stringValue(entry["type"]) == entryType {
|
||||
filtered = append(filtered, entry)
|
||||
}
|
||||
}
|
||||
return filtered
|
||||
}
|
||||
|
||||
func recentSessionActions(entries []map[string]any, limit int) []map[string]any {
|
||||
if len(entries) == 0 || limit <= 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
recent := make([]map[string]any, 0, limit)
|
||||
for i := len(entries) - 1; i >= 0 && len(recent) < limit; i-- {
|
||||
recent = append(recent, entries[i])
|
||||
}
|
||||
return recent
|
||||
}
|
||||
|
||||
func sessionProgressSummary(entries []map[string]any) map[string]any {
|
||||
if len(entries) == 0 {
|
||||
return map[string]any{
|
||||
"completed_steps": 0,
|
||||
"last_action": nil,
|
||||
"summary": "No work recorded",
|
||||
}
|
||||
}
|
||||
|
||||
lastEntry := entries[len(entries)-1]
|
||||
checkpointCount := len(filterSessionEntries(entries, "checkpoint"))
|
||||
errorCount := len(filterSessionEntries(entries, "error"))
|
||||
lastAction := stringValue(lastEntry["action"])
|
||||
if lastAction == "" {
|
||||
lastAction = stringValue(lastEntry["message"])
|
||||
}
|
||||
if lastAction == "" {
|
||||
lastAction = "Unknown"
|
||||
}
|
||||
|
||||
return map[string]any{
|
||||
"completed_steps": len(entries),
|
||||
"checkpoint_count": checkpointCount,
|
||||
"error_count": errorCount,
|
||||
"last_action": lastAction,
|
||||
"last_action_at": stringValue(lastEntry["timestamp"]),
|
||||
"summary": core.Sprintf(
|
||||
"%d actions recorded, %d checkpoints, %d errors",
|
||||
len(entries),
|
||||
checkpointCount,
|
||||
errorCount,
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
func filterArtifactsByAction(artifacts []map[string]any, action string) []map[string]any {
|
||||
filtered := make([]map[string]any, 0, len(artifacts))
|
||||
for _, artifact := range artifacts {
|
||||
if stringValue(artifact["action"]) == action {
|
||||
filtered = append(filtered, artifact)
|
||||
}
|
||||
}
|
||||
return filtered
|
||||
}
|
||||
|
||||
func sessionPlanSlug(session Session) string {
|
||||
if session.PlanSlug != "" {
|
||||
return session.PlanSlug
|
||||
}
|
||||
return session.Plan
|
||||
}
|
||||
|
||||
func parseSessionListOutput(payload map[string]any) SessionListOutput {
|
||||
sessionData := payloadDataSlice(payload, "sessions")
|
||||
sessions := make([]Session, 0, len(sessionData))
|
||||
|
|
|
|||
|
|
@ -209,3 +209,216 @@ func TestSession_HandleSessionEnd_Good(t *testing.T) {
|
|||
assert.Equal(t, "completed", output.Session.Status)
|
||||
assert.Equal(t, "All green", output.Session.Summary)
|
||||
}
|
||||
|
||||
func TestSession_HandleSessionLog_Good(t *testing.T) {
|
||||
subsystem := testPrepWithPlatformServer(t, nil, "")
|
||||
require.NoError(t, writeSessionCache(&Session{
|
||||
SessionID: "ses_log",
|
||||
AgentType: "codex",
|
||||
Status: "active",
|
||||
}))
|
||||
|
||||
result := subsystem.handleSessionLog(context.Background(), core.NewOptions(
|
||||
core.Option{Key: "session_id", Value: "ses_log"},
|
||||
core.Option{Key: "message", Value: "Checked build"},
|
||||
core.Option{Key: "type", Value: "checkpoint"},
|
||||
core.Option{Key: "data", Value: `{"repo":"core/go"}`},
|
||||
))
|
||||
require.True(t, result.OK)
|
||||
|
||||
session, err := readSessionCache("ses_log")
|
||||
require.NoError(t, err)
|
||||
require.Len(t, session.WorkLog, 1)
|
||||
assert.Equal(t, "Checked build", session.WorkLog[0]["message"])
|
||||
assert.Equal(t, "checkpoint", session.WorkLog[0]["type"])
|
||||
}
|
||||
|
||||
func TestSession_HandleSessionLog_Bad(t *testing.T) {
|
||||
subsystem := testPrepWithPlatformServer(t, nil, "")
|
||||
result := subsystem.handleSessionLog(context.Background(), core.NewOptions(
|
||||
core.Option{Key: "session_id", Value: "ses_log"},
|
||||
))
|
||||
assert.False(t, result.OK)
|
||||
}
|
||||
|
||||
func TestSession_HandleSessionLog_Ugly_MissingSession(t *testing.T) {
|
||||
subsystem := testPrepWithPlatformServer(t, nil, "")
|
||||
result := subsystem.handleSessionLog(context.Background(), core.NewOptions(
|
||||
core.Option{Key: "session_id", Value: "ses_missing"},
|
||||
core.Option{Key: "message", Value: "Checked build"},
|
||||
))
|
||||
assert.False(t, result.OK)
|
||||
}
|
||||
|
||||
func TestSession_HandleSessionArtifact_Good(t *testing.T) {
|
||||
subsystem := testPrepWithPlatformServer(t, nil, "")
|
||||
require.NoError(t, writeSessionCache(&Session{
|
||||
SessionID: "ses_artifact",
|
||||
AgentType: "codex",
|
||||
Status: "active",
|
||||
}))
|
||||
|
||||
result := subsystem.handleSessionArtifact(context.Background(), core.NewOptions(
|
||||
core.Option{Key: "session_id", Value: "ses_artifact"},
|
||||
core.Option{Key: "path", Value: "pkg/agentic/session.go"},
|
||||
core.Option{Key: "action", Value: "modified"},
|
||||
core.Option{Key: "metadata", Value: `{"insertions":12}`},
|
||||
))
|
||||
require.True(t, result.OK)
|
||||
|
||||
session, err := readSessionCache("ses_artifact")
|
||||
require.NoError(t, err)
|
||||
require.Len(t, session.Artifacts, 1)
|
||||
assert.Equal(t, "pkg/agentic/session.go", session.Artifacts[0]["path"])
|
||||
assert.Equal(t, "modified", session.Artifacts[0]["action"])
|
||||
}
|
||||
|
||||
func TestSession_HandleSessionArtifact_Bad(t *testing.T) {
|
||||
subsystem := testPrepWithPlatformServer(t, nil, "")
|
||||
result := subsystem.handleSessionArtifact(context.Background(), core.NewOptions(
|
||||
core.Option{Key: "session_id", Value: "ses_artifact"},
|
||||
))
|
||||
assert.False(t, result.OK)
|
||||
}
|
||||
|
||||
func TestSession_HandleSessionArtifact_Ugly_MissingSession(t *testing.T) {
|
||||
subsystem := testPrepWithPlatformServer(t, nil, "")
|
||||
result := subsystem.handleSessionArtifact(context.Background(), core.NewOptions(
|
||||
core.Option{Key: "session_id", Value: "ses_artifact"},
|
||||
core.Option{Key: "path", Value: "pkg/agentic/session.go"},
|
||||
core.Option{Key: "action", Value: "modified"},
|
||||
))
|
||||
assert.False(t, result.OK)
|
||||
}
|
||||
|
||||
func TestSession_HandleSessionHandoff_Good(t *testing.T) {
|
||||
subsystem := testPrepWithPlatformServer(t, nil, "")
|
||||
require.NoError(t, writeSessionCache(&Session{
|
||||
SessionID: "ses_handoff",
|
||||
AgentType: "codex",
|
||||
Status: "active",
|
||||
WorkLog: []map[string]any{
|
||||
{"message": "Checked build", "type": "checkpoint", "timestamp": "2026-03-31T10:00:00Z"},
|
||||
},
|
||||
}))
|
||||
|
||||
result := subsystem.handleSessionHandoff(context.Background(), core.NewOptions(
|
||||
core.Option{Key: "session_id", Value: "ses_handoff"},
|
||||
core.Option{Key: "summary", Value: "Ready for review"},
|
||||
core.Option{Key: "next_steps", Value: `["Run verify"]`},
|
||||
core.Option{Key: "blockers", Value: `["Need CI"]`},
|
||||
))
|
||||
require.True(t, result.OK)
|
||||
|
||||
session, err := readSessionCache("ses_handoff")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "paused", session.Status)
|
||||
assert.Equal(t, "Ready for review", session.Handoff["summary"])
|
||||
}
|
||||
|
||||
func TestSession_HandleSessionHandoff_Bad(t *testing.T) {
|
||||
subsystem := testPrepWithPlatformServer(t, nil, "")
|
||||
result := subsystem.handleSessionHandoff(context.Background(), core.NewOptions(
|
||||
core.Option{Key: "session_id", Value: "ses_handoff"},
|
||||
))
|
||||
assert.False(t, result.OK)
|
||||
}
|
||||
|
||||
func TestSession_HandleSessionHandoff_Ugly_MissingSession(t *testing.T) {
|
||||
subsystem := testPrepWithPlatformServer(t, nil, "")
|
||||
result := subsystem.handleSessionHandoff(context.Background(), core.NewOptions(
|
||||
core.Option{Key: "session_id", Value: "ses_handoff"},
|
||||
core.Option{Key: "summary", Value: "Ready for review"},
|
||||
))
|
||||
assert.False(t, result.OK)
|
||||
}
|
||||
|
||||
func TestSession_HandleSessionResume_Good(t *testing.T) {
|
||||
subsystem := testPrepWithPlatformServer(t, nil, "")
|
||||
require.NoError(t, writeSessionCache(&Session{
|
||||
SessionID: "ses_resume",
|
||||
AgentType: "codex",
|
||||
Status: "paused",
|
||||
Handoff: map[string]any{
|
||||
"summary": "Ready for review",
|
||||
},
|
||||
WorkLog: []map[string]any{
|
||||
{"message": "Checked build", "type": "checkpoint", "timestamp": "2026-03-31T10:00:00Z"},
|
||||
},
|
||||
Artifacts: []map[string]any{
|
||||
{"path": "pkg/agentic/session.go", "action": "modified"},
|
||||
},
|
||||
}))
|
||||
|
||||
result := subsystem.handleSessionResume(context.Background(), core.NewOptions(
|
||||
core.Option{Key: "session_id", Value: "ses_resume"},
|
||||
))
|
||||
require.True(t, result.OK)
|
||||
|
||||
output, ok := result.Value.(SessionResumeOutput)
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, "active", output.Session.Status)
|
||||
assert.Equal(t, "Ready for review", output.HandoffContext["summary"])
|
||||
require.Len(t, output.RecentActions, 1)
|
||||
}
|
||||
|
||||
func TestSession_HandleSessionResume_Bad(t *testing.T) {
|
||||
subsystem := testPrepWithPlatformServer(t, nil, "")
|
||||
result := subsystem.handleSessionResume(context.Background(), core.NewOptions())
|
||||
assert.False(t, result.OK)
|
||||
}
|
||||
|
||||
func TestSession_HandleSessionResume_Ugly_MissingSession(t *testing.T) {
|
||||
subsystem := testPrepWithPlatformServer(t, nil, "")
|
||||
result := subsystem.handleSessionResume(context.Background(), core.NewOptions(
|
||||
core.Option{Key: "session_id", Value: "ses_resume"},
|
||||
))
|
||||
assert.False(t, result.OK)
|
||||
}
|
||||
|
||||
func TestSession_HandleSessionReplay_Good(t *testing.T) {
|
||||
subsystem := testPrepWithPlatformServer(t, nil, "")
|
||||
require.NoError(t, writeSessionCache(&Session{
|
||||
SessionID: "ses_replay",
|
||||
AgentType: "codex",
|
||||
Status: "completed",
|
||||
PlanSlug: "ax-follow-up",
|
||||
WorkLog: []map[string]any{
|
||||
{"message": "Checked build", "type": "checkpoint", "timestamp": "2026-03-31T10:00:00Z"},
|
||||
{"message": "Chose pattern", "type": "decision", "timestamp": "2026-03-31T10:10:00Z"},
|
||||
{"message": "CI failed", "type": "error", "timestamp": "2026-03-31T10:15:00Z"},
|
||||
},
|
||||
Artifacts: []map[string]any{
|
||||
{"path": "pkg/agentic/session.go", "action": "modified"},
|
||||
},
|
||||
Handoff: map[string]any{
|
||||
"summary": "Ready for review",
|
||||
},
|
||||
Summary: "Completed work",
|
||||
}))
|
||||
|
||||
result := subsystem.handleSessionReplay(context.Background(), core.NewOptions(
|
||||
core.Option{Key: "session_id", Value: "ses_replay"},
|
||||
))
|
||||
require.True(t, result.OK)
|
||||
|
||||
output, ok := result.Value.(SessionReplayOutput)
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, "ses_replay", output.ReplayContext["session_id"])
|
||||
assert.Equal(t, 3, output.ReplayContext["total_actions"])
|
||||
require.Len(t, output.ReplayContext["checkpoints"].([]map[string]any), 1)
|
||||
}
|
||||
|
||||
func TestSession_HandleSessionReplay_Bad(t *testing.T) {
|
||||
subsystem := testPrepWithPlatformServer(t, nil, "")
|
||||
result := subsystem.handleSessionReplay(context.Background(), core.NewOptions())
|
||||
assert.False(t, result.OK)
|
||||
}
|
||||
|
||||
func TestSession_HandleSessionReplay_Ugly_MissingSession(t *testing.T) {
|
||||
subsystem := testPrepWithPlatformServer(t, nil, "")
|
||||
result := subsystem.handleSessionReplay(context.Background(), core.NewOptions(
|
||||
core.Option{Key: "session_id", Value: "ses_replay"},
|
||||
))
|
||||
assert.False(t, result.OK)
|
||||
}
|
||||
|
|
|
|||
272
pkg/agentic/state.go
Normal file
272
pkg/agentic/state.go
Normal file
|
|
@ -0,0 +1,272 @@
|
|||
// SPDX-License-Identifier: EUPL-1.2
|
||||
|
||||
package agentic
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
"github.com/modelcontextprotocol/go-sdk/mcp"
|
||||
)
|
||||
|
||||
// state := agentic.PlanState{Key: "pattern", Value: "observer", Category: "general"}
|
||||
type PlanState struct {
|
||||
Key string `json:"key"`
|
||||
Value any `json:"value"`
|
||||
Category string `json:"category,omitempty"`
|
||||
UpdatedAt string `json:"updated_at,omitempty"`
|
||||
}
|
||||
|
||||
// input := agentic.StateSetInput{PlanSlug: "ax-follow-up", Key: "pattern", Value: "observer"}
|
||||
type StateSetInput struct {
|
||||
PlanSlug string `json:"plan_slug"`
|
||||
Key string `json:"key"`
|
||||
Value any `json:"value"`
|
||||
Category string `json:"category,omitempty"`
|
||||
}
|
||||
|
||||
// input := agentic.StateGetInput{PlanSlug: "ax-follow-up", Key: "pattern"}
|
||||
type StateGetInput struct {
|
||||
PlanSlug string `json:"plan_slug"`
|
||||
Key string `json:"key"`
|
||||
}
|
||||
|
||||
// input := agentic.StateListInput{PlanSlug: "ax-follow-up", Category: "general"}
|
||||
type StateListInput struct {
|
||||
PlanSlug string `json:"plan_slug"`
|
||||
Category string `json:"category,omitempty"`
|
||||
}
|
||||
|
||||
// out := agentic.StateOutput{Success: true, State: agentic.PlanState{Key: "pattern"}}
|
||||
type StateOutput struct {
|
||||
Success bool `json:"success"`
|
||||
State PlanState `json:"state"`
|
||||
}
|
||||
|
||||
// out := agentic.StateListOutput{Success: true, Total: 1, States: []agentic.PlanState{{Key: "pattern"}}}
|
||||
type StateListOutput struct {
|
||||
Success bool `json:"success"`
|
||||
Total int `json:"total"`
|
||||
States []PlanState `json:"states"`
|
||||
}
|
||||
|
||||
// result := c.Action("state.set").Run(ctx, core.NewOptions(
|
||||
//
|
||||
// core.Option{Key: "plan_slug", Value: "ax-follow-up"},
|
||||
// core.Option{Key: "key", Value: "pattern"},
|
||||
// core.Option{Key: "value", Value: "observer"},
|
||||
//
|
||||
// ))
|
||||
func (s *PrepSubsystem) handleStateSet(ctx context.Context, options core.Options) core.Result {
|
||||
_, output, err := s.stateSet(ctx, nil, StateSetInput{
|
||||
PlanSlug: optionStringValue(options, "plan_slug", "plan"),
|
||||
Key: optionStringValue(options, "key"),
|
||||
Value: optionAnyValue(options, "value"),
|
||||
Category: optionStringValue(options, "category"),
|
||||
})
|
||||
if err != nil {
|
||||
return core.Result{Value: err, OK: false}
|
||||
}
|
||||
return core.Result{Value: output, OK: true}
|
||||
}
|
||||
|
||||
// result := c.Action("state.get").Run(ctx, core.NewOptions(
|
||||
//
|
||||
// core.Option{Key: "plan_slug", Value: "ax-follow-up"},
|
||||
// core.Option{Key: "key", Value: "pattern"},
|
||||
//
|
||||
// ))
|
||||
func (s *PrepSubsystem) handleStateGet(ctx context.Context, options core.Options) core.Result {
|
||||
_, output, err := s.stateGet(ctx, nil, StateGetInput{
|
||||
PlanSlug: optionStringValue(options, "plan_slug", "plan"),
|
||||
Key: optionStringValue(options, "key"),
|
||||
})
|
||||
if err != nil {
|
||||
return core.Result{Value: err, OK: false}
|
||||
}
|
||||
return core.Result{Value: output, OK: true}
|
||||
}
|
||||
|
||||
// result := c.Action("state.list").Run(ctx, core.NewOptions(core.Option{Key: "plan_slug", Value: "ax-follow-up"}))
|
||||
func (s *PrepSubsystem) handleStateList(ctx context.Context, options core.Options) core.Result {
|
||||
_, output, err := s.stateList(ctx, nil, StateListInput{
|
||||
PlanSlug: optionStringValue(options, "plan_slug", "plan"),
|
||||
Category: optionStringValue(options, "category"),
|
||||
})
|
||||
if err != nil {
|
||||
return core.Result{Value: err, OK: false}
|
||||
}
|
||||
return core.Result{Value: output, OK: true}
|
||||
}
|
||||
|
||||
func (s *PrepSubsystem) registerStateTools(server *mcp.Server) {
|
||||
mcp.AddTool(server, &mcp.Tool{
|
||||
Name: "state_set",
|
||||
Description: "Set a workspace state value for a plan so later sessions can reuse shared context.",
|
||||
}, s.stateSet)
|
||||
|
||||
mcp.AddTool(server, &mcp.Tool{
|
||||
Name: "state_get",
|
||||
Description: "Get a workspace state value for a plan by key.",
|
||||
}, s.stateGet)
|
||||
|
||||
mcp.AddTool(server, &mcp.Tool{
|
||||
Name: "state_list",
|
||||
Description: "List all stored workspace state values for a plan, with optional category filtering.",
|
||||
}, s.stateList)
|
||||
}
|
||||
|
||||
func (s *PrepSubsystem) stateSet(_ context.Context, _ *mcp.CallToolRequest, input StateSetInput) (*mcp.CallToolResult, StateOutput, error) {
|
||||
if input.PlanSlug == "" {
|
||||
return nil, StateOutput{}, core.E("stateSet", "plan_slug is required", nil)
|
||||
}
|
||||
if input.Key == "" {
|
||||
return nil, StateOutput{}, core.E("stateSet", "key is required", nil)
|
||||
}
|
||||
if input.Value == nil {
|
||||
return nil, StateOutput{}, core.E("stateSet", "value is required", nil)
|
||||
}
|
||||
|
||||
states, err := readPlanStates(input.PlanSlug)
|
||||
if err != nil {
|
||||
return nil, StateOutput{}, err
|
||||
}
|
||||
|
||||
now := time.Now().Format(time.RFC3339)
|
||||
state := PlanState{
|
||||
Key: input.Key,
|
||||
Value: input.Value,
|
||||
Category: input.Category,
|
||||
UpdatedAt: now,
|
||||
}
|
||||
if state.Category == "" {
|
||||
state.Category = "general"
|
||||
}
|
||||
|
||||
found := false
|
||||
for i := range states {
|
||||
if states[i].Key == input.Key {
|
||||
states[i] = state
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
states = append(states, state)
|
||||
}
|
||||
|
||||
if err := writePlanStates(input.PlanSlug, states); err != nil {
|
||||
return nil, StateOutput{}, err
|
||||
}
|
||||
|
||||
return nil, StateOutput{
|
||||
Success: true,
|
||||
State: state,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *PrepSubsystem) stateGet(_ context.Context, _ *mcp.CallToolRequest, input StateGetInput) (*mcp.CallToolResult, StateOutput, error) {
|
||||
if input.PlanSlug == "" {
|
||||
return nil, StateOutput{}, core.E("stateGet", "plan_slug is required", nil)
|
||||
}
|
||||
if input.Key == "" {
|
||||
return nil, StateOutput{}, core.E("stateGet", "key is required", nil)
|
||||
}
|
||||
|
||||
states, err := readPlanStates(input.PlanSlug)
|
||||
if err != nil {
|
||||
return nil, StateOutput{}, err
|
||||
}
|
||||
|
||||
for _, state := range states {
|
||||
if state.Key == input.Key {
|
||||
if state.Category == "" {
|
||||
state.Category = "general"
|
||||
}
|
||||
return nil, StateOutput{
|
||||
Success: true,
|
||||
State: state,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, StateOutput{}, core.E("stateGet", core.Concat("state not found: ", input.Key), nil)
|
||||
}
|
||||
|
||||
func (s *PrepSubsystem) stateList(_ context.Context, _ *mcp.CallToolRequest, input StateListInput) (*mcp.CallToolResult, StateListOutput, error) {
|
||||
if input.PlanSlug == "" {
|
||||
return nil, StateListOutput{}, core.E("stateList", "plan_slug is required", nil)
|
||||
}
|
||||
|
||||
states, err := readPlanStates(input.PlanSlug)
|
||||
if err != nil {
|
||||
return nil, StateListOutput{}, err
|
||||
}
|
||||
|
||||
filtered := make([]PlanState, 0, len(states))
|
||||
for _, state := range states {
|
||||
if state.Category == "" {
|
||||
state.Category = "general"
|
||||
}
|
||||
if input.Category != "" && state.Category != input.Category {
|
||||
continue
|
||||
}
|
||||
filtered = append(filtered, state)
|
||||
}
|
||||
|
||||
return nil, StateListOutput{
|
||||
Success: true,
|
||||
Total: len(filtered),
|
||||
States: filtered,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func stateRoot() string {
|
||||
return core.JoinPath(CoreRoot(), "state")
|
||||
}
|
||||
|
||||
func statePath(planSlug string) string {
|
||||
return core.JoinPath(stateRoot(), core.Concat(core.SanitisePath(planSlug), ".json"))
|
||||
}
|
||||
|
||||
func readPlanStates(planSlug string) ([]PlanState, error) {
|
||||
result := fs.Read(statePath(planSlug))
|
||||
if !result.OK {
|
||||
err, _ := result.Value.(error)
|
||||
if err == nil {
|
||||
return []PlanState{}, nil
|
||||
}
|
||||
if core.Contains(err.Error(), "no such file") {
|
||||
return []PlanState{}, nil
|
||||
}
|
||||
return nil, core.E("readPlanStates", "failed to read state file", err)
|
||||
}
|
||||
|
||||
content := core.Trim(result.Value.(string))
|
||||
if content == "" {
|
||||
return []PlanState{}, nil
|
||||
}
|
||||
|
||||
var states []PlanState
|
||||
if parseResult := core.JSONUnmarshalString(content, &states); !parseResult.OK {
|
||||
err, _ := parseResult.Value.(error)
|
||||
return nil, core.E("readPlanStates", "failed to parse state file", err)
|
||||
}
|
||||
|
||||
return states, nil
|
||||
}
|
||||
|
||||
func writePlanStates(planSlug string, states []PlanState) error {
|
||||
if ensureDirResult := fs.EnsureDir(stateRoot()); !ensureDirResult.OK {
|
||||
err, _ := ensureDirResult.Value.(error)
|
||||
return core.E("writePlanStates", "failed to create state directory", err)
|
||||
}
|
||||
|
||||
if writeResult := fs.WriteAtomic(statePath(planSlug), core.JSONMarshalString(states)); !writeResult.OK {
|
||||
err, _ := writeResult.Value.(error)
|
||||
return core.E("writePlanStates", "failed to write state file", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
14
pkg/agentic/state_example_test.go
Normal file
14
pkg/agentic/state_example_test.go
Normal file
|
|
@ -0,0 +1,14 @@
|
|||
// SPDX-License-Identifier: EUPL-1.2
|
||||
|
||||
package agentic
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
)
|
||||
|
||||
func Example_statePath() {
|
||||
fmt.Println(core.PathBase(statePath("ax-follow-up")))
|
||||
// Output: ax-follow-up.json
|
||||
}
|
||||
137
pkg/agentic/state_test.go
Normal file
137
pkg/agentic/state_test.go
Normal file
|
|
@ -0,0 +1,137 @@
|
|||
// SPDX-License-Identifier: EUPL-1.2
|
||||
|
||||
package agentic
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestState_HandleStateSet_Good(t *testing.T) {
|
||||
subsystem := testPrepWithPlatformServer(t, nil, "")
|
||||
result := subsystem.handleStateSet(context.Background(), core.NewOptions(
|
||||
core.Option{Key: "plan_slug", Value: "ax-follow-up"},
|
||||
core.Option{Key: "key", Value: "pattern"},
|
||||
core.Option{Key: "value", Value: `{"name":"observer"}`},
|
||||
))
|
||||
require.True(t, result.OK)
|
||||
|
||||
output, ok := result.Value.(StateOutput)
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, "pattern", output.State.Key)
|
||||
|
||||
states, err := readPlanStates("ax-follow-up")
|
||||
require.NoError(t, err)
|
||||
require.Len(t, states, 1)
|
||||
assert.Equal(t, "observer", anyMapValue(states[0].Value)["name"])
|
||||
}
|
||||
|
||||
func TestState_HandleStateSet_Bad(t *testing.T) {
|
||||
subsystem := testPrepWithPlatformServer(t, nil, "")
|
||||
result := subsystem.handleStateSet(context.Background(), core.NewOptions(
|
||||
core.Option{Key: "plan_slug", Value: "ax-follow-up"},
|
||||
))
|
||||
assert.False(t, result.OK)
|
||||
}
|
||||
|
||||
func TestState_HandleStateSet_Ugly_Upsert(t *testing.T) {
|
||||
subsystem := testPrepWithPlatformServer(t, nil, "")
|
||||
first := subsystem.handleStateSet(context.Background(), core.NewOptions(
|
||||
core.Option{Key: "plan_slug", Value: "ax-follow-up"},
|
||||
core.Option{Key: "key", Value: "pattern"},
|
||||
core.Option{Key: "value", Value: "observer"},
|
||||
))
|
||||
require.True(t, first.OK)
|
||||
|
||||
second := subsystem.handleStateSet(context.Background(), core.NewOptions(
|
||||
core.Option{Key: "plan_slug", Value: "ax-follow-up"},
|
||||
core.Option{Key: "key", Value: "pattern"},
|
||||
core.Option{Key: "value", Value: "pipeline"},
|
||||
))
|
||||
require.True(t, second.OK)
|
||||
|
||||
states, err := readPlanStates("ax-follow-up")
|
||||
require.NoError(t, err)
|
||||
require.Len(t, states, 1)
|
||||
assert.Equal(t, "pipeline", stringValue(states[0].Value))
|
||||
}
|
||||
|
||||
func TestState_HandleStateGet_Good(t *testing.T) {
|
||||
subsystem := testPrepWithPlatformServer(t, nil, "")
|
||||
require.NoError(t, writePlanStates("ax-follow-up", []PlanState{{
|
||||
Key: "pattern",
|
||||
Value: "observer",
|
||||
Category: "general",
|
||||
}}))
|
||||
|
||||
result := subsystem.handleStateGet(context.Background(), core.NewOptions(
|
||||
core.Option{Key: "plan_slug", Value: "ax-follow-up"},
|
||||
core.Option{Key: "key", Value: "pattern"},
|
||||
))
|
||||
require.True(t, result.OK)
|
||||
|
||||
output, ok := result.Value.(StateOutput)
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, "observer", stringValue(output.State.Value))
|
||||
}
|
||||
|
||||
func TestState_HandleStateGet_Bad(t *testing.T) {
|
||||
subsystem := testPrepWithPlatformServer(t, nil, "")
|
||||
result := subsystem.handleStateGet(context.Background(), core.NewOptions(
|
||||
core.Option{Key: "plan_slug", Value: "ax-follow-up"},
|
||||
))
|
||||
assert.False(t, result.OK)
|
||||
}
|
||||
|
||||
func TestState_HandleStateGet_Ugly_CorruptStateFile(t *testing.T) {
|
||||
subsystem := testPrepWithPlatformServer(t, nil, "")
|
||||
require.True(t, fs.EnsureDir(stateRoot()).OK)
|
||||
require.True(t, fs.Write(statePath("ax-follow-up"), `[{broken`).OK)
|
||||
|
||||
result := subsystem.handleStateGet(context.Background(), core.NewOptions(
|
||||
core.Option{Key: "plan_slug", Value: "ax-follow-up"},
|
||||
core.Option{Key: "key", Value: "pattern"},
|
||||
))
|
||||
assert.False(t, result.OK)
|
||||
}
|
||||
|
||||
func TestState_HandleStateList_Good(t *testing.T) {
|
||||
subsystem := testPrepWithPlatformServer(t, nil, "")
|
||||
require.NoError(t, writePlanStates("ax-follow-up", []PlanState{
|
||||
{Key: "pattern", Value: "observer", Category: "general"},
|
||||
{Key: "risk", Value: "auth", Category: "security"},
|
||||
}))
|
||||
|
||||
result := subsystem.handleStateList(context.Background(), core.NewOptions(
|
||||
core.Option{Key: "plan_slug", Value: "ax-follow-up"},
|
||||
core.Option{Key: "category", Value: "security"},
|
||||
))
|
||||
require.True(t, result.OK)
|
||||
|
||||
output, ok := result.Value.(StateListOutput)
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, 1, output.Total)
|
||||
require.Len(t, output.States, 1)
|
||||
assert.Equal(t, "risk", output.States[0].Key)
|
||||
}
|
||||
|
||||
func TestState_HandleStateList_Bad(t *testing.T) {
|
||||
subsystem := testPrepWithPlatformServer(t, nil, "")
|
||||
result := subsystem.handleStateList(context.Background(), core.NewOptions())
|
||||
assert.False(t, result.OK)
|
||||
}
|
||||
|
||||
func TestState_HandleStateList_Ugly_CorruptStateFile(t *testing.T) {
|
||||
subsystem := testPrepWithPlatformServer(t, nil, "")
|
||||
require.True(t, fs.EnsureDir(stateRoot()).OK)
|
||||
require.True(t, fs.Write(statePath("ax-follow-up"), `{broken`).OK)
|
||||
|
||||
result := subsystem.handleStateList(context.Background(), core.NewOptions(
|
||||
core.Option{Key: "plan_slug", Value: "ax-follow-up"},
|
||||
))
|
||||
assert.False(t, result.OK)
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue