fix(session): AX-6 sweep on parser.go (#398) — bufio/maps/path → core
Removed bufio (replaced with local streaming line reader capped at 8 MiB), maps (replaced with explicit for range), and path (replaced with core.CleanPath / core.JoinPath). Preserves transcript line handling. Co-authored-by: Codex <noreply@openai.com> Closes tasks.lthn.sh/view.php?id=398
This commit is contained in:
parent
e22f44c2c7
commit
74084f37b9
1 changed files with 92 additions and 32 deletions
124
parser.go
124
parser.go
|
|
@ -2,12 +2,9 @@
|
|||
package session
|
||||
|
||||
import (
|
||||
"bufio" // Note: intrinsic — streaming JSONL scan with an explicit bounded buffer; no core equivalent
|
||||
"io" // Note: intrinsic — Reader and ReadCloser contracts for transcript streams and hostFS handles; no core equivalent
|
||||
"io" // Note: intrinsic — Reader, ReadCloser, and EOF contracts for transcript streams and hostFS handles; no core equivalent
|
||||
"io/fs" // Note: intrinsic — fs.FileInfo metadata returned from hostFS.Stat; no core equivalent
|
||||
"iter" // Note: intrinsic — public lazy sequence API for sessions and events; no core equivalent
|
||||
"maps" // Note: intrinsic — maps.Keys exposes JSON fallback key sets for deterministic output; no core equivalent
|
||||
"path" // Note: intrinsic — slash-separated transcript path joining and base-name extraction; no core equivalent
|
||||
"slices" // Note: intrinsic — iterator collection, sorted keys, and session ordering; no core equivalent
|
||||
"syscall" // Note: intrinsic — Stat_t.Mode lstat bits used to reject symlinked transcript files; no core equivalent
|
||||
"time" // Note: intrinsic — RFC3339 transcript timestamps and session age calculations; no core equivalent
|
||||
|
|
@ -153,7 +150,7 @@ func ListSessions(projectsDir string) ([]Session, error) {
|
|||
// }
|
||||
func ListSessionsSeq(projectsDir string) iter.Seq[Session] {
|
||||
return func(yield func(Session) bool) {
|
||||
matches := core.PathGlob(path.Join(projectsDir, "*.jsonl"))
|
||||
matches := core.PathGlob(transcriptPath(projectsDir, "*.jsonl"))
|
||||
|
||||
var sessions []Session
|
||||
for _, filePath := range matches {
|
||||
|
|
@ -161,7 +158,7 @@ func ListSessionsSeq(projectsDir string) iter.Seq[Session] {
|
|||
continue
|
||||
}
|
||||
|
||||
base := path.Base(filePath)
|
||||
base := core.PathBase(filePath)
|
||||
id := core.TrimSuffix(base, ".jsonl")
|
||||
|
||||
infoResult := hostFS.Stat(filePath)
|
||||
|
|
@ -188,22 +185,21 @@ func ListSessionsSeq(projectsDir string) iter.Seq[Session] {
|
|||
continue
|
||||
}
|
||||
|
||||
scanner := bufio.NewScanner(f)
|
||||
scanner.Buffer(make([]byte, 1024*1024), 1024*1024)
|
||||
var firstTS, lastTS string
|
||||
for scanner.Scan() {
|
||||
scanTranscriptLines(f, maxScannerBuffer, func(line []byte) bool {
|
||||
var entry rawEntry
|
||||
if !core.JSONUnmarshal(scanner.Bytes(), &entry).OK {
|
||||
continue
|
||||
if !core.JSONUnmarshal(line, &entry).OK {
|
||||
return true
|
||||
}
|
||||
if entry.Timestamp == "" {
|
||||
continue
|
||||
return true
|
||||
}
|
||||
if firstTS == "" {
|
||||
firstTS = entry.Timestamp
|
||||
}
|
||||
lastTS = entry.Timestamp
|
||||
}
|
||||
return true
|
||||
})
|
||||
f.Close()
|
||||
|
||||
if firstTS != "" {
|
||||
|
|
@ -241,7 +237,7 @@ func ListSessionsSeq(projectsDir string) iter.Seq[Session] {
|
|||
// Example:
|
||||
// deleted, err := session.PruneSessions("/tmp/projects", 24*time.Hour)
|
||||
func PruneSessions(projectsDir string, maxAge time.Duration) (int, error) {
|
||||
matches := core.PathGlob(path.Join(projectsDir, "*.jsonl"))
|
||||
matches := core.PathGlob(transcriptPath(projectsDir, "*.jsonl"))
|
||||
|
||||
var deleted int
|
||||
now := time.Now()
|
||||
|
|
@ -286,7 +282,7 @@ func FetchSession(projectsDir, id string) (*Session, *ParseStats, error) {
|
|||
return nil, nil, core.E("FetchSession", "invalid session id", nil)
|
||||
}
|
||||
|
||||
filePath := path.Join(projectsDir, id+".jsonl")
|
||||
filePath := transcriptPath(projectsDir, id+".jsonl")
|
||||
if !isSafeProjectFile(filePath) {
|
||||
return nil, nil, core.E("FetchSession", "invalid session path", nil)
|
||||
}
|
||||
|
|
@ -309,7 +305,7 @@ func ParseTranscript(filePath string) (*Session, *ParseStats, error) {
|
|||
}
|
||||
defer f.Close()
|
||||
|
||||
base := path.Base(filePath)
|
||||
base := core.PathBase(filePath)
|
||||
id := core.TrimSuffix(base, ".jsonl")
|
||||
|
||||
sess, stats, err := parseFromReader(f, id)
|
||||
|
|
@ -337,8 +333,8 @@ func ParseTranscriptReader(r io.Reader, id string) (*Session, *ParseStats, error
|
|||
}
|
||||
|
||||
// parseFromReader is the shared implementation for both file-based and
|
||||
// reader-based parsing. It scans line-by-line using bufio.Scanner with
|
||||
// an 8 MiB buffer, gracefully skipping malformed lines.
|
||||
// reader-based parsing. It scans line-by-line with an 8 MiB buffer,
|
||||
// gracefully skipping malformed lines.
|
||||
func parseFromReader(r io.Reader, id string) (*Session, *ParseStats, error) {
|
||||
sess := &Session{
|
||||
ID: id,
|
||||
|
|
@ -354,20 +350,17 @@ func parseFromReader(r io.Reader, id string) (*Session, *ParseStats, error) {
|
|||
}
|
||||
pendingTools := make(map[string]toolUse)
|
||||
|
||||
scanner := bufio.NewScanner(r)
|
||||
scanner.Buffer(make([]byte, 64*1024), maxScannerBuffer)
|
||||
|
||||
var lineNum int
|
||||
var lastRaw string
|
||||
var lastLineFailed bool
|
||||
|
||||
for scanner.Scan() {
|
||||
scanErr := scanTranscriptLines(r, maxScannerBuffer, func(line []byte) bool {
|
||||
lineNum++
|
||||
stats.TotalLines++
|
||||
|
||||
raw := scanner.Text()
|
||||
raw := string(line)
|
||||
if core.Trim(raw) == "" {
|
||||
continue
|
||||
return true
|
||||
}
|
||||
|
||||
lastRaw = raw
|
||||
|
|
@ -383,13 +376,13 @@ func parseFromReader(r io.Reader, id string) (*Session, *ParseStats, error) {
|
|||
stats.Warnings = append(stats.Warnings,
|
||||
core.Sprintf("line %d: skipped (bad JSON): %s", lineNum, preview))
|
||||
lastLineFailed = true
|
||||
continue
|
||||
return true
|
||||
}
|
||||
|
||||
ts, err := time.Parse(time.RFC3339Nano, entry.Timestamp)
|
||||
if err != nil {
|
||||
stats.Warnings = append(stats.Warnings, core.Sprintf("line %d: bad timestamp %q: %v", lineNum, entry.Timestamp, err))
|
||||
continue
|
||||
return true
|
||||
}
|
||||
|
||||
if sess.StartTime.IsZero() && !ts.IsZero() {
|
||||
|
|
@ -404,7 +397,7 @@ func parseFromReader(r io.Reader, id string) (*Session, *ParseStats, error) {
|
|||
var msg rawMessage
|
||||
if !core.JSONUnmarshal(entry.Message, &msg).OK {
|
||||
stats.Warnings = append(stats.Warnings, core.Sprintf("line %d: failed to unmarshal assistant message", lineNum))
|
||||
continue
|
||||
return true
|
||||
}
|
||||
for i, raw := range msg.Content {
|
||||
var block contentBlock
|
||||
|
|
@ -445,7 +438,7 @@ func parseFromReader(r io.Reader, id string) (*Session, *ParseStats, error) {
|
|||
var msg rawMessage
|
||||
if !core.JSONUnmarshal(entry.Message, &msg).OK {
|
||||
stats.Warnings = append(stats.Warnings, core.Sprintf("line %d: failed to unmarshal user message", lineNum))
|
||||
continue
|
||||
return true
|
||||
}
|
||||
for i, raw := range msg.Content {
|
||||
var block contentBlock
|
||||
|
|
@ -487,15 +480,15 @@ func parseFromReader(r io.Reader, id string) (*Session, *ParseStats, error) {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
// Detect truncated final line.
|
||||
if lastLineFailed && lastRaw != "" {
|
||||
stats.Warnings = append(stats.Warnings, "truncated final line")
|
||||
}
|
||||
|
||||
// Check for scanner buffer errors.
|
||||
if scanErr := scanner.Err(); scanErr != nil {
|
||||
if scanErr != nil {
|
||||
return nil, stats, scanErr
|
||||
}
|
||||
|
||||
|
|
@ -569,7 +562,11 @@ func extractToolInput(toolName string, raw rawJSON) string {
|
|||
// Fallback: show raw JSON keys
|
||||
var m map[string]any
|
||||
if core.JSONUnmarshal(raw, &m).OK {
|
||||
parts := slices.Sorted(maps.Keys(m))
|
||||
parts := make([]string, 0, len(m))
|
||||
for key := range m {
|
||||
parts = append(parts, key)
|
||||
}
|
||||
slices.Sort(parts)
|
||||
return core.Join(", ", parts...)
|
||||
}
|
||||
|
||||
|
|
@ -605,6 +602,69 @@ func truncate(s string, max int) string {
|
|||
return s[:max] + "..."
|
||||
}
|
||||
|
||||
func scanTranscriptLines(r io.Reader, maxLineSize int, handle func([]byte) bool) error {
|
||||
if maxLineSize <= 0 {
|
||||
maxLineSize = maxScannerBuffer
|
||||
}
|
||||
|
||||
readBuffer := make([]byte, 64*1024)
|
||||
line := make([]byte, 0, 64*1024)
|
||||
|
||||
for {
|
||||
n, readErr := r.Read(readBuffer)
|
||||
if n > 0 {
|
||||
chunk := readBuffer[:n]
|
||||
start := 0
|
||||
for i, b := range chunk {
|
||||
if b != '\n' {
|
||||
continue
|
||||
}
|
||||
if len(line)+i-start > maxLineSize {
|
||||
return core.E("scanTranscriptLines", core.Sprintf("line exceeds %d bytes", maxLineSize), nil)
|
||||
}
|
||||
line = append(line, chunk[start:i]...)
|
||||
if !handle(trimLineBreak(line)) {
|
||||
return nil
|
||||
}
|
||||
line = line[:0]
|
||||
start = i + 1
|
||||
}
|
||||
if start < len(chunk) {
|
||||
if len(line)+len(chunk)-start > maxLineSize {
|
||||
return core.E("scanTranscriptLines", core.Sprintf("line exceeds %d bytes", maxLineSize), nil)
|
||||
}
|
||||
line = append(line, chunk[start:]...)
|
||||
}
|
||||
}
|
||||
|
||||
if readErr == io.EOF {
|
||||
if len(line) > 0 {
|
||||
if !handle(trimLineBreak(line)) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
if readErr != nil {
|
||||
return readErr
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func trimLineBreak(line []byte) []byte {
|
||||
if len(line) > 0 && line[len(line)-1] == '\r' {
|
||||
return line[:len(line)-1]
|
||||
}
|
||||
return line
|
||||
}
|
||||
|
||||
func transcriptPath(projectsDir, name string) string {
|
||||
if projectsDir == "" {
|
||||
return core.CleanPath(name, "/")
|
||||
}
|
||||
return core.CleanPath(core.JoinPath(projectsDir, name), "/")
|
||||
}
|
||||
|
||||
func isSymlink(filePath string) bool {
|
||||
var st syscall.Stat_t
|
||||
if err := syscall.Lstat(filePath, &st); err != nil {
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue