cli/pkg/collect/excavate.go
Snider 32a3613a3a feat: add collect, config, crypt, plugin packages and fix all lint issues
Add four new infrastructure packages with CLI commands:
- pkg/config: layered configuration (defaults → file → env → flags)
- pkg/crypt: crypto primitives (Argon2id, AES-GCM, ChaCha20, HMAC, checksums)
- pkg/plugin: plugin system with GitHub-based install/update/remove
- pkg/collect: collection subsystem (GitHub, BitcoinTalk, market, papers, excavate)

Fix all golangci-lint issues across the entire codebase (~100 errcheck,
staticcheck SA1012/SA1019/ST1005, unused, ineffassign fixes) so that
`core go qa` passes with 0 issues.

Closes #167, #168, #170, #250, #251, #252, #253, #254, #255, #256

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 11:32:41 +00:00

128 lines
3.4 KiB
Go

package collect
import (
"context"
"fmt"
"time"
core "github.com/host-uk/core/pkg/framework/core"
)
// Excavator runs multiple collectors as a coordinated operation.
// It provides sequential execution with rate limit respect, state tracking
// for resume support, and aggregated results.
type Excavator struct {
// Collectors is the list of collectors to run.
Collectors []Collector
// ScanOnly reports what would be collected without performing collection.
ScanOnly bool
// Resume enables incremental collection using saved state.
Resume bool
}
// Name returns the orchestrator name.
func (e *Excavator) Name() string {
return "excavator"
}
// Run executes all collectors sequentially, respecting rate limits and
// using state for resume support. Results are aggregated from all collectors.
func (e *Excavator) Run(ctx context.Context, cfg *Config) (*Result, error) {
result := &Result{Source: e.Name()}
if len(e.Collectors) == 0 {
return result, nil
}
if cfg.Dispatcher != nil {
cfg.Dispatcher.EmitStart(e.Name(), fmt.Sprintf("Starting excavation with %d collectors", len(e.Collectors)))
}
// Load state if resuming
if e.Resume && cfg.State != nil {
if err := cfg.State.Load(); err != nil {
return result, core.E("collect.Excavator.Run", "failed to load state", err)
}
}
// If scan-only, just report what would be collected
if e.ScanOnly {
for _, c := range e.Collectors {
if cfg.Dispatcher != nil {
cfg.Dispatcher.EmitProgress(e.Name(), fmt.Sprintf("[scan] Would run collector: %s", c.Name()), nil)
}
}
return result, nil
}
for i, c := range e.Collectors {
if ctx.Err() != nil {
return result, core.E("collect.Excavator.Run", "context cancelled", ctx.Err())
}
if cfg.Dispatcher != nil {
cfg.Dispatcher.EmitProgress(e.Name(),
fmt.Sprintf("Running collector %d/%d: %s", i+1, len(e.Collectors), c.Name()), nil)
}
// Check if we should skip (already completed in a previous run)
if e.Resume && cfg.State != nil {
if entry, ok := cfg.State.Get(c.Name()); ok {
if entry.Items > 0 && !entry.LastRun.IsZero() {
if cfg.Dispatcher != nil {
cfg.Dispatcher.EmitProgress(e.Name(),
fmt.Sprintf("Skipping %s (already collected %d items on %s)",
c.Name(), entry.Items, entry.LastRun.Format(time.RFC3339)), nil)
}
result.Skipped++
continue
}
}
}
collectorResult, err := c.Collect(ctx, cfg)
if err != nil {
result.Errors++
if cfg.Dispatcher != nil {
cfg.Dispatcher.EmitError(e.Name(),
fmt.Sprintf("Collector %s failed: %v", c.Name(), err), nil)
}
continue
}
if collectorResult != nil {
result.Items += collectorResult.Items
result.Errors += collectorResult.Errors
result.Skipped += collectorResult.Skipped
result.Files = append(result.Files, collectorResult.Files...)
// Update state
if cfg.State != nil {
cfg.State.Set(c.Name(), &StateEntry{
Source: c.Name(),
LastRun: time.Now(),
Items: collectorResult.Items,
})
}
}
}
// Save state
if cfg.State != nil {
if err := cfg.State.Save(); err != nil {
if cfg.Dispatcher != nil {
cfg.Dispatcher.EmitError(e.Name(), fmt.Sprintf("Failed to save state: %v", err), nil)
}
}
}
if cfg.Dispatcher != nil {
cfg.Dispatcher.EmitComplete(e.Name(),
fmt.Sprintf("Excavation complete: %d items, %d errors, %d skipped",
result.Items, result.Errors, result.Skipped), result)
}
return result, nil
}