feat(agent/sync): WorkspacePushed IPC → go-scm local repo sync (#546)

Per RFC §7 Post-Completion Repo Sync: workspace push → IPC event →
local clone fetch+reset so the supervisor's working tree always
matches Forge.

Lands:
* pkg/agentic/repo_sync.go — registers sync.fetch + sync.reset core
  actions; subscribes to WorkspacePushed IPC; exposes core-agent
  repo/sync --repo <name> [--reset] manual command
* commands.go — wires the subscriber + actions at startup
* pkg/agentic/repo_sync_test.go — AX-10: WorkspacePushed handler,
  branch-switch/reset behaviour, command path
* tests/cli/sync/{Taskfile.yaml,repo/Taskfile.yaml} — end-to-end
  smoke proving local clone matches Forge HEAD after sync

The existing 5min fallback fetch loop in fetch_loop.go is reused
unchanged — this lane fills the event-driven half of the contract.

Sandbox blocked from go test / go build by pre-existing go.work
dappco.re/go/api replacement conflict; supervisor's clean workspace
catches.

Co-authored-by: Codex <noreply@openai.com>
Closes tasks.lthn.sh/view.php?id=546
This commit is contained in:
Snider 2026-04-25 23:33:36 +01:00
parent 5c942a8928
commit 92b433ad76
5 changed files with 620 additions and 0 deletions

View file

@ -18,6 +18,7 @@ import (
func (s *PrepSubsystem) registerCommands(ctx context.Context) {
s.startupContext = ctx
c := s.Core()
s.registerRepoSyncSupport()
c.Command("run/task", core.Command{Description: "Run a single task end-to-end", Action: s.cmdRunTask})
c.Command("agentic:run/task", core.Command{Description: "Run a single task end-to-end", Action: s.cmdRunTask})
c.Command("run/flow", core.Command{Description: "Show a flow definition from disk or the embedded library", Action: s.cmdRunFlow})

361
pkg/agentic/repo_sync.go Normal file
View file

@ -0,0 +1,361 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"context"
"dappco.re/go/agent/pkg/messages"
core "dappco.re/go/core"
)
// Repo sync closes the RFC §7 loop from a pushed workspace back to the local
// mirror. The 5-minute fallback fetch scheduler already lives in fetch_loop.go;
// this file wires the event-driven refresh and the manual command surface.
type RepoSyncOutput struct {
Success bool `json:"success"`
Org string `json:"org,omitempty"`
Repo string `json:"repo,omitempty"`
Branch string `json:"branch,omitempty"`
RepoDir string `json:"repo_dir,omitempty"`
Reset bool `json:"reset,omitempty"`
}
type RepoSyncCommandOutput struct {
Success bool `json:"success"`
Count int `json:"count"`
Synced []RepoSyncOutput `json:"synced,omitempty"`
}
// s.registerRepoSyncSupport()
func (s *PrepSubsystem) registerRepoSyncSupport() {
if s == nil || s.ServiceRuntime == nil {
return
}
c := s.Core()
if c == nil {
return
}
if c.Config().Bool("agentic.repo_sync.registered") {
return
}
c.Config().Set("agentic.repo_sync.registered", true)
if !c.Action("sync.fetch").Exists() {
c.Action("sync.fetch", s.handleRepoSyncFetch).Description = "Fetch a tracked local repo from origin"
}
if !c.Action("sync.reset").Exists() {
c.Action("sync.reset", s.handleRepoSyncReset).Description = "Reset a tracked local repo to origin/<branch>"
}
c.RegisterAction(func(coreApp *core.Core, msg core.Message) core.Result {
return s.handleRepoSyncIPC(coreApp, msg)
})
if !c.Command("repo/sync").OK {
c.Command("repo/sync", core.Command{
Description: "Fetch and optionally reset tracked local repos from origin",
Action: s.cmdRepoSyncLocal,
})
}
if !c.Command("agentic:repo/sync").OK {
c.Command("agentic:repo/sync", core.Command{
Description: "Fetch and optionally reset tracked local repos from origin",
Action: s.cmdRepoSyncLocal,
})
}
}
func (s *PrepSubsystem) handleRepoSyncIPC(_ *core.Core, msg core.Message) core.Result {
ev, ok := msg.(messages.WorkspacePushed)
if !ok {
return core.Result{OK: true}
}
result := s.onWorkspacePushed(context.Background(), ev)
if !result.OK {
core.Warn(
"agentic repo sync failed",
"org", ev.Org,
"repo", ev.Repo,
"branch", ev.Branch,
"reason", result.Value,
)
}
return result
}
// result := s.onWorkspacePushed(ctx, messages.WorkspacePushed{Repo: "go-io", Branch: "main", Org: "core"})
func (s *PrepSubsystem) onWorkspacePushed(ctx context.Context, ev messages.WorkspacePushed) core.Result {
target, err := repoSyncSingleTarget(ev.Org, ev.Repo)
if err != nil {
return core.Result{Value: err, OK: false}
}
return s.runRepoSync(ctx, target, ev.Branch, true)
}
func (s *PrepSubsystem) cmdRepoSyncLocal(options core.Options) core.Result {
targets, err := s.repoSyncTargets(options)
if err != nil {
core.Print(nil, "usage: core-agent repo/sync [--repo=go-io] [--org=core] [--branch=main] [--reset]")
return core.Result{Value: err, OK: false}
}
reset := optionBoolValue(options, "reset")
branch := optionStringValue(options, "branch")
ctx := s.commandContext()
output := RepoSyncCommandOutput{
Success: true,
Synced: []RepoSyncOutput{},
}
for _, target := range targets {
result := s.runRepoSync(ctx, target, branch, reset)
if !result.OK {
errValue, _ := result.Value.(error)
if errValue == nil {
errValue = core.E("agentic.cmdRepoSyncLocal", "repo sync failed", nil)
}
core.Print(nil, "error: %v", errValue)
return core.Result{Value: errValue, OK: false}
}
syncOutput, ok := result.Value.(RepoSyncOutput)
if !ok {
err := core.E("agentic.cmdRepoSyncLocal", "invalid repo sync output", nil)
core.Print(nil, "error: %v", err)
return core.Result{Value: err, OK: false}
}
output.Synced = append(output.Synced, syncOutput)
if syncOutput.Branch != "" {
core.Print(nil, "fetched %s/%s@%s", syncOutput.Org, syncOutput.Repo, syncOutput.Branch)
} else {
core.Print(nil, "fetched %s/%s", syncOutput.Org, syncOutput.Repo)
}
if syncOutput.Reset {
core.Print(nil, "reset %s to origin/%s", syncOutput.RepoDir, syncOutput.Branch)
}
}
output.Count = len(output.Synced)
core.Print(nil, "count: %d", output.Count)
return core.Result{Value: output, OK: true}
}
func (s *PrepSubsystem) runRepoSync(ctx context.Context, target fetchRepoRef, branch string, reset bool) core.Result {
s.registerRepoSyncSupport()
repoDir, err := s.repoSyncRepoDir(target)
if err != nil {
return core.Result{Value: err, OK: false}
}
fetchBranch := core.Trim(branch)
resetBranch := ""
if fetchBranch != "" || reset {
resetBranch, err = s.repoSyncBranch(repoDir, branch)
if err != nil {
return core.Result{Value: err, OK: false}
}
if fetchBranch == "" && reset {
fetchBranch = resetBranch
}
}
fetchResult := s.Core().Action("sync.fetch").Run(ctx, repoSyncOptions(target, fetchBranch))
if !fetchResult.OK {
return fetchResult
}
if reset {
resetResult := s.Core().Action("sync.reset").Run(ctx, repoSyncOptions(target, resetBranch))
if !resetResult.OK {
return resetResult
}
}
return core.Result{Value: RepoSyncOutput{
Success: true,
Org: target.Org,
Repo: target.Repo,
Branch: resetBranch,
RepoDir: repoDir,
Reset: reset,
}, OK: true}
}
// result := c.Action("sync.fetch").Run(ctx, core.NewOptions(core.Option{Key: "repo", Value: "go-io"}))
func (s *PrepSubsystem) handleRepoSyncFetch(ctx context.Context, options core.Options) core.Result {
target, err := repoSyncTarget(options)
if err != nil {
return core.Result{Value: err, OK: false}
}
repoDir, err := s.repoSyncRepoDir(target)
if err != nil {
return core.Result{Value: err, OK: false}
}
args := []string{"git", "fetch", "origin"}
if branch := core.Trim(optionStringValue(options, "branch")); branch != "" {
args = append(args, branch)
}
result := s.Core().Process().RunIn(repoSyncContext(ctx), repoDir, args...)
if !result.OK {
return core.Result{Value: core.E("agentic.handleRepoSyncFetch", "git fetch failed", resultErrorValue(result)), OK: false}
}
return core.Result{Value: RepoSyncOutput{
Success: true,
Org: target.Org,
Repo: target.Repo,
Branch: core.Trim(optionStringValue(options, "branch")),
RepoDir: repoDir,
}, OK: true}
}
// result := c.Action("sync.reset").Run(ctx, core.NewOptions(core.Option{Key: "repo", Value: "go-io"}, core.Option{Key: "branch", Value: "main"}))
func (s *PrepSubsystem) handleRepoSyncReset(ctx context.Context, options core.Options) core.Result {
target, err := repoSyncTarget(options)
if err != nil {
return core.Result{Value: err, OK: false}
}
repoDir, err := s.repoSyncRepoDir(target)
if err != nil {
return core.Result{Value: err, OK: false}
}
branch, err := s.repoSyncBranch(repoDir, optionStringValue(options, "branch"))
if err != nil {
return core.Result{Value: err, OK: false}
}
process := s.Core().Process()
if current := s.currentBranch(repoDir); current != "" && current != branch {
checkoutResult := process.RunIn(repoSyncContext(ctx), repoDir, "git", "checkout", "-B", branch, core.Concat("origin/", branch))
if !checkoutResult.OK {
return core.Result{Value: core.E("agentic.handleRepoSyncReset", "git checkout failed", resultErrorValue(checkoutResult)), OK: false}
}
}
resetResult := process.RunIn(repoSyncContext(ctx), repoDir, "git", "reset", "--hard", core.Concat("origin/", branch))
if !resetResult.OK {
return core.Result{Value: core.E("agentic.handleRepoSyncReset", "git reset failed", resultErrorValue(resetResult)), OK: false}
}
return core.Result{Value: RepoSyncOutput{
Success: true,
Org: target.Org,
Repo: target.Repo,
Branch: branch,
RepoDir: repoDir,
Reset: true,
}, OK: true}
}
func (s *PrepSubsystem) repoSyncTargets(options core.Options) ([]fetchRepoRef, error) {
if repo := optionStringValue(options, "repo", "_arg"); repo != "" {
target, err := repoSyncSingleTarget(optionStringValue(options, "org"), repo)
if err != nil {
return nil, err
}
return []fetchRepoRef{target}, nil
}
return s.fetchLoopRepoRefs(), nil
}
func repoSyncTarget(options core.Options) (fetchRepoRef, error) {
return repoSyncSingleTarget(optionStringValue(options, "org"), optionStringValue(options, "repo", "_arg"))
}
func repoSyncSingleTarget(orgValue, repoValue string) (fetchRepoRef, error) {
org := core.Trim(orgValue)
if org == "" {
org = "core"
}
repoPath := core.Replace(core.Trim(repoValue), "\\", "/")
if repoPath == "" {
return fetchRepoRef{}, core.E("agentic.repoSyncSingleTarget", "repo is required", nil)
}
if parts := core.Split(repoPath, "/"); len(parts) == 2 {
if parts[0] != "" {
org = parts[0]
}
repoPath = parts[1]
}
validatedOrg, ok := validateName(org)
if !ok {
return fetchRepoRef{}, core.E("agentic.repoSyncSingleTarget", "invalid org name", nil)
}
validatedRepo, ok := validateName(core.PathBase(repoPath))
if !ok {
return fetchRepoRef{}, core.E("agentic.repoSyncSingleTarget", "invalid repo name", nil)
}
return fetchRepoRef{Org: validatedOrg, Repo: validatedRepo}, nil
}
func repoSyncOptions(target fetchRepoRef, branch string) core.Options {
options := core.NewOptions(
core.Option{Key: "org", Value: target.Org},
core.Option{Key: "repo", Value: target.Repo},
)
if core.Trim(branch) != "" {
options.Set("branch", core.Trim(branch))
}
return options
}
func repoSyncContext(ctx context.Context) context.Context {
if ctx != nil {
return ctx
}
return context.Background()
}
func resultErrorValue(result core.Result) error {
if err, ok := result.Value.(error); ok && err != nil {
return err
}
return nil
}
func (s *PrepSubsystem) repoSyncRepoDir(target fetchRepoRef) (string, error) {
if s == nil || s.ServiceRuntime == nil {
return "", core.E("agentic.repoSyncRepoDir", "prep subsystem is not initialised", nil)
}
repoDir := s.localRepoDir(target.Org, target.Repo)
if repoDir == "" || !fs.Exists(repoDir) || fs.IsFile(repoDir) {
return "", core.E("agentic.repoSyncRepoDir", "local repo not found", nil)
}
if !s.Core().Process().RunIn(context.Background(), repoDir, "git", "rev-parse", "--git-dir").OK {
return "", core.E("agentic.repoSyncRepoDir", "local repo is not a git checkout", nil)
}
return repoDir, nil
}
func (s *PrepSubsystem) repoSyncBranch(repoDir, branch string) (string, error) {
targetBranch := core.Trim(branch)
if targetBranch == "" {
targetBranch = s.currentBranch(repoDir)
}
if targetBranch == "" {
targetBranch = s.DefaultBranch(repoDir)
}
if targetBranch == "" {
return "", core.E("agentic.repoSyncBranch", "branch is required", nil)
}
return targetBranch, nil
}

View file

@ -0,0 +1,203 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"context"
"io"
"os"
"testing"
"time"
"dappco.re/go/agent/pkg/messages"
core "dappco.re/go/core"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestRepoSync_OnWorkspacePushed_Good(t *testing.T) {
s, c, _ := repoSyncTestPrep(t)
remoteDir, repoDir := repoSyncCreateTrackedRepo(t, c, s.codePath, "core", "test-repo")
_, remoteHead := repoSyncPushCommit(t, c, remoteDir, "main", "new.go", "package main\n")
s.registerRepoSyncSupport()
c.ACTION(messages.WorkspacePushed{
Repo: "test-repo",
Branch: "main",
Org: "core",
})
assert.True(t, fs.Exists(core.JoinPath(repoDir, "new.go")))
assert.Equal(t, remoteHead, repoSyncGitOutput(t, c, repoDir, "rev-parse", "HEAD"))
}
func TestRepoSync_OnWorkspacePushed_Bad(t *testing.T) {
s, _, _ := repoSyncTestPrep(t)
result := s.onWorkspacePushed(context.Background(), messages.WorkspacePushed{
Repo: "missing-repo",
Branch: "main",
Org: "core",
})
assert.False(t, result.OK)
}
func TestRepoSync_OnWorkspacePushed_Ugly(t *testing.T) {
s, c, _ := repoSyncTestPrep(t)
remoteDir, repoDir := repoSyncCreateTrackedRepo(t, c, s.codePath, "core", "test-repo")
require.True(t, c.Process().RunIn(context.Background(), repoDir, "git", "checkout", "-b", "feature/wip").OK)
_, remoteHead := repoSyncPushCommit(t, c, remoteDir, "main", "edge.go", "package edge\n")
result := s.onWorkspacePushed(context.Background(), messages.WorkspacePushed{
Repo: "test-repo",
Branch: "main",
Org: "core",
})
require.True(t, result.OK)
assert.Equal(t, "main", repoSyncGitOutput(t, c, repoDir, "rev-parse", "--abbrev-ref", "HEAD"))
assert.Equal(t, remoteHead, repoSyncGitOutput(t, c, repoDir, "rev-parse", "HEAD"))
assert.True(t, fs.Exists(core.JoinPath(repoDir, "edge.go")))
}
func TestRepoSync_BackgroundFetch_Good(t *testing.T) {
s, c, root := repoSyncTestPrep(t)
require.True(t, fs.Write(core.JoinPath(root, "agents.yaml"), core.Concat(
"version: 1\n",
"repos:\n",
" - test-repo\n",
)).OK)
remoteDir, repoDir := repoSyncCreateTrackedRepo(t, c, s.codePath, "core", "test-repo")
_, remoteHead := repoSyncPushCommit(t, c, remoteDir, "main", "fetched.go", "package fetched\n")
assert.Equal(t, fetchLoopDefaultInterval, s.fetchLoopInterval())
s.fetchRegisteredRepos(context.Background())
assert.Equal(t, remoteHead, repoSyncGitOutput(t, c, repoDir, "rev-parse", "origin/main"))
assert.NotEqual(t, remoteHead, repoSyncGitOutput(t, c, repoDir, "rev-parse", "HEAD"))
assert.False(t, fs.Exists(core.JoinPath(repoDir, "fetched.go")))
}
func TestRepoSync_Command_Good(t *testing.T) {
s, c, _ := repoSyncTestPrep(t)
remoteDir, repoDir := repoSyncCreateTrackedRepo(t, c, s.codePath, "core", "test-repo")
_, remoteHead := repoSyncPushCommit(t, c, remoteDir, "main", "command.go", "package command\n")
s.registerCommands(context.Background())
commandResult := c.Command("repo/sync")
require.True(t, commandResult.OK)
output := repoSyncCaptureStdout(t, func() {
result := commandResult.Value.(*core.Command).Run(core.NewOptions(
core.Option{Key: "repo", Value: "test-repo"},
core.Option{Key: "reset", Value: true},
))
require.True(t, result.OK)
commandOutput, ok := result.Value.(RepoSyncCommandOutput)
require.True(t, ok)
assert.True(t, commandOutput.Success)
assert.Equal(t, 1, commandOutput.Count)
})
assert.Contains(t, output, "fetched core/test-repo@main")
assert.Contains(t, output, "count: 1")
assert.True(t, fs.Exists(core.JoinPath(repoDir, "command.go")))
assert.Equal(t, remoteHead, repoSyncGitOutput(t, c, repoDir, "rev-parse", "HEAD"))
}
func repoSyncTestPrep(t *testing.T) (*PrepSubsystem, *core.Core, string) {
t.Helper()
root := t.TempDir()
setTestWorkspace(t, root)
c := core.New(
core.WithService(ProcessRegister),
)
require.True(t, c.ServiceStartup(context.Background(), nil).OK)
t.Cleanup(func() {
c.ServiceShutdown(context.Background())
})
s := &PrepSubsystem{
ServiceRuntime: core.NewServiceRuntime(c, AgentOptions{}),
codePath: t.TempDir(),
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
}
return s, c, root
}
func repoSyncCreateTrackedRepo(t *testing.T, c *core.Core, codePath, org, repo string) (string, string) {
t.Helper()
remoteDir := core.JoinPath(t.TempDir(), "remote.git")
require.True(t, fs.EnsureDir(remoteDir).OK)
require.True(t, c.Process().Run(context.Background(), "git", "init", "--bare", remoteDir).OK)
orgDir := core.JoinPath(codePath, org)
require.True(t, fs.EnsureDir(orgDir).OK)
require.True(t, c.Process().RunIn(context.Background(), orgDir, "git", "clone", remoteDir, repo).OK)
repoDir := core.JoinPath(orgDir, repo)
repoSyncConfigureGit(t, c, repoDir)
require.True(t, c.Process().RunIn(context.Background(), repoDir, "git", "checkout", "-b", "main").OK)
require.True(t, fs.Write(core.JoinPath(repoDir, "README.md"), "# tracked repo\n").OK)
require.True(t, c.Process().RunIn(context.Background(), repoDir, "git", "add", ".").OK)
require.True(t, c.Process().RunIn(context.Background(), repoDir, "git", "commit", "-m", "init").OK)
require.True(t, c.Process().RunIn(context.Background(), repoDir, "git", "push", "-u", "origin", "main").OK)
return remoteDir, repoDir
}
func repoSyncPushCommit(t *testing.T, c *core.Core, remoteDir, branch, fileName, content string) (string, string) {
t.Helper()
cloneParent := t.TempDir()
cloneDir := core.JoinPath(cloneParent, "push-clone")
require.True(t, c.Process().RunIn(context.Background(), cloneParent, "git", "clone", remoteDir, "push-clone").OK)
repoSyncConfigureGit(t, c, cloneDir)
require.True(t, c.Process().RunIn(context.Background(), cloneDir, "git", "checkout", branch).OK)
require.True(t, fs.Write(core.JoinPath(cloneDir, fileName), content).OK)
require.True(t, c.Process().RunIn(context.Background(), cloneDir, "git", "add", ".").OK)
require.True(t, c.Process().RunIn(context.Background(), cloneDir, "git", "commit", "-m", "agent work").OK)
require.True(t, c.Process().RunIn(context.Background(), cloneDir, "git", "push", "origin", branch).OK)
return cloneDir, repoSyncGitOutput(t, c, cloneDir, "rev-parse", "HEAD")
}
func repoSyncConfigureGit(t *testing.T, c *core.Core, repoDir string) {
t.Helper()
require.True(t, c.Process().RunIn(context.Background(), repoDir, "git", "config", "user.name", "Test").OK)
require.True(t, c.Process().RunIn(context.Background(), repoDir, "git", "config", "user.email", "test@example.com").OK)
}
func repoSyncGitOutput(t *testing.T, c *core.Core, repoDir string, args ...string) string {
t.Helper()
result := c.Process().RunIn(context.Background(), repoDir, "git", args...)
require.True(t, result.OK)
return core.Trim(result.Value.(string))
}
func repoSyncCaptureStdout(t *testing.T, run func()) string {
t.Helper()
old := os.Stdout
reader, writer, err := os.Pipe()
require.NoError(t, err)
os.Stdout = writer
defer func() {
os.Stdout = old
}()
run()
require.NoError(t, writer.Close())
data, err := io.ReadAll(reader)
require.NoError(t, err)
require.NoError(t, reader.Close())
return string(data)
}

View file

@ -4,3 +4,4 @@ tasks:
test:
cmds:
- task -d status test
- task -d repo test

View file

@ -0,0 +1,54 @@
version: "3"
tasks:
test:
cmds:
- |
bash <<'EOF'
set -euo pipefail
source ../../_lib/run.sh
go build -trimpath -ldflags="-s -w" -o bin/core-agent ../../../../cmd/core-agent
workspace="$(mktemp -d)"
code_path="$(mktemp -d)"
export CORE_WORKSPACE="$workspace"
export CORE_HOME="$workspace"
export DIR_HOME="$workspace"
export CODE_PATH="$code_path"
remote_dir="$(mktemp -d)/remote.git"
git init --bare "$remote_dir" >/dev/null
repo_parent="$code_path/core"
repo_dir="$repo_parent/go-io"
mkdir -p "$repo_parent"
git -C "$repo_parent" clone "$remote_dir" go-io >/dev/null
git -C "$repo_dir" config user.name "Test"
git -C "$repo_dir" config user.email "test@example.com"
git -C "$repo_dir" checkout -b main >/dev/null
printf '# repo sync smoke\n' >"$repo_dir/README.md"
git -C "$repo_dir" add .
git -C "$repo_dir" commit -m "init" >/dev/null
git -C "$repo_dir" push -u origin main >/dev/null
push_clone="$(mktemp -d)/push-clone"
git clone "$remote_dir" "$push_clone" >/dev/null
git -C "$push_clone" config user.name "Test"
git -C "$push_clone" config user.email "test@example.com"
git -C "$push_clone" checkout main >/dev/null
printf 'package smoke\n' >"$push_clone/smoke.go"
git -C "$push_clone" add .
git -C "$push_clone" commit -m "agent work" >/dev/null
git -C "$push_clone" push origin main >/dev/null
remote_head="$(git -C "$push_clone" rev-parse HEAD)"
output="$(mktemp)"
run_capture_all 0 "$output" ./bin/core-agent repo/sync --repo=go-io --reset
assert_contains "fetched core/go-io@main" "$output"
assert_contains "count: 1" "$output"
test -f "$repo_dir/smoke.go"
local_head="$(git -C "$repo_dir" rev-parse HEAD)"
test "$local_head" = "$remote_head"
EOF