From 43e2a14b823ae400604af5e0121bf8c14b018e45 Mon Sep 17 00:00:00 2001 From: Virgil Date: Wed, 1 Apr 2026 22:07:42 +0000 Subject: [PATCH] feat(agentic): add completion journal commit stage Co-Authored-By: Virgil --- pkg/agentic/actions_example_test.go | 2 +- pkg/agentic/commands.go | 2 +- pkg/agentic/commands_test.go | 2 +- pkg/agentic/commit.go | 181 ++++++++++++++++++++++++++++ pkg/agentic/commit_test.go | 99 +++++++++++++++ pkg/agentic/handlers.go | 34 +++++- pkg/agentic/prep.go | 9 +- pkg/agentic/prep_test.go | 1 + 8 files changed, 323 insertions(+), 7 deletions(-) create mode 100644 pkg/agentic/commit.go create mode 100644 pkg/agentic/commit_test.go diff --git a/pkg/agentic/actions_example_test.go b/pkg/agentic/actions_example_test.go index 394731b..1596f88 100644 --- a/pkg/agentic/actions_example_test.go +++ b/pkg/agentic/actions_example_test.go @@ -49,5 +49,5 @@ func ExampleRegister_task() { // Completion pipeline registered as a Task t := c.Task("agent.completion") core.Println(t.Description) - // Output: QA → PR → Verify → Ingest → Poke + // Output: QA → PR → Verify → Commit → Ingest → Poke } diff --git a/pkg/agentic/commands.go b/pkg/agentic/commands.go index 1c3d8c9..25ef074 100644 --- a/pkg/agentic/commands.go +++ b/pkg/agentic/commands.go @@ -32,7 +32,7 @@ func (s *PrepSubsystem) registerCommands(ctx context.Context) { c.Command("agentic:resume", core.Command{Description: "Resume a blocked or completed workspace", Action: s.cmdResume}) c.Command("generate", core.Command{Description: "Generate content from a prompt using the platform content pipeline", Action: s.cmdGenerate}) c.Command("agentic:generate", core.Command{Description: "Generate content from a prompt using the platform content pipeline", Action: s.cmdGenerate}) - c.Command("complete", core.Command{Description: "Run the completion pipeline (QA → PR → Verify → Ingest → Poke)", Action: s.cmdComplete}) + c.Command("complete", core.Command{Description: "Run the completion pipeline (QA → PR → Verify → Commit → Ingest → Poke)", Action: s.cmdComplete}) c.Command("scan", core.Command{Description: "Scan Forge repos for actionable issues", Action: s.cmdScan}) c.Command("agentic:scan", core.Command{Description: "Scan Forge repos for actionable issues", Action: s.cmdScan}) c.Command("mirror", core.Command{Description: "Mirror Forge repos to GitHub", Action: s.cmdMirror}) diff --git a/pkg/agentic/commands_test.go b/pkg/agentic/commands_test.go index ee99aba..358a437 100644 --- a/pkg/agentic/commands_test.go +++ b/pkg/agentic/commands_test.go @@ -1037,7 +1037,7 @@ func TestCommands_CmdComplete_Good(t *testing.T) { return core.Result{OK: true} }) c.Task("agent.completion", core.Task{ - Description: "QA → PR → Verify → Ingest → Poke", + Description: "QA → PR → Verify → Commit → Ingest → Poke", Steps: []core.Step{ {Action: "noop"}, }, diff --git a/pkg/agentic/commit.go b/pkg/agentic/commit.go new file mode 100644 index 0000000..24b0aac --- /dev/null +++ b/pkg/agentic/commit.go @@ -0,0 +1,181 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "context" + "time" + + core "dappco.re/go/core" + "github.com/modelcontextprotocol/go-sdk/mcp" +) + +// input := agentic.CommitInput{Workspace: "core/go-io/task-42"} +type CommitInput struct { + Workspace string `json:"workspace"` +} + +// out := agentic.CommitOutput{Success: true, Workspace: "core/go-io/task-42", JournalPath: "/srv/.core/workspace/core/go-io/task-42/.meta/journal.jsonl"} +type CommitOutput struct { + Success bool `json:"success"` + Workspace string `json:"workspace"` + JournalPath string `json:"journal_path,omitempty"` + MarkerPath string `json:"marker_path,omitempty"` + CommittedAt string `json:"committed_at,omitempty"` + Skipped bool `json:"skipped,omitempty"` +} + +// result := c.Action("agentic.commit").Run(ctx, core.NewOptions(core.Option{Key: "workspace", Value: "core/go-io/task-42"})) +func (s *PrepSubsystem) handleCommit(_ context.Context, options core.Options) core.Result { + input := CommitInput{ + Workspace: optionStringValue(options, "workspace"), + } + output, err := s.commitWorkspace(nil, input) + if err != nil { + return core.Result{Value: err, OK: false} + } + return core.Result{Value: output, OK: true} +} + +func (s *PrepSubsystem) registerCommitTool(server *mcp.Server) { + mcp.AddTool(server, &mcp.Tool{ + Name: "agentic_commit", + Description: "Write the final workspace dispatch record to the local journal after verify completes.", + }, s.commitTool) +} + +func (s *PrepSubsystem) commitTool(ctx context.Context, _ *mcp.CallToolRequest, input CommitInput) (*mcp.CallToolResult, CommitOutput, error) { + output, err := s.commitWorkspace(ctx, input) + if err != nil { + return nil, CommitOutput{}, err + } + return nil, output, nil +} + +func (s *PrepSubsystem) commitWorkspace(ctx context.Context, input CommitInput) (CommitOutput, error) { + workspaceDir := resolveWorkspace(input.Workspace) + if workspaceDir == "" { + return CommitOutput{}, core.E("commitWorkspace", core.Concat("workspace not found: ", input.Workspace), nil) + } + + result := ReadStatusResult(workspaceDir) + workspaceStatus, ok := workspaceStatusValue(result) + if !ok { + err, _ := result.Value.(error) + if err == nil { + err = core.E("commitWorkspace", "status not found", nil) + } + return CommitOutput{}, err + } + + metaDir := WorkspaceMetaDir(workspaceDir) + if r := fs.EnsureDir(metaDir); !r.OK { + err, _ := r.Value.(error) + if err == nil { + err = core.E("commitWorkspace", "failed to create metadata directory", nil) + } + return CommitOutput{}, err + } + + journalPath := core.JoinPath(metaDir, "journal.jsonl") + markerPath := core.JoinPath(metaDir, "commit.json") + + committedAt := time.Now().UTC().Format(time.RFC3339) + if existingCommit, ok := readCommitMarker(markerPath); ok && existingCommit.UpdatedAt == workspaceStatus.UpdatedAt && existingCommit.Runs == workspaceStatus.Runs { + return CommitOutput{ + Success: true, + Workspace: input.Workspace, + JournalPath: journalPath, + MarkerPath: markerPath, + CommittedAt: existingCommit.CommittedAt, + Skipped: true, + }, nil + } + + record := commitWorkspaceRecord(workspaceDir, workspaceStatus, committedAt) + line := core.Concat(core.JSONMarshalString(record), "\n") + + appendHandle := fs.Append(journalPath) + if !appendHandle.OK { + err, _ := appendHandle.Value.(error) + if err == nil { + err = core.E("commitWorkspace", "failed to open journal", nil) + } + return CommitOutput{}, err + } + core.WriteAll(appendHandle.Value, line) + + marker := commitMarker{ + Workspace: WorkspaceName(workspaceDir), + UpdatedAt: workspaceStatus.UpdatedAt, + Runs: workspaceStatus.Runs, + CommittedAt: committedAt, + } + if r := fs.WriteAtomic(markerPath, core.JSONMarshalString(marker)); !r.OK { + err, _ := r.Value.(error) + if err == nil { + err = core.E("commitWorkspace", "failed to write commit marker", nil) + } + return CommitOutput{}, err + } + + return CommitOutput{ + Success: true, + Workspace: input.Workspace, + JournalPath: journalPath, + MarkerPath: markerPath, + CommittedAt: committedAt, + }, nil +} + +type commitMarker struct { + Workspace string `json:"workspace"` + UpdatedAt time.Time `json:"updated_at"` + Runs int `json:"runs"` + CommittedAt string `json:"committed_at"` +} + +func readCommitMarker(markerPath string) (commitMarker, bool) { + r := fs.Read(markerPath) + if !r.OK { + return commitMarker{}, false + } + + var marker commitMarker + if parseResult := core.JSONUnmarshalString(r.Value.(string), &marker); !parseResult.OK { + return commitMarker{}, false + } + return marker, true +} + +func commitWorkspaceRecord(workspaceDir string, workspaceStatus *WorkspaceStatus, committedAt string) map[string]any { + record := map[string]any{ + "workspace": WorkspaceName(workspaceDir), + "repo": workspaceStatus.Repo, + "org": workspaceStatus.Org, + "task": workspaceStatus.Task, + "agent": workspaceStatus.Agent, + "branch": workspaceStatus.Branch, + "status": workspaceStatus.Status, + "question": workspaceStatus.Question, + "issue": workspaceStatus.Issue, + "runs": workspaceStatus.Runs, + "process_id": workspaceStatus.ProcessID, + "pr_url": workspaceStatus.PRURL, + "started_at": workspaceStatus.StartedAt, + "updated_at": workspaceStatus.UpdatedAt, + "committed_at": committedAt, + } + + if report := readSyncWorkspaceReport(workspaceDir); len(report) > 0 { + record["report"] = report + if findings := anyMapSliceValue(report["findings"]); len(findings) > 0 { + record["findings"] = findings + } + if changes := anyMapValue(report["changes"]); len(changes) > 0 { + record["changes"] = changes + } + } + + return record +} diff --git a/pkg/agentic/commit_test.go b/pkg/agentic/commit_test.go new file mode 100644 index 0000000..59598d1 --- /dev/null +++ b/pkg/agentic/commit_test.go @@ -0,0 +1,99 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "context" + "testing" + + core "dappco.re/go/core" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCommit_HandleCommit_Good_WritesJournal(t *testing.T) { + root := t.TempDir() + t.Setenv("CORE_WORKSPACE", root) + + workspaceName := "core/go-io/task-42" + workspaceDir := core.JoinPath(WorkspaceRoot(), workspaceName) + metaDir := WorkspaceMetaDir(workspaceDir) + require.True(t, fs.EnsureDir(metaDir).OK) + require.True(t, writeStatus(workspaceDir, &WorkspaceStatus{ + Status: "merged", + Agent: "codex", + Repo: "go-io", + Org: "core", + Task: "Fix tests", + Branch: "agent/fix-tests", + Runs: 3, + }) == nil) + require.True(t, fs.Write(core.JoinPath(metaDir, "report.json"), `{"findings":[{"file":"main.go"}],"changes":{"files_changed":1}}`).OK) + + s := &PrepSubsystem{} + result := s.handleCommit(context.Background(), core.NewOptions( + core.Option{Key: "workspace", Value: workspaceName}, + )) + + require.True(t, result.OK) + output, ok := result.Value.(CommitOutput) + require.True(t, ok) + assert.Equal(t, workspaceName, output.Workspace) + assert.False(t, output.Skipped) + assert.NotEmpty(t, output.CommittedAt) + + journal := fs.Read(output.JournalPath) + require.True(t, journal.OK) + assert.Contains(t, journal.Value.(string), `"repo":"go-io"`) + assert.Contains(t, journal.Value.(string), `"committed_at"`) + + marker := fs.Read(output.MarkerPath) + require.True(t, marker.OK) + assert.Contains(t, marker.Value.(string), `"workspace":"core/go-io/task-42"`) +} + +func TestCommit_HandleCommit_Bad_MissingWorkspace(t *testing.T) { + s := &PrepSubsystem{} + result := s.handleCommit(context.Background(), core.NewOptions()) + + assert.False(t, result.OK) + assert.Error(t, result.Value.(error)) +} + +func TestCommit_HandleCommit_Ugly_Idempotent(t *testing.T) { + root := t.TempDir() + t.Setenv("CORE_WORKSPACE", root) + + workspaceName := "core/go-io/task-43" + workspaceDir := core.JoinPath(WorkspaceRoot(), workspaceName) + require.True(t, fs.EnsureDir(WorkspaceMetaDir(workspaceDir)).OK) + require.True(t, writeStatus(workspaceDir, &WorkspaceStatus{ + Status: "completed", + Agent: "codex", + Repo: "go-io", + Org: "core", + Task: "Fix tests", + Branch: "agent/fix-tests", + Runs: 1, + }) == nil) + + s := &PrepSubsystem{} + first := s.handleCommit(context.Background(), core.NewOptions( + core.Option{Key: "workspace", Value: workspaceName}, + )) + require.True(t, first.OK) + + second := s.handleCommit(context.Background(), core.NewOptions( + core.Option{Key: "workspace", Value: workspaceName}, + )) + require.True(t, second.OK) + + output, ok := second.Value.(CommitOutput) + require.True(t, ok) + assert.True(t, output.Skipped) + + journal := fs.Read(output.JournalPath) + require.True(t, journal.OK) + lines := len(core.Split(core.Trim(journal.Value.(string)), "\n")) + assert.Equal(t, 1, lines) +} diff --git a/pkg/agentic/handlers.go b/pkg/agentic/handlers.go index 00b0cc2..f0e61c6 100644 --- a/pkg/agentic/handlers.go +++ b/pkg/agentic/handlers.go @@ -3,6 +3,8 @@ package agentic import ( + "context" + "dappco.re/go/agent/pkg/messages" core "dappco.re/go/core" ) @@ -24,6 +26,9 @@ func RegisterHandlers(c *core.Core, s *PrepSubsystem) { func(coreApp *core.Core, msg core.Message) core.Result { return handleCompletionVerify(coreApp, msg) }, + func(coreApp *core.Core, msg core.Message) core.Result { + return handleCompletionCommit(coreApp, msg) + }, func(coreApp *core.Core, msg core.Message) core.Result { return handleCompletionIngest(coreApp, msg) }, @@ -112,6 +117,27 @@ func handleCompletionVerify(c *core.Core, msg core.Message) core.Result { return core.Result{OK: true} } +func handleCompletionCommit(c *core.Core, msg core.Message) core.Result { + switch ev := msg.(type) { + case messages.PRMerged: + workspaceDir := findWorkspaceByPR(ev.Repo, "") + if workspaceDir != "" { + if c.Action("agentic.commit").Exists() { + c.Action("agentic.commit").Run(context.Background(), workspaceActionOptions(workspaceDir)) + } + } + case messages.PRNeedsReview: + workspaceDir := findWorkspaceByPR(ev.Repo, "") + if workspaceDir != "" { + if c.Action("agentic.commit").Exists() { + c.Action("agentic.commit").Run(context.Background(), workspaceActionOptions(workspaceDir)) + } + } + } + + return core.Result{OK: true} +} + func handleCompletionIngest(c *core.Core, msg core.Message) core.Result { ev, ok := msg.(messages.AgentCompleted) if !ok || c == nil || !c.Config().Enabled("auto-ingest") { @@ -181,7 +207,13 @@ func findWorkspaceByPR(repo, branch string) string { if !ok { continue } - if workspaceStatus.Repo == repo && workspaceStatus.Branch == branch { + if workspaceStatus.Repo != repo { + continue + } + if branch != "" && workspaceStatus.Branch != branch { + continue + } + if branch == "" || workspaceStatus.Branch == branch { return workspaceDir } } diff --git a/pkg/agentic/prep.go b/pkg/agentic/prep.go index 11ee344..3bc2a54 100644 --- a/pkg/agentic/prep.go +++ b/pkg/agentic/prep.go @@ -177,6 +177,7 @@ func (s *PrepSubsystem) OnStartup(ctx context.Context) core.Result { c.Action("agentic.qa", s.handleQA).Description = "Run build + test QA checks on workspace" c.Action("agentic.auto-pr", s.handleAutoPR).Description = "Create PR from completed workspace" c.Action("agentic.verify", s.handleVerify).Description = "Verify PR and auto-merge if clean" + c.Action("agentic.commit", s.handleCommit).Description = "Write the final dispatch record to the workspace journal" c.Action("agentic.ingest", s.handleIngest).Description = "Create issues from agent findings" c.Action("agentic.poke", s.handlePoke).Description = "Drain next queued task from the queue" c.Action("agentic.mirror", s.handleMirror).Description = "Mirror agent branches to GitHub" @@ -269,17 +270,18 @@ func (s *PrepSubsystem) OnStartup(ctx context.Context) core.Result { c.Action("agentic.persona", s.handlePersona).Description = "Read a persona by path" c.Task("agent.completion", core.Task{ - Description: "QA → PR → Verify → Ingest → Poke", + Description: "QA → PR → Verify → Commit → Ingest → Poke", Steps: []core.Step{ {Action: "agentic.qa"}, {Action: "agentic.auto-pr"}, {Action: "agentic.verify"}, + {Action: "agentic.commit", Async: true}, {Action: "agentic.ingest", Async: true}, {Action: "agentic.poke", Async: true}, }, }) - c.Action("agentic.complete", s.handleComplete).Description = "Run completion pipeline (QA → PR → Verify → Ingest → Poke) in background" + c.Action("agentic.complete", s.handleComplete).Description = "Run completion pipeline (QA → PR → Verify → Commit → Ingest → Poke) in background" s.hydrateWorkspaces() if planRetentionDays(core.NewOptions()) > 0 { @@ -370,8 +372,9 @@ func (s *PrepSubsystem) RegisterTools(server *mcp.Server) { s.registerResumeTool(server) mcp.AddTool(server, &mcp.Tool{ Name: "agentic_complete", - Description: "Run the completion pipeline (QA → PR → Verify → Ingest → Poke) in the background.", + Description: "Run the completion pipeline (QA → PR → Verify → Commit → Ingest → Poke) in the background.", }, s.completeTool) + s.registerCommitTool(server) s.registerCreatePRTool(server) s.registerListPRsTool(server) s.registerClosePRTool(server) diff --git a/pkg/agentic/prep_test.go b/pkg/agentic/prep_test.go index 31ca135..9c3be76 100644 --- a/pkg/agentic/prep_test.go +++ b/pkg/agentic/prep_test.go @@ -550,6 +550,7 @@ func TestPrep_OnStartup_Good_RegistersForgeActions(t *testing.T) { assert.True(t, c.Action("agentic.pr.list").Exists()) assert.True(t, c.Action("agentic.pr.merge").Exists()) assert.True(t, c.Action("agentic.pr.close").Exists()) + assert.True(t, c.Action("agentic.commit").Exists()) } func TestPrep_OnStartup_Good_RegistersContentActions(t *testing.T) {