go-session/video.go
Snider 7f2fd424cf
All checks were successful
Security Scan / security (pull_request) Successful in 12s
Test / test (pull_request) Successful in 1m18s
refactor(module): migrate module paths from forge.lthn.ai to dappco.re
Update module path to dappco.re/go/core/session, dependency to
dappco.re/go/core/log v0.1.0, all Go import paths, and documentation
references. Remove duplicate test functions found during migration.

Co-Authored-By: Virgil <virgil@lethean.io>
2026-03-22 01:37:11 +00:00

130 lines
3 KiB
Go

// SPDX-Licence-Identifier: EUPL-1.2
package session
import (
"fmt"
"os"
"os/exec"
"strings"
coreerr "dappco.re/go/core/log"
)
// RenderMP4 generates an MP4 video from session events using VHS (charmbracelet).
func RenderMP4(sess *Session, outputPath string) error {
if _, err := exec.LookPath("vhs"); err != nil {
return coreerr.E("RenderMP4", "vhs not installed (go install github.com/charmbracelet/vhs@latest)", nil)
}
tape := generateTape(sess, outputPath)
tmpFile, err := os.CreateTemp("", "session-*.tape")
if err != nil {
return coreerr.E("RenderMP4", "create tape", err)
}
defer os.Remove(tmpFile.Name())
if _, err := tmpFile.WriteString(tape); err != nil {
tmpFile.Close()
return coreerr.E("RenderMP4", "write tape", err)
}
tmpFile.Close()
cmd := exec.Command("vhs", tmpFile.Name())
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
return coreerr.E("RenderMP4", "vhs render", err)
}
return nil
}
func generateTape(sess *Session, outputPath string) string {
var b strings.Builder
b.WriteString(fmt.Sprintf("Output %s\n", outputPath))
b.WriteString("Set FontSize 16\n")
b.WriteString("Set Width 1400\n")
b.WriteString("Set Height 800\n")
b.WriteString("Set TypingSpeed 30ms\n")
b.WriteString("Set Theme \"Catppuccin Mocha\"\n")
b.WriteString("Set Shell bash\n")
b.WriteString("\n")
// Title frame
id := sess.ID
if len(id) > 8 {
id = id[:8]
}
b.WriteString(fmt.Sprintf("Type \"# Session %s | %s\"\n",
id, sess.StartTime.Format("2006-01-02 15:04")))
b.WriteString("Enter\n")
b.WriteString("Sleep 2s\n")
b.WriteString("\n")
for _, evt := range sess.Events {
if evt.Type != "tool_use" {
continue
}
switch evt.Tool {
case "Bash":
cmd := extractCommand(evt.Input)
if cmd == "" {
continue
}
// Show the command
b.WriteString(fmt.Sprintf("Type %q\n", "$ "+cmd))
b.WriteString("Enter\n")
// Show abbreviated output
output := evt.Output
if len(output) > 200 {
output = output[:200] + "..."
}
if output != "" {
for line := range strings.SplitSeq(output, "\n") {
if line == "" {
continue
}
b.WriteString(fmt.Sprintf("Type %q\n", line))
b.WriteString("Enter\n")
}
}
// Status indicator
if !evt.Success {
b.WriteString("Type \"# ✗ FAILED\"\n")
} else {
b.WriteString("Type \"# ✓ OK\"\n")
}
b.WriteString("Enter\n")
b.WriteString("Sleep 1s\n")
b.WriteString("\n")
case "Read", "Edit", "Write":
b.WriteString(fmt.Sprintf("Type %q\n",
fmt.Sprintf("# %s: %s", evt.Tool, truncate(evt.Input, 80))))
b.WriteString("Enter\n")
b.WriteString("Sleep 500ms\n")
case "Task":
b.WriteString(fmt.Sprintf("Type %q\n",
fmt.Sprintf("# Agent: %s", truncate(evt.Input, 80))))
b.WriteString("Enter\n")
b.WriteString("Sleep 1s\n")
}
}
b.WriteString("Sleep 3s\n")
return b.String()
}
func extractCommand(input string) string {
// Remove description suffix (after " # ")
if idx := strings.Index(input, " # "); idx > 0 {
return input[:idx]
}
return input
}