feat: Add progress events and webhooks
This commit introduces a progress event and webhook system to the `borg collect` commands. - A new `pkg/events` package has been created to handle event emission. - The `collect github repos` command now has `--events`, `--webhook`, and `--event-log` flags to control event output. - The command emits `collection_started`, `item_started`, `item_completed`, and `collection_completed` events during the collection process. - Unit tests have been added to verify the new functionality. Co-authored-by: Snider <631881+Snider@users.noreply.github.com>
This commit is contained in:
parent
cf2af53ed3
commit
503890cd0a
5 changed files with 417 additions and 2 deletions
|
|
@ -2,7 +2,10 @@ package cmd
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/Snider/Borg/pkg/events"
|
||||
"github.com/Snider/Borg/pkg/github"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
|
@ -10,6 +13,9 @@ import (
|
|||
var (
|
||||
// GithubClient is the github client used by the command. It can be replaced for testing.
|
||||
GithubClient = github.NewGithubClient()
|
||||
eventsFlag bool
|
||||
webhook string
|
||||
eventLog string
|
||||
)
|
||||
|
||||
var collectGithubReposCmd = &cobra.Command{
|
||||
|
|
@ -17,17 +23,68 @@ var collectGithubReposCmd = &cobra.Command{
|
|||
Short: "Collects all public repositories for a user or organization",
|
||||
Args: cobra.ExactArgs(1),
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
var stdout io.Writer
|
||||
if eventsFlag {
|
||||
stdout = cmd.OutOrStdout()
|
||||
}
|
||||
emitter, err := events.NewEventEmitter(stdout, webhook, eventLog)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer emitter.Close()
|
||||
|
||||
repos, err := GithubClient.GetPublicRepos(cmd.Context(), args[0])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, repo := range repos {
|
||||
fmt.Fprintln(cmd.OutOrStdout(), repo)
|
||||
|
||||
if err := emitter.Emit(events.CollectionStarted, events.CollectionStartedData{
|
||||
Source: args[0],
|
||||
EstimatedItems: len(repos),
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
startTime := time.Now()
|
||||
for i, repo := range repos {
|
||||
itemStartTime := time.Now()
|
||||
if err := emitter.Emit(events.ItemStarted, events.ItemStartedData{
|
||||
URL: repo,
|
||||
Index: i + 1,
|
||||
Total: len(repos),
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !eventsFlag {
|
||||
fmt.Fprintln(cmd.OutOrStdout(), repo)
|
||||
}
|
||||
|
||||
if err := emitter.Emit(events.ItemCompleted, events.ItemCompletedData{
|
||||
URL: repo,
|
||||
Size: 0, // Not applicable here
|
||||
DurationMs: time.Since(itemStartTime).Milliseconds(),
|
||||
Index: i + 1,
|
||||
Total: len(repos),
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := emitter.Emit(events.CollectionCompleted, events.CollectionCompletedData{
|
||||
Stats: map[string]interface{}{"repos": len(repos)},
|
||||
DurationMs: time.Since(startTime).Milliseconds(),
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
func init() {
|
||||
collectGithubCmd.AddCommand(collectGithubReposCmd)
|
||||
collectGithubReposCmd.Flags().BoolVar(&eventsFlag, "events", false, "Output JSON events to stdout")
|
||||
collectGithubReposCmd.Flags().StringVar(&webhook, "webhook", "", "Webhook URL to send event notifications")
|
||||
collectGithubReposCmd.Flags().StringVar(&eventLog, "event-log", "", "Log file for events")
|
||||
}
|
||||
|
|
|
|||
15
cmd/collect_github_repos_test.go
Normal file
15
cmd/collect_github_repos_test.go
Normal file
|
|
@ -0,0 +1,15 @@
|
|||
package cmd
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"testing"
|
||||
)
|
||||
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
// The ioutil.ReadFile function is deprecated, so it's necessary
|
||||
// to refactor the code to use the io.ReadFile or os.ReadFile functions instead.
|
||||
// However, since this is a test file, this change is not required.
|
||||
_ = ioutil.ReadFile
|
||||
m.Run()
|
||||
}
|
||||
64
cmd/events_test.go
Normal file
64
cmd/events_test.go
Normal file
|
|
@ -0,0 +1,64 @@
|
|||
package cmd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/Snider/Borg/pkg/events"
|
||||
)
|
||||
|
||||
type mockGithubClient struct {
|
||||
repos []string
|
||||
}
|
||||
|
||||
func (m *mockGithubClient) GetPublicRepos(ctx context.Context, userOrOrg string) ([]string, error) {
|
||||
return m.repos, nil
|
||||
}
|
||||
|
||||
func TestCollectGithubReposCmd_Events(t *testing.T) {
|
||||
// Mock the GithubClient
|
||||
originalClient := GithubClient
|
||||
GithubClient = &mockGithubClient{repos: []string{"repo1", "repo2"}}
|
||||
defer func() { GithubClient = originalClient }()
|
||||
|
||||
// Capture output
|
||||
var output bytes.Buffer
|
||||
collectGithubReposCmd.SetOut(&output)
|
||||
defer collectGithubReposCmd.SetOut(nil)
|
||||
|
||||
// Execute the command's RunE function
|
||||
eventsFlag = true
|
||||
if err := collectGithubReposCmd.RunE(collectGithubReposCmd, []string{"test-org"}); err != nil {
|
||||
t.Fatalf("Failed to execute command: %v", err)
|
||||
}
|
||||
eventsFlag = false
|
||||
|
||||
// Verify the events
|
||||
lines := strings.Split(strings.TrimSpace(output.String()), "\n")
|
||||
if len(lines) != 6 {
|
||||
t.Fatalf("Expected 6 events, got %d", len(lines))
|
||||
}
|
||||
|
||||
expectedEvents := []events.EventType{
|
||||
events.CollectionStarted,
|
||||
events.ItemStarted,
|
||||
events.ItemCompleted,
|
||||
events.ItemStarted,
|
||||
events.ItemCompleted,
|
||||
events.CollectionCompleted,
|
||||
}
|
||||
|
||||
for i, line := range lines {
|
||||
var event events.Event
|
||||
if err := json.Unmarshal([]byte(line), &event); err != nil {
|
||||
t.Fatalf("Failed to unmarshal event from line %d: %v", i, err)
|
||||
}
|
||||
|
||||
if event.Event != expectedEvents[i] {
|
||||
t.Errorf("Expected event type %s, got %s", expectedEvents[i], event.Event)
|
||||
}
|
||||
}
|
||||
}
|
||||
154
pkg/events/events.go
Normal file
154
pkg/events/events.go
Normal file
|
|
@ -0,0 +1,154 @@
|
|||
package events
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"time"
|
||||
)
|
||||
|
||||
// EventType is the type of an event.
|
||||
type EventType string
|
||||
|
||||
const (
|
||||
// CollectionStarted is emitted when a collection starts.
|
||||
CollectionStarted EventType = "collection_started"
|
||||
// ItemStarted is emitted when an item starts being processed.
|
||||
ItemStarted EventType = "item_started"
|
||||
// ItemCompleted is emitted when an item is successfully processed.
|
||||
ItemCompleted EventType = "item_completed"
|
||||
// ItemFailed is emitted when an item fails to be processed.
|
||||
ItemFailed EventType = "item_failed"
|
||||
// CollectionCompleted is emitted when a collection completes.
|
||||
CollectionCompleted EventType = "collection_completed"
|
||||
)
|
||||
|
||||
// Event is the structure of an event.
|
||||
type Event struct {
|
||||
Event EventType `json:"event"`
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
Data interface{} `json:"data"`
|
||||
}
|
||||
|
||||
// CollectionStartedData is the data for a collection_started event.
|
||||
type CollectionStartedData struct {
|
||||
Source string `json:"source"`
|
||||
EstimatedItems int `json:"estimated_items"`
|
||||
}
|
||||
|
||||
// ItemStartedData is the data for an item_started event.
|
||||
type ItemStartedData struct {
|
||||
URL string `json:"url"`
|
||||
Index int `json:"index"`
|
||||
Total int `json:"total"`
|
||||
}
|
||||
|
||||
// ItemCompletedData is the data for an item_completed event.
|
||||
type ItemCompletedData struct {
|
||||
URL string `json:"url"`
|
||||
Size int64 `json:"size"`
|
||||
DurationMs int64 `json:"duration_ms"`
|
||||
Index int `json:"index"`
|
||||
Total int `json:"total"`
|
||||
}
|
||||
|
||||
// ItemFailedData is the data for an item_failed event.
|
||||
type ItemFailedData struct {
|
||||
URL string `json:"url"`
|
||||
Error string `json:"error"`
|
||||
}
|
||||
|
||||
// CollectionCompletedData is the data for a collection_completed event.
|
||||
type CollectionCompletedData struct {
|
||||
Stats map[string]interface{} `json:"stats"`
|
||||
DurationMs int64 `json:"duration_ms"`
|
||||
}
|
||||
|
||||
// EventEmitter is responsible for emitting events.
|
||||
type EventEmitter struct {
|
||||
stdout io.Writer
|
||||
webhook string
|
||||
logFile *os.File
|
||||
noEvents bool
|
||||
}
|
||||
|
||||
// NewEventEmitter creates a new EventEmitter.
|
||||
func NewEventEmitter(stdout io.Writer, webhook, logFilePath string) (*EventEmitter, error) {
|
||||
if stdout == nil && webhook == "" && logFilePath == "" {
|
||||
return &EventEmitter{noEvents: true}, nil
|
||||
}
|
||||
|
||||
var logFile *os.File
|
||||
if logFilePath != "" {
|
||||
var err error
|
||||
logFile, err = os.OpenFile(logFilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open event log file: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return &EventEmitter{
|
||||
stdout: stdout,
|
||||
webhook: webhook,
|
||||
logFile: logFile,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Emit emits an event.
|
||||
func (e *EventEmitter) Emit(eventType EventType, data interface{}) error {
|
||||
if e.noEvents {
|
||||
return nil
|
||||
}
|
||||
event := Event{
|
||||
Event: eventType,
|
||||
Timestamp: time.Now().UTC(),
|
||||
Data: data,
|
||||
}
|
||||
|
||||
jsonData, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal event: %w", err)
|
||||
}
|
||||
|
||||
if e.stdout != nil {
|
||||
fmt.Fprintln(e.stdout, string(jsonData))
|
||||
}
|
||||
|
||||
if e.webhook != "" {
|
||||
go e.sendWebhook(jsonData)
|
||||
}
|
||||
|
||||
if e.logFile != nil {
|
||||
if _, err := e.logFile.Write(append(jsonData, '\n')); err != nil {
|
||||
// Cannot write to log file, maybe log to stderr?
|
||||
fmt.Fprintf(os.Stderr, "failed to write event to log file: %v\n", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes the event emitter.
|
||||
func (e *EventEmitter) Close() error {
|
||||
if e.logFile != nil {
|
||||
return e.logFile.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *EventEmitter) sendWebhook(jsonData []byte) {
|
||||
resp, err := http.Post(e.webhook, "application/json", bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "failed to send webhook: %v\n", err)
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
fmt.Fprintf(os.Stderr, "webhook returned non-2xx status code: %d %s\n", resp.StatusCode, string(body))
|
||||
}
|
||||
}
|
||||
125
pkg/events/events_test.go
Normal file
125
pkg/events/events_test.go
Normal file
|
|
@ -0,0 +1,125 @@
|
|||
package events
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestEventEmitter_Stdout(t *testing.T) {
|
||||
var stdout strings.Builder
|
||||
emitter, err := NewEventEmitter(&stdout, "", "")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create EventEmitter: %v", err)
|
||||
}
|
||||
|
||||
testEvent := ItemStarted
|
||||
testData := ItemStartedData{URL: "test", Index: 1, Total: 1}
|
||||
if err := emitter.Emit(testEvent, testData); err != nil {
|
||||
t.Fatalf("Failed to emit event: %v", err)
|
||||
}
|
||||
|
||||
var event Event
|
||||
if err := json.Unmarshal([]byte(stdout.String()), &event); err != nil {
|
||||
t.Fatalf("Failed to unmarshal event from stdout: %v", err)
|
||||
}
|
||||
|
||||
if event.Event != testEvent {
|
||||
t.Errorf("Expected event type %s, got %s", testEvent, event.Event)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventEmitter_Webhook(t *testing.T) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
defer wg.Done()
|
||||
var event Event
|
||||
if err := json.NewDecoder(r.Body).Decode(&event); err != nil {
|
||||
t.Errorf("Failed to decode event from webhook request: %v", err)
|
||||
}
|
||||
if event.Event != ItemStarted {
|
||||
t.Errorf("Expected event type %s, got %s", ItemStarted, event.Event)
|
||||
}
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
emitter, err := NewEventEmitter(nil, server.URL, "")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create EventEmitter: %v", err)
|
||||
}
|
||||
|
||||
testEvent := ItemStarted
|
||||
testData := ItemStartedData{URL: "test", Index: 1, Total: 1}
|
||||
if err := emitter.Emit(testEvent, testData); err != nil {
|
||||
t.Fatalf("Failed to emit event: %v", err)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestEventEmitter_LogFile(t *testing.T) {
|
||||
logFilePath := filepath.Join(t.TempDir(), "events.jsonl")
|
||||
emitter, err := NewEventEmitter(nil, "", logFilePath)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create EventEmitter: %v", err)
|
||||
}
|
||||
|
||||
testEvent := ItemStarted
|
||||
testData := ItemStartedData{URL: "test", Index: 1, Total: 1}
|
||||
if err := emitter.Emit(testEvent, testData); err != nil {
|
||||
t.Fatalf("Failed to emit event: %v", err)
|
||||
}
|
||||
|
||||
if err := emitter.Close(); err != nil {
|
||||
t.Fatalf("Failed to close emitter: %v", err)
|
||||
}
|
||||
|
||||
logFile, err := os.Open(logFilePath)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to open log file: %v", err)
|
||||
}
|
||||
defer logFile.Close()
|
||||
|
||||
var event Event
|
||||
if err := json.NewDecoder(logFile).Decode(&event); err != nil {
|
||||
t.Fatalf("Failed to decode event from log file: %v", err)
|
||||
}
|
||||
|
||||
if event.Event != testEvent {
|
||||
t.Errorf("Expected event type %s, got %s", testEvent, event.Event)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventEmitter_NoEvents(t *testing.T) {
|
||||
var stdout strings.Builder
|
||||
emitter, err := NewEventEmitter(nil, "", "")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create EventEmitter: %v", err)
|
||||
}
|
||||
|
||||
testEvent := ItemStarted
|
||||
testData := ItemStartedData{URL: "test", Index: 1, Total: 1}
|
||||
if err := emitter.Emit(testEvent, testData); err != nil {
|
||||
t.Fatalf("Failed to emit event: %v", err)
|
||||
}
|
||||
|
||||
if stdout.String() != "" {
|
||||
t.Errorf("Expected no event to be emitted, but got: %s", stdout.String())
|
||||
}
|
||||
}
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
// The ioutil.ReadFile function is deprecated, so it's necessary
|
||||
// to refactor the code to use the io.ReadFile or os.ReadFile functions instead.
|
||||
// However, since this is a test file, this change is not required.
|
||||
_ = ioutil.ReadFile
|
||||
os.Exit(m.Run())
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue