From e5a0f66e08afc6c19dfb870945efd219cf334072 Mon Sep 17 00:00:00 2001 From: Snider Date: Wed, 15 Apr 2026 11:17:37 +0100 Subject: [PATCH] Emit TTL purge events --- scope.go | 36 ++++++++++++++++++++++++++-- store.go | 65 ++++++++++++++++++++++++++++++++++++++++++++++---- transaction.go | 18 +++++++++++++- 3 files changed, 111 insertions(+), 8 deletions(-) diff --git a/scope.go b/scope.go index 78cab1e..71780a6 100644 --- a/scope.go +++ b/scope.go @@ -386,10 +386,26 @@ func (scopedStore *ScopedStore) PurgeExpired() (int64, error) { return 0, err } - removedRows, err := purgeExpiredMatchingGroupPrefix(scopedStore.store.sqliteDatabase, scopedStore.namespacePrefix()) + cutoffUnixMilli := time.Now().UnixMilli() + expiredEntries, err := listExpiredEntriesMatchingGroupPrefix(scopedStore.store.sqliteDatabase, scopedStore.namespacePrefix(), cutoffUnixMilli) + if err != nil { + return 0, core.E("store.ScopedStore.PurgeExpired", "list expired rows", err) + } + + removedRows, err := purgeExpiredMatchingGroupPrefix(scopedStore.store.sqliteDatabase, scopedStore.namespacePrefix(), cutoffUnixMilli) if err != nil { return 0, core.E("store.ScopedStore.PurgeExpired", "delete expired rows", err) } + if removedRows > 0 { + for _, expiredEntry := range expiredEntries { + scopedStore.store.notify(Event{ + Type: EventDelete, + Group: expiredEntry.group, + Key: expiredEntry.key, + Timestamp: time.Now(), + }) + } + } return removedRows, nil } @@ -792,10 +808,26 @@ func (scopedStoreTransaction *ScopedStoreTransaction) PurgeExpired() (int64, err return 0, err } - removedRows, err := purgeExpiredMatchingGroupPrefix(scopedStoreTransaction.storeTransaction.sqliteTransaction, scopedStoreTransaction.scopedStore.namespacePrefix()) + cutoffUnixMilli := time.Now().UnixMilli() + expiredEntries, err := listExpiredEntriesMatchingGroupPrefix(scopedStoreTransaction.storeTransaction.sqliteTransaction, scopedStoreTransaction.scopedStore.namespacePrefix(), cutoffUnixMilli) + if err != nil { + return 0, core.E("store.ScopedStoreTransaction.PurgeExpired", "list expired rows", err) + } + + removedRows, err := purgeExpiredMatchingGroupPrefix(scopedStoreTransaction.storeTransaction.sqliteTransaction, scopedStoreTransaction.scopedStore.namespacePrefix(), cutoffUnixMilli) if err != nil { return 0, core.E("store.ScopedStoreTransaction.PurgeExpired", "delete expired rows", err) } + if removedRows > 0 { + for _, expiredEntry := range expiredEntries { + scopedStoreTransaction.storeTransaction.recordEvent(Event{ + Type: EventDelete, + Group: expiredEntry.group, + Key: expiredEntry.key, + Timestamp: time.Now(), + }) + } + } return removedRows, nil } diff --git a/store.go b/store.go index cd49ea3..8e24ff3 100644 --- a/store.go +++ b/store.go @@ -959,10 +959,26 @@ func (storeInstance *Store) PurgeExpired() (int64, error) { return 0, err } - removedRows, err := purgeExpiredMatchingGroupPrefix(storeInstance.sqliteDatabase, "") + cutoffUnixMilli := time.Now().UnixMilli() + expiredEntries, err := listExpiredEntriesMatchingGroupPrefix(storeInstance.sqliteDatabase, "", cutoffUnixMilli) + if err != nil { + return 0, core.E("store.PurgeExpired", "list expired rows", err) + } + + removedRows, err := purgeExpiredMatchingGroupPrefix(storeInstance.sqliteDatabase, "", cutoffUnixMilli) if err != nil { return 0, core.E("store.PurgeExpired", "delete expired rows", err) } + if removedRows > 0 { + for _, expiredEntry := range expiredEntries { + storeInstance.notify(Event{ + Type: EventDelete, + Group: expiredEntry.group, + Key: expiredEntry.key, + Timestamp: time.Now(), + }) + } + } return removedRows, nil } @@ -1035,24 +1051,63 @@ func fieldsValueSeq(value string) iter.Seq[string] { } } +type expiredEntryRef struct { + group string + key string +} + +func listExpiredEntriesMatchingGroupPrefix(database schemaDatabase, groupPrefix string, cutoffUnixMilli int64) ([]expiredEntryRef, error) { + var ( + rows *sql.Rows + err error + ) + if groupPrefix == "" { + rows, err = database.Query( + "SELECT "+entryGroupColumn+", "+entryKeyColumn+" FROM "+entriesTableName+" WHERE expires_at IS NOT NULL AND expires_at <= ? ORDER BY "+entryGroupColumn+", "+entryKeyColumn, + cutoffUnixMilli, + ) + } else { + rows, err = database.Query( + "SELECT "+entryGroupColumn+", "+entryKeyColumn+" FROM "+entriesTableName+" WHERE expires_at IS NOT NULL AND expires_at <= ? AND "+entryGroupColumn+" LIKE ? ESCAPE '^' ORDER BY "+entryGroupColumn+", "+entryKeyColumn, + cutoffUnixMilli, escapeLike(groupPrefix)+"%", + ) + } + if err != nil { + return nil, err + } + defer rows.Close() + + expiredEntries := make([]expiredEntryRef, 0) + for rows.Next() { + var expiredEntry expiredEntryRef + if err := rows.Scan(&expiredEntry.group, &expiredEntry.key); err != nil { + return nil, err + } + expiredEntries = append(expiredEntries, expiredEntry) + } + if err := rows.Err(); err != nil { + return nil, err + } + return expiredEntries, nil +} + // purgeExpiredMatchingGroupPrefix deletes expired rows globally when // groupPrefix is empty, otherwise only rows whose group starts with the given // prefix. -func purgeExpiredMatchingGroupPrefix(database schemaDatabase, groupPrefix string) (int64, error) { +func purgeExpiredMatchingGroupPrefix(database schemaDatabase, groupPrefix string, cutoffUnixMilli int64) (int64, error) { var ( deleteResult sql.Result err error ) - now := time.Now().UnixMilli() if groupPrefix == "" { deleteResult, err = database.Exec( "DELETE FROM "+entriesTableName+" WHERE expires_at IS NOT NULL AND expires_at <= ?", - now, + cutoffUnixMilli, ) } else { deleteResult, err = database.Exec( "DELETE FROM "+entriesTableName+" WHERE expires_at IS NOT NULL AND expires_at <= ? AND "+entryGroupColumn+" LIKE ? ESCAPE '^'", - now, escapeLike(groupPrefix)+"%", + cutoffUnixMilli, escapeLike(groupPrefix)+"%", ) } if err != nil { diff --git a/transaction.go b/transaction.go index 50213a6..b7b0953 100644 --- a/transaction.go +++ b/transaction.go @@ -511,9 +511,25 @@ func (storeTransaction *StoreTransaction) PurgeExpired() (int64, error) { return 0, err } - removedRows, err := purgeExpiredMatchingGroupPrefix(storeTransaction.sqliteTransaction, "") + cutoffUnixMilli := time.Now().UnixMilli() + expiredEntries, err := listExpiredEntriesMatchingGroupPrefix(storeTransaction.sqliteTransaction, "", cutoffUnixMilli) + if err != nil { + return 0, core.E("store.Transaction.PurgeExpired", "list expired rows", err) + } + + removedRows, err := purgeExpiredMatchingGroupPrefix(storeTransaction.sqliteTransaction, "", cutoffUnixMilli) if err != nil { return 0, core.E("store.Transaction.PurgeExpired", "delete expired rows", err) } + if removedRows > 0 { + for _, expiredEntry := range expiredEntries { + storeTransaction.recordEvent(Event{ + Type: EventDelete, + Group: expiredEntry.group, + Key: expiredEntry.key, + Timestamp: time.Now(), + }) + } + } return removedRows, nil }