Created
January 3, 2026 14:13
-
-
Save aliakseis/6e1c1a979990181ba6ff684bb450e1e0 to your computer and use it in GitHub Desktop.
Async download demo
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import yt_dlp | |
| import http.client | |
| import pprint | |
| # Save original implementation | |
| _original_request = http.client.HTTPSConnection.request | |
| def patched_request(self, method, url, body=None, headers={}, *, encode_chunked=False): | |
| print("------ HTTPS Request Tracing ------") | |
| print("Method:", method) | |
| print("URL:", url) | |
| print("Headers:") | |
| pprint.pprint(headers) | |
| if body: | |
| print("Body:", body) | |
| print("-----------------------------------") | |
| # Call the original method | |
| return _original_request(self, method, url, body, headers, encode_chunked=encode_chunked) | |
| # Install the patch | |
| http.client.HTTPSConnection.request = patched_request | |
| import requests | |
| from requests.adapters import HTTPAdapter | |
| class LoggingAdapter(HTTPAdapter): | |
| def send(self, request, **kwargs): | |
| print("----- Intercepted HTTPS Request -----") | |
| print(f"URL: {request.url}") | |
| print(f"Method: {request.method}") | |
| print("Headers:") | |
| for k, v in request.headers.items(): | |
| print(f" {k}: {v}") | |
| print("--------------------------------------") | |
| return super().send(request, **kwargs) | |
| # Create a session and mount the adapter for HTTPS. | |
| session = requests.Session() | |
| session.mount("https://", LoggingAdapter()) | |
| import curl_cffi | |
| import sys | |
| # Create a curl handle | |
| curl = curl_cffi.Curl() | |
| # Enable verbose mode so that libcurl outputs debugging information. | |
| curl.setopt(curl_cffi.CurlOpt.VERBOSE, 1) | |
| # Set the debug function. (Ensure that your version of curl_cffi supports this option.) | |
| # curl.setopt(curl_cffi.CurlOpt.DEBUGFUNCTION, debug_callback) | |
| import pprint | |
| # Save the original urlopen method | |
| _original_urlopen = yt_dlp.YoutubeDL.urlopen | |
| def _patched_urlopen(self, req): | |
| print("=== Intercepted Request Details ===") | |
| # Try to display many attributes of the request. | |
| try: | |
| # If the request object has a __dict__, print it: | |
| if hasattr(req, '__dict__'): | |
| pprint.pprint(vars(req)) | |
| else: | |
| # Otherwise, try known attributes | |
| details = { | |
| 'url': getattr(req, 'url', None), | |
| 'method': getattr(req, 'method', None), | |
| 'headers': getattr(req, 'headers', None), | |
| 'data': getattr(req, 'data', None), | |
| } | |
| pprint.pprint(details) | |
| except Exception as e: | |
| print("Error retrieving details:", e) | |
| print("===================================") | |
| return _original_urlopen(self, req) | |
| # Substitute the original with the patched version | |
| yt_dlp.YoutubeDL.urlopen = _patched_urlopen | |
| # Testing: create a YoutubeDL instance and trigger a URL open. | |
| ydl = yt_dlp.YoutubeDL() | |
| # Now, _patched_urlopen will be used whenever urlopen is called by YoutubeDL. | |
| def _human_readable(n): | |
| if n is None: | |
| return "unknown" | |
| units = ["B", "KB", "MB", "GB", "TB"] | |
| i = 0 | |
| while n >= 1024 and i < len(units)-1: | |
| n /= 1024.0 | |
| i += 1 | |
| return f"{n:.2f} {units[i]}" | |
| def progress_hook(d): | |
| # if d['status'] == 'downloading': | |
| # print(f"Downloaded {d['downloaded_bytes']} bytes of {d['total_bytes']} bytes") | |
| status = d.get('status') | |
| if status == 'downloading': | |
| downloaded = d.get('downloaded_bytes') or d.get('downloaded_bytes') == 0 and 0 | |
| total = d.get('total_bytes') or d.get('total_bytes_estimate') | |
| speed = d.get('speed') # bytes/sec or None | |
| eta = d.get('eta') # seconds or None | |
| downloaded_str = _human_readable(downloaded) if downloaded is not None else "0 B" | |
| total_str = _human_readable(total) | |
| speed_str = _human_readable(speed) + "/s" if speed else "unknown/s" | |
| eta_str = f"{int(eta)}s" if eta is not None else "unknown" | |
| if total: | |
| percent = (downloaded / total) * 100 if total and downloaded is not None else 0 | |
| print(f"Downloading: {percent:6.2f}% - {downloaded_str} of {total_str} at {speed_str} ETA {eta_str}") | |
| else: | |
| print(f"Downloading: {downloaded_str} of {total_str} at {speed_str} ETA {eta_str}") | |
| elif status == 'finished': | |
| filename = d.get('filename') or d.get('info_dict', {}).get('title', 'unknown') | |
| print(f"Finished downloading: {filename}") | |
| elif status == 'error': | |
| print("Download error:", d.get('error', 'unknown error')) | |
| async def download_video(url): | |
| ydl_opts = { | |
| 'format': 'best', | |
| 'outtmpl': '%(title)s.%(ext)s', | |
| 'progress_hooks': [progress_hook], | |
| 'verbose': True, # Enable verbose output | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| await asyncio.to_thread(ydl.download, [url]) | |
| async def main(): | |
| #url = 'https://www.youtube.com/watch?v=_PgFqojF168' | |
| url = 'https://www.youtube.com/watch?v=XJHS5YR8p2o' | |
| #url = 'https://www.tiktok.com/@rock_pop80/video/7436058771991874848' | |
| #url = 'https://www.tiktok.com/@rock_pop80/video/7460969601812237590' | |
| await download_video(url) | |
| # Run the main function | |
| asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment