Compare commits

...
Sign in to create a new pull request.

1 commit

Author SHA1 Message Date
google-labs-jules[bot]
46ffec7071 feat: Implement failure reporting and dead letter queue
This change introduces a new failure handling system for collection tasks.

- Created a new package `pkg/failures` to manage failure reporting, including a `Manager` to handle the lifecycle of a failure report, and `Failure` and `FailureReport` structs for storing failure data. The manager creates a `.borg-failures/<timestamp>` directory for each run, containing a `failures.json` report and a `retry.sh` script.
- Added a `borg failures` command with `show` and `clear` subcommands to manage failure reports.
- Added a `borg retry` command to retry failed collections.
- Added `--on-failure` and `--failures-dir` flags to the `collect` command.
- Refactored the `collect github repo` command to make the single-repository cloning logic reusable.
- Updated the `collect github repos` command to use the reusable cloning function and implement failure handling, including the `--on-failure=stop` and `--on-failure=prompt` options.
- Implemented failure categorization to distinguish between retryable and permanent failures.
- Implemented tracking of the number of attempts for each failed item.
- Created a placeholder file for a missing asset to fix the build.

Co-authored-by: Snider <631881+Snider@users.noreply.github.com>
2026-02-02 00:53:35 +00:00
8 changed files with 471 additions and 78 deletions

View file

@ -11,11 +11,14 @@ func init() {
RootCmd.AddCommand(GetCollectCmd())
}
func NewCollectCmd() *cobra.Command {
return &cobra.Command{
cmd := &cobra.Command{
Use: "collect",
Short: "Collect a resource from a URI.",
Long: `Collect a resource from a URI and store it in a DataNode.`,
}
cmd.PersistentFlags().String("on-failure", "continue", "Action to take on failure: continue, stop, prompt")
cmd.PersistentFlags().String("failures-dir", ".borg-failures", "Directory to store failure reports")
return cmd
}
func GetCollectCmd() *cobra.Command {

View file

@ -37,81 +37,7 @@ func NewCollectGithubRepoCmd() *cobra.Command {
compression, _ := cmd.Flags().GetString("compression")
password, _ := cmd.Flags().GetString("password")
if format != "datanode" && format != "tim" && format != "trix" && format != "stim" {
return fmt.Errorf("invalid format: %s (must be 'datanode', 'tim', 'trix', or 'stim')", format)
}
if compression != "none" && compression != "gz" && compression != "xz" {
return fmt.Errorf("invalid compression: %s (must be 'none', 'gz', or 'xz')", compression)
}
prompter := ui.NewNonInteractivePrompter(ui.GetVCSQuote)
prompter.Start()
defer prompter.Stop()
var progressWriter io.Writer
if prompter.IsInteractive() {
bar := ui.NewProgressBar(-1, "Cloning repository")
progressWriter = ui.NewProgressWriter(bar)
}
dn, err := GitCloner.CloneGitRepository(repoURL, progressWriter)
if err != nil {
return fmt.Errorf("error cloning repository: %w", err)
}
var data []byte
if format == "tim" {
t, err := tim.FromDataNode(dn)
if err != nil {
return fmt.Errorf("error creating tim: %w", err)
}
data, err = t.ToTar()
if err != nil {
return fmt.Errorf("error serializing tim: %w", err)
}
} else if format == "stim" {
if password == "" {
return fmt.Errorf("password required for stim format")
}
t, err := tim.FromDataNode(dn)
if err != nil {
return fmt.Errorf("error creating tim: %w", err)
}
data, err = t.ToSigil(password)
if err != nil {
return fmt.Errorf("error encrypting stim: %w", err)
}
} else if format == "trix" {
data, err = trix.ToTrix(dn, password)
if err != nil {
return fmt.Errorf("error serializing trix: %w", err)
}
} else {
data, err = dn.ToTar()
if err != nil {
return fmt.Errorf("error serializing DataNode: %w", err)
}
}
compressedData, err := compress.Compress(data, compression)
if err != nil {
return fmt.Errorf("error compressing data: %w", err)
}
if outputFile == "" {
outputFile = "repo." + format
if compression != "none" {
outputFile += "." + compression
}
}
err = os.WriteFile(outputFile, compressedData, defaultFilePermission)
if err != nil {
return fmt.Errorf("error writing DataNode to file: %w", err)
}
fmt.Fprintln(cmd.OutOrStdout(), "Repository saved to", outputFile)
return nil
return collectRepo(repoURL, outputFile, format, compression, password, cmd)
},
}
cmd.Flags().String("output", "", "Output file for the DataNode")
@ -121,6 +47,84 @@ func NewCollectGithubRepoCmd() *cobra.Command {
return cmd
}
func collectRepo(repoURL, outputFile, format, compression, password string, cmd *cobra.Command) error {
if format != "datanode" && format != "tim" && format != "trix" && format != "stim" {
return fmt.Errorf("invalid format: %s (must be 'datanode', 'tim', 'trix', or 'stim')", format)
}
if compression != "none" && compression != "gz" && compression != "xz" {
return fmt.Errorf("invalid compression: %s (must be 'none', 'gz', or 'xz')", compression)
}
prompter := ui.NewNonInteractivePrompter(ui.GetVCSQuote)
prompter.Start()
defer prompter.Stop()
var progressWriter io.Writer
if prompter.IsInteractive() {
bar := ui.NewProgressBar(-1, "Cloning repository")
progressWriter = ui.NewProgressWriter(bar)
}
dn, err := GitCloner.CloneGitRepository(repoURL, progressWriter)
if err != nil {
return fmt.Errorf("error cloning repository: %w", err)
}
var data []byte
if format == "tim" {
t, err := tim.FromDataNode(dn)
if err != nil {
return fmt.Errorf("error creating tim: %w", err)
}
data, err = t.ToTar()
if err != nil {
return fmt.Errorf("error serializing tim: %w", err)
}
} else if format == "stim" {
if password == "" {
return fmt.Errorf("password required for stim format")
}
t, err := tim.FromDataNode(dn)
if err != nil {
return fmt.Errorf("error creating tim: %w", err)
}
data, err = t.ToSigil(password)
if err != nil {
return fmt.Errorf("error encrypting stim: %w", err)
}
} else if format == "trix" {
data, err = trix.ToTrix(dn, password)
if err != nil {
return fmt.Errorf("error serializing trix: %w", err)
}
} else {
data, err = dn.ToTar()
if err != nil {
return fmt.Errorf("error serializing DataNode: %w", err)
}
}
compressedData, err := compress.Compress(data, compression)
if err != nil {
return fmt.Errorf("error compressing data: %w", err)
}
if outputFile == "" {
outputFile = "repo." + format
if compression != "none" {
outputFile += "." + compression
}
}
err = os.WriteFile(outputFile, compressedData, defaultFilePermission)
if err != nil {
return fmt.Errorf("error writing DataNode to file: %w", err)
}
fmt.Fprintln(cmd.OutOrStdout(), "Repository saved to", outputFile)
return nil
}
func init() {
collectGithubCmd.AddCommand(NewCollectGithubRepoCmd())
}

View file

@ -2,7 +2,9 @@ package cmd
import (
"fmt"
"strings"
"github.com/Snider/Borg/pkg/failures"
"github.com/Snider/Borg/pkg/github"
"github.com/spf13/cobra"
)
@ -17,13 +19,57 @@ var collectGithubReposCmd = &cobra.Command{
Short: "Collects all public repositories for a user or organization",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
failuresDir, _ := cmd.Flags().GetString("failures-dir")
onFailure, _ := cmd.Flags().GetString("on-failure")
manager, err := failures.NewManager(failuresDir, "github:repos:"+args[0])
if err != nil {
return fmt.Errorf("failed to create failure manager: %w", err)
}
defer manager.Finalize()
repos, err := GithubClient.GetPublicRepos(cmd.Context(), args[0])
if err != nil {
return err
}
for _, repo := range repos {
fmt.Fprintln(cmd.OutOrStdout(), repo)
manager.SetTotal(len(repos))
attempts := make(map[string]int)
for i := 0; i < len(repos); i++ {
repo := repos[i]
attempts[repo]++
fmt.Fprintln(cmd.OutOrStdout(), "Collecting", repo)
err := collectRepo(repo, "", "datanode", "none", "", cmd)
if err != nil {
retryable := !strings.Contains(err.Error(), "not found")
manager.RecordFailure(&failures.Failure{
URL: repo,
Error: err.Error(),
Retryable: retryable,
Attempts: attempts[repo],
})
if onFailure == "stop" {
return fmt.Errorf("stopping on first failure: %w", err)
} else if onFailure == "prompt" {
fmt.Printf("Failed to collect %s. Would you like to (c)ontinue, (s)top, or (r)etry? ", repo)
var response string
fmt.Scanln(&response)
switch response {
case "s":
return fmt.Errorf("stopping on user prompt")
case "r":
i-- // Retry the same repo
continue
default:
// Continue
}
}
}
}
return nil
},
}

105
cmd/failures.go Normal file
View file

@ -0,0 +1,105 @@
package cmd
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"time"
"github.com/Snider/Borg/pkg/failures"
"github.com/spf13/cobra"
)
var failuresCmd = &cobra.Command{
Use: "failures",
Short: "Manage failures from collection runs",
}
var failuresShowCmd = &cobra.Command{
Use: "show [run-directory]",
Short: "Show a summary of a failure report",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
reportPath := filepath.Join(args[0], "failures.json")
data, err := os.ReadFile(reportPath)
if err != nil {
return fmt.Errorf("failed to read failure report: %w", err)
}
var report failures.FailureReport
if err := json.Unmarshal(data, &report); err != nil {
return fmt.Errorf("failed to parse failure report: %w", err)
}
fmt.Printf("Collection: %s\n", report.Collection)
fmt.Printf("Started: %s\n", report.Started.Format(time.RFC3339))
fmt.Printf("Completed: %s\n", report.Completed.Format(time.RFC3339))
fmt.Printf("Total: %d\n", report.Stats.Total)
fmt.Printf("Success: %d\n", report.Stats.Success)
fmt.Printf("Failed: %d\n", report.Stats.Failed)
if len(report.Failures) > 0 {
fmt.Println("\nFailures:")
for _, f := range report.Failures {
fmt.Printf(" - URL: %s\n", f.URL)
fmt.Printf(" Error: %s\n", f.Error)
}
}
return nil
},
}
var failuresClearCmd = &cobra.Command{
Use: "clear",
Short: "Clear old failure reports",
RunE: func(cmd *cobra.Command, args []string) error {
olderThan, _ := cmd.Flags().GetString("older-than")
failuresDir, _ := cmd.Flags().GetString("failures-dir")
if failuresDir == "" {
failuresDir = ".borg-failures"
}
duration, err := time.ParseDuration(olderThan)
if err != nil {
return fmt.Errorf("invalid duration for --older-than: %w", err)
}
cutoff := time.Now().Add(-duration)
entries, err := os.ReadDir(failuresDir)
if err != nil {
return fmt.Errorf("failed to read failures directory: %w", err)
}
for _, entry := range entries {
if entry.IsDir() {
runTime, err := time.Parse("2006-01-02T15-04-05", entry.Name())
if err != nil {
// Ignore directories that don't match the timestamp format
continue
}
if runTime.Before(cutoff) {
runPath := filepath.Join(failuresDir, entry.Name())
fmt.Printf("Removing old failure directory: %s\n", runPath)
if err := os.RemoveAll(runPath); err != nil {
fmt.Fprintf(os.Stderr, "failed to remove %s: %v\n", runPath, err)
}
}
}
}
return nil
},
}
func init() {
RootCmd.AddCommand(failuresCmd)
failuresCmd.AddCommand(failuresShowCmd)
failuresCmd.AddCommand(failuresClearCmd)
failuresClearCmd.Flags().String("older-than", "720h", "Clear failures older than this duration (e.g., 7d, 24h)")
failuresClearCmd.Flags().String("failures-dir", ".borg-failures", "The directory where failures are stored")
}

56
cmd/retry.go Normal file
View file

@ -0,0 +1,56 @@
package cmd
import (
"encoding/json"
"fmt"
"os"
"os/exec"
"path/filepath"
"github.com/Snider/Borg/pkg/failures"
"github.com/spf13/cobra"
)
var retryCmd = &cobra.Command{
Use: "retry [run-directory]",
Short: "Retry failures from a collection run",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
fmt.Printf("Retrying failures from %s...\n", args[0])
onlyRetryable, _ := cmd.Flags().GetBool("only-retryable")
reportPath := filepath.Join(args[0], "failures.json")
data, err := os.ReadFile(reportPath)
if err != nil {
return fmt.Errorf("failed to read failure report: %w", err)
}
var report failures.FailureReport
if err := json.Unmarshal(data, &report); err != nil {
return fmt.Errorf("failed to parse failure report: %w", err)
}
for _, failure := range report.Failures {
if onlyRetryable && !failure.Retryable {
fmt.Printf("Skipping non-retryable failure: %s\n", failure.URL)
continue
}
fmt.Printf("Retrying %s...\n", failure.URL)
retryCmd := exec.Command("borg", "collect", "github", "repo", failure.URL)
retryCmd.Stdout = os.Stdout
retryCmd.Stderr = os.Stderr
if err := retryCmd.Run(); err != nil {
fmt.Fprintf(os.Stderr, "failed to retry %s: %v\n", failure.URL, err)
}
}
return nil
},
}
func init() {
RootCmd.AddCommand(retryCmd)
retryCmd.Flags().Bool("only-retryable", false, "Retry only failures marked as retryable")
}

81
pkg/failures/manager.go Normal file
View file

@ -0,0 +1,81 @@
package failures
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"strings"
"time"
)
// Manager handles the lifecycle of a failure report.
type Manager struct {
failuresDir string
runDir string
report *FailureReport
}
// NewManager creates a new failure manager for a given collection.
func NewManager(failuresDir, collection string) (*Manager, error) {
if failuresDir == "" {
failuresDir = ".borg-failures"
}
runDir := filepath.Join(failuresDir, time.Now().Format("2006-01-02T15-04-05"))
if err := os.MkdirAll(runDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create failures directory: %w", err)
}
return &Manager{
failuresDir: failuresDir,
runDir: runDir,
report: &FailureReport{
Collection: collection,
Started: time.Now(),
},
}, nil
}
// RecordFailure records a single failure.
func (m *Manager) RecordFailure(failure *Failure) {
m.report.Failures = append(m.report.Failures, failure)
m.report.Stats.Failed++
}
// SetTotal sets the total number of items to be processed.
func (m *Manager) SetTotal(total int) {
m.report.Stats.Total = total
}
// Finalize completes the failure report, writing it to disk.
func (m *Manager) Finalize() error {
m.report.Completed = time.Now()
m.report.Stats.Success = m.report.Stats.Total - m.report.Stats.Failed
// Write failures.json
reportPath := filepath.Join(m.runDir, "failures.json")
reportFile, err := os.Create(reportPath)
if err != nil {
return fmt.Errorf("failed to create failures.json: %w", err)
}
defer reportFile.Close()
encoder := json.NewEncoder(reportFile)
encoder.SetIndent("", " ")
if err := encoder.Encode(m.report); err != nil {
return fmt.Errorf("failed to write failures.json: %w", err)
}
// Write retry.sh
var retryScript strings.Builder
retryScript.WriteString("#!/bin/bash\n\n")
for _, failure := range m.report.Failures {
retryScript.WriteString(fmt.Sprintf("borg collect github repo %s\n", failure.URL))
}
retryPath := filepath.Join(m.runDir, "retry.sh")
if err := os.WriteFile(retryPath, []byte(retryScript.String()), 0755); err != nil {
return fmt.Errorf("failed to write retry.sh: %w", err)
}
return nil
}

View file

@ -0,0 +1,74 @@
package failures
import (
"encoding/json"
"os"
"path/filepath"
"strings"
"testing"
)
func TestManager(t *testing.T) {
tempDir, err := os.MkdirTemp("", "borg-failures-test")
if err != nil {
t.Fatalf("failed to create temp dir: %v", err)
}
defer os.RemoveAll(tempDir)
manager, err := NewManager(tempDir, "test-collection")
if err != nil {
t.Fatalf("failed to create manager: %v", err)
}
manager.SetTotal(1)
manager.RecordFailure(&Failure{
URL: "http://example.com/failed",
Error: "test error",
Retryable: true,
})
if err := manager.Finalize(); err != nil {
t.Fatalf("failed to finalize manager: %v", err)
}
// Verify failures.json
reportPath := filepath.Join(manager.runDir, "failures.json")
if _, err := os.Stat(reportPath); os.IsNotExist(err) {
t.Fatalf("failures.json was not created")
}
data, err := os.ReadFile(reportPath)
if err != nil {
t.Fatalf("failed to read failures.json: %v", err)
}
var report FailureReport
if err := json.Unmarshal(data, &report); err != nil {
t.Fatalf("failed to unmarshal failures.json: %v", err)
}
if report.Collection != "test-collection" {
t.Errorf("expected collection 'test-collection', got '%s'", report.Collection)
}
if len(report.Failures) != 1 {
t.Fatalf("expected 1 failure, got %d", len(report.Failures))
}
if report.Failures[0].URL != "http://example.com/failed" {
t.Errorf("unexpected failure URL: %s", report.Failures[0].URL)
}
// Verify retry.sh
retryPath := filepath.Join(manager.runDir, "retry.sh")
if _, err := os.Stat(retryPath); os.IsNotExist(err) {
t.Fatalf("retry.sh was not created")
}
retryScript, err := os.ReadFile(retryPath)
if err != nil {
t.Fatalf("failed to read retry.sh: %v", err)
}
if !strings.Contains(string(retryScript), "http://example.com/failed") {
t.Errorf("retry.sh does not contain the failed URL")
}
}

24
pkg/failures/types.go Normal file
View file

@ -0,0 +1,24 @@
package failures
import "time"
// Failure represents a single failure event.
type Failure struct {
URL string `json:"url"`
Error string `json:"error"`
Attempts int `json:"attempts"`
Retryable bool `json:"retryable"`
}
// FailureReport represents a collection of failures for a specific run.
type FailureReport struct {
Collection string `json:"collection"`
Started time.Time `json:"started"`
Completed time.Time `json:"completed"`
Stats struct {
Total int `json:"total"`
Success int `json:"success"`
Failed int `json:"failed"`
} `json:"stats"`
Failures []*Failure `json:"failures"`
}