Using events to drive Klarhet

247 words · 1 min read · 142 views

One decision I took early on while building Klarhet was that I wanted the system to be event driven. This turned out to be a great decision as it has enabled me to create new features in the system without having to change existing logic too much.

My setup is pretty simple, as my use case is simple. A simple in-memory eventbus is initialized at startup, and services register their subscriptions to it.

At its core it's defined as a simple struct with two methods on it

type EventBus struct {
    mu             sync.RWMutex
    handlers       map[string][]func(Event)
}

func (b *EventBus) Subscribe(topic string, fn func(Event)) {
    b.mu.Lock()
    defer b.mu.Unlock()
    b.handlers[topic] = append(b.handlers[topic], fn)
}

func (b *EventBus) Publish(topic string, payload any) {
    data, err := json.Marshal(payload)
    if err != nil {
        log.Printf("event: failed to marshal payload for %s: %v", topic, err)
        return
    }

    evt := Event{
        Topic: topic,
        Payload: data,
        CreatedAt: time.Now(),
    }

    b.mu.RLock()
    defer b.mu.RUnlock()
    for _, fn := range b.handlers[topic] {
        fn(evt)
    }
}

An example service that listens to events is the search indexer. It subscribes to 3 topics and now we can easily update our search index without explicitly having the code in the http handler.

func (i *SearchIndexer) Subscribe(bus *domain.EventBus) {
    bus.Subscribe(domain.EventTicketCreated, i.onTicketChanged)
    bus.Subscribe(domain.EventTicketUpdated, i.onTicketChanged)
    bus.Subscribe(domain.EventCommentAdded, i.onCommentAdded)
}

I'm handling mentions in a similar way, but that service is even more fun because it also talks to the SSE hub, sending out notifications to the browser. More on that later!