diff --git a/pkg/agentic/fetch_loop.go b/pkg/agentic/fetch_loop.go new file mode 100644 index 0000000..dfaea50 --- /dev/null +++ b/pkg/agentic/fetch_loop.go @@ -0,0 +1,289 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "context" + "time" + + core "dappco.re/go/core" + "gopkg.in/yaml.v3" +) + +const fetchLoopDefaultInterval = 5 * time.Minute + +type fetchRepoRef struct { + Org string + Repo string +} + +// go s.runFetchLoop(ctx, 5*time.Minute) +func (s *PrepSubsystem) runFetchLoop(ctx context.Context, interval time.Duration) { + if s == nil || s.ServiceRuntime == nil || ctx == nil || interval <= 0 { + return + } + + ticker := time.NewTicker(interval) + defer ticker.Stop() + + s.runFetchLoopTicks(ctx, ticker.C) +} + +func (s *PrepSubsystem) runFetchLoopTicks(ctx context.Context, ticks <-chan time.Time) { + if s == nil || s.ServiceRuntime == nil || ctx == nil || ticks == nil { + return + } + + for { + select { + case <-ctx.Done(): + return + case <-ticks: + s.fetchRegisteredRepos(ctx) + } + } +} + +func (s *PrepSubsystem) fetchLoopInterval() time.Duration { + if s != nil && s.ServiceRuntime != nil { + if result := s.Core().Config().Get("agents.fetch_interval"); result.OK { + if interval := fetchLoopDuration(result.Value); interval > 0 { + return interval + } + } + } + + for _, path := range s.fetchLoopConfigPaths() { + raw := fetchLoopReadConfig(path) + if interval := fetchLoopDuration(raw["fetch_interval"]); interval > 0 { + return interval + } + if dispatch, ok := raw["dispatch"].(map[string]any); ok { + if interval := fetchLoopDuration(dispatch["fetch_interval"]); interval > 0 { + return interval + } + } + } + + return fetchLoopDefaultInterval +} + +func (s *PrepSubsystem) fetchRegisteredRepos(ctx context.Context) { + if s == nil || s.ServiceRuntime == nil || ctx == nil { + return + } + + seen := map[string]bool{} + for _, ref := range s.fetchLoopRepoRefs() { + if ctx.Err() != nil { + return + } + + name := fetchLoopRepoName(ref) + repoDir := s.localRepoDir(ref.Org, ref.Repo) + if repoDir == "" || !fs.IsDir(core.JoinPath(repoDir, ".git")) { + core.Warn("agentic fetch loop skipped repo", "repo", name, "path", repoDir) + continue + } + if seen[repoDir] { + continue + } + seen[repoDir] = true + + branch := s.DefaultBranch(repoDir) + args := []string{"git", "fetch", "origin"} + if branch != "" { + args = append(args, branch) + } + + result := s.Core().Process().RunIn(ctx, repoDir, args...) + if !result.OK { + core.Warn("agentic fetch loop failed", "repo", name, "branch", branch, "reason", result.Value) + continue + } + core.Info("agentic fetch loop fetched repo", "repo", name, "branch", branch) + } +} + +func (s *PrepSubsystem) fetchLoopRepoRefs() []fetchRepoRef { + seen := map[string]bool{} + refs := []fetchRepoRef{} + + add := func(org, repo string) { + orgName, ok := validateName(org) + if !ok { + return + } + repoName, ok := validateName(repo) + if !ok { + return + } + key := core.Concat(orgName, "/", repoName) + if seen[key] { + return + } + seen[key] = true + refs = append(refs, fetchRepoRef{Org: orgName, Repo: repoName}) + } + + if s != nil && s.ServiceRuntime != nil { + if result := s.Core().Config().Get("agents.fetch_repos"); result.OK { + fetchLoopCollectRepoRefs(result.Value, add) + } + } + + for _, path := range s.fetchLoopConfigPaths() { + raw := fetchLoopReadConfig(path) + fetchLoopCollectRepoRefs(raw["repos"], add) + if agents, ok := raw["agents"].(map[string]any); ok { + for _, value := range agents { + agent, ok := value.(map[string]any) + if !ok { + continue + } + fetchLoopCollectRepoRefs(agent["repos"], add) + } + } + } + + for _, repoDir := range core.PathGlob(core.JoinPath(WorkspaceRoot(), "*", "*")) { + if !fs.IsDir(repoDir) { + continue + } + org := core.PathBase(core.PathDir(repoDir)) + repo := core.PathBase(repoDir) + add(org, repo) + } + + return refs +} + +func (s *PrepSubsystem) fetchLoopConfigPaths() []string { + paths := []string{} + seen := map[string]bool{} + add := func(path string) { + clean := core.Trim(path) + if clean == "" || seen[clean] { + return + } + seen[clean] = true + paths = append(paths, clean) + } + + if s != nil && s.ServiceRuntime != nil { + if result := s.Core().Config().Get("agents.config_path"); result.OK { + if path, ok := result.Value.(string); ok { + add(path) + } + } + } + + add(core.JoinPath(CoreRoot(), "agents.yaml")) + if s != nil { + add(core.JoinPath(s.codePath, "core", "agent", "config", "agents.yaml")) + } + + return paths +} + +func fetchLoopReadConfig(path string) map[string]any { + readResult := fs.Read(path) + if !readResult.OK { + return map[string]any{} + } + + var raw map[string]any + if err := yaml.Unmarshal([]byte(readResult.Value.(string)), &raw); err != nil { + return map[string]any{} + } + + return raw +} + +func fetchLoopCollectRepoRefs(value any, add func(org, repo string)) { + appendRepo := func(raw string) { + org, repo, ok := fetchLoopParseRepo(raw) + if !ok { + return + } + add(org, repo) + } + + switch typed := value.(type) { + case string: + appendRepo(typed) + case []string: + for _, raw := range typed { + appendRepo(raw) + } + case []any: + for _, item := range typed { + if raw, ok := item.(string); ok { + appendRepo(raw) + } + } + case map[string]any: + for raw := range typed { + appendRepo(raw) + } + } +} + +func fetchLoopParseRepo(raw string) (string, string, bool) { + value := core.Trim(raw) + if value == "" { + return "", "", false + } + + parts := core.Split(value, "/") + switch len(parts) { + case 1: + org, ok := validateName("core") + if !ok { + return "", "", false + } + repo, ok := validateName(parts[0]) + return org, repo, ok + case 2: + org, ok := validateName(parts[0]) + if !ok { + return "", "", false + } + repo, ok := validateName(parts[1]) + return org, repo, ok + default: + return "", "", false + } +} + +func fetchLoopDuration(value any) time.Duration { + switch typed := value.(type) { + case time.Duration: + if typed > 0 { + return typed + } + case string: + parsed, err := time.ParseDuration(core.Trim(typed)) + if err == nil && parsed > 0 { + return parsed + } + case int: + if typed > 0 { + return time.Duration(typed) * time.Second + } + case int64: + if typed > 0 { + return time.Duration(typed) * time.Second + } + case float64: + if typed > 0 { + return time.Duration(typed * float64(time.Second)) + } + } + + return 0 +} + +func fetchLoopRepoName(ref fetchRepoRef) string { + return core.Concat(ref.Org, "/", ref.Repo) +} diff --git a/pkg/agentic/fetch_loop_test.go b/pkg/agentic/fetch_loop_test.go new file mode 100644 index 0000000..c84a655 --- /dev/null +++ b/pkg/agentic/fetch_loop_test.go @@ -0,0 +1,203 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "context" + "testing" + "time" + + core "dappco.re/go/core" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestFetchLoop_RunFetchLoop_Good_TicksAtConfiguredInterval(t *testing.T) { + root := t.TempDir() + setTestWorkspace(t, root) + + codePath := t.TempDir() + logPath := core.JoinPath(t.TempDir(), "git.log") + fetchLoopWriteGitScript(t, logPath, "bad-repo") + fetchLoopCreateRepo(t, codePath, "core", "good-repo") + require.True(t, fs.Write(core.JoinPath(root, "agents.yaml"), core.Concat( + "version: 1\n", + "dispatch:\n", + " fetch_interval: 25ms\n", + "repos:\n", + " - good-repo\n", + )).OK) + + subsystem := fetchLoopTestPrep(codePath) + interval := subsystem.fetchLoopInterval() + assert.Equal(t, 25*time.Millisecond, interval) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + subsystem.runFetchLoop(ctx, interval) + close(done) + }() + + time.Sleep(10 * time.Millisecond) + assert.Equal(t, 0, fetchLoopLogCount(logPath, "good-repo", "fetch origin dev")) + + fetchLoopWaitForCount(t, logPath, "good-repo", "fetch origin dev", 2, 250*time.Millisecond) + + cancel() + fetchLoopWaitForDone(t, done) +} + +func TestFetchLoop_RunFetchLoop_Bad_SurvivesFailingFetch(t *testing.T) { + root := t.TempDir() + setTestWorkspace(t, root) + + codePath := t.TempDir() + logPath := core.JoinPath(t.TempDir(), "git.log") + fetchLoopWriteGitScript(t, logPath, "bad-repo") + fetchLoopCreateRepo(t, codePath, "core", "good-repo") + fetchLoopCreateRepo(t, codePath, "core", "bad-repo") + require.True(t, fs.Write(core.JoinPath(root, "agents.yaml"), core.Concat( + "version: 1\n", + "dispatch:\n", + " fetch_interval: 15ms\n", + "repos:\n", + " - good-repo\n", + " - bad-repo\n", + )).OK) + + subsystem := fetchLoopTestPrep(codePath) + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + subsystem.runFetchLoop(ctx, subsystem.fetchLoopInterval()) + close(done) + }() + + fetchLoopWaitForCount(t, logPath, "bad-repo", "fetch origin dev", 1, 250*time.Millisecond) + fetchLoopWaitForCount(t, logPath, "good-repo", "fetch origin dev", 2, 250*time.Millisecond) + + cancel() + fetchLoopWaitForDone(t, done) +} + +func TestFetchLoop_RunFetchLoop_Ugly_StopsOnContextCancel(t *testing.T) { + root := t.TempDir() + setTestWorkspace(t, root) + + codePath := t.TempDir() + logPath := core.JoinPath(t.TempDir(), "git.log") + fetchLoopWriteGitScript(t, logPath, "bad-repo") + fetchLoopCreateRepo(t, codePath, "core", "good-repo") + require.True(t, fs.Write(core.JoinPath(root, "agents.yaml"), core.Concat( + "version: 1\n", + "dispatch:\n", + " fetch_interval: 15ms\n", + "repos:\n", + " - good-repo\n", + )).OK) + + subsystem := fetchLoopTestPrep(codePath) + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + subsystem.runFetchLoop(ctx, subsystem.fetchLoopInterval()) + close(done) + }() + + fetchLoopWaitForCount(t, logPath, "good-repo", "fetch origin dev", 1, 250*time.Millisecond) + + cancel() + fetchLoopWaitForDone(t, done) + + countAfterCancel := fetchLoopLogCount(logPath, "good-repo", "fetch origin dev") + time.Sleep(50 * time.Millisecond) + assert.Equal(t, countAfterCancel, fetchLoopLogCount(logPath, "good-repo", "fetch origin dev")) +} + +func fetchLoopTestPrep(codePath string) *PrepSubsystem { + return &PrepSubsystem{ + ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}), + codePath: codePath, + backoff: make(map[string]time.Time), + failCount: make(map[string]int), + } +} + +func fetchLoopWriteGitScript(t *testing.T, logPath, badRepo string) { + t.Helper() + + binDir := t.TempDir() + gitPath := core.JoinPath(binDir, "git") + script := core.Concat( + "#!/bin/sh\n", + "repo=$(basename \"$PWD\")\n", + "printf '%s|%s\\n' \"$repo\" \"$*\" >> ", logPath, "\n", + "if [ \"$1\" = \"symbolic-ref\" ]; then\n", + " printf 'origin/dev\\n'\n", + " exit 0\n", + "fi\n", + "if [ \"$1\" = \"fetch\" ] && [ \"$repo\" = \"", badRepo, "\" ]; then\n", + " exit 1\n", + "fi\n", + "exit 0\n", + ) + require.True(t, fs.Write(gitPath, script).OK) + require.True(t, testCore.Process().RunIn(context.Background(), binDir, "chmod", "+x", gitPath).OK) + t.Setenv("PATH", core.Concat(binDir, ":", core.Env("PATH"))) +} + +func fetchLoopCreateRepo(t *testing.T, codePath, org, repo string) { + t.Helper() + repoDir := core.JoinPath(codePath, org, repo) + require.True(t, fs.EnsureDir(core.JoinPath(repoDir, ".git")).OK) +} + +func fetchLoopWaitForCount(t *testing.T, logPath, repo, snippet string, want int, timeout time.Duration) { + t.Helper() + + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if fetchLoopLogCount(logPath, repo, snippet) >= want { + return + } + time.Sleep(5 * time.Millisecond) + } + + require.GreaterOrEqual(t, fetchLoopLogCount(logPath, repo, snippet), want) +} + +func fetchLoopWaitForDone(t *testing.T, done <-chan struct{}) { + t.Helper() + + select { + case <-done: + case <-time.After(250 * time.Millisecond): + t.Fatal("fetch loop did not stop after cancellation") + } +} + +func fetchLoopLogCount(logPath, repo, snippet string) int { + readResult := fs.Read(logPath) + if !readResult.OK { + return 0 + } + + content := core.Trim(readResult.Value.(string)) + if content == "" { + return 0 + } + + count := 0 + for _, line := range core.Split(content, "\n") { + if repo != "" && !core.HasPrefix(line, core.Concat(repo, "|")) { + continue + } + if snippet != "" && !core.Contains(line, snippet) { + continue + } + count++ + } + + return count +} diff --git a/pkg/agentic/prep.go b/pkg/agentic/prep.go index 3a562fe..48b5647 100644 --- a/pkg/agentic/prep.go +++ b/pkg/agentic/prep.go @@ -365,6 +365,7 @@ func (s *PrepSubsystem) OnStartup(ctx context.Context) core.Result { if s.syncToken() != "" { go s.runSyncFlushLoop(ctx, syncFlushScheduleInterval) } + go s.runFetchLoop(ctx, s.fetchLoopInterval()) c.RegisterQuery(s.handleWorkspaceQuery)