fix(store): tighten scoped purge and delete events
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
4f257cee6f
commit
134853e6df
5 changed files with 149 additions and 11 deletions
|
|
@ -172,6 +172,22 @@ func TestEvents_Watch_Good_DeleteEvent(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestEvents_Watch_Good_DeleteMissingKeyDoesNotEmitEvent(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
watcher := storeInstance.Watch("*", "*")
|
||||
defer storeInstance.Unwatch(watcher)
|
||||
|
||||
require.NoError(t, storeInstance.Delete("g", "missing"))
|
||||
|
||||
select {
|
||||
case event := <-watcher.Events:
|
||||
t.Fatalf("unexpected event for missing key delete: %+v", event)
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// DeleteGroup triggers event
|
||||
// ---------------------------------------------------------------------------
|
||||
|
|
@ -202,6 +218,22 @@ func TestEvents_Watch_Good_DeleteGroupEvent(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestEvents_Watch_Good_DeleteMissingGroupDoesNotEmitEvent(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
watcher := storeInstance.Watch("*", "*")
|
||||
defer storeInstance.Unwatch(watcher)
|
||||
|
||||
require.NoError(t, storeInstance.DeleteGroup("missing"))
|
||||
|
||||
select {
|
||||
case event := <-watcher.Events:
|
||||
t.Fatalf("unexpected event for missing group delete: %+v", event)
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// OnChange — callback fires on mutations
|
||||
// ---------------------------------------------------------------------------
|
||||
|
|
|
|||
6
go.sum
6
go.sum
|
|
@ -5,6 +5,7 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1
|
|||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
|
||||
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
|
||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs=
|
||||
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
|
|
@ -25,15 +26,20 @@ github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94
|
|||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
|
||||
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
|
||||
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
|
||||
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
|
||||
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
|
||||
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
||||
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
|
||||
golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546/go.mod h1:j/pmGrbnkbPtQfxEe5D0VQhZC6qKbfKifgD0oM7sR70=
|
||||
golang.org/x/mod v0.34.0 h1:xIHgNUUnW6sYkcM5Jleh05DvLOtwc6RitGHbDk4akRI=
|
||||
golang.org/x/mod v0.34.0/go.mod h1:ykgH52iCZe79kzLLMhyCUzhMci+nQj+0XkbXpNYtVjY=
|
||||
golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw=
|
||||
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
|
||||
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
|
||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo=
|
||||
golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
|
||||
golang.org/x/telemetry v0.0.0-20260311193753-579e4da9a98c/go.mod h1:TpUTTEp9frx7rTdLpC9gFG9kdI7zVLFTFFlqaH2Cncw=
|
||||
golang.org/x/tools v0.43.0 h1:12BdW9CeB3Z+J/I/wj34VMl8X+fEXBxVR90JeMX5E7s=
|
||||
golang.org/x/tools v0.43.0/go.mod h1:uHkMso649BX2cZK6+RpuIPXS3ho2hZo4FVwfoy1vIk0=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
|
|
|
|||
13
scope.go
13
scope.go
|
|
@ -48,6 +48,13 @@ func NewScopedWithQuota(storeInstance *Store, namespace string, quota QuotaConfi
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if quota.MaxKeys < 0 || quota.MaxGroups < 0 {
|
||||
return nil, core.E(
|
||||
"store.NewScopedWithQuota",
|
||||
core.Sprintf("quota values must be zero or positive; got MaxKeys=%d MaxGroups=%d", quota.MaxKeys, quota.MaxGroups),
|
||||
nil,
|
||||
)
|
||||
}
|
||||
scopedStore.quota = quota
|
||||
return scopedStore, nil
|
||||
}
|
||||
|
|
@ -168,7 +175,11 @@ func (scopedStore *ScopedStore) GetFields(group, key string) (iter.Seq[string],
|
|||
|
||||
// Usage example: `removedRows, err := scopedStore.PurgeExpired(); if err != nil { return }; fmt.Println(removedRows)`
|
||||
func (scopedStore *ScopedStore) PurgeExpired() (int64, error) {
|
||||
return scopedStore.storeInstance.PurgeExpired()
|
||||
removedRows, err := scopedStore.storeInstance.purgeExpiredMatchingGroupPrefix(scopedStore.namespacePrefix())
|
||||
if err != nil {
|
||||
return 0, core.E("store.ScopedStore.PurgeExpired", "delete expired rows", err)
|
||||
}
|
||||
return removedRows, nil
|
||||
}
|
||||
|
||||
// checkQuota("store.ScopedStore.Set", "config", "colour") returns nil when the
|
||||
|
|
|
|||
|
|
@ -76,6 +76,24 @@ func TestScope_NewScopedWithQuota_Bad_NilStore(t *testing.T) {
|
|||
assert.Contains(t, err.Error(), "store instance is nil")
|
||||
}
|
||||
|
||||
func TestScope_NewScopedWithQuota_Bad_NegativeMaxKeys(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
_, err := NewScopedWithQuota(storeInstance, "tenant-a", QuotaConfig{MaxKeys: -1})
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "zero or positive")
|
||||
}
|
||||
|
||||
func TestScope_NewScopedWithQuota_Bad_NegativeMaxGroups(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
_, err := NewScopedWithQuota(storeInstance, "tenant-a", QuotaConfig{MaxGroups: -1})
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "zero or positive")
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// ScopedStore — basic CRUD
|
||||
// ---------------------------------------------------------------------------
|
||||
|
|
@ -380,6 +398,28 @@ func TestScope_ScopedStore_Good_PurgeExpired(t *testing.T) {
|
|||
assert.True(t, core.Is(err, NotFoundError))
|
||||
}
|
||||
|
||||
func TestScope_ScopedStore_Good_PurgeExpired_NamespaceLocal(t *testing.T) {
|
||||
storeInstance, _ := New(":memory:")
|
||||
defer storeInstance.Close()
|
||||
|
||||
alphaStore, _ := NewScoped(storeInstance, "tenant-a")
|
||||
betaStore, _ := NewScoped(storeInstance, "tenant-b")
|
||||
|
||||
require.NoError(t, alphaStore.SetWithTTL("session", "alpha-token", "alpha", 1*time.Millisecond))
|
||||
require.NoError(t, betaStore.SetWithTTL("session", "beta-token", "beta", 1*time.Millisecond))
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
|
||||
assert.Equal(t, 1, rawEntryCount(t, storeInstance, "tenant-a:session"))
|
||||
assert.Equal(t, 1, rawEntryCount(t, storeInstance, "tenant-b:session"))
|
||||
|
||||
removedRows, err := alphaStore.PurgeExpired()
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, int64(1), removedRows)
|
||||
|
||||
assert.Equal(t, 0, rawEntryCount(t, storeInstance, "tenant-a:session"))
|
||||
assert.Equal(t, 1, rawEntryCount(t, storeInstance, "tenant-b:session"))
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Quota enforcement — MaxKeys
|
||||
// ---------------------------------------------------------------------------
|
||||
|
|
@ -814,3 +854,15 @@ func TestScope_Groups_Bad_ClosedStore(t *testing.T) {
|
|||
func keyName(i int) string {
|
||||
return "key-" + string(rune('a'+i%26))
|
||||
}
|
||||
|
||||
func rawEntryCount(t *testing.T, storeInstance *Store, group string) int {
|
||||
t.Helper()
|
||||
|
||||
var count int
|
||||
err := storeInstance.database.QueryRow(
|
||||
"SELECT COUNT(*) FROM "+entriesTableName+" WHERE "+entryGroupColumn+" = ?",
|
||||
group,
|
||||
).Scan(&count)
|
||||
require.NoError(t, err)
|
||||
return count
|
||||
}
|
||||
|
|
|
|||
57
store.go
57
store.go
|
|
@ -141,11 +141,17 @@ func (storeInstance *Store) SetWithTTL(group, key, value string, timeToLive time
|
|||
|
||||
// Usage example: `if err := storeInstance.Delete("config", "colour"); err != nil { return }`
|
||||
func (storeInstance *Store) Delete(group, key string) error {
|
||||
_, err := storeInstance.database.Exec("DELETE FROM "+entriesTableName+" WHERE "+entryGroupColumn+" = ? AND "+entryKeyColumn+" = ?", group, key)
|
||||
deleteResult, err := storeInstance.database.Exec("DELETE FROM "+entriesTableName+" WHERE "+entryGroupColumn+" = ? AND "+entryKeyColumn+" = ?", group, key)
|
||||
if err != nil {
|
||||
return core.E("store.Delete", "delete row", err)
|
||||
}
|
||||
storeInstance.notify(Event{Type: EventDelete, Group: group, Key: key, Timestamp: time.Now()})
|
||||
deletedRows, rowsAffectedError := deleteResult.RowsAffected()
|
||||
if rowsAffectedError != nil {
|
||||
return core.E("store.Delete", "count deleted rows", rowsAffectedError)
|
||||
}
|
||||
if deletedRows > 0 {
|
||||
storeInstance.notify(Event{Type: EventDelete, Group: group, Key: key, Timestamp: time.Now()})
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -164,11 +170,17 @@ func (storeInstance *Store) Count(group string) (int, error) {
|
|||
|
||||
// Usage example: `if err := storeInstance.DeleteGroup("cache"); err != nil { return }`
|
||||
func (storeInstance *Store) DeleteGroup(group string) error {
|
||||
_, err := storeInstance.database.Exec("DELETE FROM "+entriesTableName+" WHERE "+entryGroupColumn+" = ?", group)
|
||||
deleteResult, err := storeInstance.database.Exec("DELETE FROM "+entriesTableName+" WHERE "+entryGroupColumn+" = ?", group)
|
||||
if err != nil {
|
||||
return core.E("store.DeleteGroup", "delete group", err)
|
||||
}
|
||||
storeInstance.notify(Event{Type: EventDeleteGroup, Group: group, Timestamp: time.Now()})
|
||||
deletedRows, rowsAffectedError := deleteResult.RowsAffected()
|
||||
if rowsAffectedError != nil {
|
||||
return core.E("store.DeleteGroup", "count deleted rows", rowsAffectedError)
|
||||
}
|
||||
if deletedRows > 0 {
|
||||
storeInstance.notify(Event{Type: EventDeleteGroup, Group: group, Timestamp: time.Now()})
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -348,15 +360,10 @@ func escapeLike(text string) string {
|
|||
|
||||
// Usage example: `removed, err := storeInstance.PurgeExpired()`
|
||||
func (storeInstance *Store) PurgeExpired() (int64, error) {
|
||||
deleteResult, err := storeInstance.database.Exec("DELETE FROM "+entriesTableName+" WHERE expires_at IS NOT NULL AND expires_at <= ?",
|
||||
time.Now().UnixMilli())
|
||||
removedRows, err := storeInstance.purgeExpiredMatchingGroupPrefix("")
|
||||
if err != nil {
|
||||
return 0, core.E("store.PurgeExpired", "delete expired rows", err)
|
||||
}
|
||||
removedRows, rowsAffectedErr := deleteResult.RowsAffected()
|
||||
if rowsAffectedErr != nil {
|
||||
return 0, core.E("store.PurgeExpired", "count deleted rows", rowsAffectedErr)
|
||||
}
|
||||
return removedRows, nil
|
||||
}
|
||||
|
||||
|
|
@ -417,6 +424,36 @@ func fieldsValueSeq(value string) iter.Seq[string] {
|
|||
}
|
||||
}
|
||||
|
||||
// purgeExpiredMatchingGroupPrefix deletes expired rows globally when
|
||||
// groupPrefix is empty, otherwise only rows whose group starts with the given
|
||||
// prefix.
|
||||
func (storeInstance *Store) purgeExpiredMatchingGroupPrefix(groupPrefix string) (int64, error) {
|
||||
var (
|
||||
deleteResult sql.Result
|
||||
err error
|
||||
)
|
||||
now := time.Now().UnixMilli()
|
||||
if groupPrefix == "" {
|
||||
deleteResult, err = storeInstance.database.Exec(
|
||||
"DELETE FROM "+entriesTableName+" WHERE expires_at IS NOT NULL AND expires_at <= ?",
|
||||
now,
|
||||
)
|
||||
} else {
|
||||
deleteResult, err = storeInstance.database.Exec(
|
||||
"DELETE FROM "+entriesTableName+" WHERE expires_at IS NOT NULL AND expires_at <= ? AND "+entryGroupColumn+" LIKE ? ESCAPE '^'",
|
||||
now, escapeLike(groupPrefix)+"%",
|
||||
)
|
||||
}
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
removedRows, rowsAffectedErr := deleteResult.RowsAffected()
|
||||
if rowsAffectedErr != nil {
|
||||
return 0, rowsAffectedErr
|
||||
}
|
||||
return removedRows, nil
|
||||
}
|
||||
|
||||
type schemaDatabase interface {
|
||||
Exec(query string, args ...any) (sql.Result, error)
|
||||
QueryRow(query string, args ...any) *sql.Row
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue