When a leader session exits while a follower has pending events, the follower's attempt to become the new leader stalls indefinitely. This is caused by the Effect Worker mechanism not detecting worker termination and propagating it to pending streams.
Impact: Pending events remain stuck, GetLeaderSyncState times out after 1s.
Root Cause: Effect Worker streams don't error out when the underlying worker terminates, causing the SharedWorker to block new requests while old ones hang.
When the leader session exits while a follower has pending events, the follower's attempt to become the new leader stalls. The GetLeaderSyncState request times out, leaving the follower unable to sync its pending events.
CI=1 LIVESTORE_SYNC_LEADER_EXIT_REPRO=1 direnv exec . bunx vitest run \
tests/integration/src/tests/adapter-web/adapter-web.test.ts \
--testNamePattern "leader exit"Expected:
- Follower acquires the lock
- Follower updates its message port to become the new leader
- Follower syncs its pending events through the new leader
GetLeaderSyncStatereturns successfully- Pending count drops to 0
Actual:
- Follower acquires the lock ✓
- Follower tries to update message port ✓
- STALL:
GetLeaderSyncStaterequest never returns - Timeout after 1s:
TimeoutException: Operation timed out after '1s' - Pending count stays at 5
-
SharedWorker (
make-shared-worker.ts):- Singleton worker shared across all tabs
- Maintains
leaderWorkerContextSubRef- a subscription ref holding the current leader worker - Routes requests to the current leader via
forwardRequest/forwardRequestStream - Has
resetCurrentWorkerCtxto teardown previous leader when new one takes over
-
Client Session (
persisted-adapter.ts):- Each tab has a client session
- Uses
WebLockfor leader election (first tab gets lock = leader) - When becoming leader, calls
UpdateMessagePortto register with SharedWorker - Uses
waitForSharedWorkerInitializeddeferred to gate requests
-
Leader Worker (
make-leader-worker.ts):- Dedicated worker per-tab that only runs when tab is leader
- Handles sync operations, storage, etc.
- Leader (tab A) holds the lock
- Follower (tab B) waits on
WebLock.waitForDeferredLock - Leader exits (shutdown or tab close)
- Lock is released
- Follower acquires lock → enters
runLocked - Follower creates new dedicated worker + MessageChannel
- Follower calls
UpdateMessagePorton SharedWorker - SharedWorker's
UpdateMessagePort: a. CallsresetCurrentWorkerCtxto close previous worker scope b. Creates new worker pool from the new port c. SetsleaderWorkerContextSubRefto the new worker - Follower resolves
waitForSharedWorkerInitialized - Follower's pending requests can now flow through SharedWorker
4 x forwardRequestStream:start → streams initiated
3 x forwardRequestStream:shutdown → only 3 completed
1 x updateMessagePort:start (Tab A's only) → Tab B's never reaches handler
Tab B logs TMP shared worker update port start from the window (client side), then nothing - the message never reaches the SharedWorker handler.
Tab B Client SharedWorker Tab A Worker
| | |
|--- PullStream --------------->| |
| |--- worker.execute(PullStream) ->|
| | (waiting for events...) |
| | X (terminated)
| | (stream stuck, no error) |
|--- UpdateMessagePort -------->| |
| (blocked? queued?) | |
The issue is NOT in resetCurrentWorkerCtx or Scope.close. It occurs earlier:
- Tab B's
Worker.makePoolSerializedcreates a pool connected to the SharedWorker - Tab B's
PullStreamrequest is issued viarunInWorkerStream(creates an active stream fiber in client) - When Tab A (leader) exits, the SharedWorker's worker pool to Tab A's dedicated worker terminates
- The SharedWorker's
forwardRequestStreamfor Tab B'sPullStreamis stuck:- It called
worker.execute(req)whereworkeris the pool to Tab A's dedicated worker - That worker is now terminated, but the Effect Worker mechanism doesn't propagate the termination as an error
- The stream just hangs indefinitely
- It called
- When Tab B acquires the lock and tries to call
UpdateMessagePort:- Tab B's client-side worker pool has in-flight requests (the stuck
PullStream) - The Effect Worker serialization blocks new requests while old ones are pending
- Tab B's client-side worker pool has in-flight requests (the stuck
The core issue: The Effect Worker mechanism doesn't detect worker termination and propagate it to pending streams/requests.
The test (client session sync pending sticks after leader exit (explicit)) does:
- Opens two tabs (page1/A, page2/B) with boot delay of 80ms for B
- Tab A becomes leader first, Tab B waits for lock
- Both tabs create 5 todos (commits) each
- Tab B (follower) syncs 5 pending events through Tab A's leader
- Test sends shutdown message to leader (Tab A) only
- Tab B acquires lock and tries to become new leader
- STALL: Tab B's
UpdateMessagePortnever completes - Tab B's
GetLeaderSyncStaterequest times out after 1s - Tab B's pending events (5) remain stuck
Key parameters:
baseQuery:barrier=1&commitCount=5&timeoutMs=8000&disableFastPath=1&manualShutdown=1- Tab A:
sessionId=a&clientId=A&bootDelayMs=0 - Tab B:
sessionId=b&clientId=B&bootDelayMs=80
The root issue is that @effect/platform's Worker module doesn't detect when the underlying worker terminates. When Tab A's dedicated worker terminates, pending requests/streams to that worker should error out.
This would require changes to @effect/platform or a wrapper layer.
Add a timeout or interrupt mechanism to detect when the underlying worker stops responding:
const forwardRequestStream = <TReq>(req: TReq) =>
Effect.gen(function* () {
// ...existing code...
const stream = worker.execute(req)
// Add a heartbeat/timeout mechanism
return Stream.merge(stream, scopeShutdownStream, { haltStrategy: 'either' }).pipe(
Stream.timeoutFail({
duration: Duration.seconds(5),
onTimeout: () => new UnknownError({ cause: 'Worker stream timed out' }),
}),
)
})However, this doesn't solve the root cause and would slow down legitimate operations.
Before calling UpdateMessagePort, cancel any pending streams:
// In persisted-adapter.ts, before calling UpdateMessagePort
yield* interruptPendingStreams()
yield* sharedWorker.executeEffect(new WorkerSchema.SharedWorkerUpdateMessagePort(...))This requires tracking active stream fibers and interrupting them.
The UpdateMessagePort call is critical and should not be blocked by other operations. Use a separate worker pool or messaging channel for this:
// Create a dedicated channel for control messages
const controlWorker = yield* Worker.makeSerialized<ControlMessages>({ ... })
yield* controlWorker.executeEffect(new UpdateMessagePort(...))Instead of waiting on a scope finalizer (which can deadlock), use the leaderWorkerContextSubRef changes:
const forwardRequestStream = <TReq>(req: TReq) =>
Effect.gen(function* () {
const context = yield* SubscriptionRef.waitUntil(leaderWorkerContextSubRef, isNotUndefined)
const { worker, scope } = context
// Use the subscription to detect when context changes (new leader takes over)
const contextChangeStream = leaderWorkerContextSubRef.changes.pipe(
Stream.filter((ctx) => ctx !== context), // Context changed
Stream.take(1),
Stream.drain,
)
return Stream.merge(worker.execute(req), contextChangeStream, { haltStrategy: 'either' })
})Option 5 is the cleanest because:
- It uses existing state (
leaderWorkerContextSubRef) to detect leader changes - No circular dependency (doesn't wait for scope close)
- Streams terminate immediately when a new leader takes over
- No timeout heuristics needed
packages/@livestore/adapter-web/src/web-worker/shared-worker/make-shared-worker.tspackages/@livestore/adapter-web/src/web-worker/leader-worker/make-leader-worker.tspackages/@livestore/adapter-web/src/web-worker/client-session/persisted-adapter.tspackages/@livestore/common/src/sync/ClientSessionSyncProcessor.tspackages/@livestore/common/src/leader-thread/LeaderSyncProcessor.ts
- Implement Option 5 in
make-shared-worker.ts - Verify the fix works with the existing repro test
- Ensure no regressions in other adapter-web tests