515 lines
15 KiB
Go
515 lines
15 KiB
Go
package store
|
|
|
|
import (
|
|
"database/sql"
|
|
"regexp"
|
|
"strconv"
|
|
"time"
|
|
|
|
core "dappco.re/go/core"
|
|
)
|
|
|
|
const (
|
|
journalEntriesTableName = "journal_entries"
|
|
defaultJournalBucket = "store"
|
|
)
|
|
|
|
const createJournalEntriesTableSQL = `CREATE TABLE IF NOT EXISTS journal_entries (
|
|
entry_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
bucket_name TEXT NOT NULL,
|
|
measurement TEXT NOT NULL,
|
|
fields_json TEXT NOT NULL,
|
|
tags_json TEXT NOT NULL,
|
|
committed_at INTEGER NOT NULL,
|
|
archived_at INTEGER
|
|
)`
|
|
|
|
var (
|
|
journalBucketPattern = regexp.MustCompile(`bucket:\s*"([^"]+)"`)
|
|
journalMeasurementPatterns = []*regexp.Regexp{
|
|
regexp.MustCompile(`(?:_measurement|measurement)\s*==\s*"([^"]+)"`),
|
|
regexp.MustCompile(`\[\s*"(?:_measurement|measurement)"\s*\]\s*==\s*"([^"]+)"`),
|
|
}
|
|
journalBucketEqualityPatterns = []*regexp.Regexp{
|
|
regexp.MustCompile(`r\.(?:_bucket|bucket|bucket_name)\s*==\s*"([^"]+)"`),
|
|
regexp.MustCompile(`r\[\s*"(?:_bucket|bucket|bucket_name)"\s*\]\s*==\s*"([^"]+)"`),
|
|
}
|
|
journalStringEqualityPatterns = []*regexp.Regexp{
|
|
regexp.MustCompile(`r\.([a-zA-Z0-9_:-]+)\s*==\s*"([^"]+)"`),
|
|
regexp.MustCompile(`r\[\s*"([a-zA-Z0-9_:-]+)"\s*\]\s*==\s*"([^"]+)"`),
|
|
}
|
|
journalScalarEqualityPatterns = []*regexp.Regexp{
|
|
regexp.MustCompile(`r\.([a-zA-Z0-9_:-]+)\s*==\s*(true|false|-?[0-9]+(?:\.[0-9]+)?)`),
|
|
regexp.MustCompile(`r\[\s*"([a-zA-Z0-9_:-]+)"\s*\]\s*==\s*(true|false|-?[0-9]+(?:\.[0-9]+)?)`),
|
|
}
|
|
)
|
|
|
|
type journalEqualityFilter struct {
|
|
columnName string
|
|
filterValue any
|
|
stringCompare bool
|
|
}
|
|
|
|
type journalExecutor interface {
|
|
Exec(query string, args ...any) (sql.Result, error)
|
|
}
|
|
|
|
// Usage example: `result := storeInstance.CommitToJournal("scroll-session", map[string]any{"like": 4}, map[string]string{"workspace": "scroll-session"})`
|
|
// Workspace.Commit uses this same journal write path before it updates the
|
|
// summary row in `workspace:NAME`.
|
|
func (storeInstance *Store) CommitToJournal(measurement string, fields map[string]any, tags map[string]string) core.Result {
|
|
if err := storeInstance.ensureReady("store.CommitToJournal"); err != nil {
|
|
return core.Result{Value: err, OK: false}
|
|
}
|
|
if measurement == "" {
|
|
return core.Result{Value: core.E("store.CommitToJournal", "measurement is empty", nil), OK: false}
|
|
}
|
|
if fields == nil {
|
|
fields = map[string]any{}
|
|
}
|
|
if tags == nil {
|
|
tags = map[string]string{}
|
|
}
|
|
if err := ensureJournalSchema(storeInstance.sqliteDatabase); err != nil {
|
|
return core.Result{Value: core.E("store.CommitToJournal", "ensure journal schema", err), OK: false}
|
|
}
|
|
|
|
fieldsJSON, err := marshalJSONText(fields, "store.CommitToJournal", "marshal fields")
|
|
if err != nil {
|
|
return core.Result{Value: err, OK: false}
|
|
}
|
|
tagsJSON, err := marshalJSONText(tags, "store.CommitToJournal", "marshal tags")
|
|
if err != nil {
|
|
return core.Result{Value: err, OK: false}
|
|
}
|
|
|
|
committedAt := time.Now().UnixMilli()
|
|
if err := commitJournalEntry(
|
|
storeInstance.sqliteDatabase,
|
|
storeInstance.journalBucket(),
|
|
measurement,
|
|
fieldsJSON,
|
|
tagsJSON,
|
|
committedAt,
|
|
); err != nil {
|
|
return core.Result{Value: core.E("store.CommitToJournal", "insert journal entry", err), OK: false}
|
|
}
|
|
|
|
return core.Result{
|
|
Value: map[string]any{
|
|
"bucket": storeInstance.journalBucket(),
|
|
"measurement": measurement,
|
|
"fields": cloneAnyMap(fields),
|
|
"tags": cloneStringMap(tags),
|
|
"committed_at": committedAt,
|
|
},
|
|
OK: true,
|
|
}
|
|
}
|
|
|
|
// Usage example: `result := storeInstance.QueryJournal(\`from(bucket: "events") |> range(start: -24h) |> filter(fn: (r) => r.workspace == "session-a")\`)`
|
|
// Usage example: `result := storeInstance.QueryJournal("SELECT measurement, committed_at FROM journal_entries ORDER BY committed_at")`
|
|
func (storeInstance *Store) QueryJournal(flux string) core.Result {
|
|
if err := storeInstance.ensureReady("store.QueryJournal"); err != nil {
|
|
return core.Result{Value: err, OK: false}
|
|
}
|
|
if err := ensureJournalSchema(storeInstance.sqliteDatabase); err != nil {
|
|
return core.Result{Value: core.E("store.QueryJournal", "ensure journal schema", err), OK: false}
|
|
}
|
|
|
|
trimmedQuery := core.Trim(flux)
|
|
if trimmedQuery == "" {
|
|
return storeInstance.queryJournalRows(
|
|
"SELECT bucket_name, measurement, fields_json, tags_json, committed_at, archived_at FROM " + journalEntriesTableName + " WHERE archived_at IS NULL ORDER BY committed_at, entry_id",
|
|
)
|
|
}
|
|
if isRawSQLJournalQuery(trimmedQuery) {
|
|
return storeInstance.queryJournalRows(trimmedQuery)
|
|
}
|
|
|
|
selectSQL, arguments, err := storeInstance.queryJournalFromFlux(trimmedQuery)
|
|
if err != nil {
|
|
return core.Result{Value: err, OK: false}
|
|
}
|
|
return storeInstance.queryJournalRows(selectSQL, arguments...)
|
|
}
|
|
|
|
func isRawSQLJournalQuery(query string) bool {
|
|
upperQuery := core.Upper(core.Trim(query))
|
|
return core.HasPrefix(upperQuery, "SELECT") ||
|
|
core.HasPrefix(upperQuery, "WITH") ||
|
|
core.HasPrefix(upperQuery, "EXPLAIN") ||
|
|
core.HasPrefix(upperQuery, "PRAGMA")
|
|
}
|
|
|
|
func (storeInstance *Store) queryJournalRows(query string, arguments ...any) core.Result {
|
|
rows, err := storeInstance.sqliteDatabase.Query(query, arguments...)
|
|
if err != nil {
|
|
return core.Result{Value: core.E("store.QueryJournal", "query rows", err), OK: false}
|
|
}
|
|
defer rows.Close()
|
|
|
|
rowMaps, err := queryRowsAsMaps(rows)
|
|
if err != nil {
|
|
return core.Result{Value: core.E("store.QueryJournal", "scan rows", err), OK: false}
|
|
}
|
|
return core.Result{Value: inflateJournalRows(rowMaps), OK: true}
|
|
}
|
|
|
|
func (storeInstance *Store) queryJournalFromFlux(flux string) (string, []any, error) {
|
|
queryBuilder := core.NewBuilder()
|
|
queryBuilder.WriteString("SELECT bucket_name, measurement, fields_json, tags_json, committed_at, archived_at FROM ")
|
|
queryBuilder.WriteString(journalEntriesTableName)
|
|
queryBuilder.WriteString(" WHERE archived_at IS NULL")
|
|
|
|
var queryArguments []any
|
|
if bucket := quotedSubmatch(journalBucketPattern, flux); bucket != "" {
|
|
queryBuilder.WriteString(" AND bucket_name = ?")
|
|
queryArguments = append(queryArguments, bucket)
|
|
}
|
|
if measurement := firstQuotedSubmatch(journalMeasurementPatterns, flux); measurement != "" {
|
|
queryBuilder.WriteString(" AND measurement = ?")
|
|
queryArguments = append(queryArguments, measurement)
|
|
}
|
|
|
|
startRange, stopRange := journalRangeBounds(flux)
|
|
if startRange != "" {
|
|
startTime, err := parseFluxTime(core.Trim(startRange))
|
|
if err != nil {
|
|
return "", nil, core.E("store.QueryJournal", "parse range", err)
|
|
}
|
|
queryBuilder.WriteString(" AND committed_at >= ?")
|
|
queryArguments = append(queryArguments, startTime.UnixMilli())
|
|
}
|
|
if stopRange != "" {
|
|
stopTime, err := parseFluxTime(core.Trim(stopRange))
|
|
if err != nil {
|
|
return "", nil, core.E("store.QueryJournal", "parse range", err)
|
|
}
|
|
queryBuilder.WriteString(" AND committed_at < ?")
|
|
queryArguments = append(queryArguments, stopTime.UnixMilli())
|
|
}
|
|
|
|
for _, pattern := range journalBucketEqualityPatterns {
|
|
bucketMatches := pattern.FindAllStringSubmatch(flux, -1)
|
|
for _, match := range bucketMatches {
|
|
if len(match) < 2 {
|
|
continue
|
|
}
|
|
queryBuilder.WriteString(" AND bucket_name = ?")
|
|
queryArguments = append(queryArguments, match[1])
|
|
}
|
|
}
|
|
|
|
for _, filter := range journalEqualityFilters(flux) {
|
|
if filter.stringCompare {
|
|
queryBuilder.WriteString(" AND (CAST(json_extract(tags_json, '$.\"' || ? || '\"') AS TEXT) = ? OR CAST(json_extract(fields_json, '$.\"' || ? || '\"') AS TEXT) = ?)")
|
|
queryArguments = append(queryArguments, filter.columnName, filter.filterValue, filter.columnName, filter.filterValue)
|
|
continue
|
|
}
|
|
|
|
queryBuilder.WriteString(" AND json_extract(fields_json, '$.\"' || ? || '\"') = ?")
|
|
queryArguments = append(queryArguments, filter.columnName, filter.filterValue)
|
|
}
|
|
|
|
queryBuilder.WriteString(" ORDER BY committed_at, entry_id")
|
|
return queryBuilder.String(), queryArguments, nil
|
|
}
|
|
|
|
func (storeInstance *Store) journalBucket() string {
|
|
if storeInstance.journalConfiguration.BucketName == "" {
|
|
return defaultJournalBucket
|
|
}
|
|
return storeInstance.journalConfiguration.BucketName
|
|
}
|
|
|
|
func ensureJournalSchema(database schemaDatabase) error {
|
|
if _, err := database.Exec(createJournalEntriesTableSQL); err != nil {
|
|
return err
|
|
}
|
|
if _, err := database.Exec(
|
|
"CREATE INDEX IF NOT EXISTS journal_entries_bucket_committed_at_idx ON " + journalEntriesTableName + " (bucket_name, committed_at)",
|
|
); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func commitJournalEntry(
|
|
executor journalExecutor,
|
|
bucket, measurement, fieldsJSON, tagsJSON string,
|
|
committedAt int64,
|
|
) error {
|
|
_, err := executor.Exec(
|
|
"INSERT INTO "+journalEntriesTableName+" (bucket_name, measurement, fields_json, tags_json, committed_at, archived_at) VALUES (?, ?, ?, ?, ?, NULL)",
|
|
bucket,
|
|
measurement,
|
|
fieldsJSON,
|
|
tagsJSON,
|
|
committedAt,
|
|
)
|
|
return err
|
|
}
|
|
|
|
func marshalJSONText(value any, operation, message string) (string, error) {
|
|
result := core.JSONMarshal(value)
|
|
if !result.OK {
|
|
return "", core.E(operation, message, result.Value.(error))
|
|
}
|
|
return string(result.Value.([]byte)), nil
|
|
}
|
|
|
|
func journalRangeBounds(flux string) (string, string) {
|
|
rangeIndex := indexOfSubstring(flux, "range(")
|
|
if rangeIndex < 0 {
|
|
return "", ""
|
|
}
|
|
contentStart := rangeIndex + len("range(")
|
|
depth := 1
|
|
contentEnd := -1
|
|
scanRange:
|
|
for i := contentStart; i < len(flux); i++ {
|
|
switch flux[i] {
|
|
case '(':
|
|
depth++
|
|
case ')':
|
|
depth--
|
|
if depth == 0 {
|
|
contentEnd = i
|
|
break scanRange
|
|
}
|
|
}
|
|
}
|
|
if contentEnd < 0 || contentEnd <= contentStart {
|
|
return "", ""
|
|
}
|
|
|
|
content := flux[contentStart:contentEnd]
|
|
startPrefix := "start:"
|
|
startIndex := indexOfSubstring(content, startPrefix)
|
|
if startIndex < 0 {
|
|
return "", ""
|
|
}
|
|
startIndex += len(startPrefix)
|
|
start := core.Trim(content[startIndex:])
|
|
stop := ""
|
|
if stopIndex := indexOfSubstring(content, ", stop:"); stopIndex >= 0 {
|
|
start = core.Trim(content[startIndex:stopIndex])
|
|
stop = core.Trim(content[stopIndex+len(", stop:"):])
|
|
} else if stopIndex := indexOfSubstring(content, ",stop:"); stopIndex >= 0 {
|
|
start = core.Trim(content[startIndex:stopIndex])
|
|
stop = core.Trim(content[stopIndex+len(",stop:"):])
|
|
}
|
|
return start, stop
|
|
}
|
|
|
|
func indexOfSubstring(text, substring string) int {
|
|
if substring == "" {
|
|
return 0
|
|
}
|
|
if len(substring) > len(text) {
|
|
return -1
|
|
}
|
|
for i := 0; i <= len(text)-len(substring); i++ {
|
|
if text[i:i+len(substring)] == substring {
|
|
return i
|
|
}
|
|
}
|
|
return -1
|
|
}
|
|
|
|
func parseFluxTime(value string) (time.Time, error) {
|
|
value = core.Trim(value)
|
|
if value == "" {
|
|
return time.Time{}, core.E("store.parseFluxTime", "range value is empty", nil)
|
|
}
|
|
value = firstStringOrEmpty(core.Split(value, ","))
|
|
value = core.Trim(value)
|
|
if core.HasPrefix(value, "time(v:") && core.HasSuffix(value, ")") {
|
|
value = core.Trim(core.TrimSuffix(core.TrimPrefix(value, "time(v:"), ")"))
|
|
}
|
|
if core.HasPrefix(value, `"`) && core.HasSuffix(value, `"`) {
|
|
value = core.TrimSuffix(core.TrimPrefix(value, `"`), `"`)
|
|
}
|
|
if value == "now()" {
|
|
return time.Now(), nil
|
|
}
|
|
if core.HasSuffix(value, "d") {
|
|
days, err := strconv.Atoi(core.TrimSuffix(value, "d"))
|
|
if err != nil {
|
|
return time.Time{}, err
|
|
}
|
|
return time.Now().Add(time.Duration(days) * 24 * time.Hour), nil
|
|
}
|
|
lookback, err := time.ParseDuration(value)
|
|
if err == nil {
|
|
return time.Now().Add(lookback), nil
|
|
}
|
|
parsedTime, err := time.Parse(time.RFC3339Nano, value)
|
|
if err != nil {
|
|
return time.Time{}, err
|
|
}
|
|
return parsedTime, nil
|
|
}
|
|
|
|
func quotedSubmatch(pattern *regexp.Regexp, value string) string {
|
|
match := pattern.FindStringSubmatch(value)
|
|
if len(match) < 2 {
|
|
return ""
|
|
}
|
|
return match[1]
|
|
}
|
|
|
|
func firstQuotedSubmatch(patterns []*regexp.Regexp, value string) string {
|
|
for _, pattern := range patterns {
|
|
if match := quotedSubmatch(pattern, value); match != "" {
|
|
return match
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func regexpSubmatch(pattern *regexp.Regexp, value string, index int) string {
|
|
match := pattern.FindStringSubmatch(value)
|
|
if len(match) <= index {
|
|
return ""
|
|
}
|
|
return match[index]
|
|
}
|
|
|
|
func queryRowsAsMaps(rows *sql.Rows) ([]map[string]any, error) {
|
|
columnNames, err := rows.Columns()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var result []map[string]any
|
|
for rows.Next() {
|
|
rawValues := make([]any, len(columnNames))
|
|
scanTargets := make([]any, len(columnNames))
|
|
for i := range rawValues {
|
|
scanTargets[i] = &rawValues[i]
|
|
}
|
|
if err := rows.Scan(scanTargets...); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
row := make(map[string]any, len(columnNames))
|
|
for i, columnName := range columnNames {
|
|
row[columnName] = normaliseRowValue(rawValues[i])
|
|
}
|
|
result = append(result, row)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func inflateJournalRows(rows []map[string]any) []map[string]any {
|
|
for _, row := range rows {
|
|
if fieldsJSON, ok := row["fields_json"].(string); ok {
|
|
fields := make(map[string]any)
|
|
result := core.JSONUnmarshalString(fieldsJSON, &fields)
|
|
if result.OK {
|
|
row["fields"] = fields
|
|
}
|
|
}
|
|
if tagsJSON, ok := row["tags_json"].(string); ok {
|
|
tags := make(map[string]string)
|
|
result := core.JSONUnmarshalString(tagsJSON, &tags)
|
|
if result.OK {
|
|
row["tags"] = tags
|
|
}
|
|
}
|
|
}
|
|
return rows
|
|
}
|
|
|
|
func normaliseRowValue(value any) any {
|
|
switch typedValue := value.(type) {
|
|
case []byte:
|
|
return string(typedValue)
|
|
default:
|
|
return typedValue
|
|
}
|
|
}
|
|
|
|
func journalEqualityFilters(flux string) []journalEqualityFilter {
|
|
var filters []journalEqualityFilter
|
|
appendFilter := func(columnName string, filterValue any, stringCompare bool) {
|
|
if columnName == "_measurement" || columnName == "measurement" || columnName == "_bucket" || columnName == "bucket" || columnName == "bucket_name" {
|
|
return
|
|
}
|
|
filters = append(filters, journalEqualityFilter{
|
|
columnName: columnName,
|
|
filterValue: filterValue,
|
|
stringCompare: stringCompare,
|
|
})
|
|
}
|
|
|
|
for _, pattern := range journalStringEqualityPatterns {
|
|
matches := pattern.FindAllStringSubmatch(flux, -1)
|
|
for _, match := range matches {
|
|
if len(match) < 3 {
|
|
continue
|
|
}
|
|
appendFilter(match[1], match[2], true)
|
|
}
|
|
}
|
|
|
|
for _, pattern := range journalScalarEqualityPatterns {
|
|
matches := pattern.FindAllStringSubmatch(flux, -1)
|
|
for _, match := range matches {
|
|
if len(match) < 3 {
|
|
continue
|
|
}
|
|
filterValue, ok := parseJournalScalarValue(match[2])
|
|
if !ok {
|
|
continue
|
|
}
|
|
appendFilter(match[1], filterValue, false)
|
|
}
|
|
}
|
|
|
|
return filters
|
|
}
|
|
|
|
func parseJournalScalarValue(value string) (any, bool) {
|
|
switch value {
|
|
case "true":
|
|
return true, true
|
|
case "false":
|
|
return false, true
|
|
}
|
|
|
|
if integerValue, err := strconv.ParseInt(value, 10, 64); err == nil {
|
|
return integerValue, true
|
|
}
|
|
if floatValue, err := strconv.ParseFloat(value, 64); err == nil {
|
|
return floatValue, true
|
|
}
|
|
return nil, false
|
|
}
|
|
|
|
func cloneAnyMap(input map[string]any) map[string]any {
|
|
if input == nil {
|
|
return map[string]any{}
|
|
}
|
|
cloned := make(map[string]any, len(input))
|
|
for key, value := range input {
|
|
cloned[key] = value
|
|
}
|
|
return cloned
|
|
}
|
|
|
|
func cloneStringMap(input map[string]string) map[string]string {
|
|
if input == nil {
|
|
return map[string]string{}
|
|
}
|
|
cloned := make(map[string]string, len(input))
|
|
for key, value := range input {
|
|
cloned[key] = value
|
|
}
|
|
return cloned
|
|
}
|