feat(agent/agentic): 5-min background git fetch loop for registered repos

fetch_loop.go starts at PrepSubsystem.OnStartup, ticks on configurable
interval (default 5min), respects ctx.Done() for clean shutdown.
Each tick runs `git fetch origin <DefaultBranch>` via s.Core().Process()
— no worktree mutation.

Repo discovery sources (priority):
1. agents.fetch_repos in runtime config
2. repos / agents.*.repos in agents.yaml
3. Fallback scan of WorkspaceRoot() per RFC §7

Interval config: agents.fetch_interval, dispatch.fetch_interval, or
top-level fetch_interval in agents.yaml.

Loop survives individual repo failures (logs + continues — one bad
repo can't kill the loop).

Tests cover tick timing, failure isolation, ctx cancellation.

Co-authored-by: Codex <noreply@openai.com>
Closes tasks.lthn.sh/view.php?id=165
This commit is contained in:
Snider 2026-04-25 20:23:50 +01:00
parent 820d33ebec
commit b42cf5a18c
3 changed files with 493 additions and 0 deletions

289
pkg/agentic/fetch_loop.go Normal file
View file

@ -0,0 +1,289 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"context"
"time"
core "dappco.re/go/core"
"gopkg.in/yaml.v3"
)
const fetchLoopDefaultInterval = 5 * time.Minute
type fetchRepoRef struct {
Org string
Repo string
}
// go s.runFetchLoop(ctx, 5*time.Minute)
func (s *PrepSubsystem) runFetchLoop(ctx context.Context, interval time.Duration) {
if s == nil || s.ServiceRuntime == nil || ctx == nil || interval <= 0 {
return
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
s.runFetchLoopTicks(ctx, ticker.C)
}
func (s *PrepSubsystem) runFetchLoopTicks(ctx context.Context, ticks <-chan time.Time) {
if s == nil || s.ServiceRuntime == nil || ctx == nil || ticks == nil {
return
}
for {
select {
case <-ctx.Done():
return
case <-ticks:
s.fetchRegisteredRepos(ctx)
}
}
}
func (s *PrepSubsystem) fetchLoopInterval() time.Duration {
if s != nil && s.ServiceRuntime != nil {
if result := s.Core().Config().Get("agents.fetch_interval"); result.OK {
if interval := fetchLoopDuration(result.Value); interval > 0 {
return interval
}
}
}
for _, path := range s.fetchLoopConfigPaths() {
raw := fetchLoopReadConfig(path)
if interval := fetchLoopDuration(raw["fetch_interval"]); interval > 0 {
return interval
}
if dispatch, ok := raw["dispatch"].(map[string]any); ok {
if interval := fetchLoopDuration(dispatch["fetch_interval"]); interval > 0 {
return interval
}
}
}
return fetchLoopDefaultInterval
}
func (s *PrepSubsystem) fetchRegisteredRepos(ctx context.Context) {
if s == nil || s.ServiceRuntime == nil || ctx == nil {
return
}
seen := map[string]bool{}
for _, ref := range s.fetchLoopRepoRefs() {
if ctx.Err() != nil {
return
}
name := fetchLoopRepoName(ref)
repoDir := s.localRepoDir(ref.Org, ref.Repo)
if repoDir == "" || !fs.IsDir(core.JoinPath(repoDir, ".git")) {
core.Warn("agentic fetch loop skipped repo", "repo", name, "path", repoDir)
continue
}
if seen[repoDir] {
continue
}
seen[repoDir] = true
branch := s.DefaultBranch(repoDir)
args := []string{"git", "fetch", "origin"}
if branch != "" {
args = append(args, branch)
}
result := s.Core().Process().RunIn(ctx, repoDir, args...)
if !result.OK {
core.Warn("agentic fetch loop failed", "repo", name, "branch", branch, "reason", result.Value)
continue
}
core.Info("agentic fetch loop fetched repo", "repo", name, "branch", branch)
}
}
func (s *PrepSubsystem) fetchLoopRepoRefs() []fetchRepoRef {
seen := map[string]bool{}
refs := []fetchRepoRef{}
add := func(org, repo string) {
orgName, ok := validateName(org)
if !ok {
return
}
repoName, ok := validateName(repo)
if !ok {
return
}
key := core.Concat(orgName, "/", repoName)
if seen[key] {
return
}
seen[key] = true
refs = append(refs, fetchRepoRef{Org: orgName, Repo: repoName})
}
if s != nil && s.ServiceRuntime != nil {
if result := s.Core().Config().Get("agents.fetch_repos"); result.OK {
fetchLoopCollectRepoRefs(result.Value, add)
}
}
for _, path := range s.fetchLoopConfigPaths() {
raw := fetchLoopReadConfig(path)
fetchLoopCollectRepoRefs(raw["repos"], add)
if agents, ok := raw["agents"].(map[string]any); ok {
for _, value := range agents {
agent, ok := value.(map[string]any)
if !ok {
continue
}
fetchLoopCollectRepoRefs(agent["repos"], add)
}
}
}
for _, repoDir := range core.PathGlob(core.JoinPath(WorkspaceRoot(), "*", "*")) {
if !fs.IsDir(repoDir) {
continue
}
org := core.PathBase(core.PathDir(repoDir))
repo := core.PathBase(repoDir)
add(org, repo)
}
return refs
}
func (s *PrepSubsystem) fetchLoopConfigPaths() []string {
paths := []string{}
seen := map[string]bool{}
add := func(path string) {
clean := core.Trim(path)
if clean == "" || seen[clean] {
return
}
seen[clean] = true
paths = append(paths, clean)
}
if s != nil && s.ServiceRuntime != nil {
if result := s.Core().Config().Get("agents.config_path"); result.OK {
if path, ok := result.Value.(string); ok {
add(path)
}
}
}
add(core.JoinPath(CoreRoot(), "agents.yaml"))
if s != nil {
add(core.JoinPath(s.codePath, "core", "agent", "config", "agents.yaml"))
}
return paths
}
func fetchLoopReadConfig(path string) map[string]any {
readResult := fs.Read(path)
if !readResult.OK {
return map[string]any{}
}
var raw map[string]any
if err := yaml.Unmarshal([]byte(readResult.Value.(string)), &raw); err != nil {
return map[string]any{}
}
return raw
}
func fetchLoopCollectRepoRefs(value any, add func(org, repo string)) {
appendRepo := func(raw string) {
org, repo, ok := fetchLoopParseRepo(raw)
if !ok {
return
}
add(org, repo)
}
switch typed := value.(type) {
case string:
appendRepo(typed)
case []string:
for _, raw := range typed {
appendRepo(raw)
}
case []any:
for _, item := range typed {
if raw, ok := item.(string); ok {
appendRepo(raw)
}
}
case map[string]any:
for raw := range typed {
appendRepo(raw)
}
}
}
func fetchLoopParseRepo(raw string) (string, string, bool) {
value := core.Trim(raw)
if value == "" {
return "", "", false
}
parts := core.Split(value, "/")
switch len(parts) {
case 1:
org, ok := validateName("core")
if !ok {
return "", "", false
}
repo, ok := validateName(parts[0])
return org, repo, ok
case 2:
org, ok := validateName(parts[0])
if !ok {
return "", "", false
}
repo, ok := validateName(parts[1])
return org, repo, ok
default:
return "", "", false
}
}
func fetchLoopDuration(value any) time.Duration {
switch typed := value.(type) {
case time.Duration:
if typed > 0 {
return typed
}
case string:
parsed, err := time.ParseDuration(core.Trim(typed))
if err == nil && parsed > 0 {
return parsed
}
case int:
if typed > 0 {
return time.Duration(typed) * time.Second
}
case int64:
if typed > 0 {
return time.Duration(typed) * time.Second
}
case float64:
if typed > 0 {
return time.Duration(typed * float64(time.Second))
}
}
return 0
}
func fetchLoopRepoName(ref fetchRepoRef) string {
return core.Concat(ref.Org, "/", ref.Repo)
}

View file

@ -0,0 +1,203 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"context"
"testing"
"time"
core "dappco.re/go/core"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestFetchLoop_RunFetchLoop_Good_TicksAtConfiguredInterval(t *testing.T) {
root := t.TempDir()
setTestWorkspace(t, root)
codePath := t.TempDir()
logPath := core.JoinPath(t.TempDir(), "git.log")
fetchLoopWriteGitScript(t, logPath, "bad-repo")
fetchLoopCreateRepo(t, codePath, "core", "good-repo")
require.True(t, fs.Write(core.JoinPath(root, "agents.yaml"), core.Concat(
"version: 1\n",
"dispatch:\n",
" fetch_interval: 25ms\n",
"repos:\n",
" - good-repo\n",
)).OK)
subsystem := fetchLoopTestPrep(codePath)
interval := subsystem.fetchLoopInterval()
assert.Equal(t, 25*time.Millisecond, interval)
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
subsystem.runFetchLoop(ctx, interval)
close(done)
}()
time.Sleep(10 * time.Millisecond)
assert.Equal(t, 0, fetchLoopLogCount(logPath, "good-repo", "fetch origin dev"))
fetchLoopWaitForCount(t, logPath, "good-repo", "fetch origin dev", 2, 250*time.Millisecond)
cancel()
fetchLoopWaitForDone(t, done)
}
func TestFetchLoop_RunFetchLoop_Bad_SurvivesFailingFetch(t *testing.T) {
root := t.TempDir()
setTestWorkspace(t, root)
codePath := t.TempDir()
logPath := core.JoinPath(t.TempDir(), "git.log")
fetchLoopWriteGitScript(t, logPath, "bad-repo")
fetchLoopCreateRepo(t, codePath, "core", "good-repo")
fetchLoopCreateRepo(t, codePath, "core", "bad-repo")
require.True(t, fs.Write(core.JoinPath(root, "agents.yaml"), core.Concat(
"version: 1\n",
"dispatch:\n",
" fetch_interval: 15ms\n",
"repos:\n",
" - good-repo\n",
" - bad-repo\n",
)).OK)
subsystem := fetchLoopTestPrep(codePath)
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
subsystem.runFetchLoop(ctx, subsystem.fetchLoopInterval())
close(done)
}()
fetchLoopWaitForCount(t, logPath, "bad-repo", "fetch origin dev", 1, 250*time.Millisecond)
fetchLoopWaitForCount(t, logPath, "good-repo", "fetch origin dev", 2, 250*time.Millisecond)
cancel()
fetchLoopWaitForDone(t, done)
}
func TestFetchLoop_RunFetchLoop_Ugly_StopsOnContextCancel(t *testing.T) {
root := t.TempDir()
setTestWorkspace(t, root)
codePath := t.TempDir()
logPath := core.JoinPath(t.TempDir(), "git.log")
fetchLoopWriteGitScript(t, logPath, "bad-repo")
fetchLoopCreateRepo(t, codePath, "core", "good-repo")
require.True(t, fs.Write(core.JoinPath(root, "agents.yaml"), core.Concat(
"version: 1\n",
"dispatch:\n",
" fetch_interval: 15ms\n",
"repos:\n",
" - good-repo\n",
)).OK)
subsystem := fetchLoopTestPrep(codePath)
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
subsystem.runFetchLoop(ctx, subsystem.fetchLoopInterval())
close(done)
}()
fetchLoopWaitForCount(t, logPath, "good-repo", "fetch origin dev", 1, 250*time.Millisecond)
cancel()
fetchLoopWaitForDone(t, done)
countAfterCancel := fetchLoopLogCount(logPath, "good-repo", "fetch origin dev")
time.Sleep(50 * time.Millisecond)
assert.Equal(t, countAfterCancel, fetchLoopLogCount(logPath, "good-repo", "fetch origin dev"))
}
func fetchLoopTestPrep(codePath string) *PrepSubsystem {
return &PrepSubsystem{
ServiceRuntime: core.NewServiceRuntime(testCore, AgentOptions{}),
codePath: codePath,
backoff: make(map[string]time.Time),
failCount: make(map[string]int),
}
}
func fetchLoopWriteGitScript(t *testing.T, logPath, badRepo string) {
t.Helper()
binDir := t.TempDir()
gitPath := core.JoinPath(binDir, "git")
script := core.Concat(
"#!/bin/sh\n",
"repo=$(basename \"$PWD\")\n",
"printf '%s|%s\\n' \"$repo\" \"$*\" >> ", logPath, "\n",
"if [ \"$1\" = \"symbolic-ref\" ]; then\n",
" printf 'origin/dev\\n'\n",
" exit 0\n",
"fi\n",
"if [ \"$1\" = \"fetch\" ] && [ \"$repo\" = \"", badRepo, "\" ]; then\n",
" exit 1\n",
"fi\n",
"exit 0\n",
)
require.True(t, fs.Write(gitPath, script).OK)
require.True(t, testCore.Process().RunIn(context.Background(), binDir, "chmod", "+x", gitPath).OK)
t.Setenv("PATH", core.Concat(binDir, ":", core.Env("PATH")))
}
func fetchLoopCreateRepo(t *testing.T, codePath, org, repo string) {
t.Helper()
repoDir := core.JoinPath(codePath, org, repo)
require.True(t, fs.EnsureDir(core.JoinPath(repoDir, ".git")).OK)
}
func fetchLoopWaitForCount(t *testing.T, logPath, repo, snippet string, want int, timeout time.Duration) {
t.Helper()
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
if fetchLoopLogCount(logPath, repo, snippet) >= want {
return
}
time.Sleep(5 * time.Millisecond)
}
require.GreaterOrEqual(t, fetchLoopLogCount(logPath, repo, snippet), want)
}
func fetchLoopWaitForDone(t *testing.T, done <-chan struct{}) {
t.Helper()
select {
case <-done:
case <-time.After(250 * time.Millisecond):
t.Fatal("fetch loop did not stop after cancellation")
}
}
func fetchLoopLogCount(logPath, repo, snippet string) int {
readResult := fs.Read(logPath)
if !readResult.OK {
return 0
}
content := core.Trim(readResult.Value.(string))
if content == "" {
return 0
}
count := 0
for _, line := range core.Split(content, "\n") {
if repo != "" && !core.HasPrefix(line, core.Concat(repo, "|")) {
continue
}
if snippet != "" && !core.Contains(line, snippet) {
continue
}
count++
}
return count
}

View file

@ -365,6 +365,7 @@ func (s *PrepSubsystem) OnStartup(ctx context.Context) core.Result {
if s.syncToken() != "" {
go s.runSyncFlushLoop(ctx, syncFlushScheduleInterval)
}
go s.runFetchLoop(ctx, s.fetchLoopInterval())
c.RegisterQuery(s.handleWorkspaceQuery)