Skip to content

Instantly share code, notes, and snippets.

@tdadadavid
Last active January 1, 2026 13:13
Show Gist options
  • Select an option

  • Save tdadadavid/97c0b9838276c124a87eeda88c9a844f to your computer and use it in GitHub Desktop.

Select an option

Save tdadadavid/97c0b9838276c124a87eeda88c9a844f to your computer and use it in GitHub Desktop.
POC EventBus
package eventstore
import (
"errors"
"fmt"
"log/slog"
"sync"
"time"
)
var (
ErrNoListenerForTopic = errors.New("no listener has been registered for topic")
ErrEventPublishTimeout = errors.New("event could not be published within time range")
)
type EventsBus interface {
Publish(topic string, data EventData) error
PublishWithoutData(topic string)
Subscribe(topic string, listener []Listener) error
}
type EventBus struct {
evChan chan Event
listeners map[string][]Listener
log *slog.Logger
mu sync.RWMutex
}
func NewEventBus(logger *slog.Logger) (eb *EventBus) {
eb = &EventBus{
evChan: make(chan Event),
listeners: make(map[string][]Listener),
log: logger,
mu: sync.RWMutex{},
}
go eb.dispatch()
return eb
}
func (eb *EventBus) PublishWithoutData(topic string) {
err := eb.Publish(topic, nil)
if err != nil {
eb.log.Error(fmt.Sprintf("error publishing event(%s): %v", topic, err))
return
}
}
func (eb *EventBus) Publish(topic string, data EventData) (err error) {
event := Event{topic: topic, data: data}
timer := time.NewTimer(200 * time.Millisecond)
defer timer.Stop()
select {
case eb.evChan <- event:
return err
case <-timer.C:
return ErrEventPublishTimeout
}
}
func (eb *EventBus) Subscribe(topic string, listeners []Listener) (err error) {
if len(listeners) == 0 {
return ErrNoListenerForTopic
}
eb.mu.Lock()
defer eb.mu.Unlock()
eb.listeners[topic] = append(eb.listeners[topic], listeners...)
return err
}
func (eb *EventBus) dispatch() {
for e := range eb.evChan {
topic := e.Topic()
eb.mu.RLock()
listeners := eb.listeners[topic]
if len(listeners) == 0 {
eb.log.Error("no subscribed listener for event", "topic", e.Topic())
}
eb.mu.RUnlock()
for _, handler := range listeners {
go func(e Event, handler Listener) {
err := handler(e)
if err != nil {
eb.log.Error("error processing event", "topic", e.Topic(), "error", err)
}
}(e, handler)
}
}
}
// event register service
func (app *Application) setupEventListeners() *Application {
app.Bus.Subscribe(admin.NewCustomerComplaint, []eventstore.Listener{
admin.NotifyCustomerService(app.Services.MailService, app.Config, app.Logger),
})
app.Logger.Info("Event listeners setup successfully")
return app
}
//------ customer complaint service ----//
func (cs CustomerService) CreateComplaint() {
...business logic
err = cs.bus.Publish(NewCustomerComplaint, map[string]any{
"email": payload.Email,
"description": "I can't pay $200 for OpenAI 🤦‍♂️"
})
if err != nil {
s.logger.Error("error publishing customer complaint event", "error", err)
return nil, err
}
}
// ----- event handler ----//
func NotifyCustomerService(
mail mailService.Mailer,
config *config.Config,
log *slog.Logger,
) eventstore.Listener {
return func(e eventstore.Event) error {
data := e.Data()
if data == nil {
return fmt.Errorf("invalid event data")
}
if err := mail.Send(context.Background(), mailService.Options{
From: config.MailFromAddress,
To: to,
Subject: fmt.Sprintf("New Customer Complaint Application: %s", jobTitle),
Html: htmlBody.String(),
}); err != nil {
log.Error("failed to send complaint email", "err", err)
return err
}
return nil
}
}
package eventstore
type EventData map[string]any
type Event struct {
topic string
data EventData
}
func (e *Event) Topic() string {
return e.topic
}
func (e *Event) Data() EventData {
return e.data
}
type Listener func(Event) error
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment