feat(store): add transaction purge helpers
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
f30fb8c20b
commit
00650fd51e
4 changed files with 79 additions and 9 deletions
22
scope.go
22
scope.go
|
|
@ -265,7 +265,14 @@ func (scopedStore *ScopedStore) GetFields(group, key string) (iter.Seq[string],
|
|||
|
||||
// Usage example: `removedRows, err := scopedStore.PurgeExpired(); if err != nil { return }; fmt.Println(removedRows)`
|
||||
func (scopedStore *ScopedStore) PurgeExpired() (int64, error) {
|
||||
removedRows, err := scopedStore.storeInstance.purgeExpiredMatchingGroupPrefix(scopedStore.namespacePrefix())
|
||||
if scopedStore == nil {
|
||||
return 0, core.E("store.ScopedStore.PurgeExpired", "scoped store is nil", nil)
|
||||
}
|
||||
if err := scopedStore.storeInstance.ensureReady("store.ScopedStore.PurgeExpired"); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
removedRows, err := purgeExpiredMatchingGroupPrefix(scopedStore.storeInstance.sqliteDatabase, scopedStore.namespacePrefix())
|
||||
if err != nil {
|
||||
return 0, core.E("store.ScopedStore.PurgeExpired", "delete expired rows", err)
|
||||
}
|
||||
|
|
@ -509,6 +516,19 @@ func (scopedStoreTransaction *ScopedStoreTransaction) GetFields(group, key strin
|
|||
return scopedStoreTransaction.storeTransaction.GetFields(scopedStoreTransaction.scopedStore.namespacedGroup(group), key)
|
||||
}
|
||||
|
||||
// Usage example: `removedRows, err := scopedStoreTransaction.PurgeExpired(); if err != nil { return err }; fmt.Println(removedRows)`
|
||||
func (scopedStoreTransaction *ScopedStoreTransaction) PurgeExpired() (int64, error) {
|
||||
if err := scopedStoreTransaction.ensureReady("store.ScopedStoreTransaction.PurgeExpired"); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
removedRows, err := purgeExpiredMatchingGroupPrefix(scopedStoreTransaction.storeTransaction.sqliteTransaction, scopedStoreTransaction.scopedStore.namespacePrefix())
|
||||
if err != nil {
|
||||
return 0, core.E("store.ScopedStoreTransaction.PurgeExpired", "delete expired rows", err)
|
||||
}
|
||||
return removedRows, nil
|
||||
}
|
||||
|
||||
func (scopedStoreTransaction *ScopedStoreTransaction) checkQuota(operation, group, key string) error {
|
||||
if scopedStoreTransaction.scopedStore.MaxKeys == 0 && scopedStoreTransaction.scopedStore.MaxGroups == 0 {
|
||||
return nil
|
||||
|
|
|
|||
12
store.go
12
store.go
|
|
@ -817,7 +817,7 @@ func (storeInstance *Store) PurgeExpired() (int64, error) {
|
|||
return 0, err
|
||||
}
|
||||
|
||||
removedRows, err := storeInstance.purgeExpiredMatchingGroupPrefix("")
|
||||
removedRows, err := purgeExpiredMatchingGroupPrefix(storeInstance.sqliteDatabase, "")
|
||||
if err != nil {
|
||||
return 0, core.E("store.PurgeExpired", "delete expired rows", err)
|
||||
}
|
||||
|
|
@ -896,23 +896,19 @@ func fieldsValueSeq(value string) iter.Seq[string] {
|
|||
// purgeExpiredMatchingGroupPrefix deletes expired rows globally when
|
||||
// groupPrefix is empty, otherwise only rows whose group starts with the given
|
||||
// prefix.
|
||||
func (storeInstance *Store) purgeExpiredMatchingGroupPrefix(groupPrefix string) (int64, error) {
|
||||
if err := storeInstance.ensureReady("store.purgeExpiredMatchingGroupPrefix"); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
func purgeExpiredMatchingGroupPrefix(database schemaDatabase, groupPrefix string) (int64, error) {
|
||||
var (
|
||||
deleteResult sql.Result
|
||||
err error
|
||||
)
|
||||
now := time.Now().UnixMilli()
|
||||
if groupPrefix == "" {
|
||||
deleteResult, err = storeInstance.sqliteDatabase.Exec(
|
||||
deleteResult, err = database.Exec(
|
||||
"DELETE FROM "+entriesTableName+" WHERE expires_at IS NOT NULL AND expires_at <= ?",
|
||||
now,
|
||||
)
|
||||
} else {
|
||||
deleteResult, err = storeInstance.sqliteDatabase.Exec(
|
||||
deleteResult, err = database.Exec(
|
||||
"DELETE FROM "+entriesTableName+" WHERE expires_at IS NOT NULL AND expires_at <= ? AND "+entryGroupColumn+" LIKE ? ESCAPE '^'",
|
||||
now, escapeLike(groupPrefix)+"%",
|
||||
)
|
||||
|
|
|
|||
|
|
@ -481,3 +481,16 @@ func (storeTransaction *StoreTransaction) GetFields(group, key string) (iter.Seq
|
|||
}
|
||||
return fieldsValueSeq(value), nil
|
||||
}
|
||||
|
||||
// Usage example: `removedRows, err := transaction.PurgeExpired(); if err != nil { return err }; fmt.Println(removedRows)`
|
||||
func (storeTransaction *StoreTransaction) PurgeExpired() (int64, error) {
|
||||
if err := storeTransaction.ensureReady("store.Transaction.PurgeExpired"); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
removedRows, err := purgeExpiredMatchingGroupPrefix(storeTransaction.sqliteTransaction, "")
|
||||
if err != nil {
|
||||
return 0, core.E("store.Transaction.PurgeExpired", "delete expired rows", err)
|
||||
}
|
||||
return removedRows, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -127,6 +127,25 @@ func TestTransaction_Transaction_Good_ReadHelpersSeePendingWrites(t *testing.T)
|
|||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestTransaction_Transaction_Good_PurgeExpired(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
require.NoError(t, storeInstance.SetWithTTL("alpha", "ephemeral", "gone", 1*time.Millisecond))
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
|
||||
err := storeInstance.Transaction(func(transaction *StoreTransaction) error {
|
||||
removedRows, err := transaction.PurgeExpired()
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, int64(1), removedRows)
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = storeInstance.Get("alpha", "ephemeral")
|
||||
assert.ErrorIs(t, err, NotFoundError)
|
||||
}
|
||||
|
||||
func TestTransaction_ScopedStoreTransaction_Good_GetPage(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
|
@ -194,6 +213,28 @@ func TestTransaction_ScopedStoreTransaction_Good_CommitsNamespacedWrites(t *test
|
|||
assert.Equal(t, "en-GB", localeValue)
|
||||
}
|
||||
|
||||
func TestTransaction_ScopedStoreTransaction_Good_PurgeExpired(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
scopedStore, err := NewScoped(storeInstance, "tenant-a")
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, scopedStore.SetWithTTL("session", "token", "abc123", 1*time.Millisecond))
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
|
||||
err = scopedStore.Transaction(func(transaction *ScopedStoreTransaction) error {
|
||||
removedRows, err := transaction.PurgeExpired()
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, int64(1), removedRows)
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = scopedStore.GetFrom("session", "token")
|
||||
assert.ErrorIs(t, err, NotFoundError)
|
||||
}
|
||||
|
||||
func TestTransaction_ScopedStoreTransaction_Good_QuotaUsesPendingWrites(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue