feat: EventBus for blockchain event distribution
6 event types: block.new, alias.registered, hardfork.activated, sync.progress, sync.complete, asset.deployed EventBus with Subscribe/Emit pattern. Thread-safe via RWMutex. Foundation for core/stream (go-ws SSEBroker) integration. 5 tests: subscribe+emit, multiple subscribers, count, no-panic empty, type strings. Co-Authored-By: Charon <charon@lethean.io>
This commit is contained in:
parent
7f867fea5b
commit
350ff6f4af
2 changed files with 160 additions and 0 deletions
102
events.go
Normal file
102
events.go
Normal file
|
|
@ -0,0 +1,102 @@
|
|||
// Copyright (c) 2017-2026 Lethean (https://lt.hn)
|
||||
// SPDX-License-Identifier: EUPL-1.2
|
||||
|
||||
package blockchain
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"context"
|
||||
|
||||
"dappco.re/go/core"
|
||||
)
|
||||
|
||||
// EventType identifies blockchain events.
|
||||
type EventType string
|
||||
|
||||
const (
|
||||
EventBlockNew EventType = "blockchain.block.new"
|
||||
EventAlias EventType = "blockchain.alias.registered"
|
||||
EventHardfork EventType = "blockchain.hardfork.activated"
|
||||
EventSyncProgress EventType = "blockchain.sync.progress"
|
||||
EventSyncComplete EventType = "blockchain.sync.complete"
|
||||
EventAssetDeployed EventType = "blockchain.asset.deployed"
|
||||
)
|
||||
|
||||
// Event is a blockchain event with typed data.
|
||||
//
|
||||
// event := blockchain.Event{Type: EventBlockNew, Data: blockHeader}
|
||||
type Event struct {
|
||||
Type EventType
|
||||
Height uint64
|
||||
Data interface{}
|
||||
}
|
||||
|
||||
// EventBus distributes blockchain events to subscribers.
|
||||
//
|
||||
// bus := blockchain.NewEventBus()
|
||||
// bus.Subscribe(blockchain.EventBlockNew, handler)
|
||||
// bus.Emit(blockchain.Event{Type: EventBlockNew, Height: 11500})
|
||||
type EventBus struct {
|
||||
mu sync.RWMutex
|
||||
handlers map[EventType][]EventHandler
|
||||
}
|
||||
|
||||
// EventHandler processes a blockchain event.
|
||||
type EventHandler func(Event)
|
||||
|
||||
// NewEventBus creates an event distribution bus.
|
||||
//
|
||||
// bus := blockchain.NewEventBus()
|
||||
func NewEventBus() *EventBus {
|
||||
return &EventBus{
|
||||
handlers: make(map[EventType][]EventHandler),
|
||||
}
|
||||
}
|
||||
|
||||
// Subscribe registers a handler for an event type.
|
||||
//
|
||||
// bus.Subscribe(blockchain.EventBlockNew, func(e blockchain.Event) {
|
||||
// core.Print(nil, "new block at %d", e.Height)
|
||||
// })
|
||||
func (b *EventBus) Subscribe(eventType EventType, handler EventHandler) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
b.handlers[eventType] = append(b.handlers[eventType], handler)
|
||||
}
|
||||
|
||||
// Emit sends an event to all registered handlers.
|
||||
//
|
||||
// bus.Emit(blockchain.Event{Type: EventBlockNew, Height: 11500})
|
||||
func (b *EventBus) Emit(event Event) {
|
||||
b.mu.RLock()
|
||||
handlers := b.handlers[event.Type]
|
||||
b.mu.RUnlock()
|
||||
|
||||
for _, h := range handlers {
|
||||
h(event)
|
||||
}
|
||||
}
|
||||
|
||||
// SubscriberCount returns the number of handlers for an event type.
|
||||
func (b *EventBus) SubscriberCount(eventType EventType) int {
|
||||
b.mu.RLock()
|
||||
defer b.mu.RUnlock()
|
||||
return len(b.handlers[eventType])
|
||||
}
|
||||
|
||||
// RegisterEventActions registers event-related Core actions.
|
||||
//
|
||||
// blockchain.RegisterEventActions(c, bus)
|
||||
func RegisterEventActions(c *core.Core, bus *EventBus) {
|
||||
c.Action("blockchain.events.subscribe", func(_ context.Context, opts core.Options) core.Result {
|
||||
return core.Result{Value: map[string]interface{}{
|
||||
"available_events": []string{
|
||||
string(EventBlockNew), string(EventAlias),
|
||||
string(EventHardfork), string(EventSyncProgress),
|
||||
string(EventSyncComplete), string(EventAssetDeployed),
|
||||
},
|
||||
"note": "subscribe via SSE at /events/blocks or core/stream",
|
||||
}, OK: true}
|
||||
})
|
||||
}
|
||||
58
events_test.go
Normal file
58
events_test.go
Normal file
|
|
@ -0,0 +1,58 @@
|
|||
package blockchain
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestEventBus_SubscribeEmit_Good(t *testing.T) {
|
||||
bus := NewEventBus()
|
||||
received := false
|
||||
bus.Subscribe(EventBlockNew, func(e Event) {
|
||||
received = true
|
||||
if e.Height != 11500 {
|
||||
t.Errorf("height: got %d, want 11500", e.Height)
|
||||
}
|
||||
})
|
||||
bus.Emit(Event{Type: EventBlockNew, Height: 11500})
|
||||
if !received {
|
||||
t.Error("handler was not called")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventBus_MultipleSubscribers_Good(t *testing.T) {
|
||||
bus := NewEventBus()
|
||||
count := 0
|
||||
bus.Subscribe(EventBlockNew, func(e Event) { count++ })
|
||||
bus.Subscribe(EventBlockNew, func(e Event) { count++ })
|
||||
bus.Subscribe(EventAlias, func(e Event) { count++ })
|
||||
|
||||
bus.Emit(Event{Type: EventBlockNew})
|
||||
if count != 2 {
|
||||
t.Errorf("expected 2 handlers called, got %d", count)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventBus_SubscriberCount_Good(t *testing.T) {
|
||||
bus := NewEventBus()
|
||||
bus.Subscribe(EventBlockNew, func(e Event) {})
|
||||
bus.Subscribe(EventBlockNew, func(e Event) {})
|
||||
if bus.SubscriberCount(EventBlockNew) != 2 {
|
||||
t.Error("expected 2 subscribers")
|
||||
}
|
||||
if bus.SubscriberCount(EventAlias) != 0 {
|
||||
t.Error("expected 0 subscribers for alias")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventBus_NoSubscribers_Ugly(t *testing.T) {
|
||||
bus := NewEventBus()
|
||||
// Should not panic with no subscribers
|
||||
bus.Emit(Event{Type: EventBlockNew, Height: 100})
|
||||
}
|
||||
|
||||
func TestEventBus_EventTypes_Good(t *testing.T) {
|
||||
if EventBlockNew != "blockchain.block.new" {
|
||||
t.Error("wrong event type string")
|
||||
}
|
||||
if EventHardfork != "blockchain.hardfork.activated" {
|
||||
t.Error("wrong hardfork event type")
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue