From 10319050181879f9f68de7ad262e6c9a693e1034 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 20 Feb 2026 14:34:43 +0000 Subject: [PATCH] feat(parser): add robustness for truncated JSONL and malformed lines Phase 1: graceful error recovery, streaming parse, skip malformed lines. - Add ParseTranscriptReader for io.Reader-based streaming (pipes, buffers) - Refactor to shared parseFromReader, eliminating code duplication - Bump scanner buffer to 8 MiB for very large tool outputs - 18 new tests: streaming, custom MCP tools, binary garbage, edge cases - Coverage: 93.0% (up from 90.9%), 104 total tests Co-Authored-By: Virgil Co-Authored-By: Claude Opus 4.6 --- FINDINGS.md | 21 +++ parser.go | 41 ++++-- parser_test.go | 341 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 395 insertions(+), 8 deletions(-) diff --git a/FINDINGS.md b/FINDINGS.md index 0b08a64..3019485 100644 --- a/FINDINGS.md +++ b/FINDINGS.md @@ -38,3 +38,24 @@ Extracted from `forge.lthn.ai/core/go` `pkg/session/` on 19 Feb 2026. - `extractResultContent()` handles string, `[]interface{}`, and `map[string]interface{}` content types. All three paths plus nil are tested. - `ListSessions` falls back to file mod time when no valid timestamps are found in a JSONL file. - `go vet ./...` was clean from the start — no fixes needed. + +## 2026-02-20: Phase 1 Streaming & Edge Cases + +### New API surface + +- `ParseTranscriptReader(r io.Reader, id string) (*Session, *ParseStats, error)` — streaming parse from any `io.Reader` (pipes, network, in-memory buffers). No file on disc required. +- `parseFromReader(r io.Reader, id string)` — internal shared implementation, eliminating code duplication between `ParseTranscript` and `ParseTranscriptReader`. +- `maxScannerBuffer` constant: 8 MiB (bumped from 4 MiB), handles very large tool outputs. + +### Tests added (18 new, 104 total) + +- **Streaming io.Reader (6 tests):** strings.Reader, bytes.Buffer, empty reader, large lines (128KB), malformed lines with stats, orphaned tool calls. +- **Custom MCP tools (3 tests):** `mcp__forge__create_issue` with flat input, nested JSON input, empty input object. +- **Edge case recovery (5 tests):** binary garbage, null bytes, 5MB lines, malformed message JSON, malformed content blocks. +- **Additional truncated JSONL (3 tests):** missing closing brace, mid-key truncation, all-bad-lines file. +- **ListSessions truncated files (1 test):** partially truncated file still extracts valid timestamps. + +### Coverage + +- 93.0% statement coverage (up from 90.9%). +- `ParseTranscript`, `ParseTranscriptReader`, `extractToolInput`, `extractResultContent`, `truncate` all at 100%. diff --git a/parser.go b/parser.go index 7500523..ecf5cf6 100644 --- a/parser.go +++ b/parser.go @@ -4,6 +4,7 @@ import ( "bufio" "encoding/json" "fmt" + "io" "os" "path/filepath" "sort" @@ -11,6 +12,10 @@ import ( "time" ) +// maxScannerBuffer is the maximum line length the scanner will accept. +// Set to 8 MiB to handle very large tool outputs without truncation. +const maxScannerBuffer = 8 * 1024 * 1024 + // Event represents a single action in a session timeline. type Event struct { Timestamp time.Time @@ -172,6 +177,7 @@ func ListSessions(projectsDir string) ([]Session, error) { } // ParseTranscript reads a JSONL session file and returns structured events. +// Malformed or truncated lines are skipped; diagnostics are reported in ParseStats. func ParseTranscript(path string) (*Session, *ParseStats, error) { f, err := os.Open(path) if err != nil { @@ -180,14 +186,33 @@ func ParseTranscript(path string) (*Session, *ParseStats, error) { defer f.Close() base := filepath.Base(path) + id := strings.TrimSuffix(base, ".jsonl") + + sess, stats, err := parseFromReader(f, id) + if sess != nil { + sess.Path = path + } + return sess, stats, err +} + +// ParseTranscriptReader parses a JSONL session from an io.Reader, enabling +// streaming parse without needing a file on disc. The id parameter sets +// the session ID (since there is no file name to derive it from). +func ParseTranscriptReader(r io.Reader, id string) (*Session, *ParseStats, error) { + return parseFromReader(r, id) +} + +// parseFromReader is the shared implementation for both file-based and +// reader-based parsing. It scans line-by-line using bufio.Scanner with +// an 8 MiB buffer, gracefully skipping malformed lines. +func parseFromReader(r io.Reader, id string) (*Session, *ParseStats, error) { sess := &Session{ - ID: strings.TrimSuffix(base, ".jsonl"), - Path: path, + ID: id, } stats := &ParseStats{} - // Collect tool_use entries keyed by ID + // Collect tool_use entries keyed by ID. type toolUse struct { timestamp time.Time tool string @@ -195,8 +220,8 @@ func ParseTranscript(path string) (*Session, *ParseStats, error) { } pendingTools := make(map[string]toolUse) - scanner := bufio.NewScanner(f) - scanner.Buffer(make([]byte, 4*1024*1024), 4*1024*1024) + scanner := bufio.NewScanner(r) + scanner.Buffer(make([]byte, maxScannerBuffer), maxScannerBuffer) var lineNum int var lastRaw string @@ -314,17 +339,17 @@ func ParseTranscript(path string) (*Session, *ParseStats, error) { } } - // Detect truncated final line + // Detect truncated final line. if lastLineFailed && lastRaw != "" { stats.Warnings = append(stats.Warnings, "truncated final line") } - // Check for scanner buffer errors + // Check for scanner buffer errors. if scanErr := scanner.Err(); scanErr != nil { return nil, stats, scanErr } - // Track orphaned tool calls (tool_use with no matching result) + // Track orphaned tool calls (tool_use with no matching result). stats.OrphanedToolCalls = len(pendingTools) if stats.OrphanedToolCalls > 0 { for id := range pendingTools { diff --git a/parser_test.go b/parser_test.go index fbc9268..4e32376 100644 --- a/parser_test.go +++ b/parser_test.go @@ -2,6 +2,7 @@ package session import ( + "bytes" "encoding/json" "fmt" "os" @@ -952,3 +953,343 @@ func TestParseStats_WarningPreviewTruncated_Good(t *testing.T) { "warning preview should be truncated for long lines") assert.Contains(t, stats.Warnings[0], "line 1:") } + +// --- ParseTranscriptReader (streaming) tests --- + +func TestParseTranscriptReader_MinimalValid_Good(t *testing.T) { + // Parse directly from an in-memory reader. + data := strings.Join([]string{ + userTextEntry(ts(0), "hello"), + assistantTextEntry(ts(1), "world"), + }, "\n") + "\n" + reader := strings.NewReader(data) + + sess, stats, err := ParseTranscriptReader(reader, "stream-session") + require.NoError(t, err) + require.NotNil(t, sess) + require.NotNil(t, stats) + + assert.Equal(t, "stream-session", sess.ID) + assert.Empty(t, sess.Path, "reader-based parse should have empty path") + assert.Len(t, sess.Events, 2) + assert.Equal(t, "hello", sess.Events[0].Input) + assert.Equal(t, "world", sess.Events[1].Input) + assert.Equal(t, 2, stats.TotalLines) + assert.Equal(t, 0, stats.SkippedLines) +} + +func TestParseTranscriptReader_BytesBuffer_Good(t *testing.T) { + // Parse from a bytes.Buffer (common streaming use case). + data := strings.Join([]string{ + toolUseEntry(ts(0), "Bash", "tu-buf-1", map[string]interface{}{ + "command": "echo ok", + "description": "test", + }), + toolResultEntry(ts(1), "tu-buf-1", "ok", false), + }, "\n") + "\n" + buf := bytes.NewBufferString(data) + + sess, _, err := ParseTranscriptReader(buf, "buf-session") + require.NoError(t, err) + require.Len(t, sess.Events, 1) + assert.Equal(t, "Bash", sess.Events[0].Tool) + assert.True(t, sess.Events[0].Success) +} + +func TestParseTranscriptReader_EmptyReader_Good(t *testing.T) { + reader := strings.NewReader("") + + sess, stats, err := ParseTranscriptReader(reader, "empty") + require.NoError(t, err) + require.NotNil(t, sess) + assert.Empty(t, sess.Events) + assert.Equal(t, 0, stats.TotalLines) +} + +func TestParseTranscriptReader_LargeLines_Good(t *testing.T) { + // Verify the scanner handles very long lines (> 64KB). + longText := strings.Repeat("x", 128*1024) // 128KB of text + data := userTextEntry(ts(0), longText) + "\n" + reader := strings.NewReader(data) + + sess, _, err := ParseTranscriptReader(reader, "big-session") + require.NoError(t, err) + require.Len(t, sess.Events, 1) + // Input gets truncated to 500 chars by the truncate function. + assert.Len(t, sess.Events[0].Input, 503) // 500 + "..." +} + +func TestParseTranscriptReader_MalformedWithStats_Good(t *testing.T) { + // Malformed lines in a reader should still produce correct stats. + data := strings.Join([]string{ + `{bad json`, + userTextEntry(ts(0), "valid"), + `also bad`, + }, "\n") + "\n" + reader := strings.NewReader(data) + + sess, stats, err := ParseTranscriptReader(reader, "mixed") + require.NoError(t, err) + assert.Len(t, sess.Events, 1) + assert.Equal(t, 3, stats.TotalLines) + assert.Equal(t, 2, stats.SkippedLines) +} + +func TestParseTranscriptReader_OrphanedTools_Good(t *testing.T) { + // Tool calls without results should be tracked in stats. + data := strings.Join([]string{ + toolUseEntry(ts(0), "Bash", "orphan-r1", map[string]interface{}{ + "command": "ls", + }), + assistantTextEntry(ts(1), "No result arrived"), + }, "\n") + "\n" + reader := strings.NewReader(data) + + _, stats, err := ParseTranscriptReader(reader, "orphan-reader") + require.NoError(t, err) + assert.Equal(t, 1, stats.OrphanedToolCalls) +} + +// --- Custom MCP tool tests --- + +func TestParseTranscript_CustomMCPTool_Good(t *testing.T) { + // A tool_use with a non-standard MCP tool name (e.g. mcp__server__tool). + dir := t.TempDir() + lines := []string{ + toolUseEntry(ts(0), "mcp__forge__create_issue", "tu-mcp-1", map[string]interface{}{ + "title": "bug report", + "body": "something broke", + "repo": "core/go", + }), + toolResultEntry(ts(1), "tu-mcp-1", "Issue #42 created", false), + } + path := writeJSONL(t, dir, "mcp_tool.jsonl", lines...) + + sess, _, err := ParseTranscript(path) + require.NoError(t, err) + + var toolEvents []Event + for _, e := range sess.Events { + if e.Type == "tool_use" { + toolEvents = append(toolEvents, e) + } + } + + require.Len(t, toolEvents, 1) + assert.Equal(t, "mcp__forge__create_issue", toolEvents[0].Tool) + // Fallback should show sorted keys. + assert.Contains(t, toolEvents[0].Input, "body") + assert.Contains(t, toolEvents[0].Input, "repo") + assert.Contains(t, toolEvents[0].Input, "title") + assert.True(t, toolEvents[0].Success) +} + +func TestParseTranscript_CustomMCPToolNestedInput_Good(t *testing.T) { + // MCP tool with nested JSON input — should show top-level keys. + dir := t.TempDir() + lines := []string{ + toolUseEntry(ts(0), "mcp__db__query", "tu-nested-1", map[string]interface{}{ + "query": "SELECT *", + "params": map[string]interface{}{"limit": 10, "offset": 0}, + }), + toolResultEntry(ts(1), "tu-nested-1", "3 rows returned", false), + } + path := writeJSONL(t, dir, "mcp_nested.jsonl", lines...) + + sess, _, err := ParseTranscript(path) + require.NoError(t, err) + + var toolEvents []Event + for _, e := range sess.Events { + if e.Type == "tool_use" { + toolEvents = append(toolEvents, e) + } + } + + require.Len(t, toolEvents, 1) + assert.Contains(t, toolEvents[0].Input, "params") + assert.Contains(t, toolEvents[0].Input, "query") +} + +func TestParseTranscript_UnknownToolEmptyInput_Good(t *testing.T) { + // A tool_use with an empty input object. + dir := t.TempDir() + lines := []string{ + toolUseEntry(ts(0), "SomeTool", "tu-empty-1", map[string]interface{}{}), + toolResultEntry(ts(1), "tu-empty-1", "done", false), + } + path := writeJSONL(t, dir, "empty_input.jsonl", lines...) + + sess, _, err := ParseTranscript(path) + require.NoError(t, err) + + var toolEvents []Event + for _, e := range sess.Events { + if e.Type == "tool_use" { + toolEvents = append(toolEvents, e) + } + } + + require.Len(t, toolEvents, 1) + // Empty object should produce empty string from fallback. + assert.Equal(t, "", toolEvents[0].Input) +} + +// --- Edge case error recovery tests --- + +func TestParseTranscript_BinaryGarbage_Ugly(t *testing.T) { + // Binary garbage interspersed with valid lines — must not panic. + dir := t.TempDir() + garbage := string([]byte{0x00, 0x01, 0x02, 0xff, 0xfe, 0xfd}) + lines := []string{ + garbage, + userTextEntry(ts(0), "survived"), + garbage + garbage, + } + path := writeJSONL(t, dir, "binary.jsonl", lines...) + + sess, stats, err := ParseTranscript(path) + require.NoError(t, err) + + // Only the valid line should produce an event. + var userEvents []Event + for _, e := range sess.Events { + if e.Type == "user" { + userEvents = append(userEvents, e) + } + } + require.Len(t, userEvents, 1) + assert.Equal(t, "survived", userEvents[0].Input) + assert.Equal(t, 2, stats.SkippedLines) +} + +func TestParseTranscript_NullBytes_Ugly(t *testing.T) { + // Lines with embedded null bytes. + dir := t.TempDir() + lines := []string{ + `{"type":"user","timestamp":"` + ts(0) + `","sessionId":"n","message":` + string([]byte{0x00}) + `}`, + userTextEntry(ts(1), "ok"), + } + path := writeJSONL(t, dir, "null_bytes.jsonl", lines...) + + sess, _, err := ParseTranscript(path) + require.NoError(t, err) + assert.Len(t, sess.Events, 1) +} + +func TestParseTranscript_VeryLongLine_Ugly(t *testing.T) { + // A single line that exceeds the default bufio.Scanner buffer. + // The parser should handle this without error thanks to the enlarged buffer. + dir := t.TempDir() + huge := strings.Repeat("a", 5*1024*1024) // 5MB text + path := writeJSONL(t, dir, "huge_line.jsonl", + userTextEntry(ts(0), huge), + ) + + sess, _, err := ParseTranscript(path) + require.NoError(t, err) + require.Len(t, sess.Events, 1) +} + +func TestParseTranscript_MalformedMessageJSON_Bad(t *testing.T) { + // Valid outer JSON but the message field is not valid message structure. + dir := t.TempDir() + lines := []string{ + `{"type":"assistant","timestamp":"` + ts(0) + `","sessionId":"b","message":"not an object"}`, + userTextEntry(ts(1), "ok"), + } + path := writeJSONL(t, dir, "bad_msg.jsonl", lines...) + + sess, _, err := ParseTranscript(path) + require.NoError(t, err) + // First line's message is a string, not object — should be skipped. + assert.Len(t, sess.Events, 1) + assert.Equal(t, "ok", sess.Events[0].Input) +} + +func TestParseTranscript_MalformedContentBlock_Bad(t *testing.T) { + // Valid message structure but content blocks are malformed. + dir := t.TempDir() + lines := []string{ + `{"type":"assistant","timestamp":"` + ts(0) + `","sessionId":"c","message":{"role":"assistant","content":["not a block object"]}}`, + userTextEntry(ts(1), "still ok"), + } + path := writeJSONL(t, dir, "bad_block.jsonl", lines...) + + sess, _, err := ParseTranscript(path) + require.NoError(t, err) + assert.Len(t, sess.Events, 1) + assert.Equal(t, "still ok", sess.Events[0].Input) +} + +func TestParseTranscript_TruncatedMissingBrace_Good(t *testing.T) { + // Final line is missing its closing brace — should be skipped gracefully. + dir := t.TempDir() + lines := []string{ + userTextEntry(ts(0), "valid"), + assistantTextEntry(ts(1), "also valid"), + `{"type":"user","timestamp":"` + ts(2) + `","sessionId":"t","message":{"role":"user","content":[{"type":"text","text":"truncated"`, + } + path := writeJSONL(t, dir, "trunc_brace.jsonl", lines...) + + sess, _, err := ParseTranscript(path) + require.NoError(t, err) + // Only the two complete lines should produce events. + assert.Len(t, sess.Events, 2) + assert.Equal(t, "valid", sess.Events[0].Input) + assert.Equal(t, "also valid", sess.Events[1].Input) +} + +func TestParseTranscript_TruncatedMidKey_Good(t *testing.T) { + // Line truncated in the middle of a JSON key. + dir := t.TempDir() + lines := []string{ + userTextEntry(ts(0), "first"), + `{"type":"assis`, + } + path := writeJSONL(t, dir, "trunc_midkey.jsonl", lines...) + + sess, _, err := ParseTranscript(path) + require.NoError(t, err) + assert.Len(t, sess.Events, 1) + assert.Equal(t, "first", sess.Events[0].Input) +} + +func TestParseTranscript_AllBadLines_Good(t *testing.T) { + // Every line is truncated/malformed — result should be empty, no error. + dir := t.TempDir() + lines := []string{ + `{"type":"user","timestamp`, + `{"broken`, + `not even json`, + } + path := writeJSONL(t, dir, "all_bad.jsonl", lines...) + + sess, stats, err := ParseTranscript(path) + require.NoError(t, err) + assert.Empty(t, sess.Events) + assert.True(t, sess.StartTime.IsZero()) + assert.Equal(t, 3, stats.SkippedLines) +} + +// --- ListSessions with truncated files --- + +func TestListSessions_TruncatedFile_Good(t *testing.T) { + dir := t.TempDir() + // A .jsonl file where some lines are truncated — ListSessions should + // still extract timestamps from valid lines. + lines := []string{ + userTextEntry(ts(0), "start"), + `{"type":"assistant","truncated`, + userTextEntry(ts(5), "end"), + } + writeJSONL(t, dir, "partial.jsonl", lines...) + + sessions, err := ListSessions(dir) + require.NoError(t, err) + require.Len(t, sessions, 1) + assert.False(t, sessions[0].StartTime.IsZero()) + assert.False(t, sessions[0].EndTime.IsZero()) + // End time should reflect the last valid timestamp. + assert.True(t, sessions[0].EndTime.After(sessions[0].StartTime)) +}