Last active
December 11, 2025 10:10
-
-
Save wataash/90ef3addd757718f1753c6d93ca55a14 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # SPDX-FileCopyrightText: Copyright (c) 2025 Wataru Ashihara <wataash0607@gmail.com> | |
| # SPDX-License-Identifier: Apache-2.0 | |
| """ | |
| mostly generated by codex | |
| builtin echo $token | python spotify_genlist.py spotify_playlist --access_token_file=- --playlist_name="list_$(date +%Y%m%d%H%M%S)" --tracks_file=tracks.tsv | |
| """ | |
| import argparse | |
| import asyncio | |
| import base64 | |
| import collections | |
| import dataclasses | |
| import datetime | |
| import difflib | |
| import enum | |
| import fcntl | |
| import fileinput | |
| import functools | |
| import hashlib | |
| import inspect | |
| import io | |
| import itertools | |
| import json | |
| import logging | |
| import os | |
| import pathlib | |
| import pty | |
| import queue | |
| import random | |
| import re | |
| import select | |
| import selectors | |
| import signal | |
| import shlex | |
| import shutil | |
| import subprocess | |
| import sys | |
| import tempfile | |
| import textwrap | |
| import threading | |
| import time | |
| import tty | |
| import types | |
| import typing as t | |
| import requests | |
| class MyFormatter(logging.Formatter): | |
| def format(self, record: logging.LogRecord) -> str: | |
| color = { | |
| logging.CRITICAL: '\x1b[31m', | |
| logging.ERROR: '\x1b[31m', | |
| logging.WARNING: '\x1b[33m', | |
| logging.INFO: '\x1b[34m', | |
| logging.DEBUG: '\x1b[37m', | |
| }[record.levelno] | |
| fn = '' if record.funcName == '<module>' else f' {record.funcName}()' | |
| fmt = f'{color}[%(levelname)1.1s %(asctime)s %(filename)s:%(lineno)d{fn}] %(message)s\x1b[m' | |
| return logging.Formatter(fmt=fmt, datefmt='%T').format(record) | |
| logger = logging.getLogger(__name__) | |
| logger.setLevel(logging.DEBUG) | |
| logger_handler = logging.StreamHandler() | |
| logger_handler.setFormatter(MyFormatter()) | |
| logger.addHandler(logger_handler) | |
| class ArgumentDefaultsRawTextHelpFormatter(argparse.ArgumentDefaultsHelpFormatter, argparse.RawTextHelpFormatter): | |
| pass | |
| def main() -> int: | |
| parser = argparse.ArgumentParser(formatter_class=ArgumentDefaultsRawTextHelpFormatter) | |
| subparsers = parser.add_subparsers(dest='subcommand_name', required=True) | |
| subparser = subparsers.add_parser('spotify_playlist', formatter_class=ArgumentDefaultsRawTextHelpFormatter, help='Spotifyのプレイリストを曲リストから作成する') | |
| subparser.set_defaults(func=cmd_spotify_playlist) | |
| subparser.add_argument('--playlist_name', required=True, help='作成するプレイリスト名') | |
| subparser.add_argument('--tracks_file', required=True, type=argparse.FileType('r'), help='曲リストTSVのパス; - の場合はstdinから読む; ヘッダ名: name/song/song_name/title/曲/曲名, アーティスト列 artist/artist_name/band/band_name/singer/singer_name/アーティスト/アーティスト名') | |
| group = subparser.add_mutually_exclusive_group(required=True) | |
| group.add_argument('--access_token', help='Spotifyのアクセストークン (https://developer.spotify.com/ `const token = \'THIS_TOKEN\'`)') | |
| group.add_argument('--access_token_file', type=argparse.FileType('r'), help='アクセストークンを記したファイルパス') | |
| subparser.add_argument('--public', action='store_true', help='プレイリストを公開にする; 未指定なら非公開') | |
| args = parser.parse_args() | |
| return args.func(args) | |
| def cmd_spotify_playlist(args: argparse.Namespace) -> int: | |
| tracks = load_tracks(args.tracks_file) | |
| logger.info('loaded %d tracks', len(tracks)) | |
| access_token = load_access_token(args) | |
| user_id = get_current_user_id(access_token) | |
| logger.info('current user id: %s', user_id) | |
| playlist_id = create_playlist(access_token, user_id, args.playlist_name, args.public) | |
| logger.info('playlist created %s', playlist_id) | |
| track_uris = resolve_track_uris(access_token, tracks) | |
| logger.info('resolved %d track uris', len(track_uris)) | |
| add_tracks_to_playlist(access_token, playlist_id, track_uris) | |
| logger.info('playlist created: https://open.spotify.com/playlist/%s', playlist_id) | |
| return 0 | |
| def load_access_token(args: argparse.Namespace) -> str: | |
| if args.access_token_file: | |
| token = args.access_token_file.read().strip() | |
| args.access_token_file.close() | |
| return token | |
| assert args.access_token, 'access_token is required' | |
| return args.access_token | |
| def load_tracks(file_obj: io.TextIOBase) -> list[tuple[str, str]]: | |
| """ | |
| Parse TSV: | |
| # without header (song_name, artist_name order) | |
| Purple Haze\tJimi Hendrix | |
| Spain\tChick Corea | |
| # with header | |
| song_name\tartist_name | |
| Purple Haze\tJimi Hendrix | |
| Spain\tChick Corea | |
| # with header | |
| band\ttitle | |
| Jimi Hendrix\tPurple Haze | |
| Chick Corea\tSpain | |
| """ | |
| def normalize_header_name(name: str) -> str | None: | |
| token = name.strip().lower() | |
| title_aliases = {'name', 'song', 'song_name', 'title', '曲', '曲名'} | |
| artist_aliases = {'artist', 'artist_name', 'band', 'band_name', 'singer', 'singer_name', 'アーティスト', 'アーティスト名'} | |
| if token in title_aliases: | |
| return 'title' | |
| if token in artist_aliases: | |
| return 'artist' | |
| return None | |
| data = file_obj.read() | |
| file_obj.close() | |
| lines = [line.strip() for line in data.splitlines() if line.strip()] | |
| assert lines, 'no track entries' | |
| title_index = 0 | |
| artist_index = 1 | |
| start_index = 0 | |
| header = lines[0].split('\t') | |
| if len(header) == 2: | |
| normalized = [normalize_header_name(col) for col in header] | |
| if normalized == ['title', 'artist']: | |
| logger.debug('header detected title/artist %s', header) | |
| start_index = 1 | |
| elif normalized == ['artist', 'title']: | |
| logger.debug('header detected artist/title %s; swapping columns', header) | |
| start_index = 1 | |
| title_index, artist_index = 1, 0 | |
| if start_index == 0: | |
| logger.warning('TSV header missing; treating as song_name\\tartist_name') | |
| rows = [] | |
| for line in lines[start_index:]: | |
| parts = line.split('\t') | |
| assert len(parts) == 2, f'invalid row format: {line}' | |
| title = parts[title_index] | |
| artist = parts[artist_index] | |
| song_name = title | |
| artist_name = artist | |
| logger.debug('parsed row song_name=%s artist_name=%s', song_name, artist_name) | |
| rows.append((title, artist)) | |
| assert rows, 'no tracks after parsing' | |
| return rows | |
| def get_current_user_id(access_token: str) -> str: | |
| payload = spotify_request('GET', 'https://api.spotify.com/v1/me', access_token) | |
| assert 'id' in payload, f'user id missing {payload}' | |
| return payload['id'] | |
| def create_playlist(access_token: str, user_id: str, playlist_name: str, is_public: bool) -> str: | |
| payload = spotify_request('POST', f'https://api.spotify.com/v1/users/{user_id}/playlists', access_token, json={'name': playlist_name, 'public': is_public, 'description': 'Generated via spotify_genlist.py'}, expected_status=201) | |
| assert 'id' in payload, f'playlist id missing {payload}' | |
| return payload['id'] | |
| def resolve_track_uris(access_token: str, tracks: list[tuple[str, str]]) -> list[str | None]: | |
| uris = [] | |
| for title, artist in tracks: | |
| uri = search_track_uri(access_token, title, artist) | |
| uris.append(uri) | |
| return uris | |
| def search_track_uri(access_token: str, title: str, artist: str) -> str | None: | |
| queries = [f'track:{title} artist:{artist}', f'{title} {artist}', title] | |
| for query in queries: | |
| payload = spotify_request('GET', 'https://api.spotify.com/v1/search', access_token, params={'q': query, 'type': 'track', 'limit': 5}) | |
| items = payload.get('tracks', {}).get('items', []) | |
| if items: | |
| uri = items[0]['uri'] | |
| logger.debug('hit track %s / %s via query %s -> %s', title, artist, query, uri) | |
| return uri | |
| logger.debug('no hit for query %s', query) | |
| logger.info('no track match %s %s', title, artist) | |
| return None | |
| def add_tracks_to_playlist(access_token: str, playlist_id: str, uris: list[str | None]) -> None: | |
| filtered = [uri for uri in uris if uri is not None] | |
| logger.info('adding %d tracks (skipped %d unresolved)', len(filtered), len(uris) - len(filtered)) | |
| for i in range(0, len(filtered), 100): | |
| chunk = filtered[i : i + 100] | |
| logger.info('adding tracks %d-%d', i + 1, i + len(chunk)) | |
| spotify_request('POST', f'https://api.spotify.com/v1/playlists/{playlist_id}/tracks', access_token, json={'uris': chunk}, expected_status=201) | |
| def spotify_request(method: str, url: str, access_token: str, expected_status: int = 200, **kwargs: t.Any) -> dict[str, t.Any]: | |
| headers = kwargs.pop('headers', {}) | |
| headers['Authorization'] = f'Bearer {access_token}' | |
| resp = requests.request(method, url, headers=headers, timeout=30, **kwargs) | |
| assert resp.status_code == expected_status, f'spotify request failed {resp.status_code} {resp.text}' | |
| return resp.json() | |
| if __name__ == '__main__': | |
| sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment