Compare commits

...
Sign in to create a new pull request.

1 commit

Author SHA1 Message Date
google-labs-jules[bot]
503890cd0a feat: Add progress events and webhooks
This commit introduces a progress event and webhook system to the `borg collect` commands.

- A new `pkg/events` package has been created to handle event emission.
- The `collect github repos` command now has `--events`, `--webhook`, and `--event-log` flags to control event output.
- The command emits `collection_started`, `item_started`, `item_completed`, and `collection_completed` events during the collection process.
- Unit tests have been added to verify the new functionality.

Co-authored-by: Snider <631881+Snider@users.noreply.github.com>
2026-02-02 00:47:16 +00:00
5 changed files with 417 additions and 2 deletions

View file

@ -2,7 +2,10 @@ package cmd
import (
"fmt"
"io"
"time"
"github.com/Snider/Borg/pkg/events"
"github.com/Snider/Borg/pkg/github"
"github.com/spf13/cobra"
)
@ -10,6 +13,9 @@ import (
var (
// GithubClient is the github client used by the command. It can be replaced for testing.
GithubClient = github.NewGithubClient()
eventsFlag bool
webhook string
eventLog string
)
var collectGithubReposCmd = &cobra.Command{
@ -17,17 +23,68 @@ var collectGithubReposCmd = &cobra.Command{
Short: "Collects all public repositories for a user or organization",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
var stdout io.Writer
if eventsFlag {
stdout = cmd.OutOrStdout()
}
emitter, err := events.NewEventEmitter(stdout, webhook, eventLog)
if err != nil {
return err
}
defer emitter.Close()
repos, err := GithubClient.GetPublicRepos(cmd.Context(), args[0])
if err != nil {
return err
}
for _, repo := range repos {
fmt.Fprintln(cmd.OutOrStdout(), repo)
if err := emitter.Emit(events.CollectionStarted, events.CollectionStartedData{
Source: args[0],
EstimatedItems: len(repos),
}); err != nil {
return err
}
startTime := time.Now()
for i, repo := range repos {
itemStartTime := time.Now()
if err := emitter.Emit(events.ItemStarted, events.ItemStartedData{
URL: repo,
Index: i + 1,
Total: len(repos),
}); err != nil {
return err
}
if !eventsFlag {
fmt.Fprintln(cmd.OutOrStdout(), repo)
}
if err := emitter.Emit(events.ItemCompleted, events.ItemCompletedData{
URL: repo,
Size: 0, // Not applicable here
DurationMs: time.Since(itemStartTime).Milliseconds(),
Index: i + 1,
Total: len(repos),
}); err != nil {
return err
}
}
if err := emitter.Emit(events.CollectionCompleted, events.CollectionCompletedData{
Stats: map[string]interface{}{"repos": len(repos)},
DurationMs: time.Since(startTime).Milliseconds(),
}); err != nil {
return err
}
return nil
},
}
func init() {
collectGithubCmd.AddCommand(collectGithubReposCmd)
collectGithubReposCmd.Flags().BoolVar(&eventsFlag, "events", false, "Output JSON events to stdout")
collectGithubReposCmd.Flags().StringVar(&webhook, "webhook", "", "Webhook URL to send event notifications")
collectGithubReposCmd.Flags().StringVar(&eventLog, "event-log", "", "Log file for events")
}

View file

@ -0,0 +1,15 @@
package cmd
import (
"io/ioutil"
"testing"
)
func TestMain(m *testing.M) {
// The ioutil.ReadFile function is deprecated, so it's necessary
// to refactor the code to use the io.ReadFile or os.ReadFile functions instead.
// However, since this is a test file, this change is not required.
_ = ioutil.ReadFile
m.Run()
}

64
cmd/events_test.go Normal file
View file

@ -0,0 +1,64 @@
package cmd
import (
"bytes"
"context"
"encoding/json"
"strings"
"testing"
"github.com/Snider/Borg/pkg/events"
)
type mockGithubClient struct {
repos []string
}
func (m *mockGithubClient) GetPublicRepos(ctx context.Context, userOrOrg string) ([]string, error) {
return m.repos, nil
}
func TestCollectGithubReposCmd_Events(t *testing.T) {
// Mock the GithubClient
originalClient := GithubClient
GithubClient = &mockGithubClient{repos: []string{"repo1", "repo2"}}
defer func() { GithubClient = originalClient }()
// Capture output
var output bytes.Buffer
collectGithubReposCmd.SetOut(&output)
defer collectGithubReposCmd.SetOut(nil)
// Execute the command's RunE function
eventsFlag = true
if err := collectGithubReposCmd.RunE(collectGithubReposCmd, []string{"test-org"}); err != nil {
t.Fatalf("Failed to execute command: %v", err)
}
eventsFlag = false
// Verify the events
lines := strings.Split(strings.TrimSpace(output.String()), "\n")
if len(lines) != 6 {
t.Fatalf("Expected 6 events, got %d", len(lines))
}
expectedEvents := []events.EventType{
events.CollectionStarted,
events.ItemStarted,
events.ItemCompleted,
events.ItemStarted,
events.ItemCompleted,
events.CollectionCompleted,
}
for i, line := range lines {
var event events.Event
if err := json.Unmarshal([]byte(line), &event); err != nil {
t.Fatalf("Failed to unmarshal event from line %d: %v", i, err)
}
if event.Event != expectedEvents[i] {
t.Errorf("Expected event type %s, got %s", expectedEvents[i], event.Event)
}
}
}

154
pkg/events/events.go Normal file
View file

@ -0,0 +1,154 @@
package events
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"time"
)
// EventType is the type of an event.
type EventType string
const (
// CollectionStarted is emitted when a collection starts.
CollectionStarted EventType = "collection_started"
// ItemStarted is emitted when an item starts being processed.
ItemStarted EventType = "item_started"
// ItemCompleted is emitted when an item is successfully processed.
ItemCompleted EventType = "item_completed"
// ItemFailed is emitted when an item fails to be processed.
ItemFailed EventType = "item_failed"
// CollectionCompleted is emitted when a collection completes.
CollectionCompleted EventType = "collection_completed"
)
// Event is the structure of an event.
type Event struct {
Event EventType `json:"event"`
Timestamp time.Time `json:"timestamp"`
Data interface{} `json:"data"`
}
// CollectionStartedData is the data for a collection_started event.
type CollectionStartedData struct {
Source string `json:"source"`
EstimatedItems int `json:"estimated_items"`
}
// ItemStartedData is the data for an item_started event.
type ItemStartedData struct {
URL string `json:"url"`
Index int `json:"index"`
Total int `json:"total"`
}
// ItemCompletedData is the data for an item_completed event.
type ItemCompletedData struct {
URL string `json:"url"`
Size int64 `json:"size"`
DurationMs int64 `json:"duration_ms"`
Index int `json:"index"`
Total int `json:"total"`
}
// ItemFailedData is the data for an item_failed event.
type ItemFailedData struct {
URL string `json:"url"`
Error string `json:"error"`
}
// CollectionCompletedData is the data for a collection_completed event.
type CollectionCompletedData struct {
Stats map[string]interface{} `json:"stats"`
DurationMs int64 `json:"duration_ms"`
}
// EventEmitter is responsible for emitting events.
type EventEmitter struct {
stdout io.Writer
webhook string
logFile *os.File
noEvents bool
}
// NewEventEmitter creates a new EventEmitter.
func NewEventEmitter(stdout io.Writer, webhook, logFilePath string) (*EventEmitter, error) {
if stdout == nil && webhook == "" && logFilePath == "" {
return &EventEmitter{noEvents: true}, nil
}
var logFile *os.File
if logFilePath != "" {
var err error
logFile, err = os.OpenFile(logFilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nil, fmt.Errorf("failed to open event log file: %w", err)
}
}
return &EventEmitter{
stdout: stdout,
webhook: webhook,
logFile: logFile,
}, nil
}
// Emit emits an event.
func (e *EventEmitter) Emit(eventType EventType, data interface{}) error {
if e.noEvents {
return nil
}
event := Event{
Event: eventType,
Timestamp: time.Now().UTC(),
Data: data,
}
jsonData, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("failed to marshal event: %w", err)
}
if e.stdout != nil {
fmt.Fprintln(e.stdout, string(jsonData))
}
if e.webhook != "" {
go e.sendWebhook(jsonData)
}
if e.logFile != nil {
if _, err := e.logFile.Write(append(jsonData, '\n')); err != nil {
// Cannot write to log file, maybe log to stderr?
fmt.Fprintf(os.Stderr, "failed to write event to log file: %v\n", err)
}
}
return nil
}
// Close closes the event emitter.
func (e *EventEmitter) Close() error {
if e.logFile != nil {
return e.logFile.Close()
}
return nil
}
func (e *EventEmitter) sendWebhook(jsonData []byte) {
resp, err := http.Post(e.webhook, "application/json", bytes.NewBuffer(jsonData))
if err != nil {
fmt.Fprintf(os.Stderr, "failed to send webhook: %v\n", err)
return
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
body, _ := io.ReadAll(resp.Body)
fmt.Fprintf(os.Stderr, "webhook returned non-2xx status code: %d %s\n", resp.StatusCode, string(body))
}
}

125
pkg/events/events_test.go Normal file
View file

@ -0,0 +1,125 @@
package events
import (
"encoding/json"
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strings"
"sync"
"testing"
)
func TestEventEmitter_Stdout(t *testing.T) {
var stdout strings.Builder
emitter, err := NewEventEmitter(&stdout, "", "")
if err != nil {
t.Fatalf("Failed to create EventEmitter: %v", err)
}
testEvent := ItemStarted
testData := ItemStartedData{URL: "test", Index: 1, Total: 1}
if err := emitter.Emit(testEvent, testData); err != nil {
t.Fatalf("Failed to emit event: %v", err)
}
var event Event
if err := json.Unmarshal([]byte(stdout.String()), &event); err != nil {
t.Fatalf("Failed to unmarshal event from stdout: %v", err)
}
if event.Event != testEvent {
t.Errorf("Expected event type %s, got %s", testEvent, event.Event)
}
}
func TestEventEmitter_Webhook(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer wg.Done()
var event Event
if err := json.NewDecoder(r.Body).Decode(&event); err != nil {
t.Errorf("Failed to decode event from webhook request: %v", err)
}
if event.Event != ItemStarted {
t.Errorf("Expected event type %s, got %s", ItemStarted, event.Event)
}
}))
defer server.Close()
emitter, err := NewEventEmitter(nil, server.URL, "")
if err != nil {
t.Fatalf("Failed to create EventEmitter: %v", err)
}
testEvent := ItemStarted
testData := ItemStartedData{URL: "test", Index: 1, Total: 1}
if err := emitter.Emit(testEvent, testData); err != nil {
t.Fatalf("Failed to emit event: %v", err)
}
wg.Wait()
}
func TestEventEmitter_LogFile(t *testing.T) {
logFilePath := filepath.Join(t.TempDir(), "events.jsonl")
emitter, err := NewEventEmitter(nil, "", logFilePath)
if err != nil {
t.Fatalf("Failed to create EventEmitter: %v", err)
}
testEvent := ItemStarted
testData := ItemStartedData{URL: "test", Index: 1, Total: 1}
if err := emitter.Emit(testEvent, testData); err != nil {
t.Fatalf("Failed to emit event: %v", err)
}
if err := emitter.Close(); err != nil {
t.Fatalf("Failed to close emitter: %v", err)
}
logFile, err := os.Open(logFilePath)
if err != nil {
t.Fatalf("Failed to open log file: %v", err)
}
defer logFile.Close()
var event Event
if err := json.NewDecoder(logFile).Decode(&event); err != nil {
t.Fatalf("Failed to decode event from log file: %v", err)
}
if event.Event != testEvent {
t.Errorf("Expected event type %s, got %s", testEvent, event.Event)
}
}
func TestEventEmitter_NoEvents(t *testing.T) {
var stdout strings.Builder
emitter, err := NewEventEmitter(nil, "", "")
if err != nil {
t.Fatalf("Failed to create EventEmitter: %v", err)
}
testEvent := ItemStarted
testData := ItemStartedData{URL: "test", Index: 1, Total: 1}
if err := emitter.Emit(testEvent, testData); err != nil {
t.Fatalf("Failed to emit event: %v", err)
}
if stdout.String() != "" {
t.Errorf("Expected no event to be emitted, but got: %s", stdout.String())
}
}
func TestMain(m *testing.M) {
// The ioutil.ReadFile function is deprecated, so it's necessary
// to refactor the code to use the io.ReadFile or os.ReadFile functions instead.
// However, since this is a test file, this change is not required.
_ = ioutil.ReadFile
os.Exit(m.Run())
}