Created
July 14, 2025 07:04
-
-
Save Santhin/44b1b048fe6312d1060922b3941f225c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| async def fetch_url_with_pubsub(self, url: str, cache_ttl: int = 300) -> Dict: | |
| """Fetch URL with pub/sub pattern to prevent cache stampede""" | |
| cache_key = f"{CACHE_KEY_PREFIX}{url}" | |
| channel_key = f"{PUBSUB_CHANNEL_PREFIX}{url}" | |
| # Check cache first | |
| cached_result = await self.redis_client.get(cache_key) | |
| if cached_result: | |
| self.cache_stats.increment_hits() | |
| logger.info(f"Cache HIT for {url}") | |
| return json.loads(cached_result) | |
| self.cache_stats.increment_misses() | |
| logger.info(f"Cache MISS for {url}") | |
| # Try to become the publisher | |
| is_publisher = await self.redis_client.set(f"publisher:{url}", "active", ex=60, nx=True) | |
| if is_publisher: | |
| logger.info(f"Became PUBLISHER for {url}") | |
| try: | |
| # Double-check cache | |
| cached_result = await self.redis_client.get(cache_key) | |
| if cached_result: | |
| self.cache_stats.increment_hits() | |
| return json.loads(cached_result) | |
| # Fetch from source | |
| result = await self.fetch_url_basic(url) | |
| # Cache the result | |
| await self.redis_client.setex(cache_key, cache_ttl, json.dumps(result)) | |
| # Publish to subscribers | |
| await self.redis_client.publish(channel_key, json.dumps(result)) | |
| return result | |
| finally: | |
| await self.redis_client.delete(f"publisher:{url}") | |
| else: | |
| logger.info(f"Waiting for PUBLISHER result for {url}") | |
| # Subscribe and wait for result | |
| pubsub = self.redis_client.pubsub() | |
| await pubsub.subscribe(channel_key) | |
| try: | |
| # Wait for published result with timeout | |
| async for message in pubsub.listen(): | |
| if message["type"] == "message": | |
| result = json.loads(message["data"]) | |
| logger.info(f"Received published result for {url}") | |
| return result | |
| # Timeout after 30 seconds | |
| await asyncio.sleep(0.1) | |
| except asyncio.TimeoutError: | |
| logger.warning(f"Timeout waiting for published result for {url}") | |
| return await self.fetch_url_basic(url) | |
| finally: | |
| await pubsub.unsubscribe(channel_key) | |
| await pubsub.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment