2026-03-30 20:46:43 +00:00
package store
import (
"database/sql"
"regexp"
"strconv"
"time"
core "dappco.re/go/core"
)
const (
journalEntriesTableName = "journal_entries"
defaultJournalBucket = "store"
)
const createJournalEntriesTableSQL = ` CREATE TABLE IF NOT EXISTS journal_entries (
entry_id INTEGER PRIMARY KEY AUTOINCREMENT ,
bucket_name TEXT NOT NULL ,
measurement TEXT NOT NULL ,
fields_json TEXT NOT NULL ,
tags_json TEXT NOT NULL ,
committed_at INTEGER NOT NULL ,
archived_at INTEGER
) `
var (
2026-04-03 05:53:56 +00:00
journalBucketPattern = regexp . MustCompile ( ` bucket:\s*"([^"]+)" ` )
journalMeasurementPatterns = [ ] * regexp . Regexp {
regexp . MustCompile ( ` (?:_measurement|measurement)\s*==\s*"([^"]+)" ` ) ,
regexp . MustCompile ( ` \[\s*"(?:_measurement|measurement)"\s*\]\s*==\s*"([^"]+)" ` ) ,
}
2026-04-04 08:36:42 +00:00
journalBucketEqualityPatterns = [ ] * regexp . Regexp {
regexp . MustCompile ( ` r\.(?:_bucket|bucket|bucket_name)\s*==\s*"([^"]+)" ` ) ,
regexp . MustCompile ( ` r\[\s*"(?:_bucket|bucket|bucket_name)"\s*\]\s*==\s*"([^"]+)" ` ) ,
}
2026-04-04 17:39:45 +00:00
journalStringEqualityPatterns = [ ] * regexp . Regexp {
2026-04-03 08:20:00 +00:00
regexp . MustCompile ( ` r\.([a-zA-Z0-9_:-]+)\s*==\s*"([^"]+)" ` ) ,
regexp . MustCompile ( ` r\[\s*"([a-zA-Z0-9_:-]+)"\s*\]\s*==\s*"([^"]+)" ` ) ,
}
2026-04-04 17:39:45 +00:00
journalScalarEqualityPatterns = [ ] * regexp . Regexp {
regexp . MustCompile ( ` r\.([a-zA-Z0-9_:-]+)\s*==\s*(true|false|-?[0-9]+(?:\.[0-9]+)?) ` ) ,
regexp . MustCompile ( ` r\[\s*"([a-zA-Z0-9_:-]+)"\s*\]\s*==\s*(true|false|-?[0-9]+(?:\.[0-9]+)?) ` ) ,
}
2026-03-30 20:46:43 +00:00
)
2026-04-04 17:39:45 +00:00
type journalEqualityFilter struct {
columnName string
filterValue any
stringCompare bool
}
2026-03-30 20:46:43 +00:00
type journalExecutor interface {
Exec ( query string , args ... any ) ( sql . Result , error )
}
// Usage example: `result := storeInstance.CommitToJournal("scroll-session", map[string]any{"like": 4}, map[string]string{"workspace": "scroll-session"})`
2026-04-04 11:57:37 +00:00
// Workspace.Commit uses this same journal write path before it updates the
// summary row in `workspace:NAME`.
2026-03-30 20:46:43 +00:00
func ( storeInstance * Store ) CommitToJournal ( measurement string , fields map [ string ] any , tags map [ string ] string ) core . Result {
2026-04-03 06:31:35 +00:00
if err := storeInstance . ensureReady ( "store.CommitToJournal" ) ; err != nil {
return core . Result { Value : err , OK : false }
}
2026-03-30 20:46:43 +00:00
if measurement == "" {
return core . Result { Value : core . E ( "store.CommitToJournal" , "measurement is empty" , nil ) , OK : false }
}
if fields == nil {
fields = map [ string ] any { }
}
if tags == nil {
tags = map [ string ] string { }
}
2026-04-04 09:04:56 +00:00
if err := ensureJournalSchema ( storeInstance . sqliteDatabase ) ; err != nil {
2026-03-30 20:46:43 +00:00
return core . Result { Value : core . E ( "store.CommitToJournal" , "ensure journal schema" , err ) , OK : false }
}
2026-04-04 11:05:48 +00:00
fieldsJSON , err := marshalJSONText ( fields , "store.CommitToJournal" , "marshal fields" )
2026-03-30 20:46:43 +00:00
if err != nil {
return core . Result { Value : err , OK : false }
}
2026-04-04 11:05:48 +00:00
tagsJSON , err := marshalJSONText ( tags , "store.CommitToJournal" , "marshal tags" )
2026-03-30 20:46:43 +00:00
if err != nil {
return core . Result { Value : err , OK : false }
}
committedAt := time . Now ( ) . UnixMilli ( )
2026-04-04 11:54:03 +00:00
if err := commitJournalEntry (
2026-04-04 09:04:56 +00:00
storeInstance . sqliteDatabase ,
2026-03-30 20:46:43 +00:00
storeInstance . journalBucket ( ) ,
measurement ,
fieldsJSON ,
tagsJSON ,
committedAt ,
) ; err != nil {
return core . Result { Value : core . E ( "store.CommitToJournal" , "insert journal entry" , err ) , OK : false }
}
return core . Result {
Value : map [ string ] any {
"bucket" : storeInstance . journalBucket ( ) ,
"measurement" : measurement ,
2026-04-04 14:15:53 +00:00
"fields" : cloneAnyMap ( fields ) ,
"tags" : cloneStringMap ( tags ) ,
2026-03-30 20:46:43 +00:00
"committed_at" : committedAt ,
} ,
OK : true ,
}
}
2026-04-04 10:40:37 +00:00
// Usage example: `result := storeInstance.QueryJournal(\`from(bucket: "events") |> range(start: -24h) |> filter(fn: (r) => r.workspace == "session-a")\`)`
// Usage example: `result := storeInstance.QueryJournal("SELECT measurement, committed_at FROM journal_entries ORDER BY committed_at")`
2026-03-30 20:46:43 +00:00
func ( storeInstance * Store ) QueryJournal ( flux string ) core . Result {
2026-04-03 06:31:35 +00:00
if err := storeInstance . ensureReady ( "store.QueryJournal" ) ; err != nil {
return core . Result { Value : err , OK : false }
}
2026-04-04 09:04:56 +00:00
if err := ensureJournalSchema ( storeInstance . sqliteDatabase ) ; err != nil {
2026-03-30 20:46:43 +00:00
return core . Result { Value : core . E ( "store.QueryJournal" , "ensure journal schema" , err ) , OK : false }
}
trimmedQuery := core . Trim ( flux )
if trimmedQuery == "" {
return storeInstance . queryJournalRows (
2026-04-04 09:54:03 +00:00
"SELECT bucket_name, measurement, fields_json, tags_json, committed_at, archived_at FROM " + journalEntriesTableName + " WHERE archived_at IS NULL ORDER BY committed_at, entry_id" ,
2026-03-30 20:46:43 +00:00
)
}
2026-04-03 09:09:48 +00:00
if isRawSQLJournalQuery ( trimmedQuery ) {
2026-03-30 20:46:43 +00:00
return storeInstance . queryJournalRows ( trimmedQuery )
}
2026-04-04 10:33:25 +00:00
selectSQL , arguments , err := storeInstance . queryJournalFromFlux ( trimmedQuery )
2026-03-30 20:46:43 +00:00
if err != nil {
return core . Result { Value : err , OK : false }
}
return storeInstance . queryJournalRows ( selectSQL , arguments ... )
}
2026-04-03 09:09:48 +00:00
func isRawSQLJournalQuery ( query string ) bool {
upperQuery := core . Upper ( core . Trim ( query ) )
return core . HasPrefix ( upperQuery , "SELECT" ) ||
core . HasPrefix ( upperQuery , "WITH" ) ||
2026-04-04 08:08:21 +00:00
core . HasPrefix ( upperQuery , "EXPLAIN" ) ||
core . HasPrefix ( upperQuery , "PRAGMA" )
2026-04-03 09:09:48 +00:00
}
2026-03-30 20:46:43 +00:00
func ( storeInstance * Store ) queryJournalRows ( query string , arguments ... any ) core . Result {
2026-04-04 09:04:56 +00:00
rows , err := storeInstance . sqliteDatabase . Query ( query , arguments ... )
2026-03-30 20:46:43 +00:00
if err != nil {
return core . Result { Value : core . E ( "store.QueryJournal" , "query rows" , err ) , OK : false }
}
defer rows . Close ( )
rowMaps , err := queryRowsAsMaps ( rows )
if err != nil {
return core . Result { Value : core . E ( "store.QueryJournal" , "scan rows" , err ) , OK : false }
}
return core . Result { Value : inflateJournalRows ( rowMaps ) , OK : true }
}
2026-04-04 10:33:25 +00:00
func ( storeInstance * Store ) queryJournalFromFlux ( flux string ) ( string , [ ] any , error ) {
2026-04-03 09:13:07 +00:00
queryBuilder := core . NewBuilder ( )
queryBuilder . WriteString ( "SELECT bucket_name, measurement, fields_json, tags_json, committed_at, archived_at FROM " )
queryBuilder . WriteString ( journalEntriesTableName )
queryBuilder . WriteString ( " WHERE archived_at IS NULL" )
2026-03-30 20:46:43 +00:00
2026-04-03 09:13:07 +00:00
var queryArguments [ ] any
2026-03-30 20:46:43 +00:00
if bucket := quotedSubmatch ( journalBucketPattern , flux ) ; bucket != "" {
2026-04-03 09:13:07 +00:00
queryBuilder . WriteString ( " AND bucket_name = ?" )
queryArguments = append ( queryArguments , bucket )
2026-03-30 20:46:43 +00:00
}
2026-04-03 05:53:56 +00:00
if measurement := firstQuotedSubmatch ( journalMeasurementPatterns , flux ) ; measurement != "" {
2026-04-03 09:13:07 +00:00
queryBuilder . WriteString ( " AND measurement = ?" )
queryArguments = append ( queryArguments , measurement )
2026-03-30 20:46:43 +00:00
}
2026-04-03 04:50:26 +00:00
startRange , stopRange := journalRangeBounds ( flux )
if startRange != "" {
2026-04-04 11:05:48 +00:00
startTime , err := parseFluxTime ( core . Trim ( startRange ) )
2026-03-30 20:46:43 +00:00
if err != nil {
return "" , nil , core . E ( "store.QueryJournal" , "parse range" , err )
}
2026-04-03 09:13:07 +00:00
queryBuilder . WriteString ( " AND committed_at >= ?" )
queryArguments = append ( queryArguments , startTime . UnixMilli ( ) )
2026-03-30 20:46:43 +00:00
}
2026-04-03 04:50:26 +00:00
if stopRange != "" {
2026-04-04 11:05:48 +00:00
stopTime , err := parseFluxTime ( core . Trim ( stopRange ) )
2026-04-03 04:50:26 +00:00
if err != nil {
return "" , nil , core . E ( "store.QueryJournal" , "parse range" , err )
}
2026-04-03 09:13:07 +00:00
queryBuilder . WriteString ( " AND committed_at < ?" )
queryArguments = append ( queryArguments , stopTime . UnixMilli ( ) )
2026-04-03 04:50:26 +00:00
}
2026-03-30 20:46:43 +00:00
2026-04-04 08:36:42 +00:00
for _ , pattern := range journalBucketEqualityPatterns {
bucketMatches := pattern . FindAllStringSubmatch ( flux , - 1 )
for _ , match := range bucketMatches {
if len ( match ) < 2 {
continue
}
queryBuilder . WriteString ( " AND bucket_name = ?" )
queryArguments = append ( queryArguments , match [ 1 ] )
}
}
2026-04-04 17:39:45 +00:00
for _ , filter := range journalEqualityFilters ( flux ) {
if filter . stringCompare {
2026-04-03 09:13:07 +00:00
queryBuilder . WriteString ( " AND (CAST(json_extract(tags_json, '$.\"' || ? || '\"') AS TEXT) = ? OR CAST(json_extract(fields_json, '$.\"' || ? || '\"') AS TEXT) = ?)" )
2026-04-04 17:39:45 +00:00
queryArguments = append ( queryArguments , filter . columnName , filter . filterValue , filter . columnName , filter . filterValue )
continue
2026-04-03 08:20:00 +00:00
}
2026-04-04 17:39:45 +00:00
queryBuilder . WriteString ( " AND json_extract(fields_json, '$.\"' || ? || '\"') = ?" )
queryArguments = append ( queryArguments , filter . columnName , filter . filterValue )
2026-04-03 08:20:00 +00:00
}
2026-04-04 09:54:03 +00:00
queryBuilder . WriteString ( " ORDER BY committed_at, entry_id" )
2026-04-03 09:13:07 +00:00
return queryBuilder . String ( ) , queryArguments , nil
2026-03-30 20:46:43 +00:00
}
func ( storeInstance * Store ) journalBucket ( ) string {
2026-04-04 14:19:28 +00:00
if storeInstance . journalConfiguration . BucketName == "" {
2026-03-30 20:46:43 +00:00
return defaultJournalBucket
}
2026-04-04 14:19:28 +00:00
return storeInstance . journalConfiguration . BucketName
2026-03-30 20:46:43 +00:00
}
func ensureJournalSchema ( database schemaDatabase ) error {
if _ , err := database . Exec ( createJournalEntriesTableSQL ) ; err != nil {
return err
}
if _ , err := database . Exec (
"CREATE INDEX IF NOT EXISTS journal_entries_bucket_committed_at_idx ON " + journalEntriesTableName + " (bucket_name, committed_at)" ,
) ; err != nil {
return err
}
return nil
}
2026-04-04 11:54:03 +00:00
func commitJournalEntry (
2026-03-30 20:46:43 +00:00
executor journalExecutor ,
bucket , measurement , fieldsJSON , tagsJSON string ,
committedAt int64 ,
) error {
_ , err := executor . Exec (
"INSERT INTO " + journalEntriesTableName + " (bucket_name, measurement, fields_json, tags_json, committed_at, archived_at) VALUES (?, ?, ?, ?, ?, NULL)" ,
bucket ,
measurement ,
fieldsJSON ,
tagsJSON ,
committedAt ,
)
return err
}
2026-04-04 11:05:48 +00:00
func marshalJSONText ( value any , operation , message string ) ( string , error ) {
2026-03-30 20:46:43 +00:00
result := core . JSONMarshal ( value )
if ! result . OK {
return "" , core . E ( operation , message , result . Value . ( error ) )
}
return string ( result . Value . ( [ ] byte ) ) , nil
}
2026-04-03 04:50:26 +00:00
func journalRangeBounds ( flux string ) ( string , string ) {
2026-04-04 11:05:48 +00:00
rangeIndex := indexOfSubstring ( flux , "range(" )
2026-04-03 04:50:26 +00:00
if rangeIndex < 0 {
return "" , ""
}
contentStart := rangeIndex + len ( "range(" )
depth := 1
contentEnd := - 1
scanRange :
for i := contentStart ; i < len ( flux ) ; i ++ {
switch flux [ i ] {
case '(' :
depth ++
case ')' :
depth --
if depth == 0 {
contentEnd = i
break scanRange
}
}
}
if contentEnd < 0 || contentEnd <= contentStart {
return "" , ""
}
content := flux [ contentStart : contentEnd ]
startPrefix := "start:"
2026-04-04 11:05:48 +00:00
startIndex := indexOfSubstring ( content , startPrefix )
2026-04-03 04:50:26 +00:00
if startIndex < 0 {
return "" , ""
}
startIndex += len ( startPrefix )
start := core . Trim ( content [ startIndex : ] )
stop := ""
2026-04-04 11:05:48 +00:00
if stopIndex := indexOfSubstring ( content , ", stop:" ) ; stopIndex >= 0 {
2026-04-03 04:50:26 +00:00
start = core . Trim ( content [ startIndex : stopIndex ] )
stop = core . Trim ( content [ stopIndex + len ( ", stop:" ) : ] )
2026-04-04 11:05:48 +00:00
} else if stopIndex := indexOfSubstring ( content , ",stop:" ) ; stopIndex >= 0 {
2026-04-03 04:50:26 +00:00
start = core . Trim ( content [ startIndex : stopIndex ] )
stop = core . Trim ( content [ stopIndex + len ( ",stop:" ) : ] )
}
return start , stop
}
2026-04-04 11:05:48 +00:00
func indexOfSubstring ( text , substring string ) int {
2026-04-03 04:50:26 +00:00
if substring == "" {
return 0
}
if len ( substring ) > len ( text ) {
return - 1
}
for i := 0 ; i <= len ( text ) - len ( substring ) ; i ++ {
if text [ i : i + len ( substring ) ] == substring {
return i
}
}
return - 1
}
2026-04-04 11:05:48 +00:00
func parseFluxTime ( value string ) ( time . Time , error ) {
2026-03-30 21:07:30 +00:00
value = core . Trim ( value )
2026-03-30 20:46:43 +00:00
if value == "" {
2026-04-04 11:05:48 +00:00
return time . Time { } , core . E ( "store.parseFluxTime" , "range value is empty" , nil )
2026-03-30 20:46:43 +00:00
}
2026-04-04 21:29:27 +00:00
value = firstStringOrEmpty ( core . Split ( value , "," ) )
2026-03-30 21:07:30 +00:00
value = core . Trim ( value )
if core . HasPrefix ( value , "time(v:" ) && core . HasSuffix ( value , ")" ) {
value = core . Trim ( core . TrimSuffix ( core . TrimPrefix ( value , "time(v:" ) , ")" ) )
}
if core . HasPrefix ( value , ` " ` ) && core . HasSuffix ( value , ` " ` ) {
value = core . TrimSuffix ( core . TrimPrefix ( value , ` " ` ) , ` " ` )
}
if value == "now()" {
return time . Now ( ) , nil
}
2026-03-30 20:46:43 +00:00
if core . HasSuffix ( value , "d" ) {
days , err := strconv . Atoi ( core . TrimSuffix ( value , "d" ) )
if err != nil {
return time . Time { } , err
}
return time . Now ( ) . Add ( time . Duration ( days ) * 24 * time . Hour ) , nil
}
lookback , err := time . ParseDuration ( value )
2026-03-30 21:07:30 +00:00
if err == nil {
return time . Now ( ) . Add ( lookback ) , nil
}
parsedTime , err := time . Parse ( time . RFC3339Nano , value )
2026-03-30 20:46:43 +00:00
if err != nil {
return time . Time { } , err
}
2026-03-30 21:07:30 +00:00
return parsedTime , nil
2026-03-30 20:46:43 +00:00
}
func quotedSubmatch ( pattern * regexp . Regexp , value string ) string {
match := pattern . FindStringSubmatch ( value )
if len ( match ) < 2 {
return ""
}
return match [ 1 ]
}
2026-04-03 05:53:56 +00:00
func firstQuotedSubmatch ( patterns [ ] * regexp . Regexp , value string ) string {
for _ , pattern := range patterns {
if match := quotedSubmatch ( pattern , value ) ; match != "" {
return match
}
}
return ""
}
2026-03-30 20:46:43 +00:00
func regexpSubmatch ( pattern * regexp . Regexp , value string , index int ) string {
match := pattern . FindStringSubmatch ( value )
if len ( match ) <= index {
return ""
}
return match [ index ]
}
func queryRowsAsMaps ( rows * sql . Rows ) ( [ ] map [ string ] any , error ) {
columnNames , err := rows . Columns ( )
if err != nil {
return nil , err
}
var result [ ] map [ string ] any
for rows . Next ( ) {
rawValues := make ( [ ] any , len ( columnNames ) )
scanTargets := make ( [ ] any , len ( columnNames ) )
for i := range rawValues {
scanTargets [ i ] = & rawValues [ i ]
}
if err := rows . Scan ( scanTargets ... ) ; err != nil {
return nil , err
}
row := make ( map [ string ] any , len ( columnNames ) )
for i , columnName := range columnNames {
row [ columnName ] = normaliseRowValue ( rawValues [ i ] )
}
result = append ( result , row )
}
if err := rows . Err ( ) ; err != nil {
return nil , err
}
return result , nil
}
func inflateJournalRows ( rows [ ] map [ string ] any ) [ ] map [ string ] any {
for _ , row := range rows {
if fieldsJSON , ok := row [ "fields_json" ] . ( string ) ; ok {
fields := make ( map [ string ] any )
result := core . JSONUnmarshalString ( fieldsJSON , & fields )
if result . OK {
row [ "fields" ] = fields
}
}
if tagsJSON , ok := row [ "tags_json" ] . ( string ) ; ok {
tags := make ( map [ string ] string )
result := core . JSONUnmarshalString ( tagsJSON , & tags )
if result . OK {
row [ "tags" ] = tags
}
}
}
return rows
}
func normaliseRowValue ( value any ) any {
switch typedValue := value . ( type ) {
case [ ] byte :
return string ( typedValue )
default :
return typedValue
}
}
2026-04-04 14:15:53 +00:00
2026-04-04 17:39:45 +00:00
func journalEqualityFilters ( flux string ) [ ] journalEqualityFilter {
var filters [ ] journalEqualityFilter
appendFilter := func ( columnName string , filterValue any , stringCompare bool ) {
if columnName == "_measurement" || columnName == "measurement" || columnName == "_bucket" || columnName == "bucket" || columnName == "bucket_name" {
return
}
filters = append ( filters , journalEqualityFilter {
columnName : columnName ,
filterValue : filterValue ,
stringCompare : stringCompare ,
} )
}
for _ , pattern := range journalStringEqualityPatterns {
matches := pattern . FindAllStringSubmatch ( flux , - 1 )
for _ , match := range matches {
if len ( match ) < 3 {
continue
}
appendFilter ( match [ 1 ] , match [ 2 ] , true )
}
}
for _ , pattern := range journalScalarEqualityPatterns {
matches := pattern . FindAllStringSubmatch ( flux , - 1 )
for _ , match := range matches {
if len ( match ) < 3 {
continue
}
filterValue , ok := parseJournalScalarValue ( match [ 2 ] )
if ! ok {
continue
}
appendFilter ( match [ 1 ] , filterValue , false )
}
}
return filters
}
func parseJournalScalarValue ( value string ) ( any , bool ) {
switch value {
case "true" :
return true , true
case "false" :
return false , true
}
if integerValue , err := strconv . ParseInt ( value , 10 , 64 ) ; err == nil {
return integerValue , true
}
if floatValue , err := strconv . ParseFloat ( value , 64 ) ; err == nil {
return floatValue , true
}
return nil , false
}
2026-04-04 14:15:53 +00:00
func cloneAnyMap ( input map [ string ] any ) map [ string ] any {
if input == nil {
return map [ string ] any { }
}
cloned := make ( map [ string ] any , len ( input ) )
for key , value := range input {
cloned [ key ] = value
}
return cloned
}
func cloneStringMap ( input map [ string ] string ) map [ string ] string {
if input == nil {
return map [ string ] string { }
}
cloned := make ( map [ string ] string , len ( input ) )
for key , value := range input {
cloned [ key ] = value
}
return cloned
}