feat(parser): add robustness for truncated JSONL and malformed lines
Phase 1: graceful error recovery, streaming parse, skip malformed lines. - Add ParseTranscriptReader for io.Reader-based streaming (pipes, buffers) - Refactor to shared parseFromReader, eliminating code duplication - Bump scanner buffer to 8 MiB for very large tool outputs - 18 new tests: streaming, custom MCP tools, binary garbage, edge cases - Coverage: 93.0% (up from 90.9%), 104 total tests Co-Authored-By: Virgil <virgil@lethean.io> Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
8e9162631e
commit
1031905018
3 changed files with 395 additions and 8 deletions
21
FINDINGS.md
21
FINDINGS.md
|
|
@ -38,3 +38,24 @@ Extracted from `forge.lthn.ai/core/go` `pkg/session/` on 19 Feb 2026.
|
|||
- `extractResultContent()` handles string, `[]interface{}`, and `map[string]interface{}` content types. All three paths plus nil are tested.
|
||||
- `ListSessions` falls back to file mod time when no valid timestamps are found in a JSONL file.
|
||||
- `go vet ./...` was clean from the start — no fixes needed.
|
||||
|
||||
## 2026-02-20: Phase 1 Streaming & Edge Cases
|
||||
|
||||
### New API surface
|
||||
|
||||
- `ParseTranscriptReader(r io.Reader, id string) (*Session, *ParseStats, error)` — streaming parse from any `io.Reader` (pipes, network, in-memory buffers). No file on disc required.
|
||||
- `parseFromReader(r io.Reader, id string)` — internal shared implementation, eliminating code duplication between `ParseTranscript` and `ParseTranscriptReader`.
|
||||
- `maxScannerBuffer` constant: 8 MiB (bumped from 4 MiB), handles very large tool outputs.
|
||||
|
||||
### Tests added (18 new, 104 total)
|
||||
|
||||
- **Streaming io.Reader (6 tests):** strings.Reader, bytes.Buffer, empty reader, large lines (128KB), malformed lines with stats, orphaned tool calls.
|
||||
- **Custom MCP tools (3 tests):** `mcp__forge__create_issue` with flat input, nested JSON input, empty input object.
|
||||
- **Edge case recovery (5 tests):** binary garbage, null bytes, 5MB lines, malformed message JSON, malformed content blocks.
|
||||
- **Additional truncated JSONL (3 tests):** missing closing brace, mid-key truncation, all-bad-lines file.
|
||||
- **ListSessions truncated files (1 test):** partially truncated file still extracts valid timestamps.
|
||||
|
||||
### Coverage
|
||||
|
||||
- 93.0% statement coverage (up from 90.9%).
|
||||
- `ParseTranscript`, `ParseTranscriptReader`, `extractToolInput`, `extractResultContent`, `truncate` all at 100%.
|
||||
|
|
|
|||
41
parser.go
41
parser.go
|
|
@ -4,6 +4,7 @@ import (
|
|||
"bufio"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
|
|
@ -11,6 +12,10 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
// maxScannerBuffer is the maximum line length the scanner will accept.
|
||||
// Set to 8 MiB to handle very large tool outputs without truncation.
|
||||
const maxScannerBuffer = 8 * 1024 * 1024
|
||||
|
||||
// Event represents a single action in a session timeline.
|
||||
type Event struct {
|
||||
Timestamp time.Time
|
||||
|
|
@ -172,6 +177,7 @@ func ListSessions(projectsDir string) ([]Session, error) {
|
|||
}
|
||||
|
||||
// ParseTranscript reads a JSONL session file and returns structured events.
|
||||
// Malformed or truncated lines are skipped; diagnostics are reported in ParseStats.
|
||||
func ParseTranscript(path string) (*Session, *ParseStats, error) {
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
|
|
@ -180,14 +186,33 @@ func ParseTranscript(path string) (*Session, *ParseStats, error) {
|
|||
defer f.Close()
|
||||
|
||||
base := filepath.Base(path)
|
||||
id := strings.TrimSuffix(base, ".jsonl")
|
||||
|
||||
sess, stats, err := parseFromReader(f, id)
|
||||
if sess != nil {
|
||||
sess.Path = path
|
||||
}
|
||||
return sess, stats, err
|
||||
}
|
||||
|
||||
// ParseTranscriptReader parses a JSONL session from an io.Reader, enabling
|
||||
// streaming parse without needing a file on disc. The id parameter sets
|
||||
// the session ID (since there is no file name to derive it from).
|
||||
func ParseTranscriptReader(r io.Reader, id string) (*Session, *ParseStats, error) {
|
||||
return parseFromReader(r, id)
|
||||
}
|
||||
|
||||
// parseFromReader is the shared implementation for both file-based and
|
||||
// reader-based parsing. It scans line-by-line using bufio.Scanner with
|
||||
// an 8 MiB buffer, gracefully skipping malformed lines.
|
||||
func parseFromReader(r io.Reader, id string) (*Session, *ParseStats, error) {
|
||||
sess := &Session{
|
||||
ID: strings.TrimSuffix(base, ".jsonl"),
|
||||
Path: path,
|
||||
ID: id,
|
||||
}
|
||||
|
||||
stats := &ParseStats{}
|
||||
|
||||
// Collect tool_use entries keyed by ID
|
||||
// Collect tool_use entries keyed by ID.
|
||||
type toolUse struct {
|
||||
timestamp time.Time
|
||||
tool string
|
||||
|
|
@ -195,8 +220,8 @@ func ParseTranscript(path string) (*Session, *ParseStats, error) {
|
|||
}
|
||||
pendingTools := make(map[string]toolUse)
|
||||
|
||||
scanner := bufio.NewScanner(f)
|
||||
scanner.Buffer(make([]byte, 4*1024*1024), 4*1024*1024)
|
||||
scanner := bufio.NewScanner(r)
|
||||
scanner.Buffer(make([]byte, maxScannerBuffer), maxScannerBuffer)
|
||||
|
||||
var lineNum int
|
||||
var lastRaw string
|
||||
|
|
@ -314,17 +339,17 @@ func ParseTranscript(path string) (*Session, *ParseStats, error) {
|
|||
}
|
||||
}
|
||||
|
||||
// Detect truncated final line
|
||||
// Detect truncated final line.
|
||||
if lastLineFailed && lastRaw != "" {
|
||||
stats.Warnings = append(stats.Warnings, "truncated final line")
|
||||
}
|
||||
|
||||
// Check for scanner buffer errors
|
||||
// Check for scanner buffer errors.
|
||||
if scanErr := scanner.Err(); scanErr != nil {
|
||||
return nil, stats, scanErr
|
||||
}
|
||||
|
||||
// Track orphaned tool calls (tool_use with no matching result)
|
||||
// Track orphaned tool calls (tool_use with no matching result).
|
||||
stats.OrphanedToolCalls = len(pendingTools)
|
||||
if stats.OrphanedToolCalls > 0 {
|
||||
for id := range pendingTools {
|
||||
|
|
|
|||
341
parser_test.go
341
parser_test.go
|
|
@ -2,6 +2,7 @@
|
|||
package session
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
|
|
@ -952,3 +953,343 @@ func TestParseStats_WarningPreviewTruncated_Good(t *testing.T) {
|
|||
"warning preview should be truncated for long lines")
|
||||
assert.Contains(t, stats.Warnings[0], "line 1:")
|
||||
}
|
||||
|
||||
// --- ParseTranscriptReader (streaming) tests ---
|
||||
|
||||
func TestParseTranscriptReader_MinimalValid_Good(t *testing.T) {
|
||||
// Parse directly from an in-memory reader.
|
||||
data := strings.Join([]string{
|
||||
userTextEntry(ts(0), "hello"),
|
||||
assistantTextEntry(ts(1), "world"),
|
||||
}, "\n") + "\n"
|
||||
reader := strings.NewReader(data)
|
||||
|
||||
sess, stats, err := ParseTranscriptReader(reader, "stream-session")
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, sess)
|
||||
require.NotNil(t, stats)
|
||||
|
||||
assert.Equal(t, "stream-session", sess.ID)
|
||||
assert.Empty(t, sess.Path, "reader-based parse should have empty path")
|
||||
assert.Len(t, sess.Events, 2)
|
||||
assert.Equal(t, "hello", sess.Events[0].Input)
|
||||
assert.Equal(t, "world", sess.Events[1].Input)
|
||||
assert.Equal(t, 2, stats.TotalLines)
|
||||
assert.Equal(t, 0, stats.SkippedLines)
|
||||
}
|
||||
|
||||
func TestParseTranscriptReader_BytesBuffer_Good(t *testing.T) {
|
||||
// Parse from a bytes.Buffer (common streaming use case).
|
||||
data := strings.Join([]string{
|
||||
toolUseEntry(ts(0), "Bash", "tu-buf-1", map[string]interface{}{
|
||||
"command": "echo ok",
|
||||
"description": "test",
|
||||
}),
|
||||
toolResultEntry(ts(1), "tu-buf-1", "ok", false),
|
||||
}, "\n") + "\n"
|
||||
buf := bytes.NewBufferString(data)
|
||||
|
||||
sess, _, err := ParseTranscriptReader(buf, "buf-session")
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sess.Events, 1)
|
||||
assert.Equal(t, "Bash", sess.Events[0].Tool)
|
||||
assert.True(t, sess.Events[0].Success)
|
||||
}
|
||||
|
||||
func TestParseTranscriptReader_EmptyReader_Good(t *testing.T) {
|
||||
reader := strings.NewReader("")
|
||||
|
||||
sess, stats, err := ParseTranscriptReader(reader, "empty")
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, sess)
|
||||
assert.Empty(t, sess.Events)
|
||||
assert.Equal(t, 0, stats.TotalLines)
|
||||
}
|
||||
|
||||
func TestParseTranscriptReader_LargeLines_Good(t *testing.T) {
|
||||
// Verify the scanner handles very long lines (> 64KB).
|
||||
longText := strings.Repeat("x", 128*1024) // 128KB of text
|
||||
data := userTextEntry(ts(0), longText) + "\n"
|
||||
reader := strings.NewReader(data)
|
||||
|
||||
sess, _, err := ParseTranscriptReader(reader, "big-session")
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sess.Events, 1)
|
||||
// Input gets truncated to 500 chars by the truncate function.
|
||||
assert.Len(t, sess.Events[0].Input, 503) // 500 + "..."
|
||||
}
|
||||
|
||||
func TestParseTranscriptReader_MalformedWithStats_Good(t *testing.T) {
|
||||
// Malformed lines in a reader should still produce correct stats.
|
||||
data := strings.Join([]string{
|
||||
`{bad json`,
|
||||
userTextEntry(ts(0), "valid"),
|
||||
`also bad`,
|
||||
}, "\n") + "\n"
|
||||
reader := strings.NewReader(data)
|
||||
|
||||
sess, stats, err := ParseTranscriptReader(reader, "mixed")
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, sess.Events, 1)
|
||||
assert.Equal(t, 3, stats.TotalLines)
|
||||
assert.Equal(t, 2, stats.SkippedLines)
|
||||
}
|
||||
|
||||
func TestParseTranscriptReader_OrphanedTools_Good(t *testing.T) {
|
||||
// Tool calls without results should be tracked in stats.
|
||||
data := strings.Join([]string{
|
||||
toolUseEntry(ts(0), "Bash", "orphan-r1", map[string]interface{}{
|
||||
"command": "ls",
|
||||
}),
|
||||
assistantTextEntry(ts(1), "No result arrived"),
|
||||
}, "\n") + "\n"
|
||||
reader := strings.NewReader(data)
|
||||
|
||||
_, stats, err := ParseTranscriptReader(reader, "orphan-reader")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1, stats.OrphanedToolCalls)
|
||||
}
|
||||
|
||||
// --- Custom MCP tool tests ---
|
||||
|
||||
func TestParseTranscript_CustomMCPTool_Good(t *testing.T) {
|
||||
// A tool_use with a non-standard MCP tool name (e.g. mcp__server__tool).
|
||||
dir := t.TempDir()
|
||||
lines := []string{
|
||||
toolUseEntry(ts(0), "mcp__forge__create_issue", "tu-mcp-1", map[string]interface{}{
|
||||
"title": "bug report",
|
||||
"body": "something broke",
|
||||
"repo": "core/go",
|
||||
}),
|
||||
toolResultEntry(ts(1), "tu-mcp-1", "Issue #42 created", false),
|
||||
}
|
||||
path := writeJSONL(t, dir, "mcp_tool.jsonl", lines...)
|
||||
|
||||
sess, _, err := ParseTranscript(path)
|
||||
require.NoError(t, err)
|
||||
|
||||
var toolEvents []Event
|
||||
for _, e := range sess.Events {
|
||||
if e.Type == "tool_use" {
|
||||
toolEvents = append(toolEvents, e)
|
||||
}
|
||||
}
|
||||
|
||||
require.Len(t, toolEvents, 1)
|
||||
assert.Equal(t, "mcp__forge__create_issue", toolEvents[0].Tool)
|
||||
// Fallback should show sorted keys.
|
||||
assert.Contains(t, toolEvents[0].Input, "body")
|
||||
assert.Contains(t, toolEvents[0].Input, "repo")
|
||||
assert.Contains(t, toolEvents[0].Input, "title")
|
||||
assert.True(t, toolEvents[0].Success)
|
||||
}
|
||||
|
||||
func TestParseTranscript_CustomMCPToolNestedInput_Good(t *testing.T) {
|
||||
// MCP tool with nested JSON input — should show top-level keys.
|
||||
dir := t.TempDir()
|
||||
lines := []string{
|
||||
toolUseEntry(ts(0), "mcp__db__query", "tu-nested-1", map[string]interface{}{
|
||||
"query": "SELECT *",
|
||||
"params": map[string]interface{}{"limit": 10, "offset": 0},
|
||||
}),
|
||||
toolResultEntry(ts(1), "tu-nested-1", "3 rows returned", false),
|
||||
}
|
||||
path := writeJSONL(t, dir, "mcp_nested.jsonl", lines...)
|
||||
|
||||
sess, _, err := ParseTranscript(path)
|
||||
require.NoError(t, err)
|
||||
|
||||
var toolEvents []Event
|
||||
for _, e := range sess.Events {
|
||||
if e.Type == "tool_use" {
|
||||
toolEvents = append(toolEvents, e)
|
||||
}
|
||||
}
|
||||
|
||||
require.Len(t, toolEvents, 1)
|
||||
assert.Contains(t, toolEvents[0].Input, "params")
|
||||
assert.Contains(t, toolEvents[0].Input, "query")
|
||||
}
|
||||
|
||||
func TestParseTranscript_UnknownToolEmptyInput_Good(t *testing.T) {
|
||||
// A tool_use with an empty input object.
|
||||
dir := t.TempDir()
|
||||
lines := []string{
|
||||
toolUseEntry(ts(0), "SomeTool", "tu-empty-1", map[string]interface{}{}),
|
||||
toolResultEntry(ts(1), "tu-empty-1", "done", false),
|
||||
}
|
||||
path := writeJSONL(t, dir, "empty_input.jsonl", lines...)
|
||||
|
||||
sess, _, err := ParseTranscript(path)
|
||||
require.NoError(t, err)
|
||||
|
||||
var toolEvents []Event
|
||||
for _, e := range sess.Events {
|
||||
if e.Type == "tool_use" {
|
||||
toolEvents = append(toolEvents, e)
|
||||
}
|
||||
}
|
||||
|
||||
require.Len(t, toolEvents, 1)
|
||||
// Empty object should produce empty string from fallback.
|
||||
assert.Equal(t, "", toolEvents[0].Input)
|
||||
}
|
||||
|
||||
// --- Edge case error recovery tests ---
|
||||
|
||||
func TestParseTranscript_BinaryGarbage_Ugly(t *testing.T) {
|
||||
// Binary garbage interspersed with valid lines — must not panic.
|
||||
dir := t.TempDir()
|
||||
garbage := string([]byte{0x00, 0x01, 0x02, 0xff, 0xfe, 0xfd})
|
||||
lines := []string{
|
||||
garbage,
|
||||
userTextEntry(ts(0), "survived"),
|
||||
garbage + garbage,
|
||||
}
|
||||
path := writeJSONL(t, dir, "binary.jsonl", lines...)
|
||||
|
||||
sess, stats, err := ParseTranscript(path)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Only the valid line should produce an event.
|
||||
var userEvents []Event
|
||||
for _, e := range sess.Events {
|
||||
if e.Type == "user" {
|
||||
userEvents = append(userEvents, e)
|
||||
}
|
||||
}
|
||||
require.Len(t, userEvents, 1)
|
||||
assert.Equal(t, "survived", userEvents[0].Input)
|
||||
assert.Equal(t, 2, stats.SkippedLines)
|
||||
}
|
||||
|
||||
func TestParseTranscript_NullBytes_Ugly(t *testing.T) {
|
||||
// Lines with embedded null bytes.
|
||||
dir := t.TempDir()
|
||||
lines := []string{
|
||||
`{"type":"user","timestamp":"` + ts(0) + `","sessionId":"n","message":` + string([]byte{0x00}) + `}`,
|
||||
userTextEntry(ts(1), "ok"),
|
||||
}
|
||||
path := writeJSONL(t, dir, "null_bytes.jsonl", lines...)
|
||||
|
||||
sess, _, err := ParseTranscript(path)
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, sess.Events, 1)
|
||||
}
|
||||
|
||||
func TestParseTranscript_VeryLongLine_Ugly(t *testing.T) {
|
||||
// A single line that exceeds the default bufio.Scanner buffer.
|
||||
// The parser should handle this without error thanks to the enlarged buffer.
|
||||
dir := t.TempDir()
|
||||
huge := strings.Repeat("a", 5*1024*1024) // 5MB text
|
||||
path := writeJSONL(t, dir, "huge_line.jsonl",
|
||||
userTextEntry(ts(0), huge),
|
||||
)
|
||||
|
||||
sess, _, err := ParseTranscript(path)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sess.Events, 1)
|
||||
}
|
||||
|
||||
func TestParseTranscript_MalformedMessageJSON_Bad(t *testing.T) {
|
||||
// Valid outer JSON but the message field is not valid message structure.
|
||||
dir := t.TempDir()
|
||||
lines := []string{
|
||||
`{"type":"assistant","timestamp":"` + ts(0) + `","sessionId":"b","message":"not an object"}`,
|
||||
userTextEntry(ts(1), "ok"),
|
||||
}
|
||||
path := writeJSONL(t, dir, "bad_msg.jsonl", lines...)
|
||||
|
||||
sess, _, err := ParseTranscript(path)
|
||||
require.NoError(t, err)
|
||||
// First line's message is a string, not object — should be skipped.
|
||||
assert.Len(t, sess.Events, 1)
|
||||
assert.Equal(t, "ok", sess.Events[0].Input)
|
||||
}
|
||||
|
||||
func TestParseTranscript_MalformedContentBlock_Bad(t *testing.T) {
|
||||
// Valid message structure but content blocks are malformed.
|
||||
dir := t.TempDir()
|
||||
lines := []string{
|
||||
`{"type":"assistant","timestamp":"` + ts(0) + `","sessionId":"c","message":{"role":"assistant","content":["not a block object"]}}`,
|
||||
userTextEntry(ts(1), "still ok"),
|
||||
}
|
||||
path := writeJSONL(t, dir, "bad_block.jsonl", lines...)
|
||||
|
||||
sess, _, err := ParseTranscript(path)
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, sess.Events, 1)
|
||||
assert.Equal(t, "still ok", sess.Events[0].Input)
|
||||
}
|
||||
|
||||
func TestParseTranscript_TruncatedMissingBrace_Good(t *testing.T) {
|
||||
// Final line is missing its closing brace — should be skipped gracefully.
|
||||
dir := t.TempDir()
|
||||
lines := []string{
|
||||
userTextEntry(ts(0), "valid"),
|
||||
assistantTextEntry(ts(1), "also valid"),
|
||||
`{"type":"user","timestamp":"` + ts(2) + `","sessionId":"t","message":{"role":"user","content":[{"type":"text","text":"truncated"`,
|
||||
}
|
||||
path := writeJSONL(t, dir, "trunc_brace.jsonl", lines...)
|
||||
|
||||
sess, _, err := ParseTranscript(path)
|
||||
require.NoError(t, err)
|
||||
// Only the two complete lines should produce events.
|
||||
assert.Len(t, sess.Events, 2)
|
||||
assert.Equal(t, "valid", sess.Events[0].Input)
|
||||
assert.Equal(t, "also valid", sess.Events[1].Input)
|
||||
}
|
||||
|
||||
func TestParseTranscript_TruncatedMidKey_Good(t *testing.T) {
|
||||
// Line truncated in the middle of a JSON key.
|
||||
dir := t.TempDir()
|
||||
lines := []string{
|
||||
userTextEntry(ts(0), "first"),
|
||||
`{"type":"assis`,
|
||||
}
|
||||
path := writeJSONL(t, dir, "trunc_midkey.jsonl", lines...)
|
||||
|
||||
sess, _, err := ParseTranscript(path)
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, sess.Events, 1)
|
||||
assert.Equal(t, "first", sess.Events[0].Input)
|
||||
}
|
||||
|
||||
func TestParseTranscript_AllBadLines_Good(t *testing.T) {
|
||||
// Every line is truncated/malformed — result should be empty, no error.
|
||||
dir := t.TempDir()
|
||||
lines := []string{
|
||||
`{"type":"user","timestamp`,
|
||||
`{"broken`,
|
||||
`not even json`,
|
||||
}
|
||||
path := writeJSONL(t, dir, "all_bad.jsonl", lines...)
|
||||
|
||||
sess, stats, err := ParseTranscript(path)
|
||||
require.NoError(t, err)
|
||||
assert.Empty(t, sess.Events)
|
||||
assert.True(t, sess.StartTime.IsZero())
|
||||
assert.Equal(t, 3, stats.SkippedLines)
|
||||
}
|
||||
|
||||
// --- ListSessions with truncated files ---
|
||||
|
||||
func TestListSessions_TruncatedFile_Good(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
// A .jsonl file where some lines are truncated — ListSessions should
|
||||
// still extract timestamps from valid lines.
|
||||
lines := []string{
|
||||
userTextEntry(ts(0), "start"),
|
||||
`{"type":"assistant","truncated`,
|
||||
userTextEntry(ts(5), "end"),
|
||||
}
|
||||
writeJSONL(t, dir, "partial.jsonl", lines...)
|
||||
|
||||
sessions, err := ListSessions(dir)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sessions, 1)
|
||||
assert.False(t, sessions[0].StartTime.IsZero())
|
||||
assert.False(t, sessions[0].EndTime.IsZero())
|
||||
// End time should reflect the last valid timestamp.
|
||||
assert.True(t, sessions[0].EndTime.After(sessions[0].StartTime))
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue