2026-03-30 00:54:20 +00:00
|
|
|
// SPDX-License-Identifier: EUPL-1.2
|
2026-03-29 23:59:48 +00:00
|
|
|
|
2026-02-21 19:27:35 +00:00
|
|
|
package jobrunner
|
|
|
|
|
|
|
|
|
|
import (
|
2026-04-01 06:14:28 +00:00
|
|
|
"bufio"
|
2026-03-29 23:59:48 +00:00
|
|
|
filepath "dappco.re/go/core/scm/internal/ax/filepathx"
|
|
|
|
|
json "dappco.re/go/core/scm/internal/ax/jsonx"
|
|
|
|
|
os "dappco.re/go/core/scm/internal/ax/osx"
|
|
|
|
|
strings "dappco.re/go/core/scm/internal/ax/stringsx"
|
2026-02-21 19:27:35 +00:00
|
|
|
"regexp"
|
2026-04-01 06:14:28 +00:00
|
|
|
"sort"
|
2026-02-21 19:27:35 +00:00
|
|
|
"sync"
|
2026-04-01 06:14:28 +00:00
|
|
|
"time"
|
2026-03-16 18:27:41 +00:00
|
|
|
|
2026-03-21 23:54:23 +00:00
|
|
|
coreio "dappco.re/go/core/io"
|
2026-03-29 23:59:48 +00:00
|
|
|
coreerr "dappco.re/go/core/log"
|
2026-02-21 19:27:35 +00:00
|
|
|
)
|
|
|
|
|
|
2026-04-01 06:14:28 +00:00
|
|
|
const (
|
|
|
|
|
journalDateLayout = "2006-01-02"
|
|
|
|
|
journalTimestampLayout = "2006-01-02T15:04:05Z"
|
|
|
|
|
)
|
|
|
|
|
|
2026-02-21 19:27:35 +00:00
|
|
|
// validPathComponent matches safe repo owner/name characters (alphanumeric, hyphen, underscore, dot).
|
|
|
|
|
var validPathComponent = regexp.MustCompile(`^[a-zA-Z0-9][a-zA-Z0-9._-]*$`)
|
|
|
|
|
|
|
|
|
|
// JournalEntry is a single line in the JSONL audit log.
|
|
|
|
|
type JournalEntry struct {
|
|
|
|
|
Timestamp string `json:"ts"`
|
|
|
|
|
Epic int `json:"epic"`
|
|
|
|
|
Child int `json:"child"`
|
|
|
|
|
PR int `json:"pr"`
|
|
|
|
|
Repo string `json:"repo"`
|
|
|
|
|
Action string `json:"action"`
|
|
|
|
|
Signals SignalSnapshot `json:"signals"`
|
|
|
|
|
Result ResultSnapshot `json:"result"`
|
|
|
|
|
Cycle int `json:"cycle"`
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// SignalSnapshot captures the structural state of a PR at the time of action.
|
|
|
|
|
type SignalSnapshot struct {
|
|
|
|
|
PRState string `json:"pr_state"`
|
|
|
|
|
IsDraft bool `json:"is_draft"`
|
|
|
|
|
CheckStatus string `json:"check_status"`
|
|
|
|
|
Mergeable string `json:"mergeable"`
|
|
|
|
|
ThreadsTotal int `json:"threads_total"`
|
|
|
|
|
ThreadsResolved int `json:"threads_resolved"`
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ResultSnapshot captures the outcome of an action.
|
|
|
|
|
type ResultSnapshot struct {
|
|
|
|
|
Success bool `json:"success"`
|
|
|
|
|
Error string `json:"error,omitempty"`
|
|
|
|
|
DurationMs int64 `json:"duration_ms"`
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-01 06:14:28 +00:00
|
|
|
// JournalQueryOptions filters replay results from the journal.
|
|
|
|
|
type JournalQueryOptions struct {
|
|
|
|
|
RepoOwner string
|
|
|
|
|
RepoName string
|
|
|
|
|
RepoFullName string
|
|
|
|
|
Action string
|
|
|
|
|
Since time.Time
|
|
|
|
|
Until time.Time
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-21 19:27:35 +00:00
|
|
|
// Journal writes ActionResult entries to date-partitioned JSONL files.
|
|
|
|
|
type Journal struct {
|
|
|
|
|
baseDir string
|
|
|
|
|
mu sync.Mutex
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NewJournal creates a new Journal rooted at baseDir.
|
2026-03-30 14:11:15 +00:00
|
|
|
// Usage: NewJournal(...)
|
2026-02-21 19:27:35 +00:00
|
|
|
func NewJournal(baseDir string) (*Journal, error) {
|
|
|
|
|
if baseDir == "" {
|
2026-03-16 20:37:25 +00:00
|
|
|
return nil, coreerr.E("jobrunner.NewJournal", "base directory is required", nil)
|
2026-02-21 19:27:35 +00:00
|
|
|
}
|
|
|
|
|
return &Journal{baseDir: baseDir}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// sanitizePathComponent validates a single path component (owner or repo name)
|
|
|
|
|
// to prevent path traversal attacks. It rejects "..", empty strings, paths
|
|
|
|
|
// containing separators, and any value outside the safe character set.
|
|
|
|
|
func sanitizePathComponent(name string) (string, error) {
|
|
|
|
|
// Reject empty or whitespace-only values.
|
|
|
|
|
if name == "" || strings.TrimSpace(name) == "" {
|
2026-03-16 20:37:25 +00:00
|
|
|
return "", coreerr.E("jobrunner.sanitizePathComponent", "invalid path component: "+name, nil)
|
2026-02-21 19:27:35 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Reject inputs containing path separators (directory traversal attempt).
|
|
|
|
|
if strings.ContainsAny(name, `/\`) {
|
2026-03-16 20:37:25 +00:00
|
|
|
return "", coreerr.E("jobrunner.sanitizePathComponent", "path component contains directory separator: "+name, nil)
|
2026-02-21 19:27:35 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Use filepath.Clean to normalize (e.g., collapse redundant dots).
|
|
|
|
|
clean := filepath.Clean(name)
|
|
|
|
|
|
|
|
|
|
// Reject traversal components.
|
|
|
|
|
if clean == "." || clean == ".." {
|
2026-03-16 20:37:25 +00:00
|
|
|
return "", coreerr.E("jobrunner.sanitizePathComponent", "invalid path component: "+name, nil)
|
2026-02-21 19:27:35 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Validate against the safe character set.
|
|
|
|
|
if !validPathComponent.MatchString(clean) {
|
2026-03-16 20:37:25 +00:00
|
|
|
return "", coreerr.E("jobrunner.sanitizePathComponent", "path component contains invalid characters: "+name, nil)
|
2026-02-21 19:27:35 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return clean, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Append writes a journal entry for the given signal and result.
|
2026-03-30 14:11:15 +00:00
|
|
|
// Usage: Append(...)
|
2026-02-21 19:27:35 +00:00
|
|
|
func (j *Journal) Append(signal *PipelineSignal, result *ActionResult) error {
|
|
|
|
|
if signal == nil {
|
2026-03-16 20:37:25 +00:00
|
|
|
return coreerr.E("jobrunner.Journal.Append", "signal is required", nil)
|
2026-02-21 19:27:35 +00:00
|
|
|
}
|
|
|
|
|
if result == nil {
|
2026-03-16 20:37:25 +00:00
|
|
|
return coreerr.E("jobrunner.Journal.Append", "result is required", nil)
|
2026-02-21 19:27:35 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
entry := JournalEntry{
|
2026-04-01 06:14:28 +00:00
|
|
|
Timestamp: result.Timestamp.UTC().Format(journalTimestampLayout),
|
2026-02-21 19:27:35 +00:00
|
|
|
Epic: signal.EpicNumber,
|
|
|
|
|
Child: signal.ChildNumber,
|
|
|
|
|
PR: signal.PRNumber,
|
|
|
|
|
Repo: signal.RepoFullName(),
|
|
|
|
|
Action: result.Action,
|
|
|
|
|
Signals: SignalSnapshot{
|
|
|
|
|
PRState: signal.PRState,
|
|
|
|
|
IsDraft: signal.IsDraft,
|
|
|
|
|
CheckStatus: signal.CheckStatus,
|
|
|
|
|
Mergeable: signal.Mergeable,
|
|
|
|
|
ThreadsTotal: signal.ThreadsTotal,
|
|
|
|
|
ThreadsResolved: signal.ThreadsResolved,
|
|
|
|
|
},
|
|
|
|
|
Result: ResultSnapshot{
|
|
|
|
|
Success: result.Success,
|
|
|
|
|
Error: result.Error,
|
|
|
|
|
DurationMs: result.Duration.Milliseconds(),
|
|
|
|
|
},
|
|
|
|
|
Cycle: result.Cycle,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
data, err := json.Marshal(entry)
|
|
|
|
|
if err != nil {
|
2026-03-16 20:37:25 +00:00
|
|
|
return coreerr.E("jobrunner.Journal.Append", "marshal journal entry", err)
|
2026-02-21 19:27:35 +00:00
|
|
|
}
|
|
|
|
|
data = append(data, '\n')
|
|
|
|
|
|
|
|
|
|
// Sanitize path components to prevent path traversal (CVE: issue #46).
|
|
|
|
|
owner, err := sanitizePathComponent(signal.RepoOwner)
|
|
|
|
|
if err != nil {
|
2026-03-16 20:37:25 +00:00
|
|
|
return coreerr.E("jobrunner.Journal.Append", "invalid repo owner", err)
|
2026-02-21 19:27:35 +00:00
|
|
|
}
|
|
|
|
|
repo, err := sanitizePathComponent(signal.RepoName)
|
|
|
|
|
if err != nil {
|
2026-03-16 20:37:25 +00:00
|
|
|
return coreerr.E("jobrunner.Journal.Append", "invalid repo name", err)
|
2026-02-21 19:27:35 +00:00
|
|
|
}
|
|
|
|
|
|
2026-04-01 06:14:28 +00:00
|
|
|
date := result.Timestamp.UTC().Format(journalDateLayout)
|
2026-02-21 19:27:35 +00:00
|
|
|
dir := filepath.Join(j.baseDir, owner, repo)
|
|
|
|
|
|
|
|
|
|
// Resolve to absolute path and verify it stays within baseDir.
|
|
|
|
|
absBase, err := filepath.Abs(j.baseDir)
|
|
|
|
|
if err != nil {
|
2026-03-16 20:37:25 +00:00
|
|
|
return coreerr.E("jobrunner.Journal.Append", "resolve base directory", err)
|
2026-02-21 19:27:35 +00:00
|
|
|
}
|
|
|
|
|
absDir, err := filepath.Abs(dir)
|
|
|
|
|
if err != nil {
|
2026-03-16 20:37:25 +00:00
|
|
|
return coreerr.E("jobrunner.Journal.Append", "resolve journal directory", err)
|
2026-02-21 19:27:35 +00:00
|
|
|
}
|
|
|
|
|
if !strings.HasPrefix(absDir, absBase+string(filepath.Separator)) {
|
2026-03-16 20:37:25 +00:00
|
|
|
return coreerr.E("jobrunner.Journal.Append", "journal path escapes base directory", nil)
|
2026-02-21 19:27:35 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
j.mu.Lock()
|
|
|
|
|
defer j.mu.Unlock()
|
|
|
|
|
|
2026-03-16 18:27:41 +00:00
|
|
|
if err := coreio.Local.EnsureDir(dir); err != nil {
|
2026-03-16 20:37:25 +00:00
|
|
|
return coreerr.E("jobrunner.Journal.Append", "create journal directory", err)
|
2026-02-21 19:27:35 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
path := filepath.Join(dir, date+".jsonl")
|
|
|
|
|
f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
|
|
|
|
|
if err != nil {
|
2026-03-16 20:37:25 +00:00
|
|
|
return coreerr.E("jobrunner.Journal.Append", "open journal file", err)
|
2026-02-21 19:27:35 +00:00
|
|
|
}
|
|
|
|
|
defer func() { _ = f.Close() }()
|
|
|
|
|
|
|
|
|
|
_, err = f.Write(data)
|
|
|
|
|
return err
|
|
|
|
|
}
|
2026-04-01 06:14:28 +00:00
|
|
|
|
|
|
|
|
type journalQueryHit struct {
|
|
|
|
|
entry JournalEntry
|
|
|
|
|
timestamp time.Time
|
|
|
|
|
repo string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Query replays journal entries from the archive and applies the requested filters.
|
|
|
|
|
// Usage: Query(...)
|
|
|
|
|
func (j *Journal) Query(opts JournalQueryOptions) ([]JournalEntry, error) {
|
|
|
|
|
if j == nil {
|
|
|
|
|
return nil, coreerr.E("jobrunner.Journal.Query", "journal is required", nil)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ownerFilter, repoFilter, err := normaliseJournalQueryRepo(opts)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, coreerr.E("jobrunner.Journal.Query", "normalise repo filter", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
hits, err := j.collectQueryHits(opts, ownerFilter, repoFilter)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sort.SliceStable(hits, func(i, k int) bool {
|
|
|
|
|
if hits[i].timestamp.Equal(hits[k].timestamp) {
|
|
|
|
|
if hits[i].repo == hits[k].repo {
|
|
|
|
|
if hits[i].entry.Action == hits[k].entry.Action {
|
|
|
|
|
if hits[i].entry.Epic == hits[k].entry.Epic {
|
|
|
|
|
if hits[i].entry.Child == hits[k].entry.Child {
|
|
|
|
|
if hits[i].entry.PR == hits[k].entry.PR {
|
|
|
|
|
return hits[i].entry.Cycle < hits[k].entry.Cycle
|
|
|
|
|
}
|
|
|
|
|
return hits[i].entry.PR < hits[k].entry.PR
|
|
|
|
|
}
|
|
|
|
|
return hits[i].entry.Child < hits[k].entry.Child
|
|
|
|
|
}
|
|
|
|
|
return hits[i].entry.Epic < hits[k].entry.Epic
|
|
|
|
|
}
|
|
|
|
|
return hits[i].entry.Action < hits[k].entry.Action
|
|
|
|
|
}
|
|
|
|
|
return hits[i].repo < hits[k].repo
|
|
|
|
|
}
|
|
|
|
|
return hits[i].timestamp.Before(hits[k].timestamp)
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
entries := make([]JournalEntry, len(hits))
|
|
|
|
|
for i, hit := range hits {
|
|
|
|
|
entries[i] = hit.entry
|
|
|
|
|
}
|
|
|
|
|
return entries, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (j *Journal) collectQueryHits(opts JournalQueryOptions, ownerFilter, repoFilter string) ([]journalQueryHit, error) {
|
|
|
|
|
entries, err := os.ReadDir(j.baseDir)
|
|
|
|
|
if err != nil {
|
|
|
|
|
if os.IsNotExist(err) {
|
|
|
|
|
return nil, nil
|
|
|
|
|
}
|
|
|
|
|
return nil, coreerr.E("jobrunner.Journal.Query", "read journal base directory", err)
|
|
|
|
|
}
|
|
|
|
|
sort.Slice(entries, func(i, k int) bool { return entries[i].Name() < entries[k].Name() })
|
|
|
|
|
|
|
|
|
|
var hits []journalQueryHit
|
|
|
|
|
for _, ownerEntry := range entries {
|
|
|
|
|
if !ownerEntry.IsDir() {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
owner := ownerEntry.Name()
|
|
|
|
|
if ownerFilter != "" && owner != ownerFilter {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
repoPath := filepath.Join(j.baseDir, owner)
|
|
|
|
|
repos, err := os.ReadDir(repoPath)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, coreerr.E("jobrunner.Journal.Query", "read owner directory", err)
|
|
|
|
|
}
|
|
|
|
|
sort.Slice(repos, func(i, k int) bool { return repos[i].Name() < repos[k].Name() })
|
|
|
|
|
|
|
|
|
|
for _, repoEntry := range repos {
|
|
|
|
|
if !repoEntry.IsDir() {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
repo := repoEntry.Name()
|
|
|
|
|
if repoFilter != "" && repo != repoFilter {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
repoDir := filepath.Join(repoPath, repo)
|
|
|
|
|
files, err := os.ReadDir(repoDir)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, coreerr.E("jobrunner.Journal.Query", "read repo directory", err)
|
|
|
|
|
}
|
|
|
|
|
sort.Slice(files, func(i, k int) bool { return files[i].Name() < files[k].Name() })
|
|
|
|
|
|
|
|
|
|
for _, fileEntry := range files {
|
|
|
|
|
if fileEntry.IsDir() || !strings.HasSuffix(fileEntry.Name(), ".jsonl") {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
path := filepath.Join(repoDir, fileEntry.Name())
|
|
|
|
|
fileHits, err := j.readQueryFile(path, opts, owner+"/"+repo)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
hits = append(hits, fileHits...)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return hits, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (j *Journal) readQueryFile(path string, opts JournalQueryOptions, repo string) ([]journalQueryHit, error) {
|
|
|
|
|
data, err := os.ReadFile(path)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, coreerr.E("jobrunner.Journal.Query", "read journal file", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
scanner := bufio.NewScanner(strings.NewReader(string(data)))
|
|
|
|
|
var hits []journalQueryHit
|
|
|
|
|
for scanner.Scan() {
|
|
|
|
|
var entry JournalEntry
|
|
|
|
|
if err := json.Unmarshal(scanner.Bytes(), &entry); err != nil {
|
|
|
|
|
return nil, coreerr.E("jobrunner.Journal.Query", "decode journal entry", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ts, err := time.Parse(journalTimestampLayout, entry.Timestamp)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, coreerr.E("jobrunner.Journal.Query", "parse journal timestamp", err)
|
|
|
|
|
}
|
|
|
|
|
if !journalEntryMatches(opts, entry, ts) {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
hits = append(hits, journalQueryHit{
|
|
|
|
|
entry: entry,
|
|
|
|
|
timestamp: ts,
|
|
|
|
|
repo: repo,
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
if err := scanner.Err(); err != nil {
|
|
|
|
|
return nil, coreerr.E("jobrunner.Journal.Query", "scan journal file", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return hits, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func normaliseJournalQueryRepo(opts JournalQueryOptions) (string, string, error) {
|
|
|
|
|
owner := opts.RepoOwner
|
|
|
|
|
repo := opts.RepoName
|
|
|
|
|
|
|
|
|
|
if opts.RepoFullName != "" {
|
|
|
|
|
parts := strings.SplitN(opts.RepoFullName, "/", 2)
|
|
|
|
|
if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
|
|
|
|
|
return "", "", coreerr.E("jobrunner.normaliseJournalQueryRepo", "repo full name must be owner/repo", nil)
|
|
|
|
|
}
|
|
|
|
|
if owner != "" && owner != parts[0] {
|
|
|
|
|
return "", "", coreerr.E("jobrunner.normaliseJournalQueryRepo", "repo owner does not match repo full name", nil)
|
|
|
|
|
}
|
|
|
|
|
if repo != "" && repo != parts[1] {
|
|
|
|
|
return "", "", coreerr.E("jobrunner.normaliseJournalQueryRepo", "repo name does not match repo full name", nil)
|
|
|
|
|
}
|
|
|
|
|
owner = parts[0]
|
|
|
|
|
repo = parts[1]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if owner != "" {
|
|
|
|
|
clean, err := sanitizePathComponent(owner)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return "", "", err
|
|
|
|
|
}
|
|
|
|
|
owner = clean
|
|
|
|
|
}
|
|
|
|
|
if repo != "" {
|
|
|
|
|
clean, err := sanitizePathComponent(repo)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return "", "", err
|
|
|
|
|
}
|
|
|
|
|
repo = clean
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return owner, repo, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func journalEntryMatches(opts JournalQueryOptions, entry JournalEntry, ts time.Time) bool {
|
|
|
|
|
if opts.Action != "" && entry.Action != opts.Action {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
if !opts.Since.IsZero() && ts.Before(opts.Since) {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
if !opts.Until.IsZero() && ts.After(opts.Until) {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
return true
|
|
|
|
|
}
|