-
-
Save Infinitay/9ae1b2f156d508ae44e7a6bdfabd7b91 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import requests | |
| import threading | |
| import queue | |
| import re | |
| from dataclasses import dataclass | |
| from typing import List, Optional | |
| import time | |
| import os | |
| from urllib.parse import urlparse | |
| CLIENT_ID = 'YOUR_TWITCH_CLIENT_ID' | |
| CLIENT_SECRET = 'YOUR_TWITCH_CLIENT_SECRET' | |
| TOP_STREAMERS = 100 | |
| VODS_PER_STREAMER = 10 | |
| # --- Filtering Options --- | |
| FILTER_LANGUAGE = None # Set to language code like 'en', 'es', 'pt', 'fr', 'de', 'ja', 'ko', 'zh', etc. | |
| FILTER_COUNTRY = None # Set to country code like 'US', 'BR', 'MX', 'CA', 'GB', 'DE', 'FR', etc. | |
| FILTER_GAME_ID = None # Set to specific game ID if you want to filter by game | |
| @dataclass | |
| class Token: | |
| Signature: str | |
| Token: str | |
| @dataclass | |
| class FeedsOption: | |
| AllowSource: bool | |
| Player: str | |
| AllowSpectre: bool | |
| AllowAudioOnly: bool | |
| IncludeFramerate: bool | |
| @dataclass | |
| class Options: | |
| Timeout: int | |
| AllowSource: bool | |
| Player: str | |
| AllowSpectre: bool | |
| AllowAudioOnly: bool | |
| IncludeFramerate: bool | |
| # --- Twitch Helix API helpers --- | |
| def get_app_access_token(client_id: str, client_secret: str) -> str: | |
| url = "https://id.twitch.tv/oauth2/token" | |
| params = { | |
| "client_id": client_id, | |
| "client_secret": client_secret, | |
| "grant_type": "client_credentials" | |
| } | |
| resp = requests.post(url, params=params) | |
| resp.raise_for_status() | |
| return resp.json()["access_token"] | |
| def get_top_streamers(token: str, client_id: str, limit: int = 1000, language: Optional[str] = None, country: Optional[str] = None, game_id: Optional[str] = None) -> List[str]: | |
| url = "https://api.twitch.tv/helix/streams" | |
| headers = {"Authorization": f"Bearer {token}", "Client-ID": client_id} | |
| user_ids = [] | |
| params = {"first": 100} | |
| # Add filtering parameters | |
| if language: | |
| params["language"] = language | |
| if country: | |
| params["country"] = country | |
| if game_id: | |
| params["game_id"] = game_id | |
| cursor = None | |
| batch_count = 0 | |
| while len(user_ids) < limit: | |
| batch_count += 1 | |
| if cursor: | |
| params["after"] = cursor | |
| resp = requests.get(url, headers=headers, params=params) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| batch_streams = len(data["data"]) | |
| print(f" Batch {batch_count}: {batch_streams} streams (total: {len(user_ids)})") | |
| for stream in data["data"]: | |
| user_ids.append(stream["user_id"]) | |
| if len(user_ids) >= limit: | |
| break | |
| cursor = data.get("pagination", {}).get("cursor") | |
| if not cursor: | |
| break | |
| time.sleep(2) # avoid rate limits | |
| print(f" Total streams collected: {len(user_ids)}") | |
| return user_ids | |
| def get_vods_for_user(user_id: str, token: str, client_id: str, max_vods: int = 5) -> List[str]: | |
| url = "https://api.twitch.tv/helix/videos" | |
| headers = {"Authorization": f"Bearer {token}", "Client-ID": client_id} | |
| params = {"user_id": user_id, "type": "archive", "first": max_vods} | |
| resp = requests.get(url, headers=headers, params=params) | |
| resp.raise_for_status() | |
| return [vod["id"] for vod in resp.json()["data"]] | |
| # --- Real TwitchGQL Implementation --- | |
| def retrieve_token_gql(vod_id: int, session: requests.Session) -> Token: | |
| url = "https://gql.twitch.tv/gql" | |
| headers = { | |
| "Client-ID": "kimne78kx3ncx6brgo4mv6wki5h1ko", | |
| "Content-Type": "application/json", | |
| "Accept": "application/json", | |
| } | |
| payload = [{ | |
| "operationName": "PlaybackAccessToken", | |
| "extensions": { | |
| "persistedQuery": { | |
| "version": 1, | |
| "sha256Hash": "0828119ded1c13477966434e15800ff57ddacf13ba1911c129dc2200705b0712" | |
| } | |
| }, | |
| "variables": { | |
| "isLive": False, | |
| "login": "", | |
| "isVod": True, | |
| "vodID": str(vod_id), | |
| "playerType": "embed" | |
| } | |
| }] | |
| resp = session.post(url, headers=headers, json=payload, timeout=10) | |
| resp.raise_for_status() | |
| gql_resp = resp.json()[0] | |
| data = gql_resp.get("data", {}).get("videoPlaybackAccessToken") | |
| if not data: | |
| print("[DEBUG] Unexpected GQL response:", gql_resp) | |
| raise Exception("Failed to get videoPlaybackAccessToken for VOD ID {}".format(vod_id)) | |
| return Token(Signature=data["signature"], Token=data["value"]) | |
| # --- API Logic --- | |
| def fetch_feeds(vod_id: str, session: requests.Session, options: FeedsOption) -> List[str]: | |
| feed_urls = [] | |
| token = retrieve_token_gql(int(vod_id), session) | |
| feed_options = ( | |
| f"&allow_source={str(options.AllowSource).lower()}" | |
| f"&player={options.Player}" | |
| f"&allow_spectre={str(options.AllowSpectre).lower()}" | |
| f"&allow_audio_only={str(options.AllowAudioOnly).lower()}" | |
| f"&playlist_include_framerate={str(options.IncludeFramerate).lower()}" | |
| ) | |
| url = f"https://usher.ttvnw.net/vod/{vod_id}.m3u8?sig={token.Signature}&token={token.Token}{feed_options}" | |
| resp = session.get(url) | |
| if resp.status_code != 200: | |
| return feed_urls | |
| for line in resp.text.splitlines(): | |
| if not line.startswith('#'): | |
| feed_urls.append(line) | |
| return feed_urls | |
| # --- Retriever Logic --- | |
| host_re = re.compile(r"https://([a-z0-9-]+\.[a-z]+\.[a-z]{2,3})/") | |
| def unique_host(hosts: List[str], host: str) -> bool: | |
| return host not in hosts | |
| def retrieve_hosts(vod_id: str, session: requests.Session, feeds_options: FeedsOption, new_hosts: queue.Queue): | |
| hosts = [] | |
| max_retries = 3 | |
| delay = 2 | |
| for attempt in range(1, max_retries + 1): | |
| try: | |
| feeds_response = fetch_feeds(vod_id, session, feeds_options) | |
| for f in feeds_response: | |
| match = host_re.match(f) | |
| if match and unique_host(hosts, match.group(1)): | |
| host = match.group(1) | |
| hosts.append(host) | |
| # print(f" [DOMAIN FOUND] VOD {vod_id}: {host}") | |
| break # Success, exit retry loop | |
| except Exception as e: | |
| if attempt == max_retries: | |
| print(f"[ERROR] VOD {vod_id}: {e}") | |
| else: | |
| time.sleep(delay) | |
| new_hosts.put(hosts) | |
| # --- Main Logic --- | |
| def combine_results(new_hosts: queue.Queue, hosts: queue.Queue, num_tasks: int): | |
| results = [] | |
| for _ in range(num_tasks): | |
| host_list = new_hosts.get() | |
| results.extend(host_list) | |
| hosts.put(results) | |
| def unique_results(results: List[str]) -> List[str]: | |
| occured = set() | |
| unique_results = [] | |
| for r in results: | |
| if r not in occured: | |
| occured.add(r) | |
| unique_results.append(r) | |
| return unique_results | |
| def retrieve(vod_ids: List[str], options: Options) -> List[str]: | |
| session = requests.Session() | |
| session.timeout = options.Timeout | |
| feeds_options = FeedsOption( | |
| AllowSource=options.AllowSource, | |
| Player=options.Player, | |
| AllowSpectre=options.AllowSpectre, | |
| AllowAudioOnly=options.AllowAudioOnly, | |
| IncludeFramerate=options.IncludeFramerate, | |
| ) | |
| new_hosts = queue.Queue() | |
| hosts = queue.Queue() | |
| threads = [] | |
| for vod in vod_ids: | |
| t = threading.Thread(target=retrieve_hosts, args=(str(vod), session, feeds_options, new_hosts)) | |
| t.start() | |
| threads.append(t) | |
| combiner = threading.Thread(target=combine_results, args=(new_hosts, hosts, len(vod_ids))) | |
| combiner.start() | |
| for t in threads: | |
| t.join() | |
| combiner.join() | |
| results = hosts.get() | |
| return unique_results(results) | |
| # --- Progress saving functions --- | |
| def save_domains_to_file(domains_file: str, new_hostnames: List[str], existing_lines: List[str]): | |
| """Save new domains to file, updating existing lines if needed""" | |
| lines = existing_lines.copy() | |
| for hostname in new_hostnames: | |
| found = False | |
| # Try to find a line with the same base domain | |
| for i, line in enumerate(lines): | |
| if extract_hostname(line) == hostname: | |
| found = True | |
| break | |
| # If the hostname is not in the line but the line contains a full URL with the same hostname | |
| if hostname in [extract_hostname(part) for part in line.split()]: | |
| found = True | |
| break | |
| # If the line contains a full URL with the same hostname, append | |
| if hostname in line: | |
| found = True | |
| break | |
| if not found: | |
| # Try to append to a line with the same full URL | |
| for i, line in enumerate(lines): | |
| if extract_hostname(line) in hostname: | |
| lines[i] = line + ' ' + hostname | |
| found = True | |
| break | |
| if not found: | |
| # Otherwise, add as a new line | |
| lines.append(hostname) | |
| # Write all lines back | |
| with open(domains_file, "w", encoding="utf-8") as f: | |
| for line in lines: | |
| f.write(line + "\n") | |
| return lines | |
| def load_existing_domains(domains_file: str): | |
| """Load existing domains from file""" | |
| existing_domains = set() | |
| hostname_to_line = {} | |
| lines = [] | |
| if os.path.exists(domains_file): | |
| with open(domains_file, "r", encoding="utf-8") as f: | |
| for line in f: | |
| orig_line = line.rstrip('\n') | |
| if not orig_line.strip(): | |
| continue | |
| # Split by whitespace to support multiple hostnames per line | |
| parts = orig_line.split() | |
| for part in parts: | |
| hostname = extract_hostname(part) | |
| existing_domains.add(hostname) | |
| hostname_to_line[hostname] = orig_line | |
| lines.append(orig_line) | |
| return existing_domains, hostname_to_line, lines | |
| # --- Main entry for top streamer VOD domain retrieval --- | |
| def extract_hostname(domain): | |
| # Remove protocol and path, just get the netloc/hostname | |
| if '://' in domain: | |
| return urlparse(domain).netloc | |
| return domain.split('/')[0] | |
| def main(): | |
| # Read existing domains from local domains.txt | |
| domains_file = os.path.join(os.path.dirname(__file__), "domains.txt") | |
| existing_domains, hostname_to_line, lines = load_existing_domains(domains_file) | |
| print("Getting app access token...") | |
| token = get_app_access_token(CLIENT_ID, CLIENT_SECRET) | |
| print("Getting top streamers...") | |
| # Show current filtering settings | |
| if FILTER_LANGUAGE or FILTER_COUNTRY or FILTER_GAME_ID: | |
| print("Using filters:") | |
| if FILTER_LANGUAGE: | |
| print(f" Language: {FILTER_LANGUAGE}") | |
| if FILTER_COUNTRY: | |
| print(f" Country: {FILTER_COUNTRY}") | |
| if FILTER_GAME_ID: | |
| print(f" Game ID: {FILTER_GAME_ID}") | |
| user_ids = get_top_streamers( | |
| token, | |
| CLIENT_ID, | |
| limit=TOP_STREAMERS, | |
| language=FILTER_LANGUAGE, | |
| country=FILTER_COUNTRY, | |
| game_id=FILTER_GAME_ID | |
| ) | |
| print(f"Got {len(user_ids)} streamers. Getting VODs...") | |
| vod_ids = [] | |
| for idx, uid in enumerate(user_ids, 1): | |
| print(f"Fetching VODs for streamer {idx}/{len(user_ids)} (user_id={uid})...") | |
| try: | |
| vods = get_vods_for_user(uid, token, CLIENT_ID, max_vods=VODS_PER_STREAMER) | |
| print(f" Got {len(vods)} VODs for user {uid}.") | |
| vod_ids.extend(vods) | |
| except Exception as e: | |
| print(f"[ERROR] Could not get VODs for user {uid}: {e}") | |
| time.sleep(0.5) | |
| print(f"Got {len(vod_ids)} VODs. Retrieving domains...") | |
| options = Options( | |
| Timeout=10, | |
| AllowSource=True, | |
| Player="site", | |
| AllowSpectre=False, | |
| AllowAudioOnly=False, | |
| IncludeFramerate=False, | |
| ) | |
| # Process VODs in batches to save progress incrementally | |
| batch_size = 50 # Process 50 VODs at a time | |
| total_vods = len(vod_ids) | |
| all_new_hostnames = [] | |
| for batch_start in range(0, total_vods, batch_size): | |
| batch_end = min(batch_start + batch_size, total_vods) | |
| batch_vods = vod_ids[batch_start:batch_end] | |
| print(f"\nProcessing batch {batch_start//batch_size + 1}/{(total_vods + batch_size - 1)//batch_size} (VODs {batch_start+1}-{batch_end}/{total_vods})...") | |
| # Process this batch | |
| session = requests.Session() | |
| session.timeout = options.Timeout | |
| feeds_options = FeedsOption( | |
| AllowSource=options.AllowSource, | |
| Player=options.Player, | |
| AllowSpectre=options.AllowSpectre, | |
| AllowAudioOnly=options.AllowAudioOnly, | |
| IncludeFramerate=options.IncludeFramerate, | |
| ) | |
| new_hosts = queue.Queue() | |
| hosts = queue.Queue() | |
| threads = [] | |
| for idx, vod in enumerate(batch_vods, 1): | |
| vod_idx = batch_start + idx | |
| print(f" Processing VOD {vod_idx}/{total_vods} (id={vod})...") | |
| t = threading.Thread(target=retrieve_hosts, args=(str(vod), session, feeds_options, new_hosts)) | |
| t.start() | |
| threads.append(t) | |
| combiner = threading.Thread(target=combine_results, args=(new_hosts, hosts, len(batch_vods))) | |
| combiner.start() | |
| for t in threads: | |
| t.join() | |
| combiner.join() | |
| batch_results = hosts.get() | |
| batch_domains = unique_results(batch_results) | |
| # Normalize new domains to hostnames before comparing and saving | |
| batch_new_hostnames = [extract_hostname(x) for x in batch_domains if extract_hostname(x) not in existing_domains] | |
| if batch_new_hostnames: | |
| print(f"[BATCH RESULT] Found {len(batch_new_hostnames)} new domains in this batch:") | |
| for d in batch_new_hostnames: | |
| print(f" [NEW DOMAIN] {d}") | |
| # Save progress immediately after each batch | |
| all_new_hostnames.extend(batch_new_hostnames) | |
| existing_domains.update(batch_new_hostnames) | |
| lines = save_domains_to_file(domains_file, batch_new_hostnames, lines) | |
| print(f"[PROGRESS SAVED] Total new domains found so far: {len(all_new_hostnames)}") | |
| else: | |
| print(f"[BATCH RESULT] No new domains found in this batch.") | |
| # Small delay between batches to avoid overwhelming the API | |
| time.sleep(1) | |
| # Final summary | |
| if all_new_hostnames: | |
| print(f"\n[FINAL RESULT] Total new VOD domains found: {len(all_new_hostnames)}") | |
| print("All domains have been saved to domains.txt") | |
| else: | |
| print("\n[FINAL RESULT] No new domains found.") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment