|
#!/usr/bin/env python3 |
|
""" |
|
YouTube migration script: hello@roelvanderven.com -> roelven1@gmail.com |
|
|
|
What it does (via official YouTube Data API v3): |
|
- Export from SOURCE account: |
|
- Subscriptions (subscriptions.list) :contentReference[oaicite:6]{index=6} |
|
- Non-system playlists (playlists.list) :contentReference[oaicite:7]{index=7} |
|
+ their video IDs (playlistItems.list) :contentReference[oaicite:8]{index=8} |
|
- Liked videos (likes playlist via playlistItems.list) :contentReference[oaicite:9]{index=9} |
|
|
|
- Import into TARGET account: |
|
- Subscribe to channels (subscriptions.insert) :contentReference[oaicite:10]{index=10} |
|
- Re-create playlists (playlists.insert) :contentReference[oaicite:11]{index=11} |
|
- Add playlist items (playlistItems.insert) :contentReference[oaicite:12]{index=12} |
|
- Re-like videos (videos.rate) :contentReference[oaicite:13]{index=13} |
|
|
|
NOT possible via official API (so this script cannot do it): |
|
- Watch Later playlist: cannot list or insert items. :contentReference[oaicite:14]{index=14} |
|
- Watch history playlist: cannot list or modify reliably. :contentReference[oaicite:15]{index=15} |
|
""" |
|
|
|
import json |
|
import os |
|
import time |
|
from typing import Dict, List, Any |
|
|
|
from google_auth_oauthlib.flow import InstalledAppFlow |
|
from googleapiclient.discovery import build |
|
from google.auth.transport.requests import Request |
|
from google.oauth2.credentials import Credentials |
|
from googleapiclient.errors import HttpError |
|
|
|
# Single broad scope is easiest here. |
|
SCOPES = ["https://www.googleapis.com/auth/youtube"] |
|
|
|
CLIENT_SECRET_FILE = "client_secret.json" |
|
EXPORT_FILE = "yt_migration_data.json" |
|
|
|
# Soft cap on *write* operations per run to avoid draining quota in one go. |
|
MAX_WRITES_PER_RUN = 150 # subscriptions + likes + playlist inserts |
|
|
|
|
|
def get_service(account_label: str): |
|
""" |
|
Get an authenticated YouTube API client for a given logical account label. |
|
|
|
On first run for each label, this opens a browser where you pick the Google account. |
|
The resulting OAuth token is cached in token_<label>.json. |
|
""" |
|
token_file = f"token_{account_label}.json" |
|
creds = None |
|
|
|
if os.path.exists(token_file): |
|
creds = Credentials.from_authorized_user_file(token_file, SCOPES) |
|
|
|
if not creds or not creds.valid: |
|
if creds and creds.expired and creds.refresh_token: |
|
creds.refresh(Request()) |
|
else: |
|
if not os.path.exists(CLIENT_SECRET_FILE): |
|
raise RuntimeError(f"Missing {CLIENT_SECRET_FILE}") |
|
flow = InstalledAppFlow.from_client_secrets_file(CLIENT_SECRET_FILE, SCOPES) |
|
creds = flow.run_local_server(port=0, prompt="consent") |
|
with open(token_file, "w") as f: |
|
f.write(creds.to_json()) |
|
|
|
return build("youtube", "v3", credentials=creds) |
|
|
|
|
|
def get_related_playlists_ids(yt) -> Dict[str, str]: |
|
""" |
|
Return related playlist IDs from the channel resource: |
|
likes, favorites, uploads, (watchHistory, watchLater may or may not be present). |
|
|
|
See: channels.list (contentDetails.relatedPlaylists). :contentReference[oaicite:16]{index=16} |
|
""" |
|
resp = yt.channels().list( |
|
part="contentDetails", |
|
mine=True, |
|
maxResults=1, |
|
).execute() |
|
|
|
items = resp.get("items", []) |
|
if not items: |
|
raise RuntimeError("No channel found for authenticated user") |
|
|
|
return items[0]["contentDetails"].get("relatedPlaylists", {}) |
|
|
|
|
|
def export_subscriptions(yt) -> List[str]: |
|
""" |
|
Export list of subscribed channel IDs from source account. |
|
|
|
Uses subscriptions.list with mine=true. :contentReference[oaicite:17]{index=17} |
|
""" |
|
print("Exporting subscriptions from source...") |
|
channel_ids = [] |
|
page_token = None |
|
|
|
while True: |
|
resp = yt.subscriptions().list( |
|
part="snippet", |
|
mine=True, |
|
maxResults=50, |
|
pageToken=page_token, |
|
).execute() |
|
|
|
for item in resp.get("items", []): |
|
resource = item["snippet"].get("resourceId", {}) |
|
if resource.get("kind") == "youtube#channel": |
|
cid = resource.get("channelId") |
|
if cid: |
|
channel_ids.append(cid) |
|
|
|
page_token = resp.get("nextPageToken") |
|
if not page_token: |
|
break |
|
|
|
print(f" Found {len(channel_ids)} subscriptions") |
|
return channel_ids |
|
|
|
|
|
def export_playlists_and_items(yt, system_playlist_ids: List[str]) -> List[Dict[str, Any]]: |
|
""" |
|
Export user-created playlists (excluding system playlists like uploads/likes/watch later) |
|
plus ordered video IDs in each. |
|
|
|
Uses playlists.list and playlistItems.list. :contentReference[oaicite:18]{index=18} |
|
""" |
|
print("Exporting playlists from source...") |
|
playlists = [] |
|
page_token = None |
|
|
|
system_ids = set(system_playlist_ids) |
|
|
|
while True: |
|
resp = yt.playlists().list( |
|
part="snippet,contentDetails", |
|
mine=True, |
|
maxResults=50, |
|
pageToken=page_token, |
|
).execute() |
|
|
|
for pl in resp.get("items", []): |
|
pl_id = pl["id"] |
|
if pl_id in system_ids: |
|
# Skip uploads, likes, favorites, etc. |
|
continue |
|
|
|
snippet = pl["snippet"] |
|
status_privacy = "public" |
|
# privacyStatus is part of status; we didn't fetch that here to keep it cheap. |
|
# We'll default to 'private' on import to be conservative. |
|
privacy = "private" |
|
|
|
playlists.append( |
|
{ |
|
"id": pl_id, |
|
"title": snippet.get("title"), |
|
"description": snippet.get("description"), |
|
"privacyStatus": privacy, |
|
"item_video_ids": [], # filled below |
|
} |
|
) |
|
|
|
page_token = resp.get("nextPageToken") |
|
if not page_token: |
|
break |
|
|
|
print(f" Found {len(playlists)} non-system playlists") |
|
# Fetch items for each playlist |
|
for pl in playlists: |
|
vid_ids = [] |
|
page_token = None |
|
while True: |
|
resp = yt.playlistItems().list( |
|
part="contentDetails", |
|
playlistId=pl["id"], |
|
maxResults=50, |
|
pageToken=page_token, |
|
).execute() |
|
|
|
for it in resp.get("items", []): |
|
vid_id = it["contentDetails"].get("videoId") |
|
if vid_id: |
|
vid_ids.append(vid_id) |
|
|
|
page_token = resp.get("nextPageToken") |
|
if not page_token: |
|
break |
|
|
|
print(f" Playlist '{pl['title']}' has {len(vid_ids)} items") |
|
pl["item_video_ids"] = vid_ids |
|
|
|
return playlists |
|
|
|
|
|
def export_likes(yt, likes_playlist_id: str) -> List[str]: |
|
""" |
|
Export liked video IDs from the "likes" playlist. |
|
|
|
Official docs: channel.resource->contentDetails.relatedPlaylists.likes and then |
|
retrieve that playlist via playlistItems.list. :contentReference[oaicite:19]{index=19} |
|
""" |
|
if not likes_playlist_id: |
|
print("No likes playlist ID found; skipping likes export.") |
|
return [] |
|
|
|
print("Exporting liked videos from source...") |
|
video_ids = [] |
|
page_token = None |
|
|
|
while True: |
|
try: |
|
resp = yt.playlistItems().list( |
|
part="contentDetails", |
|
playlistId=likes_playlist_id, |
|
maxResults=50, |
|
pageToken=page_token, |
|
).execute() |
|
except HttpError as e: |
|
print(f" Error listing likes playlist: {e}") |
|
break |
|
|
|
for it in resp.get("items", []): |
|
vid_id = it["contentDetails"].get("videoId") |
|
if vid_id: |
|
video_ids.append(vid_id) |
|
|
|
page_token = resp.get("nextPageToken") |
|
if not page_token: |
|
break |
|
|
|
print(f" Found {len(video_ids)} liked videos") |
|
return video_ids |
|
|
|
|
|
def export_all(): |
|
""" |
|
Run export phase: authenticate as SOURCE and write data to EXPORT_FILE. |
|
""" |
|
yt_source = get_service("source") |
|
|
|
related = get_related_playlists_ids(yt_source) |
|
likes_playlist_id = related.get("likes") |
|
# uploads, favorites, watchHistory, watchLater may appear; we treat all as 'system' |
|
system_playlist_ids = list(related.values()) |
|
|
|
subscriptions = export_subscriptions(yt_source) |
|
playlists = export_playlists_and_items(yt_source, system_playlist_ids) |
|
likes = export_likes(yt_source, likes_playlist_id) |
|
|
|
export_data = { |
|
"subscriptions": subscriptions, |
|
"playlists": playlists, |
|
"likes": likes, |
|
"meta": { |
|
"export_time": int(time.time()), |
|
"source_account": "hello@roelvanderven.com", |
|
}, |
|
} |
|
|
|
with open(EXPORT_FILE, "w", encoding="utf-8") as f: |
|
json.dump(export_data, f, indent=2) |
|
|
|
print(f"\nExport complete. Written to {EXPORT_FILE}\n") |
|
|
|
|
|
def import_subscriptions(yt, channel_ids: List[str], write_budget: Dict[str, int]): |
|
""" |
|
Subscribe target account to given channel IDs. |
|
|
|
Uses subscriptions.insert (50 units per call). :contentReference[oaicite:20]{index=20} |
|
""" |
|
print("Importing subscriptions to target...") |
|
count = 0 |
|
for cid in channel_ids: |
|
if write_budget["remaining"] <= 0: |
|
print(" Write budget exhausted, stopping subscription import.") |
|
break |
|
body = { |
|
"snippet": { |
|
"resourceId": { |
|
"kind": "youtube#channel", |
|
"channelId": cid, |
|
} |
|
} |
|
} |
|
try: |
|
yt.subscriptions().insert(part="snippet", body=body).execute() |
|
count += 1 |
|
write_budget["remaining"] -= 1 |
|
# cheap throttle |
|
time.sleep(0.1) |
|
except HttpError as e: |
|
# Duplicate subscriptions etc. are not fatal. |
|
print(f" Failed subscription to {cid}: {e}") |
|
print(f" Created {count} subscriptions on target") |
|
|
|
def get_existing_playlists_map(yt): |
|
"""Return {title: playlistId} for target account playlists.""" |
|
existing = {} |
|
page_token = None |
|
while True: |
|
resp = yt.playlists().list( |
|
part="snippet", |
|
mine=True, |
|
maxResults=50, |
|
pageToken=page_token, |
|
).execute() |
|
for pl in resp.get("items", []): |
|
title = pl["snippet"]["title"] |
|
existing[title] = pl["id"] |
|
page_token = resp.get("nextPageToken") |
|
if not page_token: |
|
break |
|
return existing |
|
|
|
|
|
def import_playlists_and_items(yt, playlists, write_budget): |
|
print("Importing playlists to target...") |
|
created = 0 |
|
created_items = 0 |
|
|
|
# NEW: load current playlists so we don't recreate them |
|
existing = get_existing_playlists_map(yt) |
|
|
|
for pl in playlists: |
|
if write_budget["remaining"] <= 0: |
|
print(" Write budget exhausted, stopping playlist import.") |
|
break |
|
|
|
title = pl["title"] |
|
desc = pl.get("description") or "" |
|
privacy = pl.get("privacyStatus", "private") |
|
|
|
# NEW: if playlist with same title already exists, reuse it |
|
if title in existing: |
|
new_pl_id = existing[title] |
|
print(f" Reusing existing playlist '{title}' with id {new_pl_id}") |
|
else: |
|
body = { |
|
"snippet": { |
|
"title": title, |
|
"description": f"{desc}\n\n[Imported from hello@roelvanderven.com]", |
|
}, |
|
"status": {"privacyStatus": privacy}, |
|
} |
|
|
|
try: |
|
resp = yt.playlists().insert(part="snippet,status", body=body).execute() |
|
except HttpError as e: |
|
print(f" Failed to create playlist '{title}': {e}") |
|
continue |
|
|
|
new_pl_id = resp["id"] |
|
existing[title] = new_pl_id |
|
created += 1 |
|
write_budget["remaining"] -= 1 |
|
print(f" Created playlist '{title}' with id {new_pl_id}") |
|
|
|
# Add items |
|
for vid in pl.get("item_video_ids", []): |
|
if write_budget["remaining"] <= 0: |
|
print(" Write budget exhausted while adding playlist items.") |
|
break |
|
body_item = { |
|
"snippet": { |
|
"playlistId": new_pl_id, |
|
"resourceId": {"kind": "youtube#video", "videoId": vid}, |
|
} |
|
} |
|
try: |
|
yt.playlistItems().insert(part="snippet", body=body_item).execute() |
|
created_items += 1 |
|
write_budget["remaining"] -= 1 |
|
time.sleep(0.1) |
|
except HttpError as e: |
|
msg = str(e) |
|
if "quotaExceeded" in msg: |
|
print(" Quota exceeded while adding playlist items. Stopping import.") |
|
write_budget["remaining"] = 0 |
|
return |
|
print(f" Failed to add video {vid} to playlist '{title}': {e}") |
|
|
|
print(f" Created {created} playlists and {created_items} playlist items on target") |
|
|
|
|
|
def import_likes(yt, video_ids: List[str], write_budget: Dict[str, int]): |
|
""" |
|
Re-like videos on target account with videos.rate. |
|
|
|
Each videos.rate call costs 50 units. :contentReference[oaicite:22]{index=22} |
|
""" |
|
print("Importing likes to target...") |
|
count = 0 |
|
for vid in video_ids: |
|
if write_budget["remaining"] <= 0: |
|
print(" Write budget exhausted, stopping likes import.") |
|
break |
|
try: |
|
yt.videos().rate(id=vid, rating="like").execute() |
|
count += 1 |
|
write_budget["remaining"] -= 1 |
|
time.sleep(0.1) |
|
except HttpError as e: |
|
print(f" Failed to like video {vid}: {e}") |
|
print(f" Set like on {count} videos on target") |
|
|
|
|
|
def import_all(): |
|
if not os.path.exists(EXPORT_FILE): |
|
raise RuntimeError(f"{EXPORT_FILE} not found. Run export phase first.") |
|
|
|
with open(EXPORT_FILE, "r", encoding="utf-8") as f: |
|
data = json.load(f) |
|
|
|
yt_target = get_service("target") |
|
|
|
write_budget = {"remaining": MAX_WRITES_PER_RUN} |
|
|
|
# Subscriptions are already imported; disable this once you're happy. |
|
# subs = data.get("subscriptions", []) |
|
# import_subscriptions(yt_target, subs, write_budget) |
|
|
|
playlists = data.get("playlists", []) |
|
import_playlists_and_items(yt_target, playlists, write_budget) |
|
|
|
likes = data.get("likes", []) |
|
import_likes(yt_target, likes, write_budget) |
|
|
|
print(f"\nImport complete. Remaining write budget this run: {write_budget['remaining']}\n") |
|
|
|
|
|
def main(): |
|
# Export already done, just import |
|
print("=== IMPORT PHASE (target: roelven1@gmail.com) ===") |
|
print("A browser window will open; choose the TARGET account.") |
|
import_all() |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |