diff --git a/events.go b/events.go new file mode 100644 index 0000000..cc8e185 --- /dev/null +++ b/events.go @@ -0,0 +1,102 @@ +// Copyright (c) 2017-2026 Lethean (https://lt.hn) +// SPDX-License-Identifier: EUPL-1.2 + +package blockchain + +import ( + "sync" + + "context" + + "dappco.re/go/core" +) + +// EventType identifies blockchain events. +type EventType string + +const ( + EventBlockNew EventType = "blockchain.block.new" + EventAlias EventType = "blockchain.alias.registered" + EventHardfork EventType = "blockchain.hardfork.activated" + EventSyncProgress EventType = "blockchain.sync.progress" + EventSyncComplete EventType = "blockchain.sync.complete" + EventAssetDeployed EventType = "blockchain.asset.deployed" +) + +// Event is a blockchain event with typed data. +// +// event := blockchain.Event{Type: EventBlockNew, Data: blockHeader} +type Event struct { + Type EventType + Height uint64 + Data interface{} +} + +// EventBus distributes blockchain events to subscribers. +// +// bus := blockchain.NewEventBus() +// bus.Subscribe(blockchain.EventBlockNew, handler) +// bus.Emit(blockchain.Event{Type: EventBlockNew, Height: 11500}) +type EventBus struct { + mu sync.RWMutex + handlers map[EventType][]EventHandler +} + +// EventHandler processes a blockchain event. +type EventHandler func(Event) + +// NewEventBus creates an event distribution bus. +// +// bus := blockchain.NewEventBus() +func NewEventBus() *EventBus { + return &EventBus{ + handlers: make(map[EventType][]EventHandler), + } +} + +// Subscribe registers a handler for an event type. +// +// bus.Subscribe(blockchain.EventBlockNew, func(e blockchain.Event) { +// core.Print(nil, "new block at %d", e.Height) +// }) +func (b *EventBus) Subscribe(eventType EventType, handler EventHandler) { + b.mu.Lock() + defer b.mu.Unlock() + b.handlers[eventType] = append(b.handlers[eventType], handler) +} + +// Emit sends an event to all registered handlers. +// +// bus.Emit(blockchain.Event{Type: EventBlockNew, Height: 11500}) +func (b *EventBus) Emit(event Event) { + b.mu.RLock() + handlers := b.handlers[event.Type] + b.mu.RUnlock() + + for _, h := range handlers { + h(event) + } +} + +// SubscriberCount returns the number of handlers for an event type. +func (b *EventBus) SubscriberCount(eventType EventType) int { + b.mu.RLock() + defer b.mu.RUnlock() + return len(b.handlers[eventType]) +} + +// RegisterEventActions registers event-related Core actions. +// +// blockchain.RegisterEventActions(c, bus) +func RegisterEventActions(c *core.Core, bus *EventBus) { + c.Action("blockchain.events.subscribe", func(_ context.Context, opts core.Options) core.Result { + return core.Result{Value: map[string]interface{}{ + "available_events": []string{ + string(EventBlockNew), string(EventAlias), + string(EventHardfork), string(EventSyncProgress), + string(EventSyncComplete), string(EventAssetDeployed), + }, + "note": "subscribe via SSE at /events/blocks or core/stream", + }, OK: true} + }) +} diff --git a/events_test.go b/events_test.go new file mode 100644 index 0000000..d6cc707 --- /dev/null +++ b/events_test.go @@ -0,0 +1,58 @@ +package blockchain + +import "testing" + +func TestEventBus_SubscribeEmit_Good(t *testing.T) { + bus := NewEventBus() + received := false + bus.Subscribe(EventBlockNew, func(e Event) { + received = true + if e.Height != 11500 { + t.Errorf("height: got %d, want 11500", e.Height) + } + }) + bus.Emit(Event{Type: EventBlockNew, Height: 11500}) + if !received { + t.Error("handler was not called") + } +} + +func TestEventBus_MultipleSubscribers_Good(t *testing.T) { + bus := NewEventBus() + count := 0 + bus.Subscribe(EventBlockNew, func(e Event) { count++ }) + bus.Subscribe(EventBlockNew, func(e Event) { count++ }) + bus.Subscribe(EventAlias, func(e Event) { count++ }) + + bus.Emit(Event{Type: EventBlockNew}) + if count != 2 { + t.Errorf("expected 2 handlers called, got %d", count) + } +} + +func TestEventBus_SubscriberCount_Good(t *testing.T) { + bus := NewEventBus() + bus.Subscribe(EventBlockNew, func(e Event) {}) + bus.Subscribe(EventBlockNew, func(e Event) {}) + if bus.SubscriberCount(EventBlockNew) != 2 { + t.Error("expected 2 subscribers") + } + if bus.SubscriberCount(EventAlias) != 0 { + t.Error("expected 0 subscribers for alias") + } +} + +func TestEventBus_NoSubscribers_Ugly(t *testing.T) { + bus := NewEventBus() + // Should not panic with no subscribers + bus.Emit(Event{Type: EventBlockNew, Height: 100}) +} + +func TestEventBus_EventTypes_Good(t *testing.T) { + if EventBlockNew != "blockchain.block.new" { + t.Error("wrong event type string") + } + if EventHardfork != "blockchain.hardfork.activated" { + t.Error("wrong hardfork event type") + } +}