Skip to content

Instantly share code, notes, and snippets.

@thisisignitedoreo
Created April 28, 2024 21:19
Show Gist options
  • Select an option

  • Save thisisignitedoreo/54252da9011904b593a4de44732b5ae1 to your computer and use it in GitHub Desktop.

Select an option

Save thisisignitedoreo/54252da9011904b593a4de44732b5ae1 to your computer and use it in GitHub Desktop.
twitch vod downloading and splitting tool, also with chat downloading abilities.
#!/bin/env python3
# note: you will need TwitchDownloaderCLI for it to work.
# also note: dont expect me to change stuff here, i made it fully for myself
# and throwing it here for sharing sake, and code is messy. happy using!
import subprocess
import datetime
import requests
import shutil
import json
import sys
import csv
import os
if not os.path.isdir("vods"):
os.mkdir("vods")
if not os.path.isdir("temp"):
os.mkdir("temp")
def get_size(path):
fmt = subprocess.run(["ffprobe", "-i", path, "-show_format", "-v", "quiet"], capture_output=True).stdout.decode()
for i in fmt.split("\n"):
if i.startswith("duration="):
return float(i[9:])
return -1
def isint(string):
try:
int(string)
return True
except ValueError:
return False
def fmt_time(seconds):
s = round(seconds % 60, 2)
m = seconds // 60 % 60
h = seconds // 60 // 60
return f"{int(h)//1:0>2}:{int(m):0>2}:{s:0>2}"
def print_help(prg="vodis"):
print(f"usage: {prg} <vod id>")
error = lambda x: print("[!]", x) or sys.exit(1)
def log(*args, end="\n", sep=" "):
print("[i]", *args, end=end, sep=sep)
def parse_args(args):
program = args.pop(0)
if len(args) == 0:
print_help(program)
error("expected vod id")
return args.pop(0)
def norm_date(date):
datetime_object = datetime.datetime.fromisoformat(date)
return datetime_object.strftime("%d.%m.%Y")
def parse_time(secs):
secs = int(secs)
s = secs % 60
m = secs // 60 % 60
h = secs // (60**2)
h = int(h)
return f"{h:0>2}:{m:0>2}:{s:0>2}"
def get_info(vodid):
request = requests.post("https://gql.twitch.tv/gql",
headers={"Client-ID": "kimne78kx3ncx6brgo4mv6wki5h1ko"},
json={"query": "query { video(id: " + str(vodid) + ") {createdAt, title}}"})
data = json.loads(request.text)
return norm_date(data["data"]["video"]["createdAt"]), data["data"]["video"]["title"]
def run(args, **kwargs):
print("[c] $", " ".join(args))
return subprocess.run(args, **kwargs)
#def run_in_thread(args, **kwargs):
# print("[c] $", " ".join(args))
# t = threading.Thread(target=dl, args=(args, **kwargs))
# return subprocess.run(args, **kwargs)
def download_chat(vodid, path):
run(["./ttvdl", "chatdownload", "-u", vodid, "-o", path, "--temp-path", "temp/"])
def get_chapters(chat_json):
chat = json.load(open(chat_json, "r"))
chapters = chat["video"]["chapters"]
chapters_real = []
for i in chapters:
chapters_real.append({"name": i["description"], "start": int(i["startMilliseconds"]//1000), "length": int(i["lengthMilliseconds"]//1000)})
return chapters_real
if __name__ == "__main__":
vodid = parse_args(sys.argv)
voddate, vodname = get_info(vodid)
log(f"[{voddate}] \"{vodname}\"")
if os.path.isfile(os.path.join("vods", f"{vodid}.mp4")):
log("vod already downloaded, skipping")
else:
log("downloading vod")
call = run(["./ttvdl", "videodownload", "-u", vodid, "-o", f"vods/{vodid}.mp4", "--temp-path", "temp/"])
print()
if call.returncode != 0:
error("process exited with non-zero exitcode")
log("splitting vod")
video_path = os.path.join("vods", vodid + ".mp4")
directory_path = os.path.join("vods", vodid)
if os.path.isdir(directory_path):
shutil.rmtree(directory_path)
os.mkdir(directory_path)
chunk_size = 1_900_000_000
orig_size = get_size(video_path)
chunk_cur = 0
chunks = 1
chunk_average = -1
chunk_indicies = []
while chunk_cur < orig_size:
print(f"[i] #{chunks}, cur: {fmt_time(chunk_cur)}, full: {fmt_time(orig_size)}", end="")
if chunk_average == -1: print()
else: print(f", average chunk size: {fmt_time(chunk_average)}")
call = run(["ffmpeg", "-ss", str(chunk_cur), "-i", video_path, "-fs", str(int(chunk_size)), "-c", "copy", os.path.join(directory_path, f"{chunks}.mp4")], capture_output=True)
if call.returncode != 0:
error("error: exited with non-zero code")
this_chunk_size = get_size(os.path.join(directory_path, f"{chunks}.mp4"))
chunk_indicies.append((chunks, chunk_cur, chunk_cur+this_chunk_size))
chunk_cur += this_chunk_size
if chunk_average == -1: chunk_average = this_chunk_size
else: chunk_average = (chunk_average + this_chunk_size) / 2
chunks += 1
if os.path.isfile(os.path.join("vods", f"{vodid}.json")):
log("chat already downloaded, skipping")
else:
log("downloading chat")
download_chat(vodid, os.path.join("vods", f"{vodid}.json"))
log("done!")
chapters = get_chapters(os.path.join("vods", f"{vodid}.json"))
chapter_sec = 0
chapter_off = 0
c = 0
for k, s, e in chunk_indicies:
print(f"\n[часть №{k}]")
while chapter_sec + chapter_off < e:
chapter = chapters[c] if 0 <= c < len(chapters) else None
if chapter is None: break
if chapter_sec + chapter["length"] > e:
print(f"{parse_time((chapter_sec + chapter_off) - s)} - {parse_time(e - s)} ~ {chapter['name']}")
chapter_off += e - (chapter_sec + chapter_off)
else:
print(f"{parse_time((chapter_sec + chapter_off) - s)} - {parse_time((chapter_sec + chapter['length']) - s)} ~ {chapter['name']}")
chapter_sec += chapter['length']
chapter_off = 0
c += 1
rm = lambda x: os.remove(x) if os.path.isfile(x) else None
rm("COPYRIGHT.txt")
rm("THIRD-PARTY-LICENSES.txt")
# nvm this, use this to not write that every time i upload a vod to telegram lol
print(f"\n[{voddate}] {vodname}\nв комментариях 👀")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment