Created
April 28, 2024 21:19
-
-
Save thisisignitedoreo/54252da9011904b593a4de44732b5ae1 to your computer and use it in GitHub Desktop.
twitch vod downloading and splitting tool, also with chat downloading abilities.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/bin/env python3 | |
| # note: you will need TwitchDownloaderCLI for it to work. | |
| # also note: dont expect me to change stuff here, i made it fully for myself | |
| # and throwing it here for sharing sake, and code is messy. happy using! | |
| import subprocess | |
| import datetime | |
| import requests | |
| import shutil | |
| import json | |
| import sys | |
| import csv | |
| import os | |
| if not os.path.isdir("vods"): | |
| os.mkdir("vods") | |
| if not os.path.isdir("temp"): | |
| os.mkdir("temp") | |
| def get_size(path): | |
| fmt = subprocess.run(["ffprobe", "-i", path, "-show_format", "-v", "quiet"], capture_output=True).stdout.decode() | |
| for i in fmt.split("\n"): | |
| if i.startswith("duration="): | |
| return float(i[9:]) | |
| return -1 | |
| def isint(string): | |
| try: | |
| int(string) | |
| return True | |
| except ValueError: | |
| return False | |
| def fmt_time(seconds): | |
| s = round(seconds % 60, 2) | |
| m = seconds // 60 % 60 | |
| h = seconds // 60 // 60 | |
| return f"{int(h)//1:0>2}:{int(m):0>2}:{s:0>2}" | |
| def print_help(prg="vodis"): | |
| print(f"usage: {prg} <vod id>") | |
| error = lambda x: print("[!]", x) or sys.exit(1) | |
| def log(*args, end="\n", sep=" "): | |
| print("[i]", *args, end=end, sep=sep) | |
| def parse_args(args): | |
| program = args.pop(0) | |
| if len(args) == 0: | |
| print_help(program) | |
| error("expected vod id") | |
| return args.pop(0) | |
| def norm_date(date): | |
| datetime_object = datetime.datetime.fromisoformat(date) | |
| return datetime_object.strftime("%d.%m.%Y") | |
| def parse_time(secs): | |
| secs = int(secs) | |
| s = secs % 60 | |
| m = secs // 60 % 60 | |
| h = secs // (60**2) | |
| h = int(h) | |
| return f"{h:0>2}:{m:0>2}:{s:0>2}" | |
| def get_info(vodid): | |
| request = requests.post("https://gql.twitch.tv/gql", | |
| headers={"Client-ID": "kimne78kx3ncx6brgo4mv6wki5h1ko"}, | |
| json={"query": "query { video(id: " + str(vodid) + ") {createdAt, title}}"}) | |
| data = json.loads(request.text) | |
| return norm_date(data["data"]["video"]["createdAt"]), data["data"]["video"]["title"] | |
| def run(args, **kwargs): | |
| print("[c] $", " ".join(args)) | |
| return subprocess.run(args, **kwargs) | |
| #def run_in_thread(args, **kwargs): | |
| # print("[c] $", " ".join(args)) | |
| # t = threading.Thread(target=dl, args=(args, **kwargs)) | |
| # return subprocess.run(args, **kwargs) | |
| def download_chat(vodid, path): | |
| run(["./ttvdl", "chatdownload", "-u", vodid, "-o", path, "--temp-path", "temp/"]) | |
| def get_chapters(chat_json): | |
| chat = json.load(open(chat_json, "r")) | |
| chapters = chat["video"]["chapters"] | |
| chapters_real = [] | |
| for i in chapters: | |
| chapters_real.append({"name": i["description"], "start": int(i["startMilliseconds"]//1000), "length": int(i["lengthMilliseconds"]//1000)}) | |
| return chapters_real | |
| if __name__ == "__main__": | |
| vodid = parse_args(sys.argv) | |
| voddate, vodname = get_info(vodid) | |
| log(f"[{voddate}] \"{vodname}\"") | |
| if os.path.isfile(os.path.join("vods", f"{vodid}.mp4")): | |
| log("vod already downloaded, skipping") | |
| else: | |
| log("downloading vod") | |
| call = run(["./ttvdl", "videodownload", "-u", vodid, "-o", f"vods/{vodid}.mp4", "--temp-path", "temp/"]) | |
| print() | |
| if call.returncode != 0: | |
| error("process exited with non-zero exitcode") | |
| log("splitting vod") | |
| video_path = os.path.join("vods", vodid + ".mp4") | |
| directory_path = os.path.join("vods", vodid) | |
| if os.path.isdir(directory_path): | |
| shutil.rmtree(directory_path) | |
| os.mkdir(directory_path) | |
| chunk_size = 1_900_000_000 | |
| orig_size = get_size(video_path) | |
| chunk_cur = 0 | |
| chunks = 1 | |
| chunk_average = -1 | |
| chunk_indicies = [] | |
| while chunk_cur < orig_size: | |
| print(f"[i] #{chunks}, cur: {fmt_time(chunk_cur)}, full: {fmt_time(orig_size)}", end="") | |
| if chunk_average == -1: print() | |
| else: print(f", average chunk size: {fmt_time(chunk_average)}") | |
| call = run(["ffmpeg", "-ss", str(chunk_cur), "-i", video_path, "-fs", str(int(chunk_size)), "-c", "copy", os.path.join(directory_path, f"{chunks}.mp4")], capture_output=True) | |
| if call.returncode != 0: | |
| error("error: exited with non-zero code") | |
| this_chunk_size = get_size(os.path.join(directory_path, f"{chunks}.mp4")) | |
| chunk_indicies.append((chunks, chunk_cur, chunk_cur+this_chunk_size)) | |
| chunk_cur += this_chunk_size | |
| if chunk_average == -1: chunk_average = this_chunk_size | |
| else: chunk_average = (chunk_average + this_chunk_size) / 2 | |
| chunks += 1 | |
| if os.path.isfile(os.path.join("vods", f"{vodid}.json")): | |
| log("chat already downloaded, skipping") | |
| else: | |
| log("downloading chat") | |
| download_chat(vodid, os.path.join("vods", f"{vodid}.json")) | |
| log("done!") | |
| chapters = get_chapters(os.path.join("vods", f"{vodid}.json")) | |
| chapter_sec = 0 | |
| chapter_off = 0 | |
| c = 0 | |
| for k, s, e in chunk_indicies: | |
| print(f"\n[часть №{k}]") | |
| while chapter_sec + chapter_off < e: | |
| chapter = chapters[c] if 0 <= c < len(chapters) else None | |
| if chapter is None: break | |
| if chapter_sec + chapter["length"] > e: | |
| print(f"{parse_time((chapter_sec + chapter_off) - s)} - {parse_time(e - s)} ~ {chapter['name']}") | |
| chapter_off += e - (chapter_sec + chapter_off) | |
| else: | |
| print(f"{parse_time((chapter_sec + chapter_off) - s)} - {parse_time((chapter_sec + chapter['length']) - s)} ~ {chapter['name']}") | |
| chapter_sec += chapter['length'] | |
| chapter_off = 0 | |
| c += 1 | |
| rm = lambda x: os.remove(x) if os.path.isfile(x) else None | |
| rm("COPYRIGHT.txt") | |
| rm("THIRD-PARTY-LICENSES.txt") | |
| # nvm this, use this to not write that every time i upload a vod to telegram lol | |
| print(f"\n[{voddate}] {vodname}\nв комментариях 👀") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment