Last active
January 1, 2026 13:13
-
-
Save tdadadavid/97c0b9838276c124a87eeda88c9a844f to your computer and use it in GitHub Desktop.
POC EventBus
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package eventstore | |
| import ( | |
| "errors" | |
| "fmt" | |
| "log/slog" | |
| "sync" | |
| "time" | |
| ) | |
| var ( | |
| ErrNoListenerForTopic = errors.New("no listener has been registered for topic") | |
| ErrEventPublishTimeout = errors.New("event could not be published within time range") | |
| ) | |
| type EventsBus interface { | |
| Publish(topic string, data EventData) error | |
| PublishWithoutData(topic string) | |
| Subscribe(topic string, listener []Listener) error | |
| } | |
| type EventBus struct { | |
| evChan chan Event | |
| listeners map[string][]Listener | |
| log *slog.Logger | |
| mu sync.RWMutex | |
| } | |
| func NewEventBus(logger *slog.Logger) (eb *EventBus) { | |
| eb = &EventBus{ | |
| evChan: make(chan Event), | |
| listeners: make(map[string][]Listener), | |
| log: logger, | |
| mu: sync.RWMutex{}, | |
| } | |
| go eb.dispatch() | |
| return eb | |
| } | |
| func (eb *EventBus) PublishWithoutData(topic string) { | |
| err := eb.Publish(topic, nil) | |
| if err != nil { | |
| eb.log.Error(fmt.Sprintf("error publishing event(%s): %v", topic, err)) | |
| return | |
| } | |
| } | |
| func (eb *EventBus) Publish(topic string, data EventData) (err error) { | |
| event := Event{topic: topic, data: data} | |
| timer := time.NewTimer(200 * time.Millisecond) | |
| defer timer.Stop() | |
| select { | |
| case eb.evChan <- event: | |
| return err | |
| case <-timer.C: | |
| return ErrEventPublishTimeout | |
| } | |
| } | |
| func (eb *EventBus) Subscribe(topic string, listeners []Listener) (err error) { | |
| if len(listeners) == 0 { | |
| return ErrNoListenerForTopic | |
| } | |
| eb.mu.Lock() | |
| defer eb.mu.Unlock() | |
| eb.listeners[topic] = append(eb.listeners[topic], listeners...) | |
| return err | |
| } | |
| func (eb *EventBus) dispatch() { | |
| for e := range eb.evChan { | |
| topic := e.Topic() | |
| eb.mu.RLock() | |
| listeners := eb.listeners[topic] | |
| if len(listeners) == 0 { | |
| eb.log.Error("no subscribed listener for event", "topic", e.Topic()) | |
| } | |
| eb.mu.RUnlock() | |
| for _, handler := range listeners { | |
| go func(e Event, handler Listener) { | |
| err := handler(e) | |
| if err != nil { | |
| eb.log.Error("error processing event", "topic", e.Topic(), "error", err) | |
| } | |
| }(e, handler) | |
| } | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // event register service | |
| func (app *Application) setupEventListeners() *Application { | |
| app.Bus.Subscribe(admin.NewCustomerComplaint, []eventstore.Listener{ | |
| admin.NotifyCustomerService(app.Services.MailService, app.Config, app.Logger), | |
| }) | |
| app.Logger.Info("Event listeners setup successfully") | |
| return app | |
| } | |
| //------ customer complaint service ----// | |
| func (cs CustomerService) CreateComplaint() { | |
| ...business logic | |
| err = cs.bus.Publish(NewCustomerComplaint, map[string]any{ | |
| "email": payload.Email, | |
| "description": "I can't pay $200 for OpenAI 🤦♂️" | |
| }) | |
| if err != nil { | |
| s.logger.Error("error publishing customer complaint event", "error", err) | |
| return nil, err | |
| } | |
| } | |
| // ----- event handler ----// | |
| func NotifyCustomerService( | |
| mail mailService.Mailer, | |
| config *config.Config, | |
| log *slog.Logger, | |
| ) eventstore.Listener { | |
| return func(e eventstore.Event) error { | |
| data := e.Data() | |
| if data == nil { | |
| return fmt.Errorf("invalid event data") | |
| } | |
| if err := mail.Send(context.Background(), mailService.Options{ | |
| From: config.MailFromAddress, | |
| To: to, | |
| Subject: fmt.Sprintf("New Customer Complaint Application: %s", jobTitle), | |
| Html: htmlBody.String(), | |
| }); err != nil { | |
| log.Error("failed to send complaint email", "err", err) | |
| return err | |
| } | |
| return nil | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package eventstore | |
| type EventData map[string]any | |
| type Event struct { | |
| topic string | |
| data EventData | |
| } | |
| func (e *Event) Topic() string { | |
| return e.topic | |
| } | |
| func (e *Event) Data() EventData { | |
| return e.data | |
| } | |
| type Listener func(Event) error |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment