diff --git a/pkg/agentic/commands.go b/pkg/agentic/commands.go index c6ea7e4..70d3510 100644 --- a/pkg/agentic/commands.go +++ b/pkg/agentic/commands.go @@ -18,6 +18,7 @@ import ( func (s *PrepSubsystem) registerCommands(ctx context.Context) { s.startupContext = ctx c := s.Core() + s.registerRepoSyncSupport() c.Command("run/task", core.Command{Description: "Run a single task end-to-end", Action: s.cmdRunTask}) c.Command("agentic:run/task", core.Command{Description: "Run a single task end-to-end", Action: s.cmdRunTask}) c.Command("run/flow", core.Command{Description: "Show a flow definition from disk or the embedded library", Action: s.cmdRunFlow}) diff --git a/pkg/agentic/repo_sync.go b/pkg/agentic/repo_sync.go new file mode 100644 index 0000000..ae05d2b --- /dev/null +++ b/pkg/agentic/repo_sync.go @@ -0,0 +1,361 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "context" + + "dappco.re/go/agent/pkg/messages" + core "dappco.re/go/core" +) + +// Repo sync closes the RFC ยง7 loop from a pushed workspace back to the local +// mirror. The 5-minute fallback fetch scheduler already lives in fetch_loop.go; +// this file wires the event-driven refresh and the manual command surface. + +type RepoSyncOutput struct { + Success bool `json:"success"` + Org string `json:"org,omitempty"` + Repo string `json:"repo,omitempty"` + Branch string `json:"branch,omitempty"` + RepoDir string `json:"repo_dir,omitempty"` + Reset bool `json:"reset,omitempty"` +} + +type RepoSyncCommandOutput struct { + Success bool `json:"success"` + Count int `json:"count"` + Synced []RepoSyncOutput `json:"synced,omitempty"` +} + +// s.registerRepoSyncSupport() +func (s *PrepSubsystem) registerRepoSyncSupport() { + if s == nil || s.ServiceRuntime == nil { + return + } + + c := s.Core() + if c == nil { + return + } + if c.Config().Bool("agentic.repo_sync.registered") { + return + } + + c.Config().Set("agentic.repo_sync.registered", true) + + if !c.Action("sync.fetch").Exists() { + c.Action("sync.fetch", s.handleRepoSyncFetch).Description = "Fetch a tracked local repo from origin" + } + if !c.Action("sync.reset").Exists() { + c.Action("sync.reset", s.handleRepoSyncReset).Description = "Reset a tracked local repo to origin/" + } + + c.RegisterAction(func(coreApp *core.Core, msg core.Message) core.Result { + return s.handleRepoSyncIPC(coreApp, msg) + }) + + if !c.Command("repo/sync").OK { + c.Command("repo/sync", core.Command{ + Description: "Fetch and optionally reset tracked local repos from origin", + Action: s.cmdRepoSyncLocal, + }) + } + if !c.Command("agentic:repo/sync").OK { + c.Command("agentic:repo/sync", core.Command{ + Description: "Fetch and optionally reset tracked local repos from origin", + Action: s.cmdRepoSyncLocal, + }) + } +} + +func (s *PrepSubsystem) handleRepoSyncIPC(_ *core.Core, msg core.Message) core.Result { + ev, ok := msg.(messages.WorkspacePushed) + if !ok { + return core.Result{OK: true} + } + + result := s.onWorkspacePushed(context.Background(), ev) + if !result.OK { + core.Warn( + "agentic repo sync failed", + "org", ev.Org, + "repo", ev.Repo, + "branch", ev.Branch, + "reason", result.Value, + ) + } + return result +} + +// result := s.onWorkspacePushed(ctx, messages.WorkspacePushed{Repo: "go-io", Branch: "main", Org: "core"}) +func (s *PrepSubsystem) onWorkspacePushed(ctx context.Context, ev messages.WorkspacePushed) core.Result { + target, err := repoSyncSingleTarget(ev.Org, ev.Repo) + if err != nil { + return core.Result{Value: err, OK: false} + } + return s.runRepoSync(ctx, target, ev.Branch, true) +} + +func (s *PrepSubsystem) cmdRepoSyncLocal(options core.Options) core.Result { + targets, err := s.repoSyncTargets(options) + if err != nil { + core.Print(nil, "usage: core-agent repo/sync [--repo=go-io] [--org=core] [--branch=main] [--reset]") + return core.Result{Value: err, OK: false} + } + + reset := optionBoolValue(options, "reset") + branch := optionStringValue(options, "branch") + ctx := s.commandContext() + + output := RepoSyncCommandOutput{ + Success: true, + Synced: []RepoSyncOutput{}, + } + + for _, target := range targets { + result := s.runRepoSync(ctx, target, branch, reset) + if !result.OK { + errValue, _ := result.Value.(error) + if errValue == nil { + errValue = core.E("agentic.cmdRepoSyncLocal", "repo sync failed", nil) + } + core.Print(nil, "error: %v", errValue) + return core.Result{Value: errValue, OK: false} + } + + syncOutput, ok := result.Value.(RepoSyncOutput) + if !ok { + err := core.E("agentic.cmdRepoSyncLocal", "invalid repo sync output", nil) + core.Print(nil, "error: %v", err) + return core.Result{Value: err, OK: false} + } + + output.Synced = append(output.Synced, syncOutput) + if syncOutput.Branch != "" { + core.Print(nil, "fetched %s/%s@%s", syncOutput.Org, syncOutput.Repo, syncOutput.Branch) + } else { + core.Print(nil, "fetched %s/%s", syncOutput.Org, syncOutput.Repo) + } + if syncOutput.Reset { + core.Print(nil, "reset %s to origin/%s", syncOutput.RepoDir, syncOutput.Branch) + } + } + + output.Count = len(output.Synced) + core.Print(nil, "count: %d", output.Count) + return core.Result{Value: output, OK: true} +} + +func (s *PrepSubsystem) runRepoSync(ctx context.Context, target fetchRepoRef, branch string, reset bool) core.Result { + s.registerRepoSyncSupport() + + repoDir, err := s.repoSyncRepoDir(target) + if err != nil { + return core.Result{Value: err, OK: false} + } + + fetchBranch := core.Trim(branch) + resetBranch := "" + if fetchBranch != "" || reset { + resetBranch, err = s.repoSyncBranch(repoDir, branch) + if err != nil { + return core.Result{Value: err, OK: false} + } + if fetchBranch == "" && reset { + fetchBranch = resetBranch + } + } + + fetchResult := s.Core().Action("sync.fetch").Run(ctx, repoSyncOptions(target, fetchBranch)) + if !fetchResult.OK { + return fetchResult + } + + if reset { + resetResult := s.Core().Action("sync.reset").Run(ctx, repoSyncOptions(target, resetBranch)) + if !resetResult.OK { + return resetResult + } + } + + return core.Result{Value: RepoSyncOutput{ + Success: true, + Org: target.Org, + Repo: target.Repo, + Branch: resetBranch, + RepoDir: repoDir, + Reset: reset, + }, OK: true} +} + +// result := c.Action("sync.fetch").Run(ctx, core.NewOptions(core.Option{Key: "repo", Value: "go-io"})) +func (s *PrepSubsystem) handleRepoSyncFetch(ctx context.Context, options core.Options) core.Result { + target, err := repoSyncTarget(options) + if err != nil { + return core.Result{Value: err, OK: false} + } + + repoDir, err := s.repoSyncRepoDir(target) + if err != nil { + return core.Result{Value: err, OK: false} + } + + args := []string{"git", "fetch", "origin"} + if branch := core.Trim(optionStringValue(options, "branch")); branch != "" { + args = append(args, branch) + } + + result := s.Core().Process().RunIn(repoSyncContext(ctx), repoDir, args...) + if !result.OK { + return core.Result{Value: core.E("agentic.handleRepoSyncFetch", "git fetch failed", resultErrorValue(result)), OK: false} + } + + return core.Result{Value: RepoSyncOutput{ + Success: true, + Org: target.Org, + Repo: target.Repo, + Branch: core.Trim(optionStringValue(options, "branch")), + RepoDir: repoDir, + }, OK: true} +} + +// result := c.Action("sync.reset").Run(ctx, core.NewOptions(core.Option{Key: "repo", Value: "go-io"}, core.Option{Key: "branch", Value: "main"})) +func (s *PrepSubsystem) handleRepoSyncReset(ctx context.Context, options core.Options) core.Result { + target, err := repoSyncTarget(options) + if err != nil { + return core.Result{Value: err, OK: false} + } + + repoDir, err := s.repoSyncRepoDir(target) + if err != nil { + return core.Result{Value: err, OK: false} + } + + branch, err := s.repoSyncBranch(repoDir, optionStringValue(options, "branch")) + if err != nil { + return core.Result{Value: err, OK: false} + } + + process := s.Core().Process() + if current := s.currentBranch(repoDir); current != "" && current != branch { + checkoutResult := process.RunIn(repoSyncContext(ctx), repoDir, "git", "checkout", "-B", branch, core.Concat("origin/", branch)) + if !checkoutResult.OK { + return core.Result{Value: core.E("agentic.handleRepoSyncReset", "git checkout failed", resultErrorValue(checkoutResult)), OK: false} + } + } + + resetResult := process.RunIn(repoSyncContext(ctx), repoDir, "git", "reset", "--hard", core.Concat("origin/", branch)) + if !resetResult.OK { + return core.Result{Value: core.E("agentic.handleRepoSyncReset", "git reset failed", resultErrorValue(resetResult)), OK: false} + } + + return core.Result{Value: RepoSyncOutput{ + Success: true, + Org: target.Org, + Repo: target.Repo, + Branch: branch, + RepoDir: repoDir, + Reset: true, + }, OK: true} +} + +func (s *PrepSubsystem) repoSyncTargets(options core.Options) ([]fetchRepoRef, error) { + if repo := optionStringValue(options, "repo", "_arg"); repo != "" { + target, err := repoSyncSingleTarget(optionStringValue(options, "org"), repo) + if err != nil { + return nil, err + } + return []fetchRepoRef{target}, nil + } + return s.fetchLoopRepoRefs(), nil +} + +func repoSyncTarget(options core.Options) (fetchRepoRef, error) { + return repoSyncSingleTarget(optionStringValue(options, "org"), optionStringValue(options, "repo", "_arg")) +} + +func repoSyncSingleTarget(orgValue, repoValue string) (fetchRepoRef, error) { + org := core.Trim(orgValue) + if org == "" { + org = "core" + } + + repoPath := core.Replace(core.Trim(repoValue), "\\", "/") + if repoPath == "" { + return fetchRepoRef{}, core.E("agentic.repoSyncSingleTarget", "repo is required", nil) + } + + if parts := core.Split(repoPath, "/"); len(parts) == 2 { + if parts[0] != "" { + org = parts[0] + } + repoPath = parts[1] + } + + validatedOrg, ok := validateName(org) + if !ok { + return fetchRepoRef{}, core.E("agentic.repoSyncSingleTarget", "invalid org name", nil) + } + validatedRepo, ok := validateName(core.PathBase(repoPath)) + if !ok { + return fetchRepoRef{}, core.E("agentic.repoSyncSingleTarget", "invalid repo name", nil) + } + + return fetchRepoRef{Org: validatedOrg, Repo: validatedRepo}, nil +} + +func repoSyncOptions(target fetchRepoRef, branch string) core.Options { + options := core.NewOptions( + core.Option{Key: "org", Value: target.Org}, + core.Option{Key: "repo", Value: target.Repo}, + ) + if core.Trim(branch) != "" { + options.Set("branch", core.Trim(branch)) + } + return options +} + +func repoSyncContext(ctx context.Context) context.Context { + if ctx != nil { + return ctx + } + return context.Background() +} + +func resultErrorValue(result core.Result) error { + if err, ok := result.Value.(error); ok && err != nil { + return err + } + return nil +} + +func (s *PrepSubsystem) repoSyncRepoDir(target fetchRepoRef) (string, error) { + if s == nil || s.ServiceRuntime == nil { + return "", core.E("agentic.repoSyncRepoDir", "prep subsystem is not initialised", nil) + } + + repoDir := s.localRepoDir(target.Org, target.Repo) + if repoDir == "" || !fs.Exists(repoDir) || fs.IsFile(repoDir) { + return "", core.E("agentic.repoSyncRepoDir", "local repo not found", nil) + } + + if !s.Core().Process().RunIn(context.Background(), repoDir, "git", "rev-parse", "--git-dir").OK { + return "", core.E("agentic.repoSyncRepoDir", "local repo is not a git checkout", nil) + } + return repoDir, nil +} + +func (s *PrepSubsystem) repoSyncBranch(repoDir, branch string) (string, error) { + targetBranch := core.Trim(branch) + if targetBranch == "" { + targetBranch = s.currentBranch(repoDir) + } + if targetBranch == "" { + targetBranch = s.DefaultBranch(repoDir) + } + if targetBranch == "" { + return "", core.E("agentic.repoSyncBranch", "branch is required", nil) + } + return targetBranch, nil +} diff --git a/pkg/agentic/repo_sync_test.go b/pkg/agentic/repo_sync_test.go new file mode 100644 index 0000000..dcbe9ed --- /dev/null +++ b/pkg/agentic/repo_sync_test.go @@ -0,0 +1,203 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "context" + "io" + "os" + "testing" + "time" + + "dappco.re/go/agent/pkg/messages" + core "dappco.re/go/core" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRepoSync_OnWorkspacePushed_Good(t *testing.T) { + s, c, _ := repoSyncTestPrep(t) + remoteDir, repoDir := repoSyncCreateTrackedRepo(t, c, s.codePath, "core", "test-repo") + _, remoteHead := repoSyncPushCommit(t, c, remoteDir, "main", "new.go", "package main\n") + + s.registerRepoSyncSupport() + c.ACTION(messages.WorkspacePushed{ + Repo: "test-repo", + Branch: "main", + Org: "core", + }) + + assert.True(t, fs.Exists(core.JoinPath(repoDir, "new.go"))) + assert.Equal(t, remoteHead, repoSyncGitOutput(t, c, repoDir, "rev-parse", "HEAD")) +} + +func TestRepoSync_OnWorkspacePushed_Bad(t *testing.T) { + s, _, _ := repoSyncTestPrep(t) + result := s.onWorkspacePushed(context.Background(), messages.WorkspacePushed{ + Repo: "missing-repo", + Branch: "main", + Org: "core", + }) + assert.False(t, result.OK) +} + +func TestRepoSync_OnWorkspacePushed_Ugly(t *testing.T) { + s, c, _ := repoSyncTestPrep(t) + remoteDir, repoDir := repoSyncCreateTrackedRepo(t, c, s.codePath, "core", "test-repo") + require.True(t, c.Process().RunIn(context.Background(), repoDir, "git", "checkout", "-b", "feature/wip").OK) + + _, remoteHead := repoSyncPushCommit(t, c, remoteDir, "main", "edge.go", "package edge\n") + + result := s.onWorkspacePushed(context.Background(), messages.WorkspacePushed{ + Repo: "test-repo", + Branch: "main", + Org: "core", + }) + require.True(t, result.OK) + + assert.Equal(t, "main", repoSyncGitOutput(t, c, repoDir, "rev-parse", "--abbrev-ref", "HEAD")) + assert.Equal(t, remoteHead, repoSyncGitOutput(t, c, repoDir, "rev-parse", "HEAD")) + assert.True(t, fs.Exists(core.JoinPath(repoDir, "edge.go"))) +} + +func TestRepoSync_BackgroundFetch_Good(t *testing.T) { + s, c, root := repoSyncTestPrep(t) + require.True(t, fs.Write(core.JoinPath(root, "agents.yaml"), core.Concat( + "version: 1\n", + "repos:\n", + " - test-repo\n", + )).OK) + + remoteDir, repoDir := repoSyncCreateTrackedRepo(t, c, s.codePath, "core", "test-repo") + _, remoteHead := repoSyncPushCommit(t, c, remoteDir, "main", "fetched.go", "package fetched\n") + + assert.Equal(t, fetchLoopDefaultInterval, s.fetchLoopInterval()) + + s.fetchRegisteredRepos(context.Background()) + + assert.Equal(t, remoteHead, repoSyncGitOutput(t, c, repoDir, "rev-parse", "origin/main")) + assert.NotEqual(t, remoteHead, repoSyncGitOutput(t, c, repoDir, "rev-parse", "HEAD")) + assert.False(t, fs.Exists(core.JoinPath(repoDir, "fetched.go"))) +} + +func TestRepoSync_Command_Good(t *testing.T) { + s, c, _ := repoSyncTestPrep(t) + remoteDir, repoDir := repoSyncCreateTrackedRepo(t, c, s.codePath, "core", "test-repo") + _, remoteHead := repoSyncPushCommit(t, c, remoteDir, "main", "command.go", "package command\n") + + s.registerCommands(context.Background()) + commandResult := c.Command("repo/sync") + require.True(t, commandResult.OK) + + output := repoSyncCaptureStdout(t, func() { + result := commandResult.Value.(*core.Command).Run(core.NewOptions( + core.Option{Key: "repo", Value: "test-repo"}, + core.Option{Key: "reset", Value: true}, + )) + require.True(t, result.OK) + + commandOutput, ok := result.Value.(RepoSyncCommandOutput) + require.True(t, ok) + assert.True(t, commandOutput.Success) + assert.Equal(t, 1, commandOutput.Count) + }) + + assert.Contains(t, output, "fetched core/test-repo@main") + assert.Contains(t, output, "count: 1") + assert.True(t, fs.Exists(core.JoinPath(repoDir, "command.go"))) + assert.Equal(t, remoteHead, repoSyncGitOutput(t, c, repoDir, "rev-parse", "HEAD")) +} + +func repoSyncTestPrep(t *testing.T) (*PrepSubsystem, *core.Core, string) { + t.Helper() + + root := t.TempDir() + setTestWorkspace(t, root) + + c := core.New( + core.WithService(ProcessRegister), + ) + require.True(t, c.ServiceStartup(context.Background(), nil).OK) + t.Cleanup(func() { + c.ServiceShutdown(context.Background()) + }) + + s := &PrepSubsystem{ + ServiceRuntime: core.NewServiceRuntime(c, AgentOptions{}), + codePath: t.TempDir(), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), + } + return s, c, root +} + +func repoSyncCreateTrackedRepo(t *testing.T, c *core.Core, codePath, org, repo string) (string, string) { + t.Helper() + + remoteDir := core.JoinPath(t.TempDir(), "remote.git") + require.True(t, fs.EnsureDir(remoteDir).OK) + require.True(t, c.Process().Run(context.Background(), "git", "init", "--bare", remoteDir).OK) + + orgDir := core.JoinPath(codePath, org) + require.True(t, fs.EnsureDir(orgDir).OK) + require.True(t, c.Process().RunIn(context.Background(), orgDir, "git", "clone", remoteDir, repo).OK) + + repoDir := core.JoinPath(orgDir, repo) + repoSyncConfigureGit(t, c, repoDir) + require.True(t, c.Process().RunIn(context.Background(), repoDir, "git", "checkout", "-b", "main").OK) + require.True(t, fs.Write(core.JoinPath(repoDir, "README.md"), "# tracked repo\n").OK) + require.True(t, c.Process().RunIn(context.Background(), repoDir, "git", "add", ".").OK) + require.True(t, c.Process().RunIn(context.Background(), repoDir, "git", "commit", "-m", "init").OK) + require.True(t, c.Process().RunIn(context.Background(), repoDir, "git", "push", "-u", "origin", "main").OK) + + return remoteDir, repoDir +} + +func repoSyncPushCommit(t *testing.T, c *core.Core, remoteDir, branch, fileName, content string) (string, string) { + t.Helper() + + cloneParent := t.TempDir() + cloneDir := core.JoinPath(cloneParent, "push-clone") + require.True(t, c.Process().RunIn(context.Background(), cloneParent, "git", "clone", remoteDir, "push-clone").OK) + repoSyncConfigureGit(t, c, cloneDir) + require.True(t, c.Process().RunIn(context.Background(), cloneDir, "git", "checkout", branch).OK) + require.True(t, fs.Write(core.JoinPath(cloneDir, fileName), content).OK) + require.True(t, c.Process().RunIn(context.Background(), cloneDir, "git", "add", ".").OK) + require.True(t, c.Process().RunIn(context.Background(), cloneDir, "git", "commit", "-m", "agent work").OK) + require.True(t, c.Process().RunIn(context.Background(), cloneDir, "git", "push", "origin", branch).OK) + + return cloneDir, repoSyncGitOutput(t, c, cloneDir, "rev-parse", "HEAD") +} + +func repoSyncConfigureGit(t *testing.T, c *core.Core, repoDir string) { + t.Helper() + require.True(t, c.Process().RunIn(context.Background(), repoDir, "git", "config", "user.name", "Test").OK) + require.True(t, c.Process().RunIn(context.Background(), repoDir, "git", "config", "user.email", "test@example.com").OK) +} + +func repoSyncGitOutput(t *testing.T, c *core.Core, repoDir string, args ...string) string { + t.Helper() + result := c.Process().RunIn(context.Background(), repoDir, "git", args...) + require.True(t, result.OK) + return core.Trim(result.Value.(string)) +} + +func repoSyncCaptureStdout(t *testing.T, run func()) string { + t.Helper() + + old := os.Stdout + reader, writer, err := os.Pipe() + require.NoError(t, err) + os.Stdout = writer + defer func() { + os.Stdout = old + }() + + run() + + require.NoError(t, writer.Close()) + data, err := io.ReadAll(reader) + require.NoError(t, err) + require.NoError(t, reader.Close()) + return string(data) +} diff --git a/tests/cli/sync/Taskfile.yaml b/tests/cli/sync/Taskfile.yaml index f7cc385..cd2c6a2 100644 --- a/tests/cli/sync/Taskfile.yaml +++ b/tests/cli/sync/Taskfile.yaml @@ -4,3 +4,4 @@ tasks: test: cmds: - task -d status test + - task -d repo test diff --git a/tests/cli/sync/repo/Taskfile.yaml b/tests/cli/sync/repo/Taskfile.yaml new file mode 100644 index 0000000..8bd17db --- /dev/null +++ b/tests/cli/sync/repo/Taskfile.yaml @@ -0,0 +1,54 @@ +version: "3" + +tasks: + test: + cmds: + - | + bash <<'EOF' + set -euo pipefail + source ../../_lib/run.sh + + go build -trimpath -ldflags="-s -w" -o bin/core-agent ../../../../cmd/core-agent + + workspace="$(mktemp -d)" + code_path="$(mktemp -d)" + export CORE_WORKSPACE="$workspace" + export CORE_HOME="$workspace" + export DIR_HOME="$workspace" + export CODE_PATH="$code_path" + + remote_dir="$(mktemp -d)/remote.git" + git init --bare "$remote_dir" >/dev/null + + repo_parent="$code_path/core" + repo_dir="$repo_parent/go-io" + mkdir -p "$repo_parent" + git -C "$repo_parent" clone "$remote_dir" go-io >/dev/null + git -C "$repo_dir" config user.name "Test" + git -C "$repo_dir" config user.email "test@example.com" + git -C "$repo_dir" checkout -b main >/dev/null + printf '# repo sync smoke\n' >"$repo_dir/README.md" + git -C "$repo_dir" add . + git -C "$repo_dir" commit -m "init" >/dev/null + git -C "$repo_dir" push -u origin main >/dev/null + + push_clone="$(mktemp -d)/push-clone" + git clone "$remote_dir" "$push_clone" >/dev/null + git -C "$push_clone" config user.name "Test" + git -C "$push_clone" config user.email "test@example.com" + git -C "$push_clone" checkout main >/dev/null + printf 'package smoke\n' >"$push_clone/smoke.go" + git -C "$push_clone" add . + git -C "$push_clone" commit -m "agent work" >/dev/null + git -C "$push_clone" push origin main >/dev/null + remote_head="$(git -C "$push_clone" rev-parse HEAD)" + + output="$(mktemp)" + run_capture_all 0 "$output" ./bin/core-agent repo/sync --repo=go-io --reset + assert_contains "fetched core/go-io@main" "$output" + assert_contains "count: 1" "$output" + + test -f "$repo_dir/smoke.go" + local_head="$(git -C "$repo_dir" rev-parse HEAD)" + test "$local_head" = "$remote_head" + EOF