From 74084f37b91c62adbc57c0d9bb0a3bda6995e439 Mon Sep 17 00:00:00 2001 From: Snider Date: Sat, 25 Apr 2026 15:10:46 +0100 Subject: [PATCH] =?UTF-8?q?fix(session):=20AX-6=20sweep=20on=20parser.go?= =?UTF-8?q?=20(#398)=20=E2=80=94=20bufio/maps/path=20=E2=86=92=20core?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Removed bufio (replaced with local streaming line reader capped at 8 MiB), maps (replaced with explicit for range), and path (replaced with core.CleanPath / core.JoinPath). Preserves transcript line handling. Co-authored-by: Codex Closes tasks.lthn.sh/view.php?id=398 --- parser.go | 124 ++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 92 insertions(+), 32 deletions(-) diff --git a/parser.go b/parser.go index 2693766..811526b 100644 --- a/parser.go +++ b/parser.go @@ -2,12 +2,9 @@ package session import ( - "bufio" // Note: intrinsic — streaming JSONL scan with an explicit bounded buffer; no core equivalent - "io" // Note: intrinsic — Reader and ReadCloser contracts for transcript streams and hostFS handles; no core equivalent + "io" // Note: intrinsic — Reader, ReadCloser, and EOF contracts for transcript streams and hostFS handles; no core equivalent "io/fs" // Note: intrinsic — fs.FileInfo metadata returned from hostFS.Stat; no core equivalent "iter" // Note: intrinsic — public lazy sequence API for sessions and events; no core equivalent - "maps" // Note: intrinsic — maps.Keys exposes JSON fallback key sets for deterministic output; no core equivalent - "path" // Note: intrinsic — slash-separated transcript path joining and base-name extraction; no core equivalent "slices" // Note: intrinsic — iterator collection, sorted keys, and session ordering; no core equivalent "syscall" // Note: intrinsic — Stat_t.Mode lstat bits used to reject symlinked transcript files; no core equivalent "time" // Note: intrinsic — RFC3339 transcript timestamps and session age calculations; no core equivalent @@ -153,7 +150,7 @@ func ListSessions(projectsDir string) ([]Session, error) { // } func ListSessionsSeq(projectsDir string) iter.Seq[Session] { return func(yield func(Session) bool) { - matches := core.PathGlob(path.Join(projectsDir, "*.jsonl")) + matches := core.PathGlob(transcriptPath(projectsDir, "*.jsonl")) var sessions []Session for _, filePath := range matches { @@ -161,7 +158,7 @@ func ListSessionsSeq(projectsDir string) iter.Seq[Session] { continue } - base := path.Base(filePath) + base := core.PathBase(filePath) id := core.TrimSuffix(base, ".jsonl") infoResult := hostFS.Stat(filePath) @@ -188,22 +185,21 @@ func ListSessionsSeq(projectsDir string) iter.Seq[Session] { continue } - scanner := bufio.NewScanner(f) - scanner.Buffer(make([]byte, 1024*1024), 1024*1024) var firstTS, lastTS string - for scanner.Scan() { + scanTranscriptLines(f, maxScannerBuffer, func(line []byte) bool { var entry rawEntry - if !core.JSONUnmarshal(scanner.Bytes(), &entry).OK { - continue + if !core.JSONUnmarshal(line, &entry).OK { + return true } if entry.Timestamp == "" { - continue + return true } if firstTS == "" { firstTS = entry.Timestamp } lastTS = entry.Timestamp - } + return true + }) f.Close() if firstTS != "" { @@ -241,7 +237,7 @@ func ListSessionsSeq(projectsDir string) iter.Seq[Session] { // Example: // deleted, err := session.PruneSessions("/tmp/projects", 24*time.Hour) func PruneSessions(projectsDir string, maxAge time.Duration) (int, error) { - matches := core.PathGlob(path.Join(projectsDir, "*.jsonl")) + matches := core.PathGlob(transcriptPath(projectsDir, "*.jsonl")) var deleted int now := time.Now() @@ -286,7 +282,7 @@ func FetchSession(projectsDir, id string) (*Session, *ParseStats, error) { return nil, nil, core.E("FetchSession", "invalid session id", nil) } - filePath := path.Join(projectsDir, id+".jsonl") + filePath := transcriptPath(projectsDir, id+".jsonl") if !isSafeProjectFile(filePath) { return nil, nil, core.E("FetchSession", "invalid session path", nil) } @@ -309,7 +305,7 @@ func ParseTranscript(filePath string) (*Session, *ParseStats, error) { } defer f.Close() - base := path.Base(filePath) + base := core.PathBase(filePath) id := core.TrimSuffix(base, ".jsonl") sess, stats, err := parseFromReader(f, id) @@ -337,8 +333,8 @@ func ParseTranscriptReader(r io.Reader, id string) (*Session, *ParseStats, error } // parseFromReader is the shared implementation for both file-based and -// reader-based parsing. It scans line-by-line using bufio.Scanner with -// an 8 MiB buffer, gracefully skipping malformed lines. +// reader-based parsing. It scans line-by-line with an 8 MiB buffer, +// gracefully skipping malformed lines. func parseFromReader(r io.Reader, id string) (*Session, *ParseStats, error) { sess := &Session{ ID: id, @@ -354,20 +350,17 @@ func parseFromReader(r io.Reader, id string) (*Session, *ParseStats, error) { } pendingTools := make(map[string]toolUse) - scanner := bufio.NewScanner(r) - scanner.Buffer(make([]byte, 64*1024), maxScannerBuffer) - var lineNum int var lastRaw string var lastLineFailed bool - for scanner.Scan() { + scanErr := scanTranscriptLines(r, maxScannerBuffer, func(line []byte) bool { lineNum++ stats.TotalLines++ - raw := scanner.Text() + raw := string(line) if core.Trim(raw) == "" { - continue + return true } lastRaw = raw @@ -383,13 +376,13 @@ func parseFromReader(r io.Reader, id string) (*Session, *ParseStats, error) { stats.Warnings = append(stats.Warnings, core.Sprintf("line %d: skipped (bad JSON): %s", lineNum, preview)) lastLineFailed = true - continue + return true } ts, err := time.Parse(time.RFC3339Nano, entry.Timestamp) if err != nil { stats.Warnings = append(stats.Warnings, core.Sprintf("line %d: bad timestamp %q: %v", lineNum, entry.Timestamp, err)) - continue + return true } if sess.StartTime.IsZero() && !ts.IsZero() { @@ -404,7 +397,7 @@ func parseFromReader(r io.Reader, id string) (*Session, *ParseStats, error) { var msg rawMessage if !core.JSONUnmarshal(entry.Message, &msg).OK { stats.Warnings = append(stats.Warnings, core.Sprintf("line %d: failed to unmarshal assistant message", lineNum)) - continue + return true } for i, raw := range msg.Content { var block contentBlock @@ -445,7 +438,7 @@ func parseFromReader(r io.Reader, id string) (*Session, *ParseStats, error) { var msg rawMessage if !core.JSONUnmarshal(entry.Message, &msg).OK { stats.Warnings = append(stats.Warnings, core.Sprintf("line %d: failed to unmarshal user message", lineNum)) - continue + return true } for i, raw := range msg.Content { var block contentBlock @@ -487,15 +480,15 @@ func parseFromReader(r io.Reader, id string) (*Session, *ParseStats, error) { } } } - } + return true + }) // Detect truncated final line. if lastLineFailed && lastRaw != "" { stats.Warnings = append(stats.Warnings, "truncated final line") } - // Check for scanner buffer errors. - if scanErr := scanner.Err(); scanErr != nil { + if scanErr != nil { return nil, stats, scanErr } @@ -569,7 +562,11 @@ func extractToolInput(toolName string, raw rawJSON) string { // Fallback: show raw JSON keys var m map[string]any if core.JSONUnmarshal(raw, &m).OK { - parts := slices.Sorted(maps.Keys(m)) + parts := make([]string, 0, len(m)) + for key := range m { + parts = append(parts, key) + } + slices.Sort(parts) return core.Join(", ", parts...) } @@ -605,6 +602,69 @@ func truncate(s string, max int) string { return s[:max] + "..." } +func scanTranscriptLines(r io.Reader, maxLineSize int, handle func([]byte) bool) error { + if maxLineSize <= 0 { + maxLineSize = maxScannerBuffer + } + + readBuffer := make([]byte, 64*1024) + line := make([]byte, 0, 64*1024) + + for { + n, readErr := r.Read(readBuffer) + if n > 0 { + chunk := readBuffer[:n] + start := 0 + for i, b := range chunk { + if b != '\n' { + continue + } + if len(line)+i-start > maxLineSize { + return core.E("scanTranscriptLines", core.Sprintf("line exceeds %d bytes", maxLineSize), nil) + } + line = append(line, chunk[start:i]...) + if !handle(trimLineBreak(line)) { + return nil + } + line = line[:0] + start = i + 1 + } + if start < len(chunk) { + if len(line)+len(chunk)-start > maxLineSize { + return core.E("scanTranscriptLines", core.Sprintf("line exceeds %d bytes", maxLineSize), nil) + } + line = append(line, chunk[start:]...) + } + } + + if readErr == io.EOF { + if len(line) > 0 { + if !handle(trimLineBreak(line)) { + return nil + } + } + return nil + } + if readErr != nil { + return readErr + } + } +} + +func trimLineBreak(line []byte) []byte { + if len(line) > 0 && line[len(line)-1] == '\r' { + return line[:len(line)-1] + } + return line +} + +func transcriptPath(projectsDir, name string) string { + if projectsDir == "" { + return core.CleanPath(name, "/") + } + return core.CleanPath(core.JoinPath(projectsDir, name), "/") +} + func isSymlink(filePath string) bool { var st syscall.Stat_t if err := syscall.Lstat(filePath, &st); err != nil {