diff --git a/pkg/agentic/prep.go b/pkg/agentic/prep.go index 9fcf33f..6e3b29b 100644 --- a/pkg/agentic/prep.go +++ b/pkg/agentic/prep.go @@ -178,6 +178,11 @@ func (s *PrepSubsystem) OnStartup(ctx context.Context) core.Result { c.Action("plan.update", s.handlePlanUpdate).Description = "Update plan status, phases, notes, or agent assignment" c.Action("plan.delete", s.handlePlanDelete).Description = "Delete an implementation plan by ID" c.Action("plan.list", s.handlePlanList).Description = "List implementation plans with optional filters" + c.Action("session.start", s.handleSessionStart).Description = "Start an agent session for a plan" + c.Action("session.get", s.handleSessionGet).Description = "Read a session by session ID" + c.Action("session.list", s.handleSessionList).Description = "List sessions with optional plan or status filters" + c.Action("session.continue", s.handleSessionContinue).Description = "Continue a session from its latest saved context" + c.Action("session.end", s.handleSessionEnd).Description = "End a session with status and summary" c.Action("agentic.prompt", s.handlePrompt).Description = "Read a system prompt by slug" c.Action("agentic.task", s.handleTask).Description = "Read a task plan by slug" @@ -280,6 +285,7 @@ func (s *PrepSubsystem) RegisterTools(server *mcp.Server) { s.registerRemoteStatusTool(server) s.registerReviewQueueTool(server) s.registerShutdownTools(server) + s.registerSessionTools(server) mcp.AddTool(server, &mcp.Tool{ Name: "agentic_scan", diff --git a/pkg/agentic/prep_test.go b/pkg/agentic/prep_test.go index 563ccd9..6d52fa4 100644 --- a/pkg/agentic/prep_test.go +++ b/pkg/agentic/prep_test.go @@ -447,6 +447,22 @@ func TestPrep_OnStartup_Good_RegistersPlanActions(t *testing.T) { assert.True(t, c.Action("plan.list").Exists()) } +func TestPrep_OnStartup_Good_RegistersSessionActions(t *testing.T) { + t.Setenv("CORE_WORKSPACE", t.TempDir()) + t.Setenv("CORE_AGENT_DISPATCH", "") + + c := core.New(core.WithOption("name", "test")) + s := NewPrep() + s.ServiceRuntime = core.NewServiceRuntime(c, AgentOptions{}) + + require.True(t, s.OnStartup(context.Background()).OK) + assert.True(t, c.Action("session.start").Exists()) + assert.True(t, c.Action("session.get").Exists()) + assert.True(t, c.Action("session.list").Exists()) + assert.True(t, c.Action("session.continue").Exists()) + assert.True(t, c.Action("session.end").Exists()) +} + func TestPrep_OnStartup_Good_RegistersPlatformActionAliases(t *testing.T) { t.Setenv("CORE_WORKSPACE", t.TempDir()) t.Setenv("CORE_AGENT_DISPATCH", "") diff --git a/pkg/agentic/session.go b/pkg/agentic/session.go new file mode 100644 index 0000000..93bbd94 --- /dev/null +++ b/pkg/agentic/session.go @@ -0,0 +1,355 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "context" + + core "dappco.re/go/core" + "github.com/modelcontextprotocol/go-sdk/mcp" +) + +// session := agentic.Session{SessionID: "ses_abc123", AgentType: "codex", Status: "active"} +type Session struct { + ID int `json:"id"` + SessionID string `json:"session_id"` + Plan string `json:"plan,omitempty"` + PlanSlug string `json:"plan_slug,omitempty"` + AgentType string `json:"agent_type"` + Status string `json:"status"` + ContextSummary map[string]any `json:"context_summary,omitempty"` + WorkLog []map[string]any `json:"work_log,omitempty"` + Artifacts []map[string]any `json:"artifacts,omitempty"` + Handoff map[string]any `json:"handoff,omitempty"` + Summary string `json:"summary,omitempty"` + CreatedAt string `json:"created_at,omitempty"` + UpdatedAt string `json:"updated_at,omitempty"` + EndedAt string `json:"ended_at,omitempty"` +} + +// input := agentic.SessionStartInput{AgentType: "codex", PlanSlug: "ax-follow-up"} +type SessionStartInput struct { + PlanSlug string `json:"plan_slug,omitempty"` + AgentType string `json:"agent_type"` + Context map[string]any `json:"context,omitempty"` +} + +// input := agentic.SessionGetInput{SessionID: "ses_abc123"} +type SessionGetInput struct { + SessionID string `json:"session_id"` +} + +// input := agentic.SessionListInput{PlanSlug: "ax-follow-up", Status: "active"} +type SessionListInput struct { + PlanSlug string `json:"plan_slug,omitempty"` + Status string `json:"status,omitempty"` + Limit int `json:"limit,omitempty"` +} + +// input := agentic.SessionContinueInput{SessionID: "ses_abc123", AgentType: "codex"} +type SessionContinueInput struct { + SessionID string `json:"session_id"` + AgentType string `json:"agent_type,omitempty"` + WorkLog []map[string]any `json:"work_log,omitempty"` + Context map[string]any `json:"context,omitempty"` +} + +// input := agentic.SessionEndInput{SessionID: "ses_abc123", Status: "completed"} +type SessionEndInput struct { + SessionID string `json:"session_id"` + Status string `json:"status,omitempty"` + Summary string `json:"summary,omitempty"` + Handoff map[string]any `json:"handoff,omitempty"` +} + +// out := agentic.SessionOutput{Success: true, Session: agentic.Session{SessionID: "ses_abc123"}} +type SessionOutput struct { + Success bool `json:"success"` + Session Session `json:"session"` +} + +// out := agentic.SessionListOutput{Success: true, Count: 1, Sessions: []agentic.Session{{SessionID: "ses_abc123"}}} +type SessionListOutput struct { + Success bool `json:"success"` + Count int `json:"count"` + Sessions []Session `json:"sessions"` +} + +// result := c.Action("session.start").Run(ctx, core.NewOptions( +// +// core.Option{Key: "agent_type", Value: "codex"}, +// core.Option{Key: "plan_slug", Value: "ax-follow-up"}, +// +// )) +func (s *PrepSubsystem) handleSessionStart(ctx context.Context, options core.Options) core.Result { + _, output, err := s.sessionStart(ctx, nil, SessionStartInput{ + PlanSlug: optionStringValue(options, "plan_slug", "plan"), + AgentType: optionStringValue(options, "agent_type", "agent"), + Context: optionAnyMapValue(options, "context"), + }) + if err != nil { + return core.Result{Value: err, OK: false} + } + return core.Result{Value: output, OK: true} +} + +// result := c.Action("session.get").Run(ctx, core.NewOptions(core.Option{Key: "session_id", Value: "ses_abc123"})) +func (s *PrepSubsystem) handleSessionGet(ctx context.Context, options core.Options) core.Result { + _, output, err := s.sessionGet(ctx, nil, SessionGetInput{ + SessionID: optionStringValue(options, "session_id", "id", "_arg"), + }) + if err != nil { + return core.Result{Value: err, OK: false} + } + return core.Result{Value: output, OK: true} +} + +// result := c.Action("session.list").Run(ctx, core.NewOptions(core.Option{Key: "status", Value: "active"})) +func (s *PrepSubsystem) handleSessionList(ctx context.Context, options core.Options) core.Result { + _, output, err := s.sessionList(ctx, nil, SessionListInput{ + PlanSlug: optionStringValue(options, "plan_slug", "plan"), + Status: optionStringValue(options, "status"), + Limit: optionIntValue(options, "limit"), + }) + if err != nil { + return core.Result{Value: err, OK: false} + } + return core.Result{Value: output, OK: true} +} + +// result := c.Action("session.continue").Run(ctx, core.NewOptions(core.Option{Key: "session_id", Value: "ses_abc123"})) +func (s *PrepSubsystem) handleSessionContinue(ctx context.Context, options core.Options) core.Result { + _, output, err := s.sessionContinue(ctx, nil, SessionContinueInput{ + SessionID: optionStringValue(options, "session_id", "id", "_arg"), + AgentType: optionStringValue(options, "agent_type", "agent"), + WorkLog: optionAnyMapSliceValue(options, "work_log"), + Context: optionAnyMapValue(options, "context"), + }) + if err != nil { + return core.Result{Value: err, OK: false} + } + return core.Result{Value: output, OK: true} +} + +// result := c.Action("session.end").Run(ctx, core.NewOptions(core.Option{Key: "session_id", Value: "ses_abc123"})) +func (s *PrepSubsystem) handleSessionEnd(ctx context.Context, options core.Options) core.Result { + _, output, err := s.sessionEnd(ctx, nil, SessionEndInput{ + SessionID: optionStringValue(options, "session_id", "id", "_arg"), + Status: optionStringValue(options, "status"), + Summary: optionStringValue(options, "summary"), + Handoff: optionAnyMapValue(options, "handoff"), + }) + if err != nil { + return core.Result{Value: err, OK: false} + } + return core.Result{Value: output, OK: true} +} + +func (s *PrepSubsystem) registerSessionTools(server *mcp.Server) { + mcp.AddTool(server, &mcp.Tool{ + Name: "session_start", + Description: "Start a new agent session for a plan and capture the initial context summary.", + }, s.sessionStart) + + mcp.AddTool(server, &mcp.Tool{ + Name: "session_get", + Description: "Read a session by session ID, including saved context, work log, and artifacts.", + }, s.sessionGet) + + mcp.AddTool(server, &mcp.Tool{ + Name: "session_list", + Description: "List sessions with optional plan and status filters.", + }, s.sessionList) + + mcp.AddTool(server, &mcp.Tool{ + Name: "session_continue", + Description: "Continue an existing session from its latest saved state.", + }, s.sessionContinue) + + mcp.AddTool(server, &mcp.Tool{ + Name: "session_end", + Description: "End a session with status, summary, and optional handoff notes.", + }, s.sessionEnd) +} + +func (s *PrepSubsystem) sessionStart(ctx context.Context, _ *mcp.CallToolRequest, input SessionStartInput) (*mcp.CallToolResult, SessionOutput, error) { + if input.AgentType == "" { + return nil, SessionOutput{}, core.E("sessionStart", "agent_type is required", nil) + } + + body := map[string]any{ + "agent_type": input.AgentType, + } + if input.PlanSlug != "" { + body["plan_slug"] = input.PlanSlug + } + if len(input.Context) > 0 { + body["context"] = input.Context + } + + result := s.platformPayload(ctx, "session.start", "POST", "/v1/sessions", body) + if !result.OK { + return nil, SessionOutput{}, resultErrorValue("session.start", result) + } + + return nil, SessionOutput{ + Success: true, + Session: parseSession(sessionDataMap(result.Value.(map[string]any))), + }, nil +} + +func (s *PrepSubsystem) sessionGet(ctx context.Context, _ *mcp.CallToolRequest, input SessionGetInput) (*mcp.CallToolResult, SessionOutput, error) { + if input.SessionID == "" { + return nil, SessionOutput{}, core.E("sessionGet", "session_id is required", nil) + } + + path := core.Concat("/v1/sessions/", input.SessionID) + result := s.platformPayload(ctx, "session.get", "GET", path, nil) + if !result.OK { + return nil, SessionOutput{}, resultErrorValue("session.get", result) + } + + return nil, SessionOutput{ + Success: true, + Session: parseSession(sessionDataMap(result.Value.(map[string]any))), + }, nil +} + +func (s *PrepSubsystem) sessionList(ctx context.Context, _ *mcp.CallToolRequest, input SessionListInput) (*mcp.CallToolResult, SessionListOutput, error) { + path := "/v1/sessions" + path = appendQueryParam(path, "plan_slug", input.PlanSlug) + path = appendQueryParam(path, "status", input.Status) + if input.Limit > 0 { + path = appendQueryParam(path, "limit", core.Sprint(input.Limit)) + } + + result := s.platformPayload(ctx, "session.list", "GET", path, nil) + if !result.OK { + return nil, SessionListOutput{}, resultErrorValue("session.list", result) + } + + return nil, parseSessionListOutput(result.Value.(map[string]any)), nil +} + +func (s *PrepSubsystem) sessionContinue(ctx context.Context, _ *mcp.CallToolRequest, input SessionContinueInput) (*mcp.CallToolResult, SessionOutput, error) { + if input.SessionID == "" { + return nil, SessionOutput{}, core.E("sessionContinue", "session_id is required", nil) + } + + body := map[string]any{} + if input.AgentType != "" { + body["agent_type"] = input.AgentType + } + if len(input.WorkLog) > 0 { + body["work_log"] = input.WorkLog + } + if len(input.Context) > 0 { + body["context"] = input.Context + } + + path := core.Concat("/v1/sessions/", input.SessionID, "/continue") + result := s.platformPayload(ctx, "session.continue", "POST", path, body) + if !result.OK { + return nil, SessionOutput{}, resultErrorValue("session.continue", result) + } + + return nil, SessionOutput{ + Success: true, + Session: parseSession(sessionDataMap(result.Value.(map[string]any))), + }, nil +} + +func (s *PrepSubsystem) sessionEnd(ctx context.Context, _ *mcp.CallToolRequest, input SessionEndInput) (*mcp.CallToolResult, SessionOutput, error) { + if input.SessionID == "" { + return nil, SessionOutput{}, core.E("sessionEnd", "session_id is required", nil) + } + + body := map[string]any{} + if input.Status != "" { + body["status"] = input.Status + } + if input.Summary != "" { + body["summary"] = input.Summary + } + if len(input.Handoff) > 0 { + body["handoff"] = input.Handoff + } + + path := core.Concat("/v1/sessions/", input.SessionID, "/end") + result := s.platformPayload(ctx, "session.end", "POST", path, body) + if !result.OK { + return nil, SessionOutput{}, resultErrorValue("session.end", result) + } + + return nil, SessionOutput{ + Success: true, + Session: parseSession(sessionDataMap(result.Value.(map[string]any))), + }, nil +} + +func sessionDataMap(payload map[string]any) map[string]any { + data := payloadDataMap(payload) + if len(data) > 0 { + return data + } + return payload +} + +func parseSession(values map[string]any) Session { + planSlug := stringValue(values["plan_slug"]) + if planSlug == "" { + planSlug = stringValue(values["plan"]) + } + + return Session{ + ID: intValue(values["id"]), + SessionID: stringValue(values["session_id"]), + Plan: stringValue(values["plan"]), + PlanSlug: planSlug, + AgentType: stringValue(values["agent_type"]), + Status: stringValue(values["status"]), + ContextSummary: anyMapValue(values["context_summary"]), + WorkLog: anyMapSliceValue(values["work_log"]), + Artifacts: anyMapSliceValue(values["artifacts"]), + Handoff: anyMapValue(values["handoff"]), + Summary: stringValue(values["summary"]), + CreatedAt: stringValue(values["created_at"]), + UpdatedAt: stringValue(values["updated_at"]), + EndedAt: stringValue(values["ended_at"]), + } +} + +func parseSessionListOutput(payload map[string]any) SessionListOutput { + sessionData := payloadDataSlice(payload) + sessions := make([]Session, 0, len(sessionData)) + for _, values := range sessionData { + sessions = append(sessions, parseSession(values)) + } + + count := intValue(payload["count"]) + if count == 0 { + count = intValue(payload["total"]) + } + if count == 0 { + count = len(sessions) + } + + return SessionListOutput{ + Success: true, + Count: count, + Sessions: sessions, + } +} + +func resultErrorValue(action string, result core.Result) error { + if err, ok := result.Value.(error); ok && err != nil { + return err + } + + message := stringValue(result.Value) + if message != "" { + return core.E(action, message, nil) + } + + return core.E(action, "request failed", nil) +} diff --git a/pkg/agentic/session_example_test.go b/pkg/agentic/session_example_test.go new file mode 100644 index 0000000..e1eccf8 --- /dev/null +++ b/pkg/agentic/session_example_test.go @@ -0,0 +1,17 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import "fmt" + +func Example_parseSession() { + session := parseSession(map[string]any{ + "session_id": "ses_abc123", + "plan_slug": "ax-follow-up", + "agent_type": "codex", + "status": "active", + }) + + fmt.Println(session.SessionID, session.PlanSlug, session.AgentType, session.Status) + // Output: ses_abc123 ax-follow-up codex active +} diff --git a/pkg/agentic/session_test.go b/pkg/agentic/session_test.go new file mode 100644 index 0000000..badb579 --- /dev/null +++ b/pkg/agentic/session_test.go @@ -0,0 +1,176 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + + core "dappco.re/go/core" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSession_HandleSessionStart_Good(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, "/v1/sessions", r.URL.Path) + require.Equal(t, http.MethodPost, r.Method) + require.Equal(t, "Bearer secret-token", r.Header.Get("Authorization")) + + bodyResult := core.ReadAll(r.Body) + require.True(t, bodyResult.OK) + + var payload map[string]any + parseResult := core.JSONUnmarshalString(bodyResult.Value.(string), &payload) + require.True(t, parseResult.OK) + require.Equal(t, "codex", payload["agent_type"]) + require.Equal(t, "ax-follow-up", payload["plan_slug"]) + + _, _ = w.Write([]byte(`{"data":{"id":1,"session_id":"ses_abc123","plan_slug":"ax-follow-up","agent_type":"codex","status":"active","context_summary":{"repo":"core/go"}}}`)) + })) + defer server.Close() + + subsystem := testPrepWithPlatformServer(t, server, "secret-token") + result := subsystem.handleSessionStart(context.Background(), core.NewOptions( + core.Option{Key: "agent_type", Value: "codex"}, + core.Option{Key: "plan_slug", Value: "ax-follow-up"}, + core.Option{Key: "context", Value: `{"repo":"core/go"}`}, + )) + require.True(t, result.OK) + + output, ok := result.Value.(SessionOutput) + require.True(t, ok) + assert.Equal(t, "ses_abc123", output.Session.SessionID) + assert.Equal(t, "active", output.Session.Status) + assert.Equal(t, "codex", output.Session.AgentType) +} + +func TestSession_HandleSessionStart_Bad(t *testing.T) { + subsystem := testPrepWithPlatformServer(t, nil, "secret-token") + + result := subsystem.handleSessionStart(context.Background(), core.NewOptions()) + assert.False(t, result.OK) +} + +func TestSession_HandleSessionStart_Ugly(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte(`{"data":`)) + })) + defer server.Close() + + subsystem := testPrepWithPlatformServer(t, server, "secret-token") + result := subsystem.handleSessionStart(context.Background(), core.NewOptions( + core.Option{Key: "agent_type", Value: "codex"}, + )) + assert.False(t, result.OK) +} + +func TestSession_HandleSessionGet_Good(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, "/v1/sessions/ses_abc123", r.URL.Path) + require.Equal(t, http.MethodGet, r.Method) + _, _ = w.Write([]byte(`{"data":{"session_id":"ses_abc123","plan":"ax-follow-up","agent_type":"codex","status":"active"}}`)) + })) + defer server.Close() + + subsystem := testPrepWithPlatformServer(t, server, "secret-token") + result := subsystem.handleSessionGet(context.Background(), core.NewOptions( + core.Option{Key: "session_id", Value: "ses_abc123"}, + )) + require.True(t, result.OK) + + output, ok := result.Value.(SessionOutput) + require.True(t, ok) + assert.Equal(t, "ses_abc123", output.Session.SessionID) + assert.Equal(t, "ax-follow-up", output.Session.Plan) +} + +func TestSession_HandleSessionList_Good(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, "/v1/sessions", r.URL.Path) + require.Equal(t, "ax-follow-up", r.URL.Query().Get("plan_slug")) + require.Equal(t, "active", r.URL.Query().Get("status")) + require.Equal(t, "5", r.URL.Query().Get("limit")) + _, _ = w.Write([]byte(`{"data":[{"session_id":"ses_1","agent_type":"codex","status":"active"},{"session_id":"ses_2","agent_type":"claude","status":"completed"}],"count":2}`)) + })) + defer server.Close() + + subsystem := testPrepWithPlatformServer(t, server, "secret-token") + result := subsystem.handleSessionList(context.Background(), core.NewOptions( + core.Option{Key: "plan_slug", Value: "ax-follow-up"}, + core.Option{Key: "status", Value: "active"}, + core.Option{Key: "limit", Value: 5}, + )) + require.True(t, result.OK) + + output, ok := result.Value.(SessionListOutput) + require.True(t, ok) + assert.Equal(t, 2, output.Count) + require.Len(t, output.Sessions, 2) + assert.Equal(t, "ses_1", output.Sessions[0].SessionID) +} + +func TestSession_HandleSessionContinue_Good(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, "/v1/sessions/ses_abc123/continue", r.URL.Path) + require.Equal(t, http.MethodPost, r.Method) + + bodyResult := core.ReadAll(r.Body) + require.True(t, bodyResult.OK) + + var payload map[string]any + parseResult := core.JSONUnmarshalString(bodyResult.Value.(string), &payload) + require.True(t, parseResult.OK) + require.Equal(t, "codex", payload["agent_type"]) + + _, _ = w.Write([]byte(`{"data":{"session_id":"ses_abc123","agent_type":"codex","status":"active","work_log":[{"type":"checkpoint","message":"continue"}]}}`)) + })) + defer server.Close() + + subsystem := testPrepWithPlatformServer(t, server, "secret-token") + result := subsystem.handleSessionContinue(context.Background(), core.NewOptions( + core.Option{Key: "session_id", Value: "ses_abc123"}, + core.Option{Key: "agent_type", Value: "codex"}, + core.Option{Key: "work_log", Value: `[{"type":"checkpoint","message":"continue"}]`}, + )) + require.True(t, result.OK) + + output, ok := result.Value.(SessionOutput) + require.True(t, ok) + require.Len(t, output.Session.WorkLog, 1) + assert.Equal(t, "active", output.Session.Status) +} + +func TestSession_HandleSessionEnd_Good(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, "/v1/sessions/ses_abc123/end", r.URL.Path) + require.Equal(t, http.MethodPost, r.Method) + + bodyResult := core.ReadAll(r.Body) + require.True(t, bodyResult.OK) + + var payload map[string]any + parseResult := core.JSONUnmarshalString(bodyResult.Value.(string), &payload) + require.True(t, parseResult.OK) + require.Equal(t, "completed", payload["status"]) + require.Equal(t, "All green", payload["summary"]) + + _, _ = w.Write([]byte(`{"data":{"session_id":"ses_abc123","agent_type":"codex","status":"completed","summary":"All green","ended_at":"2026-03-31T12:00:00Z"}}`)) + })) + defer server.Close() + + subsystem := testPrepWithPlatformServer(t, server, "secret-token") + result := subsystem.handleSessionEnd(context.Background(), core.NewOptions( + core.Option{Key: "session_id", Value: "ses_abc123"}, + core.Option{Key: "status", Value: "completed"}, + core.Option{Key: "summary", Value: "All green"}, + )) + require.True(t, result.OK) + + output, ok := result.Value.(SessionOutput) + require.True(t, ok) + assert.Equal(t, "completed", output.Session.Status) + assert.Equal(t, "All green", output.Session.Summary) +}