Skip to content

Instantly share code, notes, and snippets.

@Santhin
Created July 14, 2025 07:04
Show Gist options
  • Select an option

  • Save Santhin/44b1b048fe6312d1060922b3941f225c to your computer and use it in GitHub Desktop.

Select an option

Save Santhin/44b1b048fe6312d1060922b3941f225c to your computer and use it in GitHub Desktop.
async def fetch_url_with_pubsub(self, url: str, cache_ttl: int = 300) -> Dict:
"""Fetch URL with pub/sub pattern to prevent cache stampede"""
cache_key = f"{CACHE_KEY_PREFIX}{url}"
channel_key = f"{PUBSUB_CHANNEL_PREFIX}{url}"
# Check cache first
cached_result = await self.redis_client.get(cache_key)
if cached_result:
self.cache_stats.increment_hits()
logger.info(f"Cache HIT for {url}")
return json.loads(cached_result)
self.cache_stats.increment_misses()
logger.info(f"Cache MISS for {url}")
# Try to become the publisher
is_publisher = await self.redis_client.set(f"publisher:{url}", "active", ex=60, nx=True)
if is_publisher:
logger.info(f"Became PUBLISHER for {url}")
try:
# Double-check cache
cached_result = await self.redis_client.get(cache_key)
if cached_result:
self.cache_stats.increment_hits()
return json.loads(cached_result)
# Fetch from source
result = await self.fetch_url_basic(url)
# Cache the result
await self.redis_client.setex(cache_key, cache_ttl, json.dumps(result))
# Publish to subscribers
await self.redis_client.publish(channel_key, json.dumps(result))
return result
finally:
await self.redis_client.delete(f"publisher:{url}")
else:
logger.info(f"Waiting for PUBLISHER result for {url}")
# Subscribe and wait for result
pubsub = self.redis_client.pubsub()
await pubsub.subscribe(channel_key)
try:
# Wait for published result with timeout
async for message in pubsub.listen():
if message["type"] == "message":
result = json.loads(message["data"])
logger.info(f"Received published result for {url}")
return result
# Timeout after 30 seconds
await asyncio.sleep(0.1)
except asyncio.TimeoutError:
logger.warning(f"Timeout waiting for published result for {url}")
return await self.fetch_url_basic(url)
finally:
await pubsub.unsubscribe(channel_key)
await pubsub.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment