From 00650fd51ef37be3dedbf9add250473f63f4c72d Mon Sep 17 00:00:00 2001 From: Virgil Date: Sat, 4 Apr 2026 18:20:52 +0000 Subject: [PATCH] feat(store): add transaction purge helpers Co-Authored-By: Virgil --- scope.go | 22 +++++++++++++++++++++- store.go | 12 ++++-------- transaction.go | 13 +++++++++++++ transaction_test.go | 41 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 79 insertions(+), 9 deletions(-) diff --git a/scope.go b/scope.go index 7ff710c..b718c4f 100644 --- a/scope.go +++ b/scope.go @@ -265,7 +265,14 @@ func (scopedStore *ScopedStore) GetFields(group, key string) (iter.Seq[string], // Usage example: `removedRows, err := scopedStore.PurgeExpired(); if err != nil { return }; fmt.Println(removedRows)` func (scopedStore *ScopedStore) PurgeExpired() (int64, error) { - removedRows, err := scopedStore.storeInstance.purgeExpiredMatchingGroupPrefix(scopedStore.namespacePrefix()) + if scopedStore == nil { + return 0, core.E("store.ScopedStore.PurgeExpired", "scoped store is nil", nil) + } + if err := scopedStore.storeInstance.ensureReady("store.ScopedStore.PurgeExpired"); err != nil { + return 0, err + } + + removedRows, err := purgeExpiredMatchingGroupPrefix(scopedStore.storeInstance.sqliteDatabase, scopedStore.namespacePrefix()) if err != nil { return 0, core.E("store.ScopedStore.PurgeExpired", "delete expired rows", err) } @@ -509,6 +516,19 @@ func (scopedStoreTransaction *ScopedStoreTransaction) GetFields(group, key strin return scopedStoreTransaction.storeTransaction.GetFields(scopedStoreTransaction.scopedStore.namespacedGroup(group), key) } +// Usage example: `removedRows, err := scopedStoreTransaction.PurgeExpired(); if err != nil { return err }; fmt.Println(removedRows)` +func (scopedStoreTransaction *ScopedStoreTransaction) PurgeExpired() (int64, error) { + if err := scopedStoreTransaction.ensureReady("store.ScopedStoreTransaction.PurgeExpired"); err != nil { + return 0, err + } + + removedRows, err := purgeExpiredMatchingGroupPrefix(scopedStoreTransaction.storeTransaction.sqliteTransaction, scopedStoreTransaction.scopedStore.namespacePrefix()) + if err != nil { + return 0, core.E("store.ScopedStoreTransaction.PurgeExpired", "delete expired rows", err) + } + return removedRows, nil +} + func (scopedStoreTransaction *ScopedStoreTransaction) checkQuota(operation, group, key string) error { if scopedStoreTransaction.scopedStore.MaxKeys == 0 && scopedStoreTransaction.scopedStore.MaxGroups == 0 { return nil diff --git a/store.go b/store.go index d121469..8e26bfb 100644 --- a/store.go +++ b/store.go @@ -817,7 +817,7 @@ func (storeInstance *Store) PurgeExpired() (int64, error) { return 0, err } - removedRows, err := storeInstance.purgeExpiredMatchingGroupPrefix("") + removedRows, err := purgeExpiredMatchingGroupPrefix(storeInstance.sqliteDatabase, "") if err != nil { return 0, core.E("store.PurgeExpired", "delete expired rows", err) } @@ -896,23 +896,19 @@ func fieldsValueSeq(value string) iter.Seq[string] { // purgeExpiredMatchingGroupPrefix deletes expired rows globally when // groupPrefix is empty, otherwise only rows whose group starts with the given // prefix. -func (storeInstance *Store) purgeExpiredMatchingGroupPrefix(groupPrefix string) (int64, error) { - if err := storeInstance.ensureReady("store.purgeExpiredMatchingGroupPrefix"); err != nil { - return 0, err - } - +func purgeExpiredMatchingGroupPrefix(database schemaDatabase, groupPrefix string) (int64, error) { var ( deleteResult sql.Result err error ) now := time.Now().UnixMilli() if groupPrefix == "" { - deleteResult, err = storeInstance.sqliteDatabase.Exec( + deleteResult, err = database.Exec( "DELETE FROM "+entriesTableName+" WHERE expires_at IS NOT NULL AND expires_at <= ?", now, ) } else { - deleteResult, err = storeInstance.sqliteDatabase.Exec( + deleteResult, err = database.Exec( "DELETE FROM "+entriesTableName+" WHERE expires_at IS NOT NULL AND expires_at <= ? AND "+entryGroupColumn+" LIKE ? ESCAPE '^'", now, escapeLike(groupPrefix)+"%", ) diff --git a/transaction.go b/transaction.go index 203e07e..285dde7 100644 --- a/transaction.go +++ b/transaction.go @@ -481,3 +481,16 @@ func (storeTransaction *StoreTransaction) GetFields(group, key string) (iter.Seq } return fieldsValueSeq(value), nil } + +// Usage example: `removedRows, err := transaction.PurgeExpired(); if err != nil { return err }; fmt.Println(removedRows)` +func (storeTransaction *StoreTransaction) PurgeExpired() (int64, error) { + if err := storeTransaction.ensureReady("store.Transaction.PurgeExpired"); err != nil { + return 0, err + } + + removedRows, err := purgeExpiredMatchingGroupPrefix(storeTransaction.sqliteTransaction, "") + if err != nil { + return 0, core.E("store.Transaction.PurgeExpired", "delete expired rows", err) + } + return removedRows, nil +} diff --git a/transaction_test.go b/transaction_test.go index fb5e04d..e3d2ae9 100644 --- a/transaction_test.go +++ b/transaction_test.go @@ -127,6 +127,25 @@ func TestTransaction_Transaction_Good_ReadHelpersSeePendingWrites(t *testing.T) require.NoError(t, err) } +func TestTransaction_Transaction_Good_PurgeExpired(t *testing.T) { + storeInstance, _ := New(":memory:") + defer storeInstance.Close() + + require.NoError(t, storeInstance.SetWithTTL("alpha", "ephemeral", "gone", 1*time.Millisecond)) + time.Sleep(5 * time.Millisecond) + + err := storeInstance.Transaction(func(transaction *StoreTransaction) error { + removedRows, err := transaction.PurgeExpired() + require.NoError(t, err) + assert.Equal(t, int64(1), removedRows) + return nil + }) + require.NoError(t, err) + + _, err = storeInstance.Get("alpha", "ephemeral") + assert.ErrorIs(t, err, NotFoundError) +} + func TestTransaction_ScopedStoreTransaction_Good_GetPage(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close() @@ -194,6 +213,28 @@ func TestTransaction_ScopedStoreTransaction_Good_CommitsNamespacedWrites(t *test assert.Equal(t, "en-GB", localeValue) } +func TestTransaction_ScopedStoreTransaction_Good_PurgeExpired(t *testing.T) { + storeInstance, _ := New(":memory:") + defer storeInstance.Close() + + scopedStore, err := NewScoped(storeInstance, "tenant-a") + require.NoError(t, err) + + require.NoError(t, scopedStore.SetWithTTL("session", "token", "abc123", 1*time.Millisecond)) + time.Sleep(5 * time.Millisecond) + + err = scopedStore.Transaction(func(transaction *ScopedStoreTransaction) error { + removedRows, err := transaction.PurgeExpired() + require.NoError(t, err) + assert.Equal(t, int64(1), removedRows) + return nil + }) + require.NoError(t, err) + + _, err = scopedStore.GetFrom("session", "token") + assert.ErrorIs(t, err, NotFoundError) +} + func TestTransaction_ScopedStoreTransaction_Good_QuotaUsesPendingWrites(t *testing.T) { storeInstance, _ := New(":memory:") defer storeInstance.Close()