From 2f4d2e58110dfdf209cf5e87305f4318a11c1eee Mon Sep 17 00:00:00 2001 From: Virgil Date: Wed, 1 Apr 2026 06:14:28 +0000 Subject: [PATCH] feat(jobrunner): add journal replay query Co-Authored-By: Virgil --- docs/history.md | 4 +- jobrunner/journal.go | 220 +++++++++++++++++++++++++++++++++++++- jobrunner/journal_test.go | 118 ++++++++++++++++++++ 3 files changed, 338 insertions(+), 4 deletions(-) diff --git a/docs/history.md b/docs/history.md index c019d93..13f64cd 100644 --- a/docs/history.md +++ b/docs/history.md @@ -130,9 +130,9 @@ The Forgejo SDK v2 and Gitea SDK do not accept `context.Context`. All Forgejo/Gi None of the HTTP-dependent collectors (`bitcointalk.go`, `github.go`, `market.go`, `papers.go`) implement retry on transient failures. A single HTTP error causes the collector to return an error and increment the `Errors` count in the result. The `Excavator` continues to the next collector. For long-running collection runs, transient network errors cause silent data gaps. -**Journal replay — no public API** +**Journal replay** -The journal can be replayed by scanning the JSONL files directly, but there is no exported `Query` or `Filter` function. Replay filtering patterns exist only in tests. A future phase should export a query interface. +The journal now exposes `Journal.Query(...)` for replay and filtering over the JSONL archive. It supports repo, action, and time-range filters while preserving the date-partitioned storage layout used by `Append(...)`. **git.Service framework integration** diff --git a/jobrunner/journal.go b/jobrunner/journal.go index 596d154..93af709 100644 --- a/jobrunner/journal.go +++ b/jobrunner/journal.go @@ -3,17 +3,25 @@ package jobrunner import ( + "bufio" filepath "dappco.re/go/core/scm/internal/ax/filepathx" json "dappco.re/go/core/scm/internal/ax/jsonx" os "dappco.re/go/core/scm/internal/ax/osx" strings "dappco.re/go/core/scm/internal/ax/stringsx" "regexp" + "sort" "sync" + "time" coreio "dappco.re/go/core/io" coreerr "dappco.re/go/core/log" ) +const ( + journalDateLayout = "2006-01-02" + journalTimestampLayout = "2006-01-02T15:04:05Z" +) + // validPathComponent matches safe repo owner/name characters (alphanumeric, hyphen, underscore, dot). var validPathComponent = regexp.MustCompile(`^[a-zA-Z0-9][a-zA-Z0-9._-]*$`) @@ -47,6 +55,16 @@ type ResultSnapshot struct { DurationMs int64 `json:"duration_ms"` } +// JournalQueryOptions filters replay results from the journal. +type JournalQueryOptions struct { + RepoOwner string + RepoName string + RepoFullName string + Action string + Since time.Time + Until time.Time +} + // Journal writes ActionResult entries to date-partitioned JSONL files. type Journal struct { baseDir string @@ -103,7 +121,7 @@ func (j *Journal) Append(signal *PipelineSignal, result *ActionResult) error { } entry := JournalEntry{ - Timestamp: result.Timestamp.UTC().Format("2006-01-02T15:04:05Z"), + Timestamp: result.Timestamp.UTC().Format(journalTimestampLayout), Epic: signal.EpicNumber, Child: signal.ChildNumber, PR: signal.PRNumber, @@ -141,7 +159,7 @@ func (j *Journal) Append(signal *PipelineSignal, result *ActionResult) error { return coreerr.E("jobrunner.Journal.Append", "invalid repo name", err) } - date := result.Timestamp.UTC().Format("2006-01-02") + date := result.Timestamp.UTC().Format(journalDateLayout) dir := filepath.Join(j.baseDir, owner, repo) // Resolve to absolute path and verify it stays within baseDir. @@ -174,3 +192,201 @@ func (j *Journal) Append(signal *PipelineSignal, result *ActionResult) error { _, err = f.Write(data) return err } + +type journalQueryHit struct { + entry JournalEntry + timestamp time.Time + repo string +} + +// Query replays journal entries from the archive and applies the requested filters. +// Usage: Query(...) +func (j *Journal) Query(opts JournalQueryOptions) ([]JournalEntry, error) { + if j == nil { + return nil, coreerr.E("jobrunner.Journal.Query", "journal is required", nil) + } + + ownerFilter, repoFilter, err := normaliseJournalQueryRepo(opts) + if err != nil { + return nil, coreerr.E("jobrunner.Journal.Query", "normalise repo filter", err) + } + + hits, err := j.collectQueryHits(opts, ownerFilter, repoFilter) + if err != nil { + return nil, err + } + + sort.SliceStable(hits, func(i, k int) bool { + if hits[i].timestamp.Equal(hits[k].timestamp) { + if hits[i].repo == hits[k].repo { + if hits[i].entry.Action == hits[k].entry.Action { + if hits[i].entry.Epic == hits[k].entry.Epic { + if hits[i].entry.Child == hits[k].entry.Child { + if hits[i].entry.PR == hits[k].entry.PR { + return hits[i].entry.Cycle < hits[k].entry.Cycle + } + return hits[i].entry.PR < hits[k].entry.PR + } + return hits[i].entry.Child < hits[k].entry.Child + } + return hits[i].entry.Epic < hits[k].entry.Epic + } + return hits[i].entry.Action < hits[k].entry.Action + } + return hits[i].repo < hits[k].repo + } + return hits[i].timestamp.Before(hits[k].timestamp) + }) + + entries := make([]JournalEntry, len(hits)) + for i, hit := range hits { + entries[i] = hit.entry + } + return entries, nil +} + +func (j *Journal) collectQueryHits(opts JournalQueryOptions, ownerFilter, repoFilter string) ([]journalQueryHit, error) { + entries, err := os.ReadDir(j.baseDir) + if err != nil { + if os.IsNotExist(err) { + return nil, nil + } + return nil, coreerr.E("jobrunner.Journal.Query", "read journal base directory", err) + } + sort.Slice(entries, func(i, k int) bool { return entries[i].Name() < entries[k].Name() }) + + var hits []journalQueryHit + for _, ownerEntry := range entries { + if !ownerEntry.IsDir() { + continue + } + owner := ownerEntry.Name() + if ownerFilter != "" && owner != ownerFilter { + continue + } + + repoPath := filepath.Join(j.baseDir, owner) + repos, err := os.ReadDir(repoPath) + if err != nil { + return nil, coreerr.E("jobrunner.Journal.Query", "read owner directory", err) + } + sort.Slice(repos, func(i, k int) bool { return repos[i].Name() < repos[k].Name() }) + + for _, repoEntry := range repos { + if !repoEntry.IsDir() { + continue + } + repo := repoEntry.Name() + if repoFilter != "" && repo != repoFilter { + continue + } + + repoDir := filepath.Join(repoPath, repo) + files, err := os.ReadDir(repoDir) + if err != nil { + return nil, coreerr.E("jobrunner.Journal.Query", "read repo directory", err) + } + sort.Slice(files, func(i, k int) bool { return files[i].Name() < files[k].Name() }) + + for _, fileEntry := range files { + if fileEntry.IsDir() || !strings.HasSuffix(fileEntry.Name(), ".jsonl") { + continue + } + + path := filepath.Join(repoDir, fileEntry.Name()) + fileHits, err := j.readQueryFile(path, opts, owner+"/"+repo) + if err != nil { + return nil, err + } + hits = append(hits, fileHits...) + } + } + } + + return hits, nil +} + +func (j *Journal) readQueryFile(path string, opts JournalQueryOptions, repo string) ([]journalQueryHit, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, coreerr.E("jobrunner.Journal.Query", "read journal file", err) + } + + scanner := bufio.NewScanner(strings.NewReader(string(data))) + var hits []journalQueryHit + for scanner.Scan() { + var entry JournalEntry + if err := json.Unmarshal(scanner.Bytes(), &entry); err != nil { + return nil, coreerr.E("jobrunner.Journal.Query", "decode journal entry", err) + } + + ts, err := time.Parse(journalTimestampLayout, entry.Timestamp) + if err != nil { + return nil, coreerr.E("jobrunner.Journal.Query", "parse journal timestamp", err) + } + if !journalEntryMatches(opts, entry, ts) { + continue + } + + hits = append(hits, journalQueryHit{ + entry: entry, + timestamp: ts, + repo: repo, + }) + } + if err := scanner.Err(); err != nil { + return nil, coreerr.E("jobrunner.Journal.Query", "scan journal file", err) + } + + return hits, nil +} + +func normaliseJournalQueryRepo(opts JournalQueryOptions) (string, string, error) { + owner := opts.RepoOwner + repo := opts.RepoName + + if opts.RepoFullName != "" { + parts := strings.SplitN(opts.RepoFullName, "/", 2) + if len(parts) != 2 || parts[0] == "" || parts[1] == "" { + return "", "", coreerr.E("jobrunner.normaliseJournalQueryRepo", "repo full name must be owner/repo", nil) + } + if owner != "" && owner != parts[0] { + return "", "", coreerr.E("jobrunner.normaliseJournalQueryRepo", "repo owner does not match repo full name", nil) + } + if repo != "" && repo != parts[1] { + return "", "", coreerr.E("jobrunner.normaliseJournalQueryRepo", "repo name does not match repo full name", nil) + } + owner = parts[0] + repo = parts[1] + } + + if owner != "" { + clean, err := sanitizePathComponent(owner) + if err != nil { + return "", "", err + } + owner = clean + } + if repo != "" { + clean, err := sanitizePathComponent(repo) + if err != nil { + return "", "", err + } + repo = clean + } + + return owner, repo, nil +} + +func journalEntryMatches(opts JournalQueryOptions, entry JournalEntry, ts time.Time) bool { + if opts.Action != "" && entry.Action != opts.Action { + return false + } + if !opts.Since.IsZero() && ts.Before(opts.Since) { + return false + } + if !opts.Until.IsZero() && ts.After(opts.Until) { + return false + } + return true +} diff --git a/jobrunner/journal_test.go b/jobrunner/journal_test.go index ff928a7..eb3764e 100644 --- a/jobrunner/journal_test.go +++ b/jobrunner/journal_test.go @@ -263,3 +263,121 @@ func TestJournal_Append_Bad_NilResult_Good(t *testing.T) { require.Error(t, err) assert.Contains(t, err.Error(), "result is required") } + +func TestJournal_Query_Good(t *testing.T) { + dir := t.TempDir() + + j, err := NewJournal(dir) + require.NoError(t, err) + + base := time.Date(2026, 2, 5, 14, 30, 0, 0, time.UTC) + + firstSignal := &PipelineSignal{ + EpicNumber: 10, + ChildNumber: 3, + PRNumber: 55, + RepoOwner: "host-uk", + RepoName: "core-tenant", + PRState: "OPEN", + } + firstResult := &ActionResult{ + Action: "merge", + RepoOwner: "host-uk", + RepoName: "core-tenant", + Timestamp: base, + Duration: 10 * time.Second, + } + require.NoError(t, j.Append(firstSignal, firstResult)) + + secondSignal := &PipelineSignal{ + EpicNumber: 10, + ChildNumber: 4, + PRNumber: 56, + RepoOwner: "host-uk", + RepoName: "core-tenant", + PRState: "OPEN", + } + secondResult := &ActionResult{ + Action: "comment", + RepoOwner: "host-uk", + RepoName: "core-tenant", + Timestamp: base.Add(1 * time.Hour), + Duration: 5 * time.Second, + } + require.NoError(t, j.Append(secondSignal, secondResult)) + + thirdSignal := &PipelineSignal{ + EpicNumber: 11, + ChildNumber: 1, + PRNumber: 57, + RepoOwner: "host-uk", + RepoName: "core-api", + PRState: "MERGED", + } + thirdResult := &ActionResult{ + Action: "merge", + RepoOwner: "host-uk", + RepoName: "core-api", + Timestamp: base.Add(2 * time.Hour), + Duration: 15 * time.Second, + } + require.NoError(t, j.Append(thirdSignal, thirdResult)) + + all, err := j.Query(JournalQueryOptions{}) + require.NoError(t, err) + require.Len(t, all, 3) + assert.Equal(t, "merge", all[0].Action) + assert.Equal(t, "comment", all[1].Action) + assert.Equal(t, "merge", all[2].Action) + + byAction, err := j.Query(JournalQueryOptions{Action: "merge"}) + require.NoError(t, err) + require.Len(t, byAction, 2) + assert.Equal(t, "host-uk/core-tenant", byAction[0].Repo) + assert.Equal(t, "host-uk/core-api", byAction[1].Repo) + + byRepo, err := j.Query(JournalQueryOptions{RepoFullName: "host-uk/core-tenant"}) + require.NoError(t, err) + require.Len(t, byRepo, 2) + assert.Equal(t, 10, byRepo[0].Epic) + assert.Equal(t, 10, byRepo[1].Epic) + + byRange, err := j.Query(JournalQueryOptions{ + Since: base.Add(30 * time.Minute), + Until: base.Add(90 * time.Minute), + }) + require.NoError(t, err) + require.Len(t, byRange, 1) + assert.Equal(t, "comment", byRange[0].Action) + + combined, err := j.Query(JournalQueryOptions{ + RepoFullName: "host-uk/core-tenant", + Action: "comment", + Since: base.Add(30 * time.Minute), + }) + require.NoError(t, err) + require.Len(t, combined, 1) + assert.Equal(t, 4, combined[0].Child) +} + +func TestJournal_Query_Good_Empty_Good(t *testing.T) { + dir := t.TempDir() + + j, err := NewJournal(dir) + require.NoError(t, err) + + entries, err := j.Query(JournalQueryOptions{}) + require.NoError(t, err) + assert.Empty(t, entries) +} + +func TestJournal_Query_Bad_InvalidRepoFullName_Good(t *testing.T) { + dir := t.TempDir() + + j, err := NewJournal(dir) + require.NoError(t, err) + + _, err = j.Query(JournalQueryOptions{RepoFullName: "broken"}) + require.Error(t, err) + assert.Contains(t, err.Error(), "repo full name") +}