- Entire server is wrapped in an Application from the CLI library.
main.go/Jetstreamis the main function.- Fetches
ws-urlfrom configuration (this is for the firehose) - Sets up
NewServer(seeServerinserver.go) - Sets up a single firehose consumer, with a callback reference to the Jetstream server handler
Emit. Also provides the server with a reference to the consumer (circular reference?) - Creates a new schedule via (bsky/parallel) with a call back for
c.HandleStreamEvent - Sets up a goroutine to handle the cursor saving (ever 5 seconds via ticker).
- Includes listeners for
shutdownCursorManager
- Includes listeners for
- Sets up a liveness checker, shuts down the app if the firehose is no longer pushing new events.
livenessKillis a key channel to control and stop the application.
- Sets up metrics (promethus) serving endpoint.
- Sets up
/subscribeendpoint ->s.HandleSubscribe - Sets up
jetServer(HTTP server) - Sets up echo server.
- Sets up metrics server.
- Checks if a cursor has been set, or if one is being provided to override. Else it's live.
- Attempts to dial the firehose websocket once
eventsKill<- channel for closing application.- Sets up event loop to handle stream.
- Takes an
echo.Contextfrom github.com/labstack/echo/v4 - Parses context request, builds a subscriber via parameters.
- Calls
s.AddSubscriberwith the upgraded websocket, ip, and other params. - Defer call to
s.RemoveSubscriber - Replays traffic via goroutine if cursor isn't live.
- Fork -> handle any read messages
- Fork -> handle any write messages, sent from the subscriber
- Acquire the server lock
- Build out all the wanted collections and DIDs
- Create a
Subscriberwith all the config given. - Inc connected subscribers prom gauge.
- Unlock
lk
*Called by consumer
- Gets server read lock
- Increment perf counters (eventsEmitted)
- Emit to each subscriber concurrently, dropping slow subscribers.
- Calls
emitToSubscriber
- Calls
type Subscriber struct {
// Upgraded WS connection
ws *websocket.Conn
realIP string
seq int64
// Channel for
buf chan *[]byte
id int64
cLk sync.Mutex
cursor *int64
compress bool
deliveredCounter prometheus.Counter
bytesCounter prometheus.Counter
// wantedCollections is nil if the subscriber wants all collections
wantedCollections *WantedCollections
wantedDids map[string]struct{}
rl *rate.Limiter
}