feat(agentic): add completion journal commit stage

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-04-01 22:07:42 +00:00
parent fdbf7f7e4d
commit 43e2a14b82
8 changed files with 323 additions and 7 deletions

View file

@ -49,5 +49,5 @@ func ExampleRegister_task() {
// Completion pipeline registered as a Task
t := c.Task("agent.completion")
core.Println(t.Description)
// Output: QA → PR → Verify → Ingest → Poke
// Output: QA → PR → Verify → Commit → Ingest → Poke
}

View file

@ -32,7 +32,7 @@ func (s *PrepSubsystem) registerCommands(ctx context.Context) {
c.Command("agentic:resume", core.Command{Description: "Resume a blocked or completed workspace", Action: s.cmdResume})
c.Command("generate", core.Command{Description: "Generate content from a prompt using the platform content pipeline", Action: s.cmdGenerate})
c.Command("agentic:generate", core.Command{Description: "Generate content from a prompt using the platform content pipeline", Action: s.cmdGenerate})
c.Command("complete", core.Command{Description: "Run the completion pipeline (QA → PR → Verify → Ingest → Poke)", Action: s.cmdComplete})
c.Command("complete", core.Command{Description: "Run the completion pipeline (QA → PR → Verify → Commit → Ingest → Poke)", Action: s.cmdComplete})
c.Command("scan", core.Command{Description: "Scan Forge repos for actionable issues", Action: s.cmdScan})
c.Command("agentic:scan", core.Command{Description: "Scan Forge repos for actionable issues", Action: s.cmdScan})
c.Command("mirror", core.Command{Description: "Mirror Forge repos to GitHub", Action: s.cmdMirror})

View file

@ -1037,7 +1037,7 @@ func TestCommands_CmdComplete_Good(t *testing.T) {
return core.Result{OK: true}
})
c.Task("agent.completion", core.Task{
Description: "QA → PR → Verify → Ingest → Poke",
Description: "QA → PR → Verify → Commit → Ingest → Poke",
Steps: []core.Step{
{Action: "noop"},
},

181
pkg/agentic/commit.go Normal file
View file

@ -0,0 +1,181 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"context"
"time"
core "dappco.re/go/core"
"github.com/modelcontextprotocol/go-sdk/mcp"
)
// input := agentic.CommitInput{Workspace: "core/go-io/task-42"}
type CommitInput struct {
Workspace string `json:"workspace"`
}
// out := agentic.CommitOutput{Success: true, Workspace: "core/go-io/task-42", JournalPath: "/srv/.core/workspace/core/go-io/task-42/.meta/journal.jsonl"}
type CommitOutput struct {
Success bool `json:"success"`
Workspace string `json:"workspace"`
JournalPath string `json:"journal_path,omitempty"`
MarkerPath string `json:"marker_path,omitempty"`
CommittedAt string `json:"committed_at,omitempty"`
Skipped bool `json:"skipped,omitempty"`
}
// result := c.Action("agentic.commit").Run(ctx, core.NewOptions(core.Option{Key: "workspace", Value: "core/go-io/task-42"}))
func (s *PrepSubsystem) handleCommit(_ context.Context, options core.Options) core.Result {
input := CommitInput{
Workspace: optionStringValue(options, "workspace"),
}
output, err := s.commitWorkspace(nil, input)
if err != nil {
return core.Result{Value: err, OK: false}
}
return core.Result{Value: output, OK: true}
}
func (s *PrepSubsystem) registerCommitTool(server *mcp.Server) {
mcp.AddTool(server, &mcp.Tool{
Name: "agentic_commit",
Description: "Write the final workspace dispatch record to the local journal after verify completes.",
}, s.commitTool)
}
func (s *PrepSubsystem) commitTool(ctx context.Context, _ *mcp.CallToolRequest, input CommitInput) (*mcp.CallToolResult, CommitOutput, error) {
output, err := s.commitWorkspace(ctx, input)
if err != nil {
return nil, CommitOutput{}, err
}
return nil, output, nil
}
func (s *PrepSubsystem) commitWorkspace(ctx context.Context, input CommitInput) (CommitOutput, error) {
workspaceDir := resolveWorkspace(input.Workspace)
if workspaceDir == "" {
return CommitOutput{}, core.E("commitWorkspace", core.Concat("workspace not found: ", input.Workspace), nil)
}
result := ReadStatusResult(workspaceDir)
workspaceStatus, ok := workspaceStatusValue(result)
if !ok {
err, _ := result.Value.(error)
if err == nil {
err = core.E("commitWorkspace", "status not found", nil)
}
return CommitOutput{}, err
}
metaDir := WorkspaceMetaDir(workspaceDir)
if r := fs.EnsureDir(metaDir); !r.OK {
err, _ := r.Value.(error)
if err == nil {
err = core.E("commitWorkspace", "failed to create metadata directory", nil)
}
return CommitOutput{}, err
}
journalPath := core.JoinPath(metaDir, "journal.jsonl")
markerPath := core.JoinPath(metaDir, "commit.json")
committedAt := time.Now().UTC().Format(time.RFC3339)
if existingCommit, ok := readCommitMarker(markerPath); ok && existingCommit.UpdatedAt == workspaceStatus.UpdatedAt && existingCommit.Runs == workspaceStatus.Runs {
return CommitOutput{
Success: true,
Workspace: input.Workspace,
JournalPath: journalPath,
MarkerPath: markerPath,
CommittedAt: existingCommit.CommittedAt,
Skipped: true,
}, nil
}
record := commitWorkspaceRecord(workspaceDir, workspaceStatus, committedAt)
line := core.Concat(core.JSONMarshalString(record), "\n")
appendHandle := fs.Append(journalPath)
if !appendHandle.OK {
err, _ := appendHandle.Value.(error)
if err == nil {
err = core.E("commitWorkspace", "failed to open journal", nil)
}
return CommitOutput{}, err
}
core.WriteAll(appendHandle.Value, line)
marker := commitMarker{
Workspace: WorkspaceName(workspaceDir),
UpdatedAt: workspaceStatus.UpdatedAt,
Runs: workspaceStatus.Runs,
CommittedAt: committedAt,
}
if r := fs.WriteAtomic(markerPath, core.JSONMarshalString(marker)); !r.OK {
err, _ := r.Value.(error)
if err == nil {
err = core.E("commitWorkspace", "failed to write commit marker", nil)
}
return CommitOutput{}, err
}
return CommitOutput{
Success: true,
Workspace: input.Workspace,
JournalPath: journalPath,
MarkerPath: markerPath,
CommittedAt: committedAt,
}, nil
}
type commitMarker struct {
Workspace string `json:"workspace"`
UpdatedAt time.Time `json:"updated_at"`
Runs int `json:"runs"`
CommittedAt string `json:"committed_at"`
}
func readCommitMarker(markerPath string) (commitMarker, bool) {
r := fs.Read(markerPath)
if !r.OK {
return commitMarker{}, false
}
var marker commitMarker
if parseResult := core.JSONUnmarshalString(r.Value.(string), &marker); !parseResult.OK {
return commitMarker{}, false
}
return marker, true
}
func commitWorkspaceRecord(workspaceDir string, workspaceStatus *WorkspaceStatus, committedAt string) map[string]any {
record := map[string]any{
"workspace": WorkspaceName(workspaceDir),
"repo": workspaceStatus.Repo,
"org": workspaceStatus.Org,
"task": workspaceStatus.Task,
"agent": workspaceStatus.Agent,
"branch": workspaceStatus.Branch,
"status": workspaceStatus.Status,
"question": workspaceStatus.Question,
"issue": workspaceStatus.Issue,
"runs": workspaceStatus.Runs,
"process_id": workspaceStatus.ProcessID,
"pr_url": workspaceStatus.PRURL,
"started_at": workspaceStatus.StartedAt,
"updated_at": workspaceStatus.UpdatedAt,
"committed_at": committedAt,
}
if report := readSyncWorkspaceReport(workspaceDir); len(report) > 0 {
record["report"] = report
if findings := anyMapSliceValue(report["findings"]); len(findings) > 0 {
record["findings"] = findings
}
if changes := anyMapValue(report["changes"]); len(changes) > 0 {
record["changes"] = changes
}
}
return record
}

View file

@ -0,0 +1,99 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"context"
"testing"
core "dappco.re/go/core"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestCommit_HandleCommit_Good_WritesJournal(t *testing.T) {
root := t.TempDir()
t.Setenv("CORE_WORKSPACE", root)
workspaceName := "core/go-io/task-42"
workspaceDir := core.JoinPath(WorkspaceRoot(), workspaceName)
metaDir := WorkspaceMetaDir(workspaceDir)
require.True(t, fs.EnsureDir(metaDir).OK)
require.True(t, writeStatus(workspaceDir, &WorkspaceStatus{
Status: "merged",
Agent: "codex",
Repo: "go-io",
Org: "core",
Task: "Fix tests",
Branch: "agent/fix-tests",
Runs: 3,
}) == nil)
require.True(t, fs.Write(core.JoinPath(metaDir, "report.json"), `{"findings":[{"file":"main.go"}],"changes":{"files_changed":1}}`).OK)
s := &PrepSubsystem{}
result := s.handleCommit(context.Background(), core.NewOptions(
core.Option{Key: "workspace", Value: workspaceName},
))
require.True(t, result.OK)
output, ok := result.Value.(CommitOutput)
require.True(t, ok)
assert.Equal(t, workspaceName, output.Workspace)
assert.False(t, output.Skipped)
assert.NotEmpty(t, output.CommittedAt)
journal := fs.Read(output.JournalPath)
require.True(t, journal.OK)
assert.Contains(t, journal.Value.(string), `"repo":"go-io"`)
assert.Contains(t, journal.Value.(string), `"committed_at"`)
marker := fs.Read(output.MarkerPath)
require.True(t, marker.OK)
assert.Contains(t, marker.Value.(string), `"workspace":"core/go-io/task-42"`)
}
func TestCommit_HandleCommit_Bad_MissingWorkspace(t *testing.T) {
s := &PrepSubsystem{}
result := s.handleCommit(context.Background(), core.NewOptions())
assert.False(t, result.OK)
assert.Error(t, result.Value.(error))
}
func TestCommit_HandleCommit_Ugly_Idempotent(t *testing.T) {
root := t.TempDir()
t.Setenv("CORE_WORKSPACE", root)
workspaceName := "core/go-io/task-43"
workspaceDir := core.JoinPath(WorkspaceRoot(), workspaceName)
require.True(t, fs.EnsureDir(WorkspaceMetaDir(workspaceDir)).OK)
require.True(t, writeStatus(workspaceDir, &WorkspaceStatus{
Status: "completed",
Agent: "codex",
Repo: "go-io",
Org: "core",
Task: "Fix tests",
Branch: "agent/fix-tests",
Runs: 1,
}) == nil)
s := &PrepSubsystem{}
first := s.handleCommit(context.Background(), core.NewOptions(
core.Option{Key: "workspace", Value: workspaceName},
))
require.True(t, first.OK)
second := s.handleCommit(context.Background(), core.NewOptions(
core.Option{Key: "workspace", Value: workspaceName},
))
require.True(t, second.OK)
output, ok := second.Value.(CommitOutput)
require.True(t, ok)
assert.True(t, output.Skipped)
journal := fs.Read(output.JournalPath)
require.True(t, journal.OK)
lines := len(core.Split(core.Trim(journal.Value.(string)), "\n"))
assert.Equal(t, 1, lines)
}

View file

@ -3,6 +3,8 @@
package agentic
import (
"context"
"dappco.re/go/agent/pkg/messages"
core "dappco.re/go/core"
)
@ -24,6 +26,9 @@ func RegisterHandlers(c *core.Core, s *PrepSubsystem) {
func(coreApp *core.Core, msg core.Message) core.Result {
return handleCompletionVerify(coreApp, msg)
},
func(coreApp *core.Core, msg core.Message) core.Result {
return handleCompletionCommit(coreApp, msg)
},
func(coreApp *core.Core, msg core.Message) core.Result {
return handleCompletionIngest(coreApp, msg)
},
@ -112,6 +117,27 @@ func handleCompletionVerify(c *core.Core, msg core.Message) core.Result {
return core.Result{OK: true}
}
func handleCompletionCommit(c *core.Core, msg core.Message) core.Result {
switch ev := msg.(type) {
case messages.PRMerged:
workspaceDir := findWorkspaceByPR(ev.Repo, "")
if workspaceDir != "" {
if c.Action("agentic.commit").Exists() {
c.Action("agentic.commit").Run(context.Background(), workspaceActionOptions(workspaceDir))
}
}
case messages.PRNeedsReview:
workspaceDir := findWorkspaceByPR(ev.Repo, "")
if workspaceDir != "" {
if c.Action("agentic.commit").Exists() {
c.Action("agentic.commit").Run(context.Background(), workspaceActionOptions(workspaceDir))
}
}
}
return core.Result{OK: true}
}
func handleCompletionIngest(c *core.Core, msg core.Message) core.Result {
ev, ok := msg.(messages.AgentCompleted)
if !ok || c == nil || !c.Config().Enabled("auto-ingest") {
@ -181,7 +207,13 @@ func findWorkspaceByPR(repo, branch string) string {
if !ok {
continue
}
if workspaceStatus.Repo == repo && workspaceStatus.Branch == branch {
if workspaceStatus.Repo != repo {
continue
}
if branch != "" && workspaceStatus.Branch != branch {
continue
}
if branch == "" || workspaceStatus.Branch == branch {
return workspaceDir
}
}

View file

@ -177,6 +177,7 @@ func (s *PrepSubsystem) OnStartup(ctx context.Context) core.Result {
c.Action("agentic.qa", s.handleQA).Description = "Run build + test QA checks on workspace"
c.Action("agentic.auto-pr", s.handleAutoPR).Description = "Create PR from completed workspace"
c.Action("agentic.verify", s.handleVerify).Description = "Verify PR and auto-merge if clean"
c.Action("agentic.commit", s.handleCommit).Description = "Write the final dispatch record to the workspace journal"
c.Action("agentic.ingest", s.handleIngest).Description = "Create issues from agent findings"
c.Action("agentic.poke", s.handlePoke).Description = "Drain next queued task from the queue"
c.Action("agentic.mirror", s.handleMirror).Description = "Mirror agent branches to GitHub"
@ -269,17 +270,18 @@ func (s *PrepSubsystem) OnStartup(ctx context.Context) core.Result {
c.Action("agentic.persona", s.handlePersona).Description = "Read a persona by path"
c.Task("agent.completion", core.Task{
Description: "QA → PR → Verify → Ingest → Poke",
Description: "QA → PR → Verify → Commit → Ingest → Poke",
Steps: []core.Step{
{Action: "agentic.qa"},
{Action: "agentic.auto-pr"},
{Action: "agentic.verify"},
{Action: "agentic.commit", Async: true},
{Action: "agentic.ingest", Async: true},
{Action: "agentic.poke", Async: true},
},
})
c.Action("agentic.complete", s.handleComplete).Description = "Run completion pipeline (QA → PR → Verify → Ingest → Poke) in background"
c.Action("agentic.complete", s.handleComplete).Description = "Run completion pipeline (QA → PR → Verify → Commit → Ingest → Poke) in background"
s.hydrateWorkspaces()
if planRetentionDays(core.NewOptions()) > 0 {
@ -370,8 +372,9 @@ func (s *PrepSubsystem) RegisterTools(server *mcp.Server) {
s.registerResumeTool(server)
mcp.AddTool(server, &mcp.Tool{
Name: "agentic_complete",
Description: "Run the completion pipeline (QA → PR → Verify → Ingest → Poke) in the background.",
Description: "Run the completion pipeline (QA → PR → Verify → Commit → Ingest → Poke) in the background.",
}, s.completeTool)
s.registerCommitTool(server)
s.registerCreatePRTool(server)
s.registerListPRsTool(server)
s.registerClosePRTool(server)

View file

@ -550,6 +550,7 @@ func TestPrep_OnStartup_Good_RegistersForgeActions(t *testing.T) {
assert.True(t, c.Action("agentic.pr.list").Exists())
assert.True(t, c.Action("agentic.pr.merge").Exists())
assert.True(t, c.Action("agentic.pr.close").Exists())
assert.True(t, c.Action("agentic.commit").Exists())
}
func TestPrep_OnStartup_Good_RegistersContentActions(t *testing.T) {