Forgejo and Gitea SDK wrappers, multi-repo git utilities, AgentCI dispatch, distributed job orchestrator, and data collection pipelines. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
128 lines
3.4 KiB
Go
128 lines
3.4 KiB
Go
package collect
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
core "forge.lthn.ai/core/go/pkg/framework/core"
|
|
)
|
|
|
|
// Excavator runs multiple collectors as a coordinated operation.
|
|
// It provides sequential execution with rate limit respect, state tracking
|
|
// for resume support, and aggregated results.
|
|
type Excavator struct {
|
|
// Collectors is the list of collectors to run.
|
|
Collectors []Collector
|
|
|
|
// ScanOnly reports what would be collected without performing collection.
|
|
ScanOnly bool
|
|
|
|
// Resume enables incremental collection using saved state.
|
|
Resume bool
|
|
}
|
|
|
|
// Name returns the orchestrator name.
|
|
func (e *Excavator) Name() string {
|
|
return "excavator"
|
|
}
|
|
|
|
// Run executes all collectors sequentially, respecting rate limits and
|
|
// using state for resume support. Results are aggregated from all collectors.
|
|
func (e *Excavator) Run(ctx context.Context, cfg *Config) (*Result, error) {
|
|
result := &Result{Source: e.Name()}
|
|
|
|
if len(e.Collectors) == 0 {
|
|
return result, nil
|
|
}
|
|
|
|
if cfg.Dispatcher != nil {
|
|
cfg.Dispatcher.EmitStart(e.Name(), fmt.Sprintf("Starting excavation with %d collectors", len(e.Collectors)))
|
|
}
|
|
|
|
// Load state if resuming
|
|
if e.Resume && cfg.State != nil {
|
|
if err := cfg.State.Load(); err != nil {
|
|
return result, core.E("collect.Excavator.Run", "failed to load state", err)
|
|
}
|
|
}
|
|
|
|
// If scan-only, just report what would be collected
|
|
if e.ScanOnly {
|
|
for _, c := range e.Collectors {
|
|
if cfg.Dispatcher != nil {
|
|
cfg.Dispatcher.EmitProgress(e.Name(), fmt.Sprintf("[scan] Would run collector: %s", c.Name()), nil)
|
|
}
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
for i, c := range e.Collectors {
|
|
if ctx.Err() != nil {
|
|
return result, core.E("collect.Excavator.Run", "context cancelled", ctx.Err())
|
|
}
|
|
|
|
if cfg.Dispatcher != nil {
|
|
cfg.Dispatcher.EmitProgress(e.Name(),
|
|
fmt.Sprintf("Running collector %d/%d: %s", i+1, len(e.Collectors), c.Name()), nil)
|
|
}
|
|
|
|
// Check if we should skip (already completed in a previous run)
|
|
if e.Resume && cfg.State != nil {
|
|
if entry, ok := cfg.State.Get(c.Name()); ok {
|
|
if entry.Items > 0 && !entry.LastRun.IsZero() {
|
|
if cfg.Dispatcher != nil {
|
|
cfg.Dispatcher.EmitProgress(e.Name(),
|
|
fmt.Sprintf("Skipping %s (already collected %d items on %s)",
|
|
c.Name(), entry.Items, entry.LastRun.Format(time.RFC3339)), nil)
|
|
}
|
|
result.Skipped++
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
collectorResult, err := c.Collect(ctx, cfg)
|
|
if err != nil {
|
|
result.Errors++
|
|
if cfg.Dispatcher != nil {
|
|
cfg.Dispatcher.EmitError(e.Name(),
|
|
fmt.Sprintf("Collector %s failed: %v", c.Name(), err), nil)
|
|
}
|
|
continue
|
|
}
|
|
|
|
if collectorResult != nil {
|
|
result.Items += collectorResult.Items
|
|
result.Errors += collectorResult.Errors
|
|
result.Skipped += collectorResult.Skipped
|
|
result.Files = append(result.Files, collectorResult.Files...)
|
|
|
|
// Update state
|
|
if cfg.State != nil {
|
|
cfg.State.Set(c.Name(), &StateEntry{
|
|
Source: c.Name(),
|
|
LastRun: time.Now(),
|
|
Items: collectorResult.Items,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
// Save state
|
|
if cfg.State != nil {
|
|
if err := cfg.State.Save(); err != nil {
|
|
if cfg.Dispatcher != nil {
|
|
cfg.Dispatcher.EmitError(e.Name(), fmt.Sprintf("Failed to save state: %v", err), nil)
|
|
}
|
|
}
|
|
}
|
|
|
|
if cfg.Dispatcher != nil {
|
|
cfg.Dispatcher.EmitComplete(e.Name(),
|
|
fmt.Sprintf("Excavation complete: %d items, %d errors, %d skipped",
|
|
result.Items, result.Errors, result.Skipped), result)
|
|
}
|
|
|
|
return result, nil
|
|
}
|