feat(store): add declarative store config
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
f9a7e542bf
commit
aad8dded6b
4 changed files with 103 additions and 16 deletions
14
doc.go
14
doc.go
|
|
@ -11,6 +11,20 @@
|
|||
// }
|
||||
// defer storeInstance.Close()
|
||||
//
|
||||
// configuredStore, err := store.NewConfigured(store.StoreConfig{
|
||||
// DatabasePath: ":memory:",
|
||||
// Journal: store.JournalConfiguration{
|
||||
// EndpointURL: "http://127.0.0.1:8086",
|
||||
// Organisation: "core",
|
||||
// BucketName: "events",
|
||||
// },
|
||||
// PurgeInterval: 20 * time.Millisecond,
|
||||
// })
|
||||
// if err != nil {
|
||||
// return
|
||||
// }
|
||||
// defer configuredStore.Close()
|
||||
//
|
||||
// if err := storeInstance.Set("config", "colour", "blue"); err != nil {
|
||||
// return
|
||||
// }
|
||||
|
|
|
|||
|
|
@ -7,6 +7,8 @@ description: Group-namespaced SQLite key-value store with TTL expiry, namespace
|
|||
|
||||
`go-store` is a group-namespaced key-value store backed by SQLite. It provides persistent or in-memory storage with optional TTL expiry, namespace isolation for multi-tenant use, quota enforcement, and a reactive event system for observing mutations.
|
||||
|
||||
For declarative setup, `store.NewConfigured(store.StoreConfig{...})` takes a single config struct instead of functional options.
|
||||
|
||||
The package has a single runtime dependency -- a pure-Go SQLite driver (`modernc.org/sqlite`). No CGO is required. It compiles and runs on all platforms that Go supports.
|
||||
|
||||
**Module path:** `dappco.re/go/core/store`
|
||||
|
|
|
|||
77
store.go
77
store.go
|
|
@ -38,6 +38,16 @@ type journalConfiguration struct {
|
|||
bucketName string
|
||||
}
|
||||
|
||||
// Usage example: `config := store.StoreConfig{DatabasePath: ":memory:", PurgeInterval: 30 * time.Second}`
|
||||
type StoreConfig struct {
|
||||
// Usage example: `config := store.StoreConfig{DatabasePath: "/tmp/go-store.db"}`
|
||||
DatabasePath string
|
||||
// Usage example: `config := store.StoreConfig{Journal: store.JournalConfiguration{EndpointURL: "http://127.0.0.1:8086", Organisation: "core", BucketName: "events"}}`
|
||||
Journal JournalConfiguration
|
||||
// Usage example: `config := store.StoreConfig{PurgeInterval: 30 * time.Second}`
|
||||
PurgeInterval time.Duration
|
||||
}
|
||||
|
||||
// Usage example: `config := storeInstance.JournalConfiguration(); fmt.Println(config.EndpointURL, config.Organisation, config.BucketName)`
|
||||
type JournalConfiguration struct {
|
||||
// Usage example: `config := store.JournalConfiguration{EndpointURL: "http://127.0.0.1:8086"}`
|
||||
|
|
@ -51,6 +61,7 @@ type JournalConfiguration struct {
|
|||
// Usage example: `storeInstance, err := store.New(":memory:")`
|
||||
type Store struct {
|
||||
database *sql.DB
|
||||
purgeContext context.Context
|
||||
cancelPurge context.CancelFunc
|
||||
purgeWaitGroup sync.WaitGroup
|
||||
purgeInterval time.Duration // interval between background purge cycles
|
||||
|
|
@ -116,11 +127,49 @@ func WithPurgeInterval(interval time.Duration) StoreOption {
|
|||
}
|
||||
}
|
||||
|
||||
// Usage example: `storeInstance, err := store.NewConfigured(store.StoreConfig{DatabasePath: ":memory:", Journal: store.JournalConfiguration{EndpointURL: "http://127.0.0.1:8086", Organisation: "core", BucketName: "events"}, PurgeInterval: 20 * time.Millisecond})`
|
||||
func NewConfigured(config StoreConfig) (*Store, error) {
|
||||
storeInstance, err := openStore("store.NewConfigured", config.DatabasePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if config.Journal != (JournalConfiguration{}) {
|
||||
storeInstance.journalConfiguration = journalConfiguration{
|
||||
endpointURL: config.Journal.EndpointURL,
|
||||
organisation: config.Journal.Organisation,
|
||||
bucketName: config.Journal.BucketName,
|
||||
}
|
||||
}
|
||||
if config.PurgeInterval > 0 {
|
||||
storeInstance.purgeInterval = config.PurgeInterval
|
||||
}
|
||||
|
||||
storeInstance.startBackgroundPurge()
|
||||
storeInstance.cleanUpOrphanedWorkspaces(defaultWorkspaceStateDirectory)
|
||||
return storeInstance, nil
|
||||
}
|
||||
|
||||
// Usage example: `storeInstance, err := store.New("/tmp/go-store.db", store.WithJournal("http://127.0.0.1:8086", "core", "events"))`
|
||||
func New(databasePath string, options ...StoreOption) (*Store, error) {
|
||||
storeInstance, err := openStore("store.New", databasePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, option := range options {
|
||||
if option != nil {
|
||||
option(storeInstance)
|
||||
}
|
||||
}
|
||||
storeInstance.startBackgroundPurge()
|
||||
storeInstance.cleanUpOrphanedWorkspaces(defaultWorkspaceStateDirectory)
|
||||
return storeInstance, nil
|
||||
}
|
||||
|
||||
func openStore(operation, databasePath string) (*Store, error) {
|
||||
sqliteDatabase, err := sql.Open("sqlite", databasePath)
|
||||
if err != nil {
|
||||
return nil, core.E("store.New", "open database", err)
|
||||
return nil, core.E(operation, "open database", err)
|
||||
}
|
||||
// Serialise all access through a single connection. SQLite only supports
|
||||
// one writer at a time; using a pool causes SQLITE_BUSY under contention
|
||||
|
|
@ -129,32 +178,25 @@ func New(databasePath string, options ...StoreOption) (*Store, error) {
|
|||
sqliteDatabase.SetMaxOpenConns(1)
|
||||
if _, err := sqliteDatabase.Exec("PRAGMA journal_mode=WAL"); err != nil {
|
||||
sqliteDatabase.Close()
|
||||
return nil, core.E("store.New", "set WAL journal mode", err)
|
||||
return nil, core.E(operation, "set WAL journal mode", err)
|
||||
}
|
||||
if _, err := sqliteDatabase.Exec("PRAGMA busy_timeout=5000"); err != nil {
|
||||
sqliteDatabase.Close()
|
||||
return nil, core.E("store.New", "set busy timeout", err)
|
||||
return nil, core.E(operation, "set busy timeout", err)
|
||||
}
|
||||
if err := ensureSchema(sqliteDatabase); err != nil {
|
||||
sqliteDatabase.Close()
|
||||
return nil, core.E("store.New", "ensure schema", err)
|
||||
return nil, core.E(operation, "ensure schema", err)
|
||||
}
|
||||
|
||||
purgeContext, cancel := context.WithCancel(context.Background())
|
||||
storeInstance := &Store{
|
||||
return &Store{
|
||||
database: sqliteDatabase,
|
||||
purgeContext: purgeContext,
|
||||
cancelPurge: cancel,
|
||||
purgeInterval: 60 * time.Second,
|
||||
watchers: make(map[string][]chan Event),
|
||||
}
|
||||
for _, option := range options {
|
||||
if option != nil {
|
||||
option(storeInstance)
|
||||
}
|
||||
}
|
||||
storeInstance.startBackgroundPurge(purgeContext)
|
||||
storeInstance.cleanUpOrphanedWorkspaces(defaultWorkspaceStateDirectory)
|
||||
return storeInstance, nil
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Usage example: `storeInstance, err := store.New(":memory:"); if err != nil { return }; defer storeInstance.Close()`
|
||||
|
|
@ -542,17 +584,20 @@ func (storeInstance *Store) PurgeExpired() (int64, error) {
|
|||
|
||||
// New(":memory:") starts a background goroutine that calls PurgeExpired every
|
||||
// 60 seconds until Close stops the store.
|
||||
func (storeInstance *Store) startBackgroundPurge(purgeContext context.Context) {
|
||||
func (storeInstance *Store) startBackgroundPurge() {
|
||||
if storeInstance == nil {
|
||||
return
|
||||
}
|
||||
if storeInstance.purgeContext == nil {
|
||||
return
|
||||
}
|
||||
|
||||
storeInstance.purgeWaitGroup.Go(func() {
|
||||
ticker := time.NewTicker(storeInstance.purgeInterval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-purgeContext.Done():
|
||||
case <-storeInstance.purgeContext.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
if _, err := storeInstance.PurgeExpired(); err != nil {
|
||||
|
|
|
|||
|
|
@ -121,6 +121,32 @@ func TestStore_JournalConfiguration_Good(t *testing.T) {
|
|||
}, config)
|
||||
}
|
||||
|
||||
func TestStore_NewConfigured_Good(t *testing.T) {
|
||||
storeInstance, err := NewConfigured(StoreConfig{
|
||||
DatabasePath: ":memory:",
|
||||
Journal: JournalConfiguration{
|
||||
EndpointURL: "http://127.0.0.1:8086",
|
||||
Organisation: "core",
|
||||
BucketName: "events",
|
||||
},
|
||||
PurgeInterval: 20 * time.Millisecond,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
defer storeInstance.Close()
|
||||
|
||||
assert.Equal(t, JournalConfiguration{
|
||||
EndpointURL: "http://127.0.0.1:8086",
|
||||
Organisation: "core",
|
||||
BucketName: "events",
|
||||
}, storeInstance.JournalConfiguration())
|
||||
assert.Equal(t, 20*time.Millisecond, storeInstance.purgeInterval)
|
||||
|
||||
require.NoError(t, storeInstance.Set("g", "k", "v"))
|
||||
value, err := storeInstance.Get("g", "k")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "v", value)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Set / Get — core CRUD
|
||||
// ---------------------------------------------------------------------------
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue