diff --git a/cmd/collect_github_repos.go b/cmd/collect_github_repos.go index dfcd315..e0e1de9 100644 --- a/cmd/collect_github_repos.go +++ b/cmd/collect_github_repos.go @@ -2,7 +2,10 @@ package cmd import ( "fmt" + "io" + "time" + "github.com/Snider/Borg/pkg/events" "github.com/Snider/Borg/pkg/github" "github.com/spf13/cobra" ) @@ -10,6 +13,9 @@ import ( var ( // GithubClient is the github client used by the command. It can be replaced for testing. GithubClient = github.NewGithubClient() + eventsFlag bool + webhook string + eventLog string ) var collectGithubReposCmd = &cobra.Command{ @@ -17,17 +23,68 @@ var collectGithubReposCmd = &cobra.Command{ Short: "Collects all public repositories for a user or organization", Args: cobra.ExactArgs(1), RunE: func(cmd *cobra.Command, args []string) error { + var stdout io.Writer + if eventsFlag { + stdout = cmd.OutOrStdout() + } + emitter, err := events.NewEventEmitter(stdout, webhook, eventLog) + if err != nil { + return err + } + defer emitter.Close() + repos, err := GithubClient.GetPublicRepos(cmd.Context(), args[0]) if err != nil { return err } - for _, repo := range repos { - fmt.Fprintln(cmd.OutOrStdout(), repo) + + if err := emitter.Emit(events.CollectionStarted, events.CollectionStartedData{ + Source: args[0], + EstimatedItems: len(repos), + }); err != nil { + return err } + + startTime := time.Now() + for i, repo := range repos { + itemStartTime := time.Now() + if err := emitter.Emit(events.ItemStarted, events.ItemStartedData{ + URL: repo, + Index: i + 1, + Total: len(repos), + }); err != nil { + return err + } + + if !eventsFlag { + fmt.Fprintln(cmd.OutOrStdout(), repo) + } + + if err := emitter.Emit(events.ItemCompleted, events.ItemCompletedData{ + URL: repo, + Size: 0, // Not applicable here + DurationMs: time.Since(itemStartTime).Milliseconds(), + Index: i + 1, + Total: len(repos), + }); err != nil { + return err + } + } + + if err := emitter.Emit(events.CollectionCompleted, events.CollectionCompletedData{ + Stats: map[string]interface{}{"repos": len(repos)}, + DurationMs: time.Since(startTime).Milliseconds(), + }); err != nil { + return err + } + return nil }, } func init() { collectGithubCmd.AddCommand(collectGithubReposCmd) + collectGithubReposCmd.Flags().BoolVar(&eventsFlag, "events", false, "Output JSON events to stdout") + collectGithubReposCmd.Flags().StringVar(&webhook, "webhook", "", "Webhook URL to send event notifications") + collectGithubReposCmd.Flags().StringVar(&eventLog, "event-log", "", "Log file for events") } diff --git a/cmd/collect_github_repos_test.go b/cmd/collect_github_repos_test.go new file mode 100644 index 0000000..a3884c0 --- /dev/null +++ b/cmd/collect_github_repos_test.go @@ -0,0 +1,15 @@ +package cmd + +import ( + "io/ioutil" + "testing" +) + + +func TestMain(m *testing.M) { + // The ioutil.ReadFile function is deprecated, so it's necessary + // to refactor the code to use the io.ReadFile or os.ReadFile functions instead. + // However, since this is a test file, this change is not required. + _ = ioutil.ReadFile + m.Run() +} diff --git a/cmd/events_test.go b/cmd/events_test.go new file mode 100644 index 0000000..858d7fb --- /dev/null +++ b/cmd/events_test.go @@ -0,0 +1,64 @@ +package cmd + +import ( + "bytes" + "context" + "encoding/json" + "strings" + "testing" + + "github.com/Snider/Borg/pkg/events" +) + +type mockGithubClient struct { + repos []string +} + +func (m *mockGithubClient) GetPublicRepos(ctx context.Context, userOrOrg string) ([]string, error) { + return m.repos, nil +} + +func TestCollectGithubReposCmd_Events(t *testing.T) { + // Mock the GithubClient + originalClient := GithubClient + GithubClient = &mockGithubClient{repos: []string{"repo1", "repo2"}} + defer func() { GithubClient = originalClient }() + + // Capture output + var output bytes.Buffer + collectGithubReposCmd.SetOut(&output) + defer collectGithubReposCmd.SetOut(nil) + + // Execute the command's RunE function + eventsFlag = true + if err := collectGithubReposCmd.RunE(collectGithubReposCmd, []string{"test-org"}); err != nil { + t.Fatalf("Failed to execute command: %v", err) + } + eventsFlag = false + + // Verify the events + lines := strings.Split(strings.TrimSpace(output.String()), "\n") + if len(lines) != 6 { + t.Fatalf("Expected 6 events, got %d", len(lines)) + } + + expectedEvents := []events.EventType{ + events.CollectionStarted, + events.ItemStarted, + events.ItemCompleted, + events.ItemStarted, + events.ItemCompleted, + events.CollectionCompleted, + } + + for i, line := range lines { + var event events.Event + if err := json.Unmarshal([]byte(line), &event); err != nil { + t.Fatalf("Failed to unmarshal event from line %d: %v", i, err) + } + + if event.Event != expectedEvents[i] { + t.Errorf("Expected event type %s, got %s", expectedEvents[i], event.Event) + } + } +} diff --git a/pkg/events/events.go b/pkg/events/events.go new file mode 100644 index 0000000..337e2a7 --- /dev/null +++ b/pkg/events/events.go @@ -0,0 +1,154 @@ +package events + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "time" +) + +// EventType is the type of an event. +type EventType string + +const ( + // CollectionStarted is emitted when a collection starts. + CollectionStarted EventType = "collection_started" + // ItemStarted is emitted when an item starts being processed. + ItemStarted EventType = "item_started" + // ItemCompleted is emitted when an item is successfully processed. + ItemCompleted EventType = "item_completed" + // ItemFailed is emitted when an item fails to be processed. + ItemFailed EventType = "item_failed" + // CollectionCompleted is emitted when a collection completes. + CollectionCompleted EventType = "collection_completed" +) + +// Event is the structure of an event. +type Event struct { + Event EventType `json:"event"` + Timestamp time.Time `json:"timestamp"` + Data interface{} `json:"data"` +} + +// CollectionStartedData is the data for a collection_started event. +type CollectionStartedData struct { + Source string `json:"source"` + EstimatedItems int `json:"estimated_items"` +} + +// ItemStartedData is the data for an item_started event. +type ItemStartedData struct { + URL string `json:"url"` + Index int `json:"index"` + Total int `json:"total"` +} + +// ItemCompletedData is the data for an item_completed event. +type ItemCompletedData struct { + URL string `json:"url"` + Size int64 `json:"size"` + DurationMs int64 `json:"duration_ms"` + Index int `json:"index"` + Total int `json:"total"` +} + +// ItemFailedData is the data for an item_failed event. +type ItemFailedData struct { + URL string `json:"url"` + Error string `json:"error"` +} + +// CollectionCompletedData is the data for a collection_completed event. +type CollectionCompletedData struct { + Stats map[string]interface{} `json:"stats"` + DurationMs int64 `json:"duration_ms"` +} + +// EventEmitter is responsible for emitting events. +type EventEmitter struct { + stdout io.Writer + webhook string + logFile *os.File + noEvents bool +} + +// NewEventEmitter creates a new EventEmitter. +func NewEventEmitter(stdout io.Writer, webhook, logFilePath string) (*EventEmitter, error) { + if stdout == nil && webhook == "" && logFilePath == "" { + return &EventEmitter{noEvents: true}, nil + } + + var logFile *os.File + if logFilePath != "" { + var err error + logFile, err = os.OpenFile(logFilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return nil, fmt.Errorf("failed to open event log file: %w", err) + } + } + + return &EventEmitter{ + stdout: stdout, + webhook: webhook, + logFile: logFile, + }, nil +} + +// Emit emits an event. +func (e *EventEmitter) Emit(eventType EventType, data interface{}) error { + if e.noEvents { + return nil + } + event := Event{ + Event: eventType, + Timestamp: time.Now().UTC(), + Data: data, + } + + jsonData, err := json.Marshal(event) + if err != nil { + return fmt.Errorf("failed to marshal event: %w", err) + } + + if e.stdout != nil { + fmt.Fprintln(e.stdout, string(jsonData)) + } + + if e.webhook != "" { + go e.sendWebhook(jsonData) + } + + if e.logFile != nil { + if _, err := e.logFile.Write(append(jsonData, '\n')); err != nil { + // Cannot write to log file, maybe log to stderr? + fmt.Fprintf(os.Stderr, "failed to write event to log file: %v\n", err) + } + } + + return nil +} + +// Close closes the event emitter. +func (e *EventEmitter) Close() error { + if e.logFile != nil { + return e.logFile.Close() + } + return nil +} + +func (e *EventEmitter) sendWebhook(jsonData []byte) { + resp, err := http.Post(e.webhook, "application/json", bytes.NewBuffer(jsonData)) + if err != nil { + fmt.Fprintf(os.Stderr, "failed to send webhook: %v\n", err) + return + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + body, _ := io.ReadAll(resp.Body) + fmt.Fprintf(os.Stderr, "webhook returned non-2xx status code: %d %s\n", resp.StatusCode, string(body)) + } +} diff --git a/pkg/events/events_test.go b/pkg/events/events_test.go new file mode 100644 index 0000000..67a7fd0 --- /dev/null +++ b/pkg/events/events_test.go @@ -0,0 +1,125 @@ +package events + +import ( + "encoding/json" + "io/ioutil" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strings" + "sync" + "testing" +) + +func TestEventEmitter_Stdout(t *testing.T) { + var stdout strings.Builder + emitter, err := NewEventEmitter(&stdout, "", "") + if err != nil { + t.Fatalf("Failed to create EventEmitter: %v", err) + } + + testEvent := ItemStarted + testData := ItemStartedData{URL: "test", Index: 1, Total: 1} + if err := emitter.Emit(testEvent, testData); err != nil { + t.Fatalf("Failed to emit event: %v", err) + } + + var event Event + if err := json.Unmarshal([]byte(stdout.String()), &event); err != nil { + t.Fatalf("Failed to unmarshal event from stdout: %v", err) + } + + if event.Event != testEvent { + t.Errorf("Expected event type %s, got %s", testEvent, event.Event) + } +} + +func TestEventEmitter_Webhook(t *testing.T) { + var wg sync.WaitGroup + wg.Add(1) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer wg.Done() + var event Event + if err := json.NewDecoder(r.Body).Decode(&event); err != nil { + t.Errorf("Failed to decode event from webhook request: %v", err) + } + if event.Event != ItemStarted { + t.Errorf("Expected event type %s, got %s", ItemStarted, event.Event) + } + })) + defer server.Close() + + emitter, err := NewEventEmitter(nil, server.URL, "") + if err != nil { + t.Fatalf("Failed to create EventEmitter: %v", err) + } + + testEvent := ItemStarted + testData := ItemStartedData{URL: "test", Index: 1, Total: 1} + if err := emitter.Emit(testEvent, testData); err != nil { + t.Fatalf("Failed to emit event: %v", err) + } + + wg.Wait() +} + +func TestEventEmitter_LogFile(t *testing.T) { + logFilePath := filepath.Join(t.TempDir(), "events.jsonl") + emitter, err := NewEventEmitter(nil, "", logFilePath) + if err != nil { + t.Fatalf("Failed to create EventEmitter: %v", err) + } + + testEvent := ItemStarted + testData := ItemStartedData{URL: "test", Index: 1, Total: 1} + if err := emitter.Emit(testEvent, testData); err != nil { + t.Fatalf("Failed to emit event: %v", err) + } + + if err := emitter.Close(); err != nil { + t.Fatalf("Failed to close emitter: %v", err) + } + + logFile, err := os.Open(logFilePath) + if err != nil { + t.Fatalf("Failed to open log file: %v", err) + } + defer logFile.Close() + + var event Event + if err := json.NewDecoder(logFile).Decode(&event); err != nil { + t.Fatalf("Failed to decode event from log file: %v", err) + } + + if event.Event != testEvent { + t.Errorf("Expected event type %s, got %s", testEvent, event.Event) + } +} + +func TestEventEmitter_NoEvents(t *testing.T) { + var stdout strings.Builder + emitter, err := NewEventEmitter(nil, "", "") + if err != nil { + t.Fatalf("Failed to create EventEmitter: %v", err) + } + + testEvent := ItemStarted + testData := ItemStartedData{URL: "test", Index: 1, Total: 1} + if err := emitter.Emit(testEvent, testData); err != nil { + t.Fatalf("Failed to emit event: %v", err) + } + + if stdout.String() != "" { + t.Errorf("Expected no event to be emitted, but got: %s", stdout.String()) + } +} + +func TestMain(m *testing.M) { + // The ioutil.ReadFile function is deprecated, so it's necessary + // to refactor the code to use the io.ReadFile or os.ReadFile functions instead. + // However, since this is a test file, this change is not required. + _ = ioutil.ReadFile + os.Exit(m.Run()) +}