feat(jobrunner): add journal replay query
Some checks failed
Security Scan / security (push) Failing after 10s
Test / test (push) Successful in 5m2s

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-04-01 06:14:28 +00:00
parent b2bbc11746
commit 2f4d2e5811
3 changed files with 338 additions and 4 deletions

View file

@ -130,9 +130,9 @@ The Forgejo SDK v2 and Gitea SDK do not accept `context.Context`. All Forgejo/Gi
None of the HTTP-dependent collectors (`bitcointalk.go`, `github.go`, `market.go`, `papers.go`) implement retry on transient failures. A single HTTP error causes the collector to return an error and increment the `Errors` count in the result. The `Excavator` continues to the next collector. For long-running collection runs, transient network errors cause silent data gaps. None of the HTTP-dependent collectors (`bitcointalk.go`, `github.go`, `market.go`, `papers.go`) implement retry on transient failures. A single HTTP error causes the collector to return an error and increment the `Errors` count in the result. The `Excavator` continues to the next collector. For long-running collection runs, transient network errors cause silent data gaps.
**Journal replay — no public API** **Journal replay**
The journal can be replayed by scanning the JSONL files directly, but there is no exported `Query` or `Filter` function. Replay filtering patterns exist only in tests. A future phase should export a query interface. The journal now exposes `Journal.Query(...)` for replay and filtering over the JSONL archive. It supports repo, action, and time-range filters while preserving the date-partitioned storage layout used by `Append(...)`.
**git.Service framework integration** **git.Service framework integration**

View file

@ -3,17 +3,25 @@
package jobrunner package jobrunner
import ( import (
"bufio"
filepath "dappco.re/go/core/scm/internal/ax/filepathx" filepath "dappco.re/go/core/scm/internal/ax/filepathx"
json "dappco.re/go/core/scm/internal/ax/jsonx" json "dappco.re/go/core/scm/internal/ax/jsonx"
os "dappco.re/go/core/scm/internal/ax/osx" os "dappco.re/go/core/scm/internal/ax/osx"
strings "dappco.re/go/core/scm/internal/ax/stringsx" strings "dappco.re/go/core/scm/internal/ax/stringsx"
"regexp" "regexp"
"sort"
"sync" "sync"
"time"
coreio "dappco.re/go/core/io" coreio "dappco.re/go/core/io"
coreerr "dappco.re/go/core/log" coreerr "dappco.re/go/core/log"
) )
const (
journalDateLayout = "2006-01-02"
journalTimestampLayout = "2006-01-02T15:04:05Z"
)
// validPathComponent matches safe repo owner/name characters (alphanumeric, hyphen, underscore, dot). // validPathComponent matches safe repo owner/name characters (alphanumeric, hyphen, underscore, dot).
var validPathComponent = regexp.MustCompile(`^[a-zA-Z0-9][a-zA-Z0-9._-]*$`) var validPathComponent = regexp.MustCompile(`^[a-zA-Z0-9][a-zA-Z0-9._-]*$`)
@ -47,6 +55,16 @@ type ResultSnapshot struct {
DurationMs int64 `json:"duration_ms"` DurationMs int64 `json:"duration_ms"`
} }
// JournalQueryOptions filters replay results from the journal.
type JournalQueryOptions struct {
RepoOwner string
RepoName string
RepoFullName string
Action string
Since time.Time
Until time.Time
}
// Journal writes ActionResult entries to date-partitioned JSONL files. // Journal writes ActionResult entries to date-partitioned JSONL files.
type Journal struct { type Journal struct {
baseDir string baseDir string
@ -103,7 +121,7 @@ func (j *Journal) Append(signal *PipelineSignal, result *ActionResult) error {
} }
entry := JournalEntry{ entry := JournalEntry{
Timestamp: result.Timestamp.UTC().Format("2006-01-02T15:04:05Z"), Timestamp: result.Timestamp.UTC().Format(journalTimestampLayout),
Epic: signal.EpicNumber, Epic: signal.EpicNumber,
Child: signal.ChildNumber, Child: signal.ChildNumber,
PR: signal.PRNumber, PR: signal.PRNumber,
@ -141,7 +159,7 @@ func (j *Journal) Append(signal *PipelineSignal, result *ActionResult) error {
return coreerr.E("jobrunner.Journal.Append", "invalid repo name", err) return coreerr.E("jobrunner.Journal.Append", "invalid repo name", err)
} }
date := result.Timestamp.UTC().Format("2006-01-02") date := result.Timestamp.UTC().Format(journalDateLayout)
dir := filepath.Join(j.baseDir, owner, repo) dir := filepath.Join(j.baseDir, owner, repo)
// Resolve to absolute path and verify it stays within baseDir. // Resolve to absolute path and verify it stays within baseDir.
@ -174,3 +192,201 @@ func (j *Journal) Append(signal *PipelineSignal, result *ActionResult) error {
_, err = f.Write(data) _, err = f.Write(data)
return err return err
} }
type journalQueryHit struct {
entry JournalEntry
timestamp time.Time
repo string
}
// Query replays journal entries from the archive and applies the requested filters.
// Usage: Query(...)
func (j *Journal) Query(opts JournalQueryOptions) ([]JournalEntry, error) {
if j == nil {
return nil, coreerr.E("jobrunner.Journal.Query", "journal is required", nil)
}
ownerFilter, repoFilter, err := normaliseJournalQueryRepo(opts)
if err != nil {
return nil, coreerr.E("jobrunner.Journal.Query", "normalise repo filter", err)
}
hits, err := j.collectQueryHits(opts, ownerFilter, repoFilter)
if err != nil {
return nil, err
}
sort.SliceStable(hits, func(i, k int) bool {
if hits[i].timestamp.Equal(hits[k].timestamp) {
if hits[i].repo == hits[k].repo {
if hits[i].entry.Action == hits[k].entry.Action {
if hits[i].entry.Epic == hits[k].entry.Epic {
if hits[i].entry.Child == hits[k].entry.Child {
if hits[i].entry.PR == hits[k].entry.PR {
return hits[i].entry.Cycle < hits[k].entry.Cycle
}
return hits[i].entry.PR < hits[k].entry.PR
}
return hits[i].entry.Child < hits[k].entry.Child
}
return hits[i].entry.Epic < hits[k].entry.Epic
}
return hits[i].entry.Action < hits[k].entry.Action
}
return hits[i].repo < hits[k].repo
}
return hits[i].timestamp.Before(hits[k].timestamp)
})
entries := make([]JournalEntry, len(hits))
for i, hit := range hits {
entries[i] = hit.entry
}
return entries, nil
}
func (j *Journal) collectQueryHits(opts JournalQueryOptions, ownerFilter, repoFilter string) ([]journalQueryHit, error) {
entries, err := os.ReadDir(j.baseDir)
if err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, coreerr.E("jobrunner.Journal.Query", "read journal base directory", err)
}
sort.Slice(entries, func(i, k int) bool { return entries[i].Name() < entries[k].Name() })
var hits []journalQueryHit
for _, ownerEntry := range entries {
if !ownerEntry.IsDir() {
continue
}
owner := ownerEntry.Name()
if ownerFilter != "" && owner != ownerFilter {
continue
}
repoPath := filepath.Join(j.baseDir, owner)
repos, err := os.ReadDir(repoPath)
if err != nil {
return nil, coreerr.E("jobrunner.Journal.Query", "read owner directory", err)
}
sort.Slice(repos, func(i, k int) bool { return repos[i].Name() < repos[k].Name() })
for _, repoEntry := range repos {
if !repoEntry.IsDir() {
continue
}
repo := repoEntry.Name()
if repoFilter != "" && repo != repoFilter {
continue
}
repoDir := filepath.Join(repoPath, repo)
files, err := os.ReadDir(repoDir)
if err != nil {
return nil, coreerr.E("jobrunner.Journal.Query", "read repo directory", err)
}
sort.Slice(files, func(i, k int) bool { return files[i].Name() < files[k].Name() })
for _, fileEntry := range files {
if fileEntry.IsDir() || !strings.HasSuffix(fileEntry.Name(), ".jsonl") {
continue
}
path := filepath.Join(repoDir, fileEntry.Name())
fileHits, err := j.readQueryFile(path, opts, owner+"/"+repo)
if err != nil {
return nil, err
}
hits = append(hits, fileHits...)
}
}
}
return hits, nil
}
func (j *Journal) readQueryFile(path string, opts JournalQueryOptions, repo string) ([]journalQueryHit, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, coreerr.E("jobrunner.Journal.Query", "read journal file", err)
}
scanner := bufio.NewScanner(strings.NewReader(string(data)))
var hits []journalQueryHit
for scanner.Scan() {
var entry JournalEntry
if err := json.Unmarshal(scanner.Bytes(), &entry); err != nil {
return nil, coreerr.E("jobrunner.Journal.Query", "decode journal entry", err)
}
ts, err := time.Parse(journalTimestampLayout, entry.Timestamp)
if err != nil {
return nil, coreerr.E("jobrunner.Journal.Query", "parse journal timestamp", err)
}
if !journalEntryMatches(opts, entry, ts) {
continue
}
hits = append(hits, journalQueryHit{
entry: entry,
timestamp: ts,
repo: repo,
})
}
if err := scanner.Err(); err != nil {
return nil, coreerr.E("jobrunner.Journal.Query", "scan journal file", err)
}
return hits, nil
}
func normaliseJournalQueryRepo(opts JournalQueryOptions) (string, string, error) {
owner := opts.RepoOwner
repo := opts.RepoName
if opts.RepoFullName != "" {
parts := strings.SplitN(opts.RepoFullName, "/", 2)
if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
return "", "", coreerr.E("jobrunner.normaliseJournalQueryRepo", "repo full name must be owner/repo", nil)
}
if owner != "" && owner != parts[0] {
return "", "", coreerr.E("jobrunner.normaliseJournalQueryRepo", "repo owner does not match repo full name", nil)
}
if repo != "" && repo != parts[1] {
return "", "", coreerr.E("jobrunner.normaliseJournalQueryRepo", "repo name does not match repo full name", nil)
}
owner = parts[0]
repo = parts[1]
}
if owner != "" {
clean, err := sanitizePathComponent(owner)
if err != nil {
return "", "", err
}
owner = clean
}
if repo != "" {
clean, err := sanitizePathComponent(repo)
if err != nil {
return "", "", err
}
repo = clean
}
return owner, repo, nil
}
func journalEntryMatches(opts JournalQueryOptions, entry JournalEntry, ts time.Time) bool {
if opts.Action != "" && entry.Action != opts.Action {
return false
}
if !opts.Since.IsZero() && ts.Before(opts.Since) {
return false
}
if !opts.Until.IsZero() && ts.After(opts.Until) {
return false
}
return true
}

View file

@ -263,3 +263,121 @@ func TestJournal_Append_Bad_NilResult_Good(t *testing.T) {
require.Error(t, err) require.Error(t, err)
assert.Contains(t, err.Error(), "result is required") assert.Contains(t, err.Error(), "result is required")
} }
func TestJournal_Query_Good(t *testing.T) {
dir := t.TempDir()
j, err := NewJournal(dir)
require.NoError(t, err)
base := time.Date(2026, 2, 5, 14, 30, 0, 0, time.UTC)
firstSignal := &PipelineSignal{
EpicNumber: 10,
ChildNumber: 3,
PRNumber: 55,
RepoOwner: "host-uk",
RepoName: "core-tenant",
PRState: "OPEN",
}
firstResult := &ActionResult{
Action: "merge",
RepoOwner: "host-uk",
RepoName: "core-tenant",
Timestamp: base,
Duration: 10 * time.Second,
}
require.NoError(t, j.Append(firstSignal, firstResult))
secondSignal := &PipelineSignal{
EpicNumber: 10,
ChildNumber: 4,
PRNumber: 56,
RepoOwner: "host-uk",
RepoName: "core-tenant",
PRState: "OPEN",
}
secondResult := &ActionResult{
Action: "comment",
RepoOwner: "host-uk",
RepoName: "core-tenant",
Timestamp: base.Add(1 * time.Hour),
Duration: 5 * time.Second,
}
require.NoError(t, j.Append(secondSignal, secondResult))
thirdSignal := &PipelineSignal{
EpicNumber: 11,
ChildNumber: 1,
PRNumber: 57,
RepoOwner: "host-uk",
RepoName: "core-api",
PRState: "MERGED",
}
thirdResult := &ActionResult{
Action: "merge",
RepoOwner: "host-uk",
RepoName: "core-api",
Timestamp: base.Add(2 * time.Hour),
Duration: 15 * time.Second,
}
require.NoError(t, j.Append(thirdSignal, thirdResult))
all, err := j.Query(JournalQueryOptions{})
require.NoError(t, err)
require.Len(t, all, 3)
assert.Equal(t, "merge", all[0].Action)
assert.Equal(t, "comment", all[1].Action)
assert.Equal(t, "merge", all[2].Action)
byAction, err := j.Query(JournalQueryOptions{Action: "merge"})
require.NoError(t, err)
require.Len(t, byAction, 2)
assert.Equal(t, "host-uk/core-tenant", byAction[0].Repo)
assert.Equal(t, "host-uk/core-api", byAction[1].Repo)
byRepo, err := j.Query(JournalQueryOptions{RepoFullName: "host-uk/core-tenant"})
require.NoError(t, err)
require.Len(t, byRepo, 2)
assert.Equal(t, 10, byRepo[0].Epic)
assert.Equal(t, 10, byRepo[1].Epic)
byRange, err := j.Query(JournalQueryOptions{
Since: base.Add(30 * time.Minute),
Until: base.Add(90 * time.Minute),
})
require.NoError(t, err)
require.Len(t, byRange, 1)
assert.Equal(t, "comment", byRange[0].Action)
combined, err := j.Query(JournalQueryOptions{
RepoFullName: "host-uk/core-tenant",
Action: "comment",
Since: base.Add(30 * time.Minute),
})
require.NoError(t, err)
require.Len(t, combined, 1)
assert.Equal(t, 4, combined[0].Child)
}
func TestJournal_Query_Good_Empty_Good(t *testing.T) {
dir := t.TempDir()
j, err := NewJournal(dir)
require.NoError(t, err)
entries, err := j.Query(JournalQueryOptions{})
require.NoError(t, err)
assert.Empty(t, entries)
}
func TestJournal_Query_Bad_InvalidRepoFullName_Good(t *testing.T) {
dir := t.TempDir()
j, err := NewJournal(dir)
require.NoError(t, err)
_, err = j.Query(JournalQueryOptions{RepoFullName: "broken"})
require.Error(t, err)
assert.Contains(t, err.Error(), "repo full name")
}