Skip to content

Instantly share code, notes, and snippets.

@moll-dev
Created September 24, 2024 20:43
Show Gist options
  • Select an option

  • Save moll-dev/2b767da57758957f7119dc45abdb78fb to your computer and use it in GitHub Desktop.

Select an option

Save moll-dev/2b767da57758957f7119dc45abdb78fb to your computer and use it in GitHub Desktop.
Jetstream Notes

jetstream/main:Jetstream

  1. Entire server is wrapped in an Application from the CLI library.
  2. main.go/Jetstream is the main function.
  3. Fetches ws-url from configuration (this is for the firehose)
  4. Sets up NewServer (see Server in server.go)
  5. Sets up a single firehose consumer, with a callback reference to the Jetstream server handler Emit. Also provides the server with a reference to the consumer (circular reference?)
  6. Creates a new schedule via (bsky/parallel) with a call back for c.HandleStreamEvent
  7. Sets up a goroutine to handle the cursor saving (ever 5 seconds via ticker).
    • Includes listeners for shutdownCursorManager
  8. Sets up a liveness checker, shuts down the app if the firehose is no longer pushing new events.
    • livenessKill is a key channel to control and stop the application.
  9. Sets up metrics (promethus) serving endpoint.
  10. Sets up /subscribe endpoint -> s.HandleSubscribe
  11. Sets up jetServer (HTTP server)
  12. Sets up echo server.
  13. Sets up metrics server.
  14. Checks if a cursor has been set, or if one is being provided to override. Else it's live.
  15. Attempts to dial the firehose websocket once
  16. eventsKill <- channel for closing application.
  17. Sets up event loop to handle stream.

jetstream/server:HandleSubscribe

  1. Takes an echo.Context from github.com/labstack/echo/v4
  2. Parses context request, builds a subscriber via parameters.
  3. Calls s.AddSubscriber with the upgraded websocket, ip, and other params.
  4. Defer call to s.RemoveSubscriber
  5. Replays traffic via goroutine if cursor isn't live.
  6. Fork -> handle any read messages
  7. Fork -> handle any write messages, sent from the subscriber

jetstream/server:AddSubscriber

  1. Acquire the server lock
  2. Build out all the wanted collections and DIDs
  3. Create a Subscriber with all the config given.
  4. Inc connected subscribers prom gauge.
  5. Unlock lk

jetstream/server:Emit

*Called by consumer

  1. Gets server read lock
  2. Increment perf counters (eventsEmitted)
  3. Emit to each subscriber concurrently, dropping slow subscribers.
    • Calls emitToSubscriber

server.go:Subscriber

type Subscriber struct {
	// Upgraded WS connection
	ws               *websocket.Conn
	realIP           string
	seq              int64
	// Channel for 
	buf              chan *[]byte
	id               int64
	cLk              sync.Mutex
	cursor           *int64
	compress         bool
	deliveredCounter prometheus.Counter
	bytesCounter     prometheus.Counter
	// wantedCollections is nil if the subscriber wants all collections
	wantedCollections *WantedCollections
	wantedDids        map[string]struct{}
	rl                *rate.Limiter
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment