diff --git a/compact.go b/compact.go index bcff91a..4f78434 100644 --- a/compact.go +++ b/compact.go @@ -29,6 +29,9 @@ type compactArchiveEntry struct { // Usage example: `result := storeInstance.Compact(store.CompactOptions{Before: time.Now().Add(-30 * 24 * time.Hour), Output: "/tmp/archive", Format: "gzip"})` func (storeInstance *Store) Compact(options CompactOptions) core.Result { + if err := storeInstance.ensureReady("store.Compact"); err != nil { + return core.Result{Value: err, OK: false} + } if err := ensureJournalSchema(storeInstance.database); err != nil { return core.Result{Value: core.E("store.Compact", "ensure journal schema", err), OK: false} } diff --git a/events.go b/events.go index 856bf82..7e1438a 100644 --- a/events.go +++ b/events.go @@ -55,6 +55,12 @@ type changeCallbackRegistration struct { callback func(Event) } +func closedEventChannel() chan Event { + eventChannel := make(chan Event) + close(eventChannel) + return eventChannel +} + // Watch("config") can hold 16 pending events before non-blocking sends start // dropping new ones. const watcherEventBufferCapacity = 16 @@ -62,6 +68,17 @@ const watcherEventBufferCapacity = 16 // Usage example: `events := storeInstance.Watch("config")` // Usage example: `events := storeInstance.Watch("*")` func (storeInstance *Store) Watch(group string) <-chan Event { + if storeInstance == nil { + return closedEventChannel() + } + + storeInstance.closeLock.Lock() + closed := storeInstance.closed + storeInstance.closeLock.Unlock() + if closed { + return closedEventChannel() + } + eventChannel := make(chan Event, watcherEventBufferCapacity) storeInstance.watchersLock.Lock() @@ -76,7 +93,14 @@ func (storeInstance *Store) Watch(group string) <-chan Event { // Usage example: `storeInstance.Unwatch("config", events)` func (storeInstance *Store) Unwatch(group string, events <-chan Event) { - if events == nil { + if storeInstance == nil || events == nil { + return + } + + storeInstance.closeLock.Lock() + closed := storeInstance.closed + storeInstance.closeLock.Unlock() + if closed { return } @@ -117,6 +141,17 @@ func (storeInstance *Store) OnChange(callback func(Event)) func() { return func() {} } + if storeInstance == nil { + return func() {} + } + + storeInstance.closeLock.Lock() + closed := storeInstance.closed + storeInstance.closeLock.Unlock() + if closed { + return func() {} + } + registrationID := atomic.AddUint64(&storeInstance.nextCallbackRegistrationID, 1) callbackRegistration := changeCallbackRegistration{registrationID: registrationID, callback: callback} @@ -147,6 +182,17 @@ func (storeInstance *Store) OnChange(callback func(Event)) func() { // released, so they can register or unregister subscriptions without // deadlocking. func (storeInstance *Store) notify(event Event) { + if storeInstance == nil { + return + } + + storeInstance.closeLock.Lock() + closed := storeInstance.closed + storeInstance.closeLock.Unlock() + if closed { + return + } + storeInstance.watchersLock.RLock() for _, registeredChannel := range storeInstance.watchers["*"] { select { diff --git a/journal.go b/journal.go index cf5e912..36f295d 100644 --- a/journal.go +++ b/journal.go @@ -38,6 +38,9 @@ type journalExecutor interface { // Usage example: `result := storeInstance.CommitToJournal("scroll-session", map[string]any{"like": 4}, map[string]string{"workspace": "scroll-session"})` func (storeInstance *Store) CommitToJournal(measurement string, fields map[string]any, tags map[string]string) core.Result { + if err := storeInstance.ensureReady("store.CommitToJournal"); err != nil { + return core.Result{Value: err, OK: false} + } if measurement == "" { return core.Result{Value: core.E("store.CommitToJournal", "measurement is empty", nil), OK: false} } @@ -86,6 +89,9 @@ func (storeInstance *Store) CommitToJournal(measurement string, fields map[strin // Usage example: `result := storeInstance.QueryJournal(\`from(bucket: "store") |> range(start: -24h)\`)` func (storeInstance *Store) QueryJournal(flux string) core.Result { + if err := storeInstance.ensureReady("store.QueryJournal"); err != nil { + return core.Result{Value: err, OK: false} + } if err := ensureJournalSchema(storeInstance.database); err != nil { return core.Result{Value: core.E("store.QueryJournal", "ensure journal schema", err), OK: false} } diff --git a/scope.go b/scope.go index ab416f2..219ff9c 100644 --- a/scope.go +++ b/scope.go @@ -29,6 +29,19 @@ type ScopedStore struct { MaxGroups int } +func (scopedStore *ScopedStore) storeInstance(operation string) (*Store, error) { + if scopedStore == nil { + return nil, core.E(operation, "scoped store is nil", nil) + } + if scopedStore.store == nil { + return nil, core.E(operation, "underlying store is nil", nil) + } + if err := scopedStore.store.ensureReady(operation); err != nil { + return nil, err + } + return scopedStore.store, nil +} + // Usage example: `scopedStore := store.NewScoped(storeInstance, "tenant-a")` func NewScoped(storeInstance *Store, namespace string) *ScopedStore { if storeInstance == nil { @@ -76,17 +89,28 @@ func (scopedStore *ScopedStore) trimNamespacePrefix(groupName string) string { // Usage example: `scopedStore := store.NewScoped(storeInstance, "tenant-a"); if scopedStore == nil { return }; namespace := scopedStore.Namespace(); fmt.Println(namespace)` func (scopedStore *ScopedStore) Namespace() string { + if scopedStore == nil { + return "" + } return scopedStore.namespace } // Usage example: `colourValue, err := scopedStore.Get("colour")` func (scopedStore *ScopedStore) Get(key string) (string, error) { - return scopedStore.GetFrom(defaultScopedGroupName, key) + storeInstance, err := scopedStore.storeInstance("store.Get") + if err != nil { + return "", err + } + return storeInstance.Get(scopedStore.namespacedGroup(defaultScopedGroupName), key) } // Usage example: `colourValue, err := scopedStore.GetFrom("config", "colour")` func (scopedStore *ScopedStore) GetFrom(group, key string) (string, error) { - return scopedStore.store.Get(scopedStore.namespacedGroup(group), key) + storeInstance, err := scopedStore.storeInstance("store.Get") + if err != nil { + return "", err + } + return storeInstance.Get(scopedStore.namespacedGroup(group), key) } // Usage example: `if err := scopedStore.Set("colour", "blue"); err != nil { return }` @@ -96,60 +120,105 @@ func (scopedStore *ScopedStore) Set(key, value string) error { // Usage example: `if err := scopedStore.SetIn("config", "colour", "blue"); err != nil { return }` func (scopedStore *ScopedStore) SetIn(group, key, value string) error { + storeInstance, err := scopedStore.storeInstance("store.Set") + if err != nil { + return err + } if err := scopedStore.checkQuota("store.ScopedStore.SetIn", group, key); err != nil { return err } - return scopedStore.store.Set(scopedStore.namespacedGroup(group), key, value) + return storeInstance.Set(scopedStore.namespacedGroup(group), key, value) } // Usage example: `if err := scopedStore.SetWithTTL("sessions", "token", "abc123", time.Hour); err != nil { return }` func (scopedStore *ScopedStore) SetWithTTL(group, key, value string, timeToLive time.Duration) error { + storeInstance, err := scopedStore.storeInstance("store.SetWithTTL") + if err != nil { + return err + } if err := scopedStore.checkQuota("store.ScopedStore.SetWithTTL", group, key); err != nil { return err } - return scopedStore.store.SetWithTTL(scopedStore.namespacedGroup(group), key, value, timeToLive) + return storeInstance.SetWithTTL(scopedStore.namespacedGroup(group), key, value, timeToLive) } // Usage example: `if err := scopedStore.Delete("config", "colour"); err != nil { return }` func (scopedStore *ScopedStore) Delete(group, key string) error { - return scopedStore.store.Delete(scopedStore.namespacedGroup(group), key) + storeInstance, err := scopedStore.storeInstance("store.Delete") + if err != nil { + return err + } + return storeInstance.Delete(scopedStore.namespacedGroup(group), key) } // Usage example: `if err := scopedStore.DeleteGroup("cache"); err != nil { return }` func (scopedStore *ScopedStore) DeleteGroup(group string) error { - return scopedStore.store.DeleteGroup(scopedStore.namespacedGroup(group)) + storeInstance, err := scopedStore.storeInstance("store.DeleteGroup") + if err != nil { + return err + } + return storeInstance.DeleteGroup(scopedStore.namespacedGroup(group)) } // Usage example: `colourEntries, err := scopedStore.GetAll("config")` func (scopedStore *ScopedStore) GetAll(group string) (map[string]string, error) { - return scopedStore.store.GetAll(scopedStore.namespacedGroup(group)) + storeInstance, err := scopedStore.storeInstance("store.GetAll") + if err != nil { + return nil, err + } + return storeInstance.GetAll(scopedStore.namespacedGroup(group)) } // Usage example: `for entry, err := range scopedStore.All("config") { if err != nil { break }; fmt.Println(entry.Key, entry.Value) }` func (scopedStore *ScopedStore) All(group string) iter.Seq2[KeyValue, error] { - return scopedStore.store.All(scopedStore.namespacedGroup(group)) + storeInstance, err := scopedStore.storeInstance("store.All") + if err != nil { + return func(yield func(KeyValue, error) bool) { + yield(KeyValue{}, err) + } + } + return storeInstance.All(scopedStore.namespacedGroup(group)) } // Usage example: `for entry, err := range scopedStore.AllSeq("config") { if err != nil { break }; fmt.Println(entry.Key, entry.Value) }` func (scopedStore *ScopedStore) AllSeq(group string) iter.Seq2[KeyValue, error] { - return scopedStore.store.AllSeq(scopedStore.namespacedGroup(group)) + storeInstance, err := scopedStore.storeInstance("store.All") + if err != nil { + return func(yield func(KeyValue, error) bool) { + yield(KeyValue{}, err) + } + } + return storeInstance.AllSeq(scopedStore.namespacedGroup(group)) } // Usage example: `keyCount, err := scopedStore.Count("config")` func (scopedStore *ScopedStore) Count(group string) (int, error) { - return scopedStore.store.Count(scopedStore.namespacedGroup(group)) + storeInstance, err := scopedStore.storeInstance("store.Count") + if err != nil { + return 0, err + } + return storeInstance.Count(scopedStore.namespacedGroup(group)) } // Usage example: `keyCount, err := scopedStore.CountAll("config")` // Usage example: `keyCount, err := scopedStore.CountAll()` func (scopedStore *ScopedStore) CountAll(groupPrefix ...string) (int, error) { - return scopedStore.store.CountAll(scopedStore.namespacedGroup(firstOrEmptyString(groupPrefix))) + storeInstance, err := scopedStore.storeInstance("store.CountAll") + if err != nil { + return 0, err + } + return storeInstance.CountAll(scopedStore.namespacedGroup(firstOrEmptyString(groupPrefix))) } // Usage example: `groupNames, err := scopedStore.Groups("config")` // Usage example: `groupNames, err := scopedStore.Groups()` func (scopedStore *ScopedStore) Groups(groupPrefix ...string) ([]string, error) { - groupNames, err := scopedStore.store.Groups(scopedStore.namespacedGroup(firstOrEmptyString(groupPrefix))) + storeInstance, err := scopedStore.storeInstance("store.Groups") + if err != nil { + return nil, err + } + + groupNames, err := storeInstance.Groups(scopedStore.namespacedGroup(firstOrEmptyString(groupPrefix))) if err != nil { return nil, err } @@ -163,8 +232,13 @@ func (scopedStore *ScopedStore) Groups(groupPrefix ...string) ([]string, error) // Usage example: `for groupName, err := range scopedStore.GroupsSeq() { if err != nil { break }; fmt.Println(groupName) }` func (scopedStore *ScopedStore) GroupsSeq(groupPrefix ...string) iter.Seq2[string, error] { return func(yield func(string, error) bool) { + storeInstance, err := scopedStore.storeInstance("store.GroupsSeq") + if err != nil { + yield("", err) + return + } namespacePrefix := scopedStore.namespacePrefix() - for groupName, err := range scopedStore.store.GroupsSeq(scopedStore.namespacedGroup(firstOrEmptyString(groupPrefix))) { + for groupName, err := range storeInstance.GroupsSeq(scopedStore.namespacedGroup(firstOrEmptyString(groupPrefix))) { if err != nil { if !yield("", err) { return @@ -180,22 +254,38 @@ func (scopedStore *ScopedStore) GroupsSeq(groupPrefix ...string) iter.Seq2[strin // Usage example: `renderedTemplate, err := scopedStore.Render("Hello {{ .name }}", "user")` func (scopedStore *ScopedStore) Render(templateSource, group string) (string, error) { - return scopedStore.store.Render(templateSource, scopedStore.namespacedGroup(group)) + storeInstance, err := scopedStore.storeInstance("store.Render") + if err != nil { + return "", err + } + return storeInstance.Render(templateSource, scopedStore.namespacedGroup(group)) } // Usage example: `parts, err := scopedStore.GetSplit("config", "hosts", ","); if err != nil { return }; for part := range parts { fmt.Println(part) }` func (scopedStore *ScopedStore) GetSplit(group, key, separator string) (iter.Seq[string], error) { - return scopedStore.store.GetSplit(scopedStore.namespacedGroup(group), key, separator) + storeInstance, err := scopedStore.storeInstance("store.GetSplit") + if err != nil { + return nil, err + } + return storeInstance.GetSplit(scopedStore.namespacedGroup(group), key, separator) } // Usage example: `fields, err := scopedStore.GetFields("config", "flags"); if err != nil { return }; for field := range fields { fmt.Println(field) }` func (scopedStore *ScopedStore) GetFields(group, key string) (iter.Seq[string], error) { - return scopedStore.store.GetFields(scopedStore.namespacedGroup(group), key) + storeInstance, err := scopedStore.storeInstance("store.GetFields") + if err != nil { + return nil, err + } + return storeInstance.GetFields(scopedStore.namespacedGroup(group), key) } // Usage example: `removedRows, err := scopedStore.PurgeExpired(); if err != nil { return }; fmt.Println(removedRows)` func (scopedStore *ScopedStore) PurgeExpired() (int64, error) { - removedRows, err := scopedStore.store.purgeExpiredMatchingGroupPrefix(scopedStore.namespacePrefix()) + storeInstance, err := scopedStore.storeInstance("store.PurgeExpired") + if err != nil { + return 0, err + } + removedRows, err := storeInstance.purgeExpiredMatchingGroupPrefix(scopedStore.namespacePrefix()) if err != nil { return 0, core.E("store.ScopedStore.PurgeExpired", "delete expired rows", err) } @@ -207,6 +297,9 @@ func (scopedStore *ScopedStore) PurgeExpired() (int64, error) { // group would exceed the configured limit. Existing keys are treated as // upserts and do not consume quota. func (scopedStore *ScopedStore) checkQuota(operation, group, key string) error { + if scopedStore == nil { + return core.E(operation, "scoped store is nil", nil) + } if scopedStore.MaxKeys == 0 && scopedStore.MaxGroups == 0 { return nil } diff --git a/store.go b/store.go index 5636f77..5746bbd 100644 --- a/store.go +++ b/store.go @@ -66,6 +66,24 @@ type Store struct { nextCallbackRegistrationID uint64 // monotonic ID for callback registrations } +func (storeInstance *Store) ensureReady(operation string) error { + if storeInstance == nil { + return core.E(operation, "store is nil", nil) + } + if storeInstance.database == nil { + return core.E(operation, "store is not initialised", nil) + } + + storeInstance.closeLock.Lock() + closed := storeInstance.closed + storeInstance.closeLock.Unlock() + if closed { + return core.E(operation, "store is closed", nil) + } + + return nil +} + // Usage example: `storeInstance, err := store.New("/tmp/go-store.db", store.WithJournal("http://127.0.0.1:8086", "core", "events"))` func WithJournal(endpointURL, organisation, bucketName string) StoreOption { return func(storeInstance *Store) { @@ -141,6 +159,10 @@ func New(databasePath string, options ...StoreOption) (*Store, error) { // Usage example: `storeInstance, err := store.New(":memory:"); if err != nil { return }; defer storeInstance.Close()` func (storeInstance *Store) Close() error { + if storeInstance == nil { + return nil + } + storeInstance.closeLock.Lock() if storeInstance.closed { storeInstance.closeLock.Unlock() @@ -149,8 +171,13 @@ func (storeInstance *Store) Close() error { storeInstance.closed = true storeInstance.closeLock.Unlock() - storeInstance.cancelPurge() + if storeInstance.cancelPurge != nil { + storeInstance.cancelPurge() + } storeInstance.purgeWaitGroup.Wait() + if storeInstance.database == nil { + return nil + } if err := storeInstance.database.Close(); err != nil { return core.E("store.Close", "database close", err) } @@ -159,6 +186,10 @@ func (storeInstance *Store) Close() error { // Usage example: `colourValue, err := storeInstance.Get("config", "colour")` func (storeInstance *Store) Get(group, key string) (string, error) { + if err := storeInstance.ensureReady("store.Get"); err != nil { + return "", err + } + var value string var expiresAt sql.NullInt64 err := storeInstance.database.QueryRow( @@ -182,6 +213,10 @@ func (storeInstance *Store) Get(group, key string) (string, error) { // Usage example: `if err := storeInstance.Set("config", "colour", "blue"); err != nil { return }` func (storeInstance *Store) Set(group, key, value string) error { + if err := storeInstance.ensureReady("store.Set"); err != nil { + return err + } + _, err := storeInstance.database.Exec( "INSERT INTO "+entriesTableName+" ("+entryGroupColumn+", "+entryKeyColumn+", "+entryValueColumn+", expires_at) VALUES (?, ?, ?, NULL) "+ "ON CONFLICT("+entryGroupColumn+", "+entryKeyColumn+") DO UPDATE SET "+entryValueColumn+" = excluded."+entryValueColumn+", expires_at = NULL", @@ -196,6 +231,10 @@ func (storeInstance *Store) Set(group, key, value string) error { // Usage example: `if err := storeInstance.SetWithTTL("session", "token", "abc123", time.Minute); err != nil { return }` func (storeInstance *Store) SetWithTTL(group, key, value string, timeToLive time.Duration) error { + if err := storeInstance.ensureReady("store.SetWithTTL"); err != nil { + return err + } + expiresAt := time.Now().Add(timeToLive).UnixMilli() _, err := storeInstance.database.Exec( "INSERT INTO "+entriesTableName+" ("+entryGroupColumn+", "+entryKeyColumn+", "+entryValueColumn+", expires_at) VALUES (?, ?, ?, ?) "+ @@ -211,6 +250,10 @@ func (storeInstance *Store) SetWithTTL(group, key, value string, timeToLive time // Usage example: `if err := storeInstance.Delete("config", "colour"); err != nil { return }` func (storeInstance *Store) Delete(group, key string) error { + if err := storeInstance.ensureReady("store.Delete"); err != nil { + return err + } + deleteResult, err := storeInstance.database.Exec("DELETE FROM "+entriesTableName+" WHERE "+entryGroupColumn+" = ? AND "+entryKeyColumn+" = ?", group, key) if err != nil { return core.E("store.Delete", "delete row", err) @@ -227,6 +270,10 @@ func (storeInstance *Store) Delete(group, key string) error { // Usage example: `keyCount, err := storeInstance.Count("config")` func (storeInstance *Store) Count(group string) (int, error) { + if err := storeInstance.ensureReady("store.Count"); err != nil { + return 0, err + } + var count int err := storeInstance.database.QueryRow( "SELECT COUNT(*) FROM "+entriesTableName+" WHERE "+entryGroupColumn+" = ? AND (expires_at IS NULL OR expires_at > ?)", @@ -240,6 +287,10 @@ func (storeInstance *Store) Count(group string) (int, error) { // Usage example: `if err := storeInstance.DeleteGroup("cache"); err != nil { return }` func (storeInstance *Store) DeleteGroup(group string) error { + if err := storeInstance.ensureReady("store.DeleteGroup"); err != nil { + return err + } + deleteResult, err := storeInstance.database.Exec("DELETE FROM "+entriesTableName+" WHERE "+entryGroupColumn+" = ?", group) if err != nil { return core.E("store.DeleteGroup", "delete group", err) @@ -264,6 +315,10 @@ type KeyValue struct { // Usage example: `colourEntries, err := storeInstance.GetAll("config")` func (storeInstance *Store) GetAll(group string) (map[string]string, error) { + if err := storeInstance.ensureReady("store.GetAll"); err != nil { + return nil, err + } + entriesByKey := make(map[string]string) for entry, err := range storeInstance.All(group) { if err != nil { @@ -277,6 +332,11 @@ func (storeInstance *Store) GetAll(group string) (map[string]string, error) { // Usage example: `for entry, err := range storeInstance.AllSeq("config") { if err != nil { break }; fmt.Println(entry.Key, entry.Value) }` func (storeInstance *Store) AllSeq(group string) iter.Seq2[KeyValue, error] { return func(yield func(KeyValue, error) bool) { + if err := storeInstance.ensureReady("store.All"); err != nil { + yield(KeyValue{}, err) + return + } + rows, err := storeInstance.database.Query( "SELECT "+entryKeyColumn+", "+entryValueColumn+" FROM "+entriesTableName+" WHERE "+entryGroupColumn+" = ? AND (expires_at IS NULL OR expires_at > ?) ORDER BY "+entryKeyColumn, group, time.Now().UnixMilli(), @@ -312,6 +372,10 @@ func (storeInstance *Store) All(group string) iter.Seq2[KeyValue, error] { // Usage example: `parts, err := storeInstance.GetSplit("config", "hosts", ","); if err != nil { return }; for part := range parts { fmt.Println(part) }` func (storeInstance *Store) GetSplit(group, key, separator string) (iter.Seq[string], error) { + if err := storeInstance.ensureReady("store.GetSplit"); err != nil { + return nil, err + } + value, err := storeInstance.Get(group, key) if err != nil { return nil, err @@ -321,6 +385,10 @@ func (storeInstance *Store) GetSplit(group, key, separator string) (iter.Seq[str // Usage example: `fields, err := storeInstance.GetFields("config", "flags"); if err != nil { return }; for field := range fields { fmt.Println(field) }` func (storeInstance *Store) GetFields(group, key string) (iter.Seq[string], error) { + if err := storeInstance.ensureReady("store.GetFields"); err != nil { + return nil, err + } + value, err := storeInstance.Get(group, key) if err != nil { return nil, err @@ -330,6 +398,10 @@ func (storeInstance *Store) GetFields(group, key string) (iter.Seq[string], erro // Usage example: `renderedTemplate, err := storeInstance.Render("Hello {{ .name }}", "user")` func (storeInstance *Store) Render(templateSource, group string) (string, error) { + if err := storeInstance.ensureReady("store.Render"); err != nil { + return "", err + } + templateData := make(map[string]string) for entry, err := range storeInstance.All(group) { if err != nil { @@ -351,6 +423,10 @@ func (storeInstance *Store) Render(templateSource, group string) (string, error) // Usage example: `tenantKeyCount, err := storeInstance.CountAll("tenant-a:")` func (storeInstance *Store) CountAll(groupPrefix string) (int, error) { + if err := storeInstance.ensureReady("store.CountAll"); err != nil { + return 0, err + } + var count int var err error if groupPrefix == "" { @@ -373,6 +449,10 @@ func (storeInstance *Store) CountAll(groupPrefix string) (int, error) { // Usage example: `tenantGroupNames, err := storeInstance.Groups("tenant-a:")` // Usage example: `allGroupNames, err := storeInstance.Groups()` func (storeInstance *Store) Groups(groupPrefix ...string) ([]string, error) { + if err := storeInstance.ensureReady("store.Groups"); err != nil { + return nil, err + } + var groupNames []string for groupName, err := range storeInstance.GroupsSeq(groupPrefix...) { if err != nil { @@ -388,6 +468,11 @@ func (storeInstance *Store) Groups(groupPrefix ...string) ([]string, error) { func (storeInstance *Store) GroupsSeq(groupPrefix ...string) iter.Seq2[string, error] { actualGroupPrefix := firstOrEmptyString(groupPrefix) return func(yield func(string, error) bool) { + if err := storeInstance.ensureReady("store.GroupsSeq"); err != nil { + yield("", err) + return + } + var rows *sql.Rows var err error now := time.Now().UnixMilli() @@ -444,6 +529,10 @@ func escapeLike(text string) string { // Usage example: `removed, err := storeInstance.PurgeExpired()` func (storeInstance *Store) PurgeExpired() (int64, error) { + if err := storeInstance.ensureReady("store.PurgeExpired"); err != nil { + return 0, err + } + removedRows, err := storeInstance.purgeExpiredMatchingGroupPrefix("") if err != nil { return 0, core.E("store.PurgeExpired", "delete expired rows", err) @@ -454,6 +543,10 @@ func (storeInstance *Store) PurgeExpired() (int64, error) { // New(":memory:") starts a background goroutine that calls PurgeExpired every // 60 seconds until Close stops the store. func (storeInstance *Store) startBackgroundPurge(purgeContext context.Context) { + if storeInstance == nil { + return + } + storeInstance.purgeWaitGroup.Go(func() { ticker := time.NewTicker(storeInstance.purgeInterval) defer ticker.Stop() @@ -512,6 +605,10 @@ func fieldsValueSeq(value string) iter.Seq[string] { // groupPrefix is empty, otherwise only rows whose group starts with the given // prefix. func (storeInstance *Store) purgeExpiredMatchingGroupPrefix(groupPrefix string) (int64, error) { + if err := storeInstance.ensureReady("store.purgeExpiredMatchingGroupPrefix"); err != nil { + return 0, err + } + var ( deleteResult sql.Result err error diff --git a/workspace.go b/workspace.go index 1160632..84c1a04 100644 --- a/workspace.go +++ b/workspace.go @@ -44,8 +44,39 @@ type Workspace struct { closed bool } +func (workspace *Workspace) ensureReady(operation string) error { + if workspace == nil { + return core.E(operation, "workspace is nil", nil) + } + if workspace.store == nil { + return core.E(operation, "workspace store is nil", nil) + } + if workspace.database == nil { + return core.E(operation, "workspace database is nil", nil) + } + if workspace.filesystem == nil { + return core.E(operation, "workspace filesystem is nil", nil) + } + if err := workspace.store.ensureReady(operation); err != nil { + return err + } + + workspace.closeLock.Lock() + closed := workspace.closed + workspace.closeLock.Unlock() + if closed { + return core.E(operation, "workspace is closed", nil) + } + + return nil +} + // Usage example: `workspace, err := storeInstance.NewWorkspace("scroll-session-2026-03-30")` func (storeInstance *Store) NewWorkspace(name string) (*Workspace, error) { + if err := storeInstance.ensureReady("store.NewWorkspace"); err != nil { + return nil, err + } + validation := core.ValidateName(name) if !validation.OK { return nil, core.E("store.NewWorkspace", "validate workspace name", validation.Value.(error)) @@ -78,6 +109,10 @@ func (storeInstance *Store) NewWorkspace(name string) (*Workspace, error) { // decide whether to commit or discard them. // Usage example: `orphans := storeInstance.RecoverOrphans(".core/state")` func (storeInstance *Store) RecoverOrphans(stateDirectory string) []*Workspace { + if storeInstance == nil { + return nil + } + if stateDirectory == "" { stateDirectory = defaultWorkspaceStateDirectory } @@ -131,6 +166,10 @@ func (storeInstance *Store) RecoverOrphans(stateDirectory string) []*Workspace { } func (storeInstance *Store) cleanUpOrphanedWorkspaces(stateDirectory string) { + if storeInstance == nil { + return + } + for _, orphanWorkspace := range storeInstance.RecoverOrphans(stateDirectory) { _ = orphanWorkspace.Aggregate() orphanWorkspace.Discard() @@ -139,6 +178,10 @@ func (storeInstance *Store) cleanUpOrphanedWorkspaces(stateDirectory string) { // Usage example: `err := workspace.Put("like", map[string]any{"user": "@alice", "post": "video_123"})` func (workspace *Workspace) Put(kind string, data map[string]any) error { + if err := workspace.ensureReady("store.Workspace.Put"); err != nil { + return err + } + if kind == "" { return core.E("store.Workspace.Put", "kind is empty", nil) } @@ -165,6 +208,10 @@ func (workspace *Workspace) Put(kind string, data map[string]any) error { // Usage example: `summary := workspace.Aggregate()` func (workspace *Workspace) Aggregate() map[string]any { + if err := workspace.ensureReady("store.Workspace.Aggregate"); err != nil { + return map[string]any{} + } + fields, err := workspace.aggregateFields() if err != nil { return map[string]any{} @@ -176,6 +223,10 @@ func (workspace *Workspace) Aggregate() map[string]any { // store summary entry for the workspace. // Usage example: `result := workspace.Commit()` func (workspace *Workspace) Commit() core.Result { + if err := workspace.ensureReady("store.Workspace.Commit"); err != nil { + return core.Result{Value: err, OK: false} + } + fields, err := workspace.aggregateFields() if err != nil { return core.Result{Value: core.E("store.Workspace.Commit", "aggregate workspace", err), OK: false} @@ -191,11 +242,18 @@ func (workspace *Workspace) Commit() core.Result { // Usage example: `workspace.Discard()` func (workspace *Workspace) Discard() { + if workspace == nil { + return + } _ = workspace.closeAndDelete() } // Usage example: `result := workspace.Query("SELECT entry_kind, COUNT(*) AS count FROM workspace_entries GROUP BY entry_kind")` func (workspace *Workspace) Query(sqlQuery string) core.Result { + if err := workspace.ensureReady("store.Workspace.Query"); err != nil { + return core.Result{Value: err, OK: false} + } + rows, err := workspace.database.Query(sqlQuery) if err != nil { return core.Result{Value: core.E("store.Workspace.Query", "query workspace", err), OK: false} @@ -210,6 +268,10 @@ func (workspace *Workspace) Query(sqlQuery string) core.Result { } func (workspace *Workspace) aggregateFields() (map[string]any, error) { + if err := workspace.ensureReady("store.Workspace.aggregateFields"); err != nil { + return nil, err + } + rows, err := workspace.database.Query( "SELECT entry_kind, COUNT(*) FROM " + workspaceEntriesTableName + " GROUP BY entry_kind ORDER BY entry_kind", ) @@ -236,6 +298,13 @@ func (workspace *Workspace) aggregateFields() (map[string]any, error) { } func (workspace *Workspace) closeAndDelete() error { + if workspace == nil { + return nil + } + if workspace.database == nil || workspace.filesystem == nil { + return nil + } + workspace.closeLock.Lock() defer workspace.closeLock.Unlock() @@ -256,6 +325,9 @@ func (workspace *Workspace) closeAndDelete() error { } func (storeInstance *Store) commitWorkspaceAggregate(workspaceName string, fields map[string]any) error { + if err := storeInstance.ensureReady("store.Workspace.Commit"); err != nil { + return err + } if err := ensureJournalSchema(storeInstance.database); err != nil { return core.E("store.Workspace.Commit", "ensure journal schema", err) }