package store import ( "database/sql" "regexp" "strconv" "time" core "dappco.re/go/core" ) const ( journalEntriesTableName = "journal_entries" defaultJournalBucket = "store" ) const createJournalEntriesTableSQL = `CREATE TABLE IF NOT EXISTS journal_entries ( entry_id INTEGER PRIMARY KEY AUTOINCREMENT, bucket_name TEXT NOT NULL, measurement TEXT NOT NULL, fields_json TEXT NOT NULL, tags_json TEXT NOT NULL, committed_at INTEGER NOT NULL, archived_at INTEGER )` var ( journalBucketPattern = regexp.MustCompile(`bucket:\s*"([^"]+)"`) journalMeasurementPatterns = []*regexp.Regexp{ regexp.MustCompile(`(?:_measurement|measurement)\s*==\s*"([^"]+)"`), regexp.MustCompile(`\[\s*"(?:_measurement|measurement)"\s*\]\s*==\s*"([^"]+)"`), } journalBucketEqualityPatterns = []*regexp.Regexp{ regexp.MustCompile(`r\.(?:_bucket|bucket|bucket_name)\s*==\s*"([^"]+)"`), regexp.MustCompile(`r\[\s*"(?:_bucket|bucket|bucket_name)"\s*\]\s*==\s*"([^"]+)"`), } journalStringEqualityPatterns = []*regexp.Regexp{ regexp.MustCompile(`r\.([a-zA-Z0-9_:-]+)\s*==\s*"([^"]+)"`), regexp.MustCompile(`r\[\s*"([a-zA-Z0-9_:-]+)"\s*\]\s*==\s*"([^"]+)"`), } journalScalarEqualityPatterns = []*regexp.Regexp{ regexp.MustCompile(`r\.([a-zA-Z0-9_:-]+)\s*==\s*(true|false|-?[0-9]+(?:\.[0-9]+)?)`), regexp.MustCompile(`r\[\s*"([a-zA-Z0-9_:-]+)"\s*\]\s*==\s*(true|false|-?[0-9]+(?:\.[0-9]+)?)`), } ) type journalEqualityFilter struct { columnName string filterValue any stringCompare bool } type journalExecutor interface { Exec(query string, args ...any) (sql.Result, error) } // Usage example: `result := storeInstance.CommitToJournal("scroll-session", map[string]any{"like": 4}, map[string]string{"workspace": "scroll-session"})` // Workspace.Commit uses this same journal write path before it updates the // summary row in `workspace:NAME`. func (storeInstance *Store) CommitToJournal(measurement string, fields map[string]any, tags map[string]string) core.Result { if err := storeInstance.ensureReady("store.CommitToJournal"); err != nil { return core.Result{Value: err, OK: false} } if measurement == "" { return core.Result{Value: core.E("store.CommitToJournal", "measurement is empty", nil), OK: false} } if fields == nil { fields = map[string]any{} } if tags == nil { tags = map[string]string{} } if err := ensureJournalSchema(storeInstance.sqliteDatabase); err != nil { return core.Result{Value: core.E("store.CommitToJournal", "ensure journal schema", err), OK: false} } fieldsJSON, err := marshalJSONText(fields, "store.CommitToJournal", "marshal fields") if err != nil { return core.Result{Value: err, OK: false} } tagsJSON, err := marshalJSONText(tags, "store.CommitToJournal", "marshal tags") if err != nil { return core.Result{Value: err, OK: false} } committedAt := time.Now().UnixMilli() if err := commitJournalEntry( storeInstance.sqliteDatabase, storeInstance.journalBucket(), measurement, fieldsJSON, tagsJSON, committedAt, ); err != nil { return core.Result{Value: core.E("store.CommitToJournal", "insert journal entry", err), OK: false} } return core.Result{ Value: map[string]any{ "bucket": storeInstance.journalBucket(), "measurement": measurement, "fields": cloneAnyMap(fields), "tags": cloneStringMap(tags), "committed_at": committedAt, }, OK: true, } } // Usage example: `result := storeInstance.QueryJournal(\`from(bucket: "events") |> range(start: -24h) |> filter(fn: (r) => r.workspace == "session-a")\`)` // Usage example: `result := storeInstance.QueryJournal("SELECT measurement, committed_at FROM journal_entries ORDER BY committed_at")` func (storeInstance *Store) QueryJournal(flux string) core.Result { if err := storeInstance.ensureReady("store.QueryJournal"); err != nil { return core.Result{Value: err, OK: false} } if err := ensureJournalSchema(storeInstance.sqliteDatabase); err != nil { return core.Result{Value: core.E("store.QueryJournal", "ensure journal schema", err), OK: false} } trimmedQuery := core.Trim(flux) if trimmedQuery == "" { return storeInstance.queryJournalRows( "SELECT bucket_name, measurement, fields_json, tags_json, committed_at, archived_at FROM " + journalEntriesTableName + " WHERE archived_at IS NULL ORDER BY committed_at, entry_id", ) } if isRawSQLJournalQuery(trimmedQuery) { return storeInstance.queryJournalRows(trimmedQuery) } selectSQL, arguments, err := storeInstance.queryJournalFromFlux(trimmedQuery) if err != nil { return core.Result{Value: err, OK: false} } return storeInstance.queryJournalRows(selectSQL, arguments...) } func isRawSQLJournalQuery(query string) bool { upperQuery := core.Upper(core.Trim(query)) return core.HasPrefix(upperQuery, "SELECT") || core.HasPrefix(upperQuery, "WITH") || core.HasPrefix(upperQuery, "EXPLAIN") || core.HasPrefix(upperQuery, "PRAGMA") } func (storeInstance *Store) queryJournalRows(query string, arguments ...any) core.Result { rows, err := storeInstance.sqliteDatabase.Query(query, arguments...) if err != nil { return core.Result{Value: core.E("store.QueryJournal", "query rows", err), OK: false} } defer rows.Close() rowMaps, err := queryRowsAsMaps(rows) if err != nil { return core.Result{Value: core.E("store.QueryJournal", "scan rows", err), OK: false} } return core.Result{Value: inflateJournalRows(rowMaps), OK: true} } func (storeInstance *Store) queryJournalFromFlux(flux string) (string, []any, error) { queryBuilder := core.NewBuilder() queryBuilder.WriteString("SELECT bucket_name, measurement, fields_json, tags_json, committed_at, archived_at FROM ") queryBuilder.WriteString(journalEntriesTableName) queryBuilder.WriteString(" WHERE archived_at IS NULL") var queryArguments []any if bucket := quotedSubmatch(journalBucketPattern, flux); bucket != "" { queryBuilder.WriteString(" AND bucket_name = ?") queryArguments = append(queryArguments, bucket) } if measurement := firstQuotedSubmatch(journalMeasurementPatterns, flux); measurement != "" { queryBuilder.WriteString(" AND measurement = ?") queryArguments = append(queryArguments, measurement) } startRange, stopRange := journalRangeBounds(flux) if startRange != "" { startTime, err := parseFluxTime(core.Trim(startRange)) if err != nil { return "", nil, core.E("store.QueryJournal", "parse range", err) } queryBuilder.WriteString(" AND committed_at >= ?") queryArguments = append(queryArguments, startTime.UnixMilli()) } if stopRange != "" { stopTime, err := parseFluxTime(core.Trim(stopRange)) if err != nil { return "", nil, core.E("store.QueryJournal", "parse range", err) } queryBuilder.WriteString(" AND committed_at < ?") queryArguments = append(queryArguments, stopTime.UnixMilli()) } for _, pattern := range journalBucketEqualityPatterns { bucketMatches := pattern.FindAllStringSubmatch(flux, -1) for _, match := range bucketMatches { if len(match) < 2 { continue } queryBuilder.WriteString(" AND bucket_name = ?") queryArguments = append(queryArguments, match[1]) } } for _, filter := range journalEqualityFilters(flux) { if filter.stringCompare { queryBuilder.WriteString(" AND (CAST(json_extract(tags_json, '$.\"' || ? || '\"') AS TEXT) = ? OR CAST(json_extract(fields_json, '$.\"' || ? || '\"') AS TEXT) = ?)") queryArguments = append(queryArguments, filter.columnName, filter.filterValue, filter.columnName, filter.filterValue) continue } queryBuilder.WriteString(" AND json_extract(fields_json, '$.\"' || ? || '\"') = ?") queryArguments = append(queryArguments, filter.columnName, filter.filterValue) } queryBuilder.WriteString(" ORDER BY committed_at, entry_id") return queryBuilder.String(), queryArguments, nil } func (storeInstance *Store) journalBucket() string { if storeInstance.journalConfiguration.BucketName == "" { return defaultJournalBucket } return storeInstance.journalConfiguration.BucketName } func ensureJournalSchema(database schemaDatabase) error { if _, err := database.Exec(createJournalEntriesTableSQL); err != nil { return err } if _, err := database.Exec( "CREATE INDEX IF NOT EXISTS journal_entries_bucket_committed_at_idx ON " + journalEntriesTableName + " (bucket_name, committed_at)", ); err != nil { return err } return nil } func commitJournalEntry( executor journalExecutor, bucket, measurement, fieldsJSON, tagsJSON string, committedAt int64, ) error { _, err := executor.Exec( "INSERT INTO "+journalEntriesTableName+" (bucket_name, measurement, fields_json, tags_json, committed_at, archived_at) VALUES (?, ?, ?, ?, ?, NULL)", bucket, measurement, fieldsJSON, tagsJSON, committedAt, ) return err } func marshalJSONText(value any, operation, message string) (string, error) { result := core.JSONMarshal(value) if !result.OK { return "", core.E(operation, message, result.Value.(error)) } return string(result.Value.([]byte)), nil } func journalRangeBounds(flux string) (string, string) { rangeIndex := indexOfSubstring(flux, "range(") if rangeIndex < 0 { return "", "" } contentStart := rangeIndex + len("range(") depth := 1 contentEnd := -1 scanRange: for i := contentStart; i < len(flux); i++ { switch flux[i] { case '(': depth++ case ')': depth-- if depth == 0 { contentEnd = i break scanRange } } } if contentEnd < 0 || contentEnd <= contentStart { return "", "" } content := flux[contentStart:contentEnd] startPrefix := "start:" startIndex := indexOfSubstring(content, startPrefix) if startIndex < 0 { return "", "" } startIndex += len(startPrefix) start := core.Trim(content[startIndex:]) stop := "" if stopIndex := indexOfSubstring(content, ", stop:"); stopIndex >= 0 { start = core.Trim(content[startIndex:stopIndex]) stop = core.Trim(content[stopIndex+len(", stop:"):]) } else if stopIndex := indexOfSubstring(content, ",stop:"); stopIndex >= 0 { start = core.Trim(content[startIndex:stopIndex]) stop = core.Trim(content[stopIndex+len(",stop:"):]) } return start, stop } func indexOfSubstring(text, substring string) int { if substring == "" { return 0 } if len(substring) > len(text) { return -1 } for i := 0; i <= len(text)-len(substring); i++ { if text[i:i+len(substring)] == substring { return i } } return -1 } func parseFluxTime(value string) (time.Time, error) { value = core.Trim(value) if value == "" { return time.Time{}, core.E("store.parseFluxTime", "range value is empty", nil) } value = firstStringOrEmpty(core.Split(value, ",")) value = core.Trim(value) if core.HasPrefix(value, "time(v:") && core.HasSuffix(value, ")") { value = core.Trim(core.TrimSuffix(core.TrimPrefix(value, "time(v:"), ")")) } if core.HasPrefix(value, `"`) && core.HasSuffix(value, `"`) { value = core.TrimSuffix(core.TrimPrefix(value, `"`), `"`) } if value == "now()" { return time.Now(), nil } if core.HasSuffix(value, "d") { days, err := strconv.Atoi(core.TrimSuffix(value, "d")) if err != nil { return time.Time{}, err } return time.Now().Add(time.Duration(days) * 24 * time.Hour), nil } lookback, err := time.ParseDuration(value) if err == nil { return time.Now().Add(lookback), nil } parsedTime, err := time.Parse(time.RFC3339Nano, value) if err != nil { return time.Time{}, err } return parsedTime, nil } func quotedSubmatch(pattern *regexp.Regexp, value string) string { match := pattern.FindStringSubmatch(value) if len(match) < 2 { return "" } return match[1] } func firstQuotedSubmatch(patterns []*regexp.Regexp, value string) string { for _, pattern := range patterns { if match := quotedSubmatch(pattern, value); match != "" { return match } } return "" } func regexpSubmatch(pattern *regexp.Regexp, value string, index int) string { match := pattern.FindStringSubmatch(value) if len(match) <= index { return "" } return match[index] } func queryRowsAsMaps(rows *sql.Rows) ([]map[string]any, error) { columnNames, err := rows.Columns() if err != nil { return nil, err } var result []map[string]any for rows.Next() { rawValues := make([]any, len(columnNames)) scanTargets := make([]any, len(columnNames)) for i := range rawValues { scanTargets[i] = &rawValues[i] } if err := rows.Scan(scanTargets...); err != nil { return nil, err } row := make(map[string]any, len(columnNames)) for i, columnName := range columnNames { row[columnName] = normaliseRowValue(rawValues[i]) } result = append(result, row) } if err := rows.Err(); err != nil { return nil, err } return result, nil } func inflateJournalRows(rows []map[string]any) []map[string]any { for _, row := range rows { if fieldsJSON, ok := row["fields_json"].(string); ok { fields := make(map[string]any) result := core.JSONUnmarshalString(fieldsJSON, &fields) if result.OK { row["fields"] = fields } } if tagsJSON, ok := row["tags_json"].(string); ok { tags := make(map[string]string) result := core.JSONUnmarshalString(tagsJSON, &tags) if result.OK { row["tags"] = tags } } } return rows } func normaliseRowValue(value any) any { switch typedValue := value.(type) { case []byte: return string(typedValue) default: return typedValue } } func journalEqualityFilters(flux string) []journalEqualityFilter { var filters []journalEqualityFilter appendFilter := func(columnName string, filterValue any, stringCompare bool) { if columnName == "_measurement" || columnName == "measurement" || columnName == "_bucket" || columnName == "bucket" || columnName == "bucket_name" { return } filters = append(filters, journalEqualityFilter{ columnName: columnName, filterValue: filterValue, stringCompare: stringCompare, }) } for _, pattern := range journalStringEqualityPatterns { matches := pattern.FindAllStringSubmatch(flux, -1) for _, match := range matches { if len(match) < 3 { continue } appendFilter(match[1], match[2], true) } } for _, pattern := range journalScalarEqualityPatterns { matches := pattern.FindAllStringSubmatch(flux, -1) for _, match := range matches { if len(match) < 3 { continue } filterValue, ok := parseJournalScalarValue(match[2]) if !ok { continue } appendFilter(match[1], filterValue, false) } } return filters } func parseJournalScalarValue(value string) (any, bool) { switch value { case "true": return true, true case "false": return false, true } if integerValue, err := strconv.ParseInt(value, 10, 64); err == nil { return integerValue, true } if floatValue, err := strconv.ParseFloat(value, 64); err == nil { return floatValue, true } return nil, false } func cloneAnyMap(input map[string]any) map[string]any { if input == nil { return map[string]any{} } cloned := make(map[string]any, len(input)) for key, value := range input { cloned[key] = value } return cloned } func cloneStringMap(input map[string]string) map[string]string { if input == nil { return map[string]string{} } cloned := make(map[string]string, len(input)) for key, value := range input { cloned[key] = value } return cloned }