Skip to content

Instantly share code, notes, and snippets.

@Infinitay
Forked from MacielG1/Twitch Domains
Created December 8, 2025 07:09
Show Gist options
  • Select an option

  • Save Infinitay/9ae1b2f156d508ae44e7a6bdfabd7b91 to your computer and use it in GitHub Desktop.

Select an option

Save Infinitay/9ae1b2f156d508ae44e7a6bdfabd7b91 to your computer and use it in GitHub Desktop.
import requests
import threading
import queue
import re
from dataclasses import dataclass
from typing import List, Optional
import time
import os
from urllib.parse import urlparse
CLIENT_ID = 'YOUR_TWITCH_CLIENT_ID'
CLIENT_SECRET = 'YOUR_TWITCH_CLIENT_SECRET'
TOP_STREAMERS = 100
VODS_PER_STREAMER = 10
# --- Filtering Options ---
FILTER_LANGUAGE = None # Set to language code like 'en', 'es', 'pt', 'fr', 'de', 'ja', 'ko', 'zh', etc.
FILTER_COUNTRY = None # Set to country code like 'US', 'BR', 'MX', 'CA', 'GB', 'DE', 'FR', etc.
FILTER_GAME_ID = None # Set to specific game ID if you want to filter by game
@dataclass
class Token:
Signature: str
Token: str
@dataclass
class FeedsOption:
AllowSource: bool
Player: str
AllowSpectre: bool
AllowAudioOnly: bool
IncludeFramerate: bool
@dataclass
class Options:
Timeout: int
AllowSource: bool
Player: str
AllowSpectre: bool
AllowAudioOnly: bool
IncludeFramerate: bool
# --- Twitch Helix API helpers ---
def get_app_access_token(client_id: str, client_secret: str) -> str:
url = "https://id.twitch.tv/oauth2/token"
params = {
"client_id": client_id,
"client_secret": client_secret,
"grant_type": "client_credentials"
}
resp = requests.post(url, params=params)
resp.raise_for_status()
return resp.json()["access_token"]
def get_top_streamers(token: str, client_id: str, limit: int = 1000, language: Optional[str] = None, country: Optional[str] = None, game_id: Optional[str] = None) -> List[str]:
url = "https://api.twitch.tv/helix/streams"
headers = {"Authorization": f"Bearer {token}", "Client-ID": client_id}
user_ids = []
params = {"first": 100}
# Add filtering parameters
if language:
params["language"] = language
if country:
params["country"] = country
if game_id:
params["game_id"] = game_id
cursor = None
batch_count = 0
while len(user_ids) < limit:
batch_count += 1
if cursor:
params["after"] = cursor
resp = requests.get(url, headers=headers, params=params)
resp.raise_for_status()
data = resp.json()
batch_streams = len(data["data"])
print(f" Batch {batch_count}: {batch_streams} streams (total: {len(user_ids)})")
for stream in data["data"]:
user_ids.append(stream["user_id"])
if len(user_ids) >= limit:
break
cursor = data.get("pagination", {}).get("cursor")
if not cursor:
break
time.sleep(2) # avoid rate limits
print(f" Total streams collected: {len(user_ids)}")
return user_ids
def get_vods_for_user(user_id: str, token: str, client_id: str, max_vods: int = 5) -> List[str]:
url = "https://api.twitch.tv/helix/videos"
headers = {"Authorization": f"Bearer {token}", "Client-ID": client_id}
params = {"user_id": user_id, "type": "archive", "first": max_vods}
resp = requests.get(url, headers=headers, params=params)
resp.raise_for_status()
return [vod["id"] for vod in resp.json()["data"]]
# --- Real TwitchGQL Implementation ---
def retrieve_token_gql(vod_id: int, session: requests.Session) -> Token:
url = "https://gql.twitch.tv/gql"
headers = {
"Client-ID": "kimne78kx3ncx6brgo4mv6wki5h1ko",
"Content-Type": "application/json",
"Accept": "application/json",
}
payload = [{
"operationName": "PlaybackAccessToken",
"extensions": {
"persistedQuery": {
"version": 1,
"sha256Hash": "0828119ded1c13477966434e15800ff57ddacf13ba1911c129dc2200705b0712"
}
},
"variables": {
"isLive": False,
"login": "",
"isVod": True,
"vodID": str(vod_id),
"playerType": "embed"
}
}]
resp = session.post(url, headers=headers, json=payload, timeout=10)
resp.raise_for_status()
gql_resp = resp.json()[0]
data = gql_resp.get("data", {}).get("videoPlaybackAccessToken")
if not data:
print("[DEBUG] Unexpected GQL response:", gql_resp)
raise Exception("Failed to get videoPlaybackAccessToken for VOD ID {}".format(vod_id))
return Token(Signature=data["signature"], Token=data["value"])
# --- API Logic ---
def fetch_feeds(vod_id: str, session: requests.Session, options: FeedsOption) -> List[str]:
feed_urls = []
token = retrieve_token_gql(int(vod_id), session)
feed_options = (
f"&allow_source={str(options.AllowSource).lower()}"
f"&player={options.Player}"
f"&allow_spectre={str(options.AllowSpectre).lower()}"
f"&allow_audio_only={str(options.AllowAudioOnly).lower()}"
f"&playlist_include_framerate={str(options.IncludeFramerate).lower()}"
)
url = f"https://usher.ttvnw.net/vod/{vod_id}.m3u8?sig={token.Signature}&token={token.Token}{feed_options}"
resp = session.get(url)
if resp.status_code != 200:
return feed_urls
for line in resp.text.splitlines():
if not line.startswith('#'):
feed_urls.append(line)
return feed_urls
# --- Retriever Logic ---
host_re = re.compile(r"https://([a-z0-9-]+\.[a-z]+\.[a-z]{2,3})/")
def unique_host(hosts: List[str], host: str) -> bool:
return host not in hosts
def retrieve_hosts(vod_id: str, session: requests.Session, feeds_options: FeedsOption, new_hosts: queue.Queue):
hosts = []
max_retries = 3
delay = 2
for attempt in range(1, max_retries + 1):
try:
feeds_response = fetch_feeds(vod_id, session, feeds_options)
for f in feeds_response:
match = host_re.match(f)
if match and unique_host(hosts, match.group(1)):
host = match.group(1)
hosts.append(host)
# print(f" [DOMAIN FOUND] VOD {vod_id}: {host}")
break # Success, exit retry loop
except Exception as e:
if attempt == max_retries:
print(f"[ERROR] VOD {vod_id}: {e}")
else:
time.sleep(delay)
new_hosts.put(hosts)
# --- Main Logic ---
def combine_results(new_hosts: queue.Queue, hosts: queue.Queue, num_tasks: int):
results = []
for _ in range(num_tasks):
host_list = new_hosts.get()
results.extend(host_list)
hosts.put(results)
def unique_results(results: List[str]) -> List[str]:
occured = set()
unique_results = []
for r in results:
if r not in occured:
occured.add(r)
unique_results.append(r)
return unique_results
def retrieve(vod_ids: List[str], options: Options) -> List[str]:
session = requests.Session()
session.timeout = options.Timeout
feeds_options = FeedsOption(
AllowSource=options.AllowSource,
Player=options.Player,
AllowSpectre=options.AllowSpectre,
AllowAudioOnly=options.AllowAudioOnly,
IncludeFramerate=options.IncludeFramerate,
)
new_hosts = queue.Queue()
hosts = queue.Queue()
threads = []
for vod in vod_ids:
t = threading.Thread(target=retrieve_hosts, args=(str(vod), session, feeds_options, new_hosts))
t.start()
threads.append(t)
combiner = threading.Thread(target=combine_results, args=(new_hosts, hosts, len(vod_ids)))
combiner.start()
for t in threads:
t.join()
combiner.join()
results = hosts.get()
return unique_results(results)
# --- Progress saving functions ---
def save_domains_to_file(domains_file: str, new_hostnames: List[str], existing_lines: List[str]):
"""Save new domains to file, updating existing lines if needed"""
lines = existing_lines.copy()
for hostname in new_hostnames:
found = False
# Try to find a line with the same base domain
for i, line in enumerate(lines):
if extract_hostname(line) == hostname:
found = True
break
# If the hostname is not in the line but the line contains a full URL with the same hostname
if hostname in [extract_hostname(part) for part in line.split()]:
found = True
break
# If the line contains a full URL with the same hostname, append
if hostname in line:
found = True
break
if not found:
# Try to append to a line with the same full URL
for i, line in enumerate(lines):
if extract_hostname(line) in hostname:
lines[i] = line + ' ' + hostname
found = True
break
if not found:
# Otherwise, add as a new line
lines.append(hostname)
# Write all lines back
with open(domains_file, "w", encoding="utf-8") as f:
for line in lines:
f.write(line + "\n")
return lines
def load_existing_domains(domains_file: str):
"""Load existing domains from file"""
existing_domains = set()
hostname_to_line = {}
lines = []
if os.path.exists(domains_file):
with open(domains_file, "r", encoding="utf-8") as f:
for line in f:
orig_line = line.rstrip('\n')
if not orig_line.strip():
continue
# Split by whitespace to support multiple hostnames per line
parts = orig_line.split()
for part in parts:
hostname = extract_hostname(part)
existing_domains.add(hostname)
hostname_to_line[hostname] = orig_line
lines.append(orig_line)
return existing_domains, hostname_to_line, lines
# --- Main entry for top streamer VOD domain retrieval ---
def extract_hostname(domain):
# Remove protocol and path, just get the netloc/hostname
if '://' in domain:
return urlparse(domain).netloc
return domain.split('/')[0]
def main():
# Read existing domains from local domains.txt
domains_file = os.path.join(os.path.dirname(__file__), "domains.txt")
existing_domains, hostname_to_line, lines = load_existing_domains(domains_file)
print("Getting app access token...")
token = get_app_access_token(CLIENT_ID, CLIENT_SECRET)
print("Getting top streamers...")
# Show current filtering settings
if FILTER_LANGUAGE or FILTER_COUNTRY or FILTER_GAME_ID:
print("Using filters:")
if FILTER_LANGUAGE:
print(f" Language: {FILTER_LANGUAGE}")
if FILTER_COUNTRY:
print(f" Country: {FILTER_COUNTRY}")
if FILTER_GAME_ID:
print(f" Game ID: {FILTER_GAME_ID}")
user_ids = get_top_streamers(
token,
CLIENT_ID,
limit=TOP_STREAMERS,
language=FILTER_LANGUAGE,
country=FILTER_COUNTRY,
game_id=FILTER_GAME_ID
)
print(f"Got {len(user_ids)} streamers. Getting VODs...")
vod_ids = []
for idx, uid in enumerate(user_ids, 1):
print(f"Fetching VODs for streamer {idx}/{len(user_ids)} (user_id={uid})...")
try:
vods = get_vods_for_user(uid, token, CLIENT_ID, max_vods=VODS_PER_STREAMER)
print(f" Got {len(vods)} VODs for user {uid}.")
vod_ids.extend(vods)
except Exception as e:
print(f"[ERROR] Could not get VODs for user {uid}: {e}")
time.sleep(0.5)
print(f"Got {len(vod_ids)} VODs. Retrieving domains...")
options = Options(
Timeout=10,
AllowSource=True,
Player="site",
AllowSpectre=False,
AllowAudioOnly=False,
IncludeFramerate=False,
)
# Process VODs in batches to save progress incrementally
batch_size = 50 # Process 50 VODs at a time
total_vods = len(vod_ids)
all_new_hostnames = []
for batch_start in range(0, total_vods, batch_size):
batch_end = min(batch_start + batch_size, total_vods)
batch_vods = vod_ids[batch_start:batch_end]
print(f"\nProcessing batch {batch_start//batch_size + 1}/{(total_vods + batch_size - 1)//batch_size} (VODs {batch_start+1}-{batch_end}/{total_vods})...")
# Process this batch
session = requests.Session()
session.timeout = options.Timeout
feeds_options = FeedsOption(
AllowSource=options.AllowSource,
Player=options.Player,
AllowSpectre=options.AllowSpectre,
AllowAudioOnly=options.AllowAudioOnly,
IncludeFramerate=options.IncludeFramerate,
)
new_hosts = queue.Queue()
hosts = queue.Queue()
threads = []
for idx, vod in enumerate(batch_vods, 1):
vod_idx = batch_start + idx
print(f" Processing VOD {vod_idx}/{total_vods} (id={vod})...")
t = threading.Thread(target=retrieve_hosts, args=(str(vod), session, feeds_options, new_hosts))
t.start()
threads.append(t)
combiner = threading.Thread(target=combine_results, args=(new_hosts, hosts, len(batch_vods)))
combiner.start()
for t in threads:
t.join()
combiner.join()
batch_results = hosts.get()
batch_domains = unique_results(batch_results)
# Normalize new domains to hostnames before comparing and saving
batch_new_hostnames = [extract_hostname(x) for x in batch_domains if extract_hostname(x) not in existing_domains]
if batch_new_hostnames:
print(f"[BATCH RESULT] Found {len(batch_new_hostnames)} new domains in this batch:")
for d in batch_new_hostnames:
print(f" [NEW DOMAIN] {d}")
# Save progress immediately after each batch
all_new_hostnames.extend(batch_new_hostnames)
existing_domains.update(batch_new_hostnames)
lines = save_domains_to_file(domains_file, batch_new_hostnames, lines)
print(f"[PROGRESS SAVED] Total new domains found so far: {len(all_new_hostnames)}")
else:
print(f"[BATCH RESULT] No new domains found in this batch.")
# Small delay between batches to avoid overwhelming the API
time.sleep(1)
# Final summary
if all_new_hostnames:
print(f"\n[FINAL RESULT] Total new VOD domains found: {len(all_new_hostnames)}")
print("All domains have been saved to domains.txt")
else:
print("\n[FINAL RESULT] No new domains found.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment