agent/pkg/agentic/fetch_loop.go
Snider 83df8ad71a fix(agent): address CodeRabbit + SonarCloud findings on PR #6
20+ CHANGES_REQUESTED dispositions across PHP MCP services, Go pkg/agentic,
hermes_runner_mcp Python server, plugin shell scripts.

Highlights:
- DatabaseSchema.php: identifier quoting
- AwardCredits.php: task row locking order
- CreditTransaction.php: fail-fast row decoding
- OpenApiGenerator.php: YAML parse handling + uri query params
- CaptureDispatchResultJob.php: AgentProfile namespace fix
- CreditsController.php: missing workspace_id fail-closed
- QueryAuditService.php: prose query false positives + unbounded aggregation
- McpHealthService.php: proc_close after timeout + env var resolution
- CreditLedger.php + FleetOverview.php: workspace agent + dispatch target validation
- McpAgentServerCommand.php: quota burn on failed tool calls
- McpMetricsService.php: N-day window consistency
- hermes_runner_mcp: API key off command line + invalid method+id + run_id encoding
- CircuitBreaker.php: extracted CircuitOpenException class with autoload-correct placement
- pkg/agentic + brain + flow: SonarCloud sendMessage/fetchLoopRepoRefs/commitWorkspace/Connect annotations
- shell scripts: removed [[ usage for portability

43 files modified, 1 new (CircuitOpenException.php).

Verification: gofmt -w + php -l + python3 -m py_compile + bash -n all clean.
Touched-package go test passes (pkg/lib/flow, pkg/lib).
Full go test ./... blocked by pre-existing dappco.re module graph drift, out of scope.

Parked for separate work:
- Mantis #1062: go.mod local replace removal (cross-repo architectural)
- Mantis #1063: Sonar residual line-length / duplication quality-gate cluster

Closes findings on https://github.com/dAppCore/agent/pull/6

Co-authored-by: Codex <noreply@openai.com>
2026-04-27 13:39:24 +01:00

301 lines
6.5 KiB
Go

// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"context"
"time"
core "dappco.re/go/core"
"gopkg.in/yaml.v3"
)
const fetchLoopDefaultInterval = 5 * time.Minute
type fetchRepoRef struct {
Org string
Repo string
}
// go s.runFetchLoop(ctx, 5*time.Minute)
func (s *PrepSubsystem) runFetchLoop(ctx context.Context, interval time.Duration) {
if s == nil || s.ServiceRuntime == nil || ctx == nil || interval <= 0 {
return
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
s.runFetchLoopTicks(ctx, ticker.C)
}
func (s *PrepSubsystem) runFetchLoopTicks(ctx context.Context, ticks <-chan time.Time) {
if s == nil || s.ServiceRuntime == nil || ctx == nil || ticks == nil {
return
}
for {
select {
case <-ctx.Done():
return
case <-ticks:
s.fetchRegisteredRepos(ctx)
}
}
}
func (s *PrepSubsystem) fetchLoopInterval() time.Duration {
if s != nil && s.ServiceRuntime != nil {
if result := s.Core().Config().Get("agents.fetch_interval"); result.OK {
if interval := fetchLoopDuration(result.Value); interval > 0 {
return interval
}
}
}
for _, path := range s.fetchLoopConfigPaths() {
raw := fetchLoopReadConfig(path)
if interval := fetchLoopDuration(raw["fetch_interval"]); interval > 0 {
return interval
}
if dispatch, ok := raw["dispatch"].(map[string]any); ok {
if interval := fetchLoopDuration(dispatch["fetch_interval"]); interval > 0 {
return interval
}
}
}
return fetchLoopDefaultInterval
}
func (s *PrepSubsystem) fetchRegisteredRepos(ctx context.Context) {
if s == nil || s.ServiceRuntime == nil || ctx == nil {
return
}
seen := map[string]bool{}
for _, ref := range s.fetchLoopRepoRefs() {
if ctx.Err() != nil {
return
}
name := fetchLoopRepoName(ref)
repoDir := s.localRepoDir(ref.Org, ref.Repo)
if repoDir == "" || !fs.IsDir(core.JoinPath(repoDir, ".git")) {
core.Warn("agentic fetch loop skipped repo", "repo", name, "path", repoDir)
continue
}
if seen[repoDir] {
continue
}
seen[repoDir] = true
branch := s.DefaultBranch(repoDir)
args := []string{"git", "fetch", "origin"}
if branch != "" {
args = append(args, branch)
}
result := s.Core().Process().RunIn(ctx, repoDir, args...)
if !result.OK {
core.Warn("agentic fetch loop failed", "repo", name, "branch", branch, "reason", result.Value)
continue
}
core.Info("agentic fetch loop fetched repo", "repo", name, "branch", branch)
}
}
func (s *PrepSubsystem) fetchLoopRepoRefs() []fetchRepoRef {
seen := map[string]bool{}
refs := []fetchRepoRef{}
add := func(org, repo string) { fetchLoopAppendRepoRef(seen, &refs, org, repo) }
s.fetchLoopCollectConfiguredRepoRefs(add)
s.fetchLoopCollectWorkspaceRepoRefs(add)
return refs
}
func fetchLoopAppendRepoRef(seen map[string]bool, refs *[]fetchRepoRef, org, repo string) {
orgName, ok := validateName(org)
if !ok {
return
}
repoName, ok := validateName(repo)
if !ok {
return
}
key := core.Concat(orgName, "/", repoName)
if seen[key] {
return
}
seen[key] = true
*refs = append(*refs, fetchRepoRef{Org: orgName, Repo: repoName})
}
func (s *PrepSubsystem) fetchLoopCollectConfiguredRepoRefs(add func(org, repo string)) {
if s != nil && s.ServiceRuntime != nil {
if result := s.Core().Config().Get("agents.fetch_repos"); result.OK {
fetchLoopCollectRepoRefs(result.Value, add)
}
}
for _, path := range s.fetchLoopConfigPaths() {
fetchLoopCollectConfigRepoRefs(fetchLoopReadConfig(path), add)
}
}
func fetchLoopCollectConfigRepoRefs(raw map[string]any, add func(org, repo string)) {
fetchLoopCollectRepoRefs(raw["repos"], add)
agents, ok := raw["agents"].(map[string]any)
if !ok {
return
}
for _, value := range agents {
agent, ok := value.(map[string]any)
if !ok {
continue
}
fetchLoopCollectRepoRefs(agent["repos"], add)
}
}
func (s *PrepSubsystem) fetchLoopCollectWorkspaceRepoRefs(add func(org, repo string)) {
for _, repoDir := range core.PathGlob(core.JoinPath(WorkspaceRoot(), "*", "*")) {
if !fs.IsDir(repoDir) {
continue
}
add(core.PathBase(core.PathDir(repoDir)), core.PathBase(repoDir))
}
}
func (s *PrepSubsystem) fetchLoopConfigPaths() []string {
paths := []string{}
seen := map[string]bool{}
add := func(path string) {
clean := core.Trim(path)
if clean == "" || seen[clean] {
return
}
seen[clean] = true
paths = append(paths, clean)
}
if s != nil && s.ServiceRuntime != nil {
if result := s.Core().Config().Get("agents.config_path"); result.OK {
if path, ok := result.Value.(string); ok {
add(path)
}
}
}
add(core.JoinPath(CoreRoot(), "agents.yaml"))
if s != nil {
add(core.JoinPath(s.codePath, "core", "agent", "config", "agents.yaml"))
}
return paths
}
func fetchLoopReadConfig(path string) map[string]any {
readResult := fs.Read(path)
if !readResult.OK {
return map[string]any{}
}
var raw map[string]any
if err := yaml.Unmarshal([]byte(readResult.Value.(string)), &raw); err != nil {
return map[string]any{}
}
return raw
}
func fetchLoopCollectRepoRefs(value any, add func(org, repo string)) {
appendRepo := func(raw string) {
org, repo, ok := fetchLoopParseRepo(raw)
if !ok {
return
}
add(org, repo)
}
switch typed := value.(type) {
case string:
appendRepo(typed)
case []string:
for _, raw := range typed {
appendRepo(raw)
}
case []any:
for _, item := range typed {
if raw, ok := item.(string); ok {
appendRepo(raw)
}
}
case map[string]any:
for raw := range typed {
appendRepo(raw)
}
}
}
func fetchLoopParseRepo(raw string) (string, string, bool) {
value := core.Trim(raw)
if value == "" {
return "", "", false
}
parts := core.Split(value, "/")
switch len(parts) {
case 1:
org, ok := validateName("core")
if !ok {
return "", "", false
}
repo, ok := validateName(parts[0])
return org, repo, ok
case 2:
org, ok := validateName(parts[0])
if !ok {
return "", "", false
}
repo, ok := validateName(parts[1])
return org, repo, ok
default:
return "", "", false
}
}
func fetchLoopDuration(value any) time.Duration {
switch typed := value.(type) {
case time.Duration:
if typed > 0 {
return typed
}
case string:
parsed, err := time.ParseDuration(core.Trim(typed))
if err == nil && parsed > 0 {
return parsed
}
case int:
if typed > 0 {
return time.Duration(typed) * time.Second
}
case int64:
if typed > 0 {
return time.Duration(typed) * time.Second
}
case float64:
if typed > 0 {
return time.Duration(typed * float64(time.Second))
}
}
return 0
}
func fetchLoopRepoName(ref fetchRepoRef) string {
return core.Concat(ref.Org, "/", ref.Repo)
}