Last active
February 7, 2026 19:55
-
-
Save wlinds/e5a3e830eeea7f5472f81bf075382ed6 to your computer and use it in GitHub Desktop.
SoundCloud Watcher for OpenClaw - Track your SoundCloud account
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| SoundCloud Watcher - cron script for tracking your SoundCloud account | |
| and getting notified about new releases from artists you care about. | |
| Features: | |
| - Follower change detection (new/lost followers by name) | |
| - Track engagement tracking (who liked, repost counts) | |
| - New release detection from a curated artist list | |
| - Dormant artist throttling (skip inactive artists to save API calls) | |
| - Rate limit backoff (exponential backoff on 429s) | |
| - Single entry runs everything | |
| Setup: | |
| 1. Create config file (see PATHS below) with your SoundCloud API credentials | |
| 2. Run: python3 soundcloud_cron.py add <artist_username> - for each artist | |
| 3. Run: python3 soundcloud_cron.py check - to verify everything works | |
| 4. Add to cron: 0 */6 * * * python3 /path/to/soundcloud_cron.py cron | |
| Config file format (one KEY=VALUE per line): | |
| SOUNDCLOUD_CLIENT_ID=your_client_id | |
| SOUNDCLOUD_CLIENT_SECRET=your_client_secret | |
| MY_USERNAME=your_soundcloud_username | |
| """ | |
| import json | |
| import sys | |
| import time | |
| import urllib.request | |
| import urllib.error | |
| import urllib.parse | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Dict, List, Optional | |
| # ============================================================================= | |
| # CONFIGURATION — edit these to match your setup | |
| # ============================================================================= | |
| # Your SoundCloud username (the URL slug, e.g. soundcloud.com/THIS_PART) | |
| MY_USERNAME = "your_username" | |
| # Where to store config and data files | |
| CONFIG_DIR = Path.home() / ".config" / "soundcloud-watcher" | |
| CONFIG_FILE = CONFIG_DIR / "credentials.env" | |
| ACCOUNT_DATA = CONFIG_DIR / "account.json" # Your account tracking data | |
| ARTISTS_DATA = CONFIG_DIR / "artists.json" # Tracked artist data | |
| BACKOFF_FILE = CONFIG_DIR / "backoff.json" # Rate limit state (auto-managed) | |
| # --- Notification settings --- | |
| # Set to True to send notifications via a gateway (see send_notification()) | |
| # Set to False to just print to stdout (good for testing or piping to other tools) | |
| NOTIFICATIONS_ENABLED = False | |
| # Gateway config file (only needed if NOTIFICATIONS_ENABLED = True) | |
| GATEWAY_CONFIG = CONFIG_DIR / "gateway.json" | |
| GATEWAY_SESSION_KEY = "default" | |
| GATEWAY_PORT_DEFAULT = 8080 # Fallback if not specified in gateway config | |
| GATEWAY_ENDPOINT = "/tools/invoke" # Gateway API endpoint path | |
| GATEWAY_TOOL_NAME = "sessions_send" # Tool name for the gateway payload | |
| # ============================================================================= | |
| # TUNING — adjust these to balance API usage vs responsiveness | |
| # ============================================================================= | |
| # How many of YOUR recent tracks to monitor for likes/reposts | |
| MY_TRACKS_LIMIT = 10 | |
| # How many recent tracks to fetch per artist when checking for new releases | |
| ARTIST_TRACKS_LIMIT = 5 | |
| # How many tracks to fetch when first adding an artist (seeds known tracks) | |
| ARTIST_ADD_LIMIT = 50 | |
| # Artists who haven't uploaded in this many days are considered "dormant" | |
| DORMANT_DAYS = 90 | |
| # Dormant artists are only checked every N days instead of every run | |
| DORMANT_CHECK_INTERVAL_DAYS = 7 | |
| # Max stored track IDs per artist (older ones pruned to save disk/memory) | |
| MAX_KNOWN_TRACKS = 50 | |
| # Max likers to fetch per track | |
| MAX_LIKERS_PER_TRACK = 50 | |
| # Followers pagination page size (SoundCloud max is 200) | |
| FOLLOWERS_PAGE_SIZE = 200 | |
| # --- Rate limit backoff --- | |
| BACKOFF_BASE_SECONDS = 300 # 5 min initial backoff after a 429 | |
| BACKOFF_MAX_SECONDS = 7200 # 2 hour ceiling | |
| # --- Timeouts --- | |
| API_TIMEOUT = 15 # Seconds per API request | |
| GATEWAY_TIMEOUT = 60 # Seconds for gateway notification delivery | |
| # ============================================================================= | |
| # INTERNALS — you probably don't need to change anything below | |
| # ============================================================================= | |
| API_BASE = "https://api.soundcloud.com" | |
| class Config: | |
| """Loads credentials from the config file and manages token persistence.""" | |
| def __init__(self): | |
| self.client_id = "" | |
| self.client_secret = "" | |
| self.access_token = "" | |
| self.my_username = MY_USERNAME | |
| @classmethod | |
| def load(cls) -> "Config": | |
| cfg = cls() | |
| if CONFIG_FILE.exists(): | |
| for line in CONFIG_FILE.read_text().splitlines(): | |
| if "=" in line and not line.startswith("#"): | |
| k, v = line.split("=", 1) | |
| k, v = k.strip(), v.strip() | |
| if k == "SOUNDCLOUD_CLIENT_ID": | |
| cfg.client_id = v | |
| elif k == "SOUNDCLOUD_CLIENT_SECRET": | |
| cfg.client_secret = v | |
| elif k == "SOUNDCLOUD_ACCESS_TOKEN": | |
| cfg.access_token = v | |
| elif k == "MY_USERNAME": | |
| cfg.my_username = v | |
| return cfg | |
| def save_token(self, token: str): | |
| """Write new access token back to config file.""" | |
| self.access_token = token | |
| CONFIG_FILE.parent.mkdir(parents=True, exist_ok=True) | |
| lines = CONFIG_FILE.read_text().splitlines() if CONFIG_FILE.exists() else [] | |
| new_lines = [] | |
| found = False | |
| for line in lines: | |
| if line.startswith("SOUNDCLOUD_ACCESS_TOKEN="): | |
| new_lines.append(f"SOUNDCLOUD_ACCESS_TOKEN={token}") | |
| found = True | |
| else: | |
| new_lines.append(line) | |
| if not found: | |
| new_lines.append(f"SOUNDCLOUD_ACCESS_TOKEN={token}") | |
| CONFIG_FILE.write_text("\n".join(new_lines) + "\n") | |
| def _utcnow() -> datetime: | |
| """Timezone-aware UTC now.""" | |
| return datetime.now(timezone.utc) | |
| def _parse_timestamp(ts: str) -> Optional[datetime]: | |
| """Parse SoundCloud timestamps into timezone-aware UTC datetimes. | |
| SoundCloud returns dates like: "2026/01/22 16:22:27 +0000" | |
| We normalize to ISO 8601 before parsing. | |
| """ | |
| if not ts: | |
| return None | |
| try: | |
| # "2026/01/22 16:22:27 +0000" -> "2026-01-22T16:22:27+00:00" | |
| cleaned = ts.replace("/", "-").replace(" ", "T", 1) | |
| cleaned = cleaned.replace(" +0000", "+00:00").replace("Z", "+00:00") | |
| dt = datetime.fromisoformat(cleaned) | |
| if dt.tzinfo is None: | |
| dt = dt.replace(tzinfo=timezone.utc) | |
| return dt | |
| except Exception as e: | |
| print(f"Warning: Could not parse timestamp '{ts}': {e}") | |
| return None | |
| class SoundCloudAPI: | |
| """Minimal SoundCloud API client with automatic token refresh and rate limit backoff.""" | |
| def __init__(self, config: Config): | |
| self.config = config | |
| self.calls = 0 # Running count of API calls this session | |
| def _check_backoff(self) -> Optional[int]: | |
| """Returns seconds remaining in backoff period, or None if clear.""" | |
| if not BACKOFF_FILE.exists(): | |
| return None | |
| try: | |
| data = json.loads(BACKOFF_FILE.read_text()) | |
| elapsed = time.time() - data.get("last_fail", 0) | |
| backoff = min(BACKOFF_BASE_SECONDS * (2 ** data.get("fail_count", 0)), BACKOFF_MAX_SECONDS) | |
| if elapsed < backoff: | |
| return int(backoff - elapsed) | |
| except Exception: | |
| pass | |
| return None | |
| def _set_backoff(self): | |
| """Record a rate limit failure for exponential backoff.""" | |
| try: | |
| data = json.loads(BACKOFF_FILE.read_text()) if BACKOFF_FILE.exists() else {} | |
| data["fail_count"] = data.get("fail_count", 0) + 1 | |
| data["last_fail"] = time.time() | |
| BACKOFF_FILE.parent.mkdir(parents=True, exist_ok=True) | |
| BACKOFF_FILE.write_text(json.dumps(data)) | |
| except Exception: | |
| pass | |
| def _clear_backoff(self): | |
| """Clear backoff state after a successful token refresh.""" | |
| if BACKOFF_FILE.exists(): | |
| BACKOFF_FILE.unlink() | |
| def refresh_token(self) -> bool: | |
| """Refresh OAuth token using client credentials grant.""" | |
| if not self.config.client_id or not self.config.client_secret: | |
| return False | |
| remaining = self._check_backoff() | |
| if remaining: | |
| print(f"Token refresh in backoff ({remaining}s remaining)") | |
| return False | |
| try: | |
| data = urllib.parse.urlencode({ | |
| "grant_type": "client_credentials", | |
| "client_id": self.config.client_id, | |
| "client_secret": self.config.client_secret | |
| }).encode() | |
| req = urllib.request.Request( | |
| f"{API_BASE}/oauth2/token", | |
| data=data, | |
| headers={"Content-Type": "application/x-www-form-urlencoded"} | |
| ) | |
| with urllib.request.urlopen(req, timeout=API_TIMEOUT) as resp: | |
| result = json.loads(resp.read().decode()) | |
| self.config.save_token(result["access_token"]) | |
| self._clear_backoff() | |
| print("Token refreshed") | |
| return True | |
| except urllib.error.HTTPError as e: | |
| if e.code == 429: | |
| self._set_backoff() | |
| print("Token refresh rate limited (429)") | |
| else: | |
| print(f"Token refresh failed: {e}") | |
| return False | |
| except Exception as e: | |
| print(f"Token refresh failed: {e}") | |
| return False | |
| def get(self, url: str, params: Dict = None, retry: bool = True) -> Optional[Dict]: | |
| """Make authenticated GET request. Accepts relative (/users/...) or full URLs.""" | |
| self.calls += 1 | |
| if url.startswith("/"): | |
| full_url = f"{API_BASE}{url}" | |
| elif url.startswith("http"): | |
| full_url = url | |
| else: | |
| full_url = f"{API_BASE}/{url}" | |
| if params: | |
| separator = "&" if "?" in full_url else "?" | |
| query = urllib.parse.urlencode(params) | |
| full_url = f"{full_url}{separator}{query}" | |
| headers = {} | |
| if self.config.access_token: | |
| headers["Authorization"] = f"OAuth {self.config.access_token}" | |
| try: | |
| req = urllib.request.Request(full_url, headers=headers) | |
| with urllib.request.urlopen(req, timeout=API_TIMEOUT) as resp: | |
| return json.loads(resp.read().decode()) | |
| except urllib.error.HTTPError as e: | |
| if e.code == 401 and retry: | |
| if self.refresh_token(): | |
| return self.get(url, params, retry=False) | |
| print(f"API error {e.code}: {full_url}") | |
| return None | |
| except Exception as e: | |
| print(f"API error: {e}") | |
| return None | |
| def resolve(self, username: str) -> Optional[Dict]: | |
| """Resolve a username to a full user profile object.""" | |
| return self.get("/resolve", {"url": f"https://soundcloud.com/{username}"}) | |
| def get_user(self, user_id: int) -> Optional[Dict]: | |
| """Get user profile by ID (includes followers_count).""" | |
| return self.get(f"/users/{user_id}") | |
| def get_tracks(self, user_id: int, limit: int = 20) -> List[Dict]: | |
| """Get a user's tracks. Response includes play/like/repost counts per track.""" | |
| data = self.get(f"/users/{user_id}/tracks", {"limit": limit, "linked_partitioning": 1}) | |
| if not data: | |
| return [] | |
| return data.get("collection", data) if isinstance(data, dict) else data | |
| def get_track_likers(self, track_id: int, limit: int = MAX_LIKERS_PER_TRACK) -> Dict[str, Dict]: | |
| """Get users who liked a specific track. Returns {user_id: {username, display_name}}.""" | |
| data = self.get(f"/tracks/{track_id}/favoriters", {"limit": limit, "linked_partitioning": 1}) | |
| if not data: | |
| return {} | |
| likers = {} | |
| collection = data.get("collection", data) if isinstance(data, dict) else data | |
| if isinstance(collection, list): | |
| for u in collection: | |
| if isinstance(u, dict) and "id" in u: | |
| likers[str(u["id"])] = { | |
| "username": u.get("permalink", u.get("username", "unknown")), | |
| "display_name": u.get("full_name", u.get("username", "unknown")) | |
| } | |
| return likers | |
| def get_followers_paginated(self, user_id: int) -> Dict[str, Dict]: | |
| """Paginate through all followers. Expensive — only call when follower count changes.""" | |
| followers = {} | |
| next_url = f"/users/{user_id}/followers" | |
| params = {"limit": FOLLOWERS_PAGE_SIZE, "linked_partitioning": 1} | |
| while next_url: | |
| data = self.get(next_url, params) | |
| if not data: | |
| break | |
| for f in data.get("collection", []): | |
| if isinstance(f, dict) and "id" in f: | |
| followers[str(f["id"])] = { | |
| "username": f.get("permalink", f.get("username", "unknown")), | |
| "display_name": f.get("full_name", f.get("username", "unknown")) | |
| } | |
| next_href = data.get("next_href") | |
| if next_href and next_href != next_url: | |
| next_url = next_href # Full URL with cursor params included | |
| params = None | |
| else: | |
| break | |
| return followers | |
| class AccountWatcher: | |
| """Monitors your account: follower changes and per-track engagement.""" | |
| def __init__(self, api: SoundCloudAPI, config: Config): | |
| self.api = api | |
| self.config = config | |
| self.data = self._load() | |
| def _load(self) -> Dict: | |
| default = { | |
| "my_account": None, | |
| "my_followers": {}, | |
| "follower_count": 0, | |
| "track_stats": [], | |
| "last_check": None | |
| } | |
| if ACCOUNT_DATA.exists(): | |
| try: | |
| data = json.load(ACCOUNT_DATA.open()) | |
| for k in default: | |
| if k not in data: | |
| data[k] = default[k] | |
| return data | |
| except Exception: | |
| pass | |
| return default | |
| def _save(self): | |
| ACCOUNT_DATA.parent.mkdir(parents=True, exist_ok=True) | |
| self.data["last_check"] = _utcnow().isoformat() | |
| json.dump(self.data, ACCOUNT_DATA.open("w"), indent=2) | |
| def check(self) -> List[str]: | |
| """Run account check. Returns list of human-readable notification strings. | |
| API calls on a quiet day: 2 (profile + tracks) | |
| API calls on follower change: 2 + ceil(followers/200) + tracks_with_new_likes | |
| """ | |
| notifications = [] | |
| # Resolve account on first run | |
| if not self.data["my_account"]: | |
| user = self.api.resolve(self.config.my_username) | |
| if not user: | |
| return ["Failed to resolve SoundCloud user"] | |
| self.data["my_account"] = { | |
| "user_id": user["id"], | |
| "username": user.get("permalink", self.config.my_username) | |
| } | |
| user_id = self.data["my_account"]["user_id"] | |
| # Fetch profile to check follower count (1 API call) | |
| profile = self.api.get_user(user_id) | |
| if not profile: | |
| print("Failed to fetch profile, skipping account check") | |
| return notifications # Don't save — preserve previous state | |
| current_count = profile.get("followers_count", 0) | |
| stored_count = self.data.get("follower_count", 0) | |
| # Only paginate full follower list if the count actually changed | |
| if current_count != stored_count or not self.data["my_followers"]: | |
| print(f"Follower count changed: {stored_count} -> {current_count}, fetching list...") | |
| current_followers = self.api.get_followers_paginated(user_id) | |
| if not current_followers and stored_count > 0: | |
| # Empty response with known followers = API hiccup, don't wipe data | |
| print("API returned empty followers, skipping comparison") | |
| else: | |
| stored_followers = self.data.get("my_followers", {}) | |
| # Skip diff on first run (everything would show as "new") | |
| if stored_followers: | |
| new = [f["display_name"] for uid, f in current_followers.items() if uid not in stored_followers] | |
| lost = [f["display_name"] for uid, f in stored_followers.items() if uid not in current_followers] | |
| if new: | |
| names = ", ".join(new[:3]) + (f" +{len(new)-3} more" if len(new) > 3 else "") | |
| notifications.append(f"New follower{'s' if len(new) > 1 else ''}: **{names}**") | |
| if lost: | |
| names = ", ".join(lost[:3]) | |
| notifications.append(f"Lost follower{'s' if len(lost) > 1 else ''}: {names}") | |
| self.data["my_followers"] = current_followers | |
| self.data["follower_count"] = current_count | |
| else: | |
| print(f"Follower count unchanged ({current_count}), skipping pagination") | |
| # Fetch my tracks — play/like/repost counts included in response (1 API call) | |
| tracks = self.api.get_tracks(user_id, limit=MY_TRACKS_LIMIT) | |
| if tracks: | |
| prev_stats = {s["track_id"]: s for s in self.data.get("track_stats", [])} | |
| new_stats = [] | |
| for t in tracks: | |
| track_id = t["id"] | |
| title = t.get("title", "Unknown") | |
| current_likes = t.get("likes_count", 0) or t.get("favoritings_count", 0) or 0 | |
| current_reposts = t.get("reposts_count", 0) or 0 | |
| stats = { | |
| "track_id": track_id, | |
| "title": title, | |
| "plays": t.get("playback_count", 0) or 0, | |
| "likes": current_likes, | |
| "reposts": current_reposts, | |
| "likers": {} | |
| } | |
| prev = prev_stats.get(track_id) | |
| if prev: | |
| prev_likes = prev.get("likes", 0) | |
| prev_likers = prev.get("likers", {}) | |
| # Only fetch liker list if like count changed (or never seeded) | |
| needs_liker_fetch = current_likes != prev_likes or (current_likes > 0 and not prev_likers) | |
| if needs_liker_fetch: | |
| current_likers = self.api.get_track_likers(track_id) | |
| stats["likers"] = current_likers | |
| new_liker_names = [ | |
| u["display_name"] or u["username"] | |
| for uid, u in current_likers.items() | |
| if uid not in prev_likers | |
| ] | |
| unliker_names = [ | |
| u["display_name"] or u["username"] | |
| for uid, u in prev_likers.items() | |
| if uid not in current_likers | |
| ] | |
| if new_liker_names: | |
| names = ", ".join(new_liker_names[:3]) | |
| if len(new_liker_names) > 3: | |
| names += f" +{len(new_liker_names)-3} more" | |
| notifications.append(f"**{names}** liked '{title}'") | |
| if unliker_names: | |
| names = ", ".join(unliker_names[:3]) | |
| notifications.append(f"{names} unliked '{title}'") | |
| else: | |
| # No change — carry forward previous liker data | |
| stats["likers"] = prev_likers | |
| new_reposts = current_reposts - prev.get("reposts", 0) | |
| if new_reposts > 0: | |
| notifications.append(f"'{title}' got {new_reposts} repost{'s' if new_reposts > 1 else ''}!") | |
| else: | |
| # First time seeing this track — seed likers without notifying | |
| stats["likers"] = self.api.get_track_likers(track_id) | |
| new_stats.append(stats) | |
| self.data["track_stats"] = new_stats | |
| else: | |
| # API failure — don't overwrite good data with empty data | |
| print("Failed to fetch tracks, keeping previous stats") | |
| self._save() | |
| return notifications | |
| class ArtistTracker: | |
| """Tracks a curated list of artists and detects new releases.""" | |
| def __init__(self, api: SoundCloudAPI): | |
| self.api = api | |
| self.data = self._load() | |
| def _load(self) -> Dict: | |
| if ARTISTS_DATA.exists(): | |
| try: | |
| return json.load(ARTISTS_DATA.open()) | |
| except Exception: | |
| pass | |
| return {"artists": {}, "updated_at": None} | |
| def _save(self): | |
| ARTISTS_DATA.parent.mkdir(parents=True, exist_ok=True) | |
| self.data["updated_at"] = _utcnow().isoformat() | |
| json.dump(self.data, ARTISTS_DATA.open("w"), indent=2) | |
| def _is_dormant(self, artist: Dict) -> bool: | |
| """An artist is dormant if their last upload was more than DORMANT_DAYS ago.""" | |
| dt = _parse_timestamp(artist.get("last_upload")) | |
| if not dt: | |
| return False # Unknown upload date = assume active (safer to check) | |
| return (_utcnow() - dt).days > DORMANT_DAYS | |
| def _should_skip(self, artist: Dict) -> bool: | |
| """Active artists: check every run. Dormant: every DORMANT_CHECK_INTERVAL_DAYS.""" | |
| if not self._is_dormant(artist): | |
| return False | |
| dt = _parse_timestamp(artist.get("last_checked")) | |
| if not dt: | |
| return False # Never checked = don't skip | |
| return (_utcnow() - dt).days < DORMANT_CHECK_INTERVAL_DAYS | |
| def check_releases(self) -> List[Dict]: | |
| """Check all tracked artists for new releases. | |
| API calls: 1 per active artist, 1 per dormant artist due for check, 0 for skipped. | |
| """ | |
| notifications = [] | |
| checked = 0 | |
| skipped = 0 | |
| for username, artist in self.data["artists"].items(): | |
| if self._should_skip(artist): | |
| skipped += 1 | |
| continue | |
| checked += 1 | |
| user_id = artist.get("user_id") | |
| if not user_id: | |
| continue | |
| tracks = self.api.get_tracks(user_id, limit=ARTIST_TRACKS_LIMIT) | |
| known_ids = set(artist.get("known_track_ids", [])) | |
| self.data["artists"][username]["last_checked"] = _utcnow().isoformat() | |
| for track in tracks: | |
| if track["id"] not in known_ids: | |
| duration_sec = (track.get("duration", 0) or 0) // 1000 | |
| notifications.append({ | |
| "artist": artist.get("display_name", username), | |
| "title": track.get("title", "Unknown"), | |
| "url": track.get("permalink_url", ""), | |
| "duration": f"{duration_sec // 60}:{duration_sec % 60:02d}", | |
| "genre": track.get("genre") | |
| }) | |
| if "known_track_ids" not in self.data["artists"][username]: | |
| self.data["artists"][username]["known_track_ids"] = [] | |
| self.data["artists"][username]["known_track_ids"].append(track["id"]) | |
| if track.get("created_at"): | |
| self.data["artists"][username]["last_upload"] = track["created_at"] | |
| # Prune old track IDs to prevent unbounded growth | |
| ids = self.data["artists"][username].get("known_track_ids", []) | |
| if len(ids) > MAX_KNOWN_TRACKS: | |
| self.data["artists"][username]["known_track_ids"] = ids[-MAX_KNOWN_TRACKS:] | |
| dormant_count = sum(1 for a in self.data["artists"].values() if self._is_dormant(a)) | |
| print(f"Checked {checked} artists, skipped {skipped} dormant, {dormant_count} total dormant") | |
| self._save() | |
| return notifications | |
| def add(self, username: str) -> str: | |
| """Add an artist to tracking. Seeds known tracks to avoid false notifications.""" | |
| user = self.api.resolve(username) | |
| if not user: | |
| return f"Could not find user: {username}" | |
| tracks = self.api.get_tracks(user["id"], limit=ARTIST_ADD_LIMIT) | |
| total_plays = sum(t.get("playback_count", 0) or 0 for t in tracks) | |
| genres = {} | |
| for t in tracks: | |
| g = (t.get("genre") or "").lower().strip() | |
| if g: | |
| genres[g] = genres.get(g, 0) + 1 | |
| top_genres = [g for g, _ in sorted(genres.items(), key=lambda x: -x[1])[:3]] | |
| last_upload = None | |
| if tracks: | |
| dates = [t.get("created_at") for t in tracks if t.get("created_at")] | |
| if dates: | |
| last_upload = max(dates) | |
| followers = user.get("followers_count", 0) or 0 | |
| self.data["artists"][username.lower()] = { | |
| "username": user.get("permalink", username), | |
| "display_name": user.get("full_name") or user.get("username", username), | |
| "user_id": user["id"], | |
| "permalink_url": user.get("permalink_url", f"https://soundcloud.com/{username}"), | |
| "followers": followers, | |
| "track_count": user.get("track_count", 0), | |
| "total_plays": total_plays, | |
| "genres": top_genres, | |
| "last_upload": last_upload, | |
| "known_track_ids": [t["id"] for t in tracks][-MAX_KNOWN_TRACKS:], | |
| "added_at": _utcnow().isoformat(), | |
| "last_updated": _utcnow().isoformat() | |
| } | |
| self._save() | |
| return f"Added {user.get('full_name', username)} ({followers:,} followers, {len(tracks)} tracks)" | |
| def remove(self, username: str) -> str: | |
| """Remove an artist from tracking.""" | |
| key = username.lower() | |
| for k, artist in self.data["artists"].items(): | |
| if k == key or artist.get("username", "").lower() == key: | |
| name = artist.get("display_name", k) | |
| del self.data["artists"][k] | |
| self._save() | |
| return f"Removed {name}" | |
| return f"Artist '{username}' not found" | |
| def list(self): | |
| """Print all tracked artists sorted by follower count.""" | |
| artists = sorted(self.data["artists"].values(), key=lambda x: x.get("followers", 0), reverse=True) | |
| print(f"\n=== Tracked Artists ({len(artists)}) ===\n") | |
| for a in artists: | |
| dormant = self._is_dormant(a) | |
| status = " [DORMANT]" if dormant else "" | |
| print(f"{a['display_name']} (@{a['username']}){status}") | |
| print(f" {a.get('followers', 0):,} followers | {a.get('track_count', 0)} tracks") | |
| if a.get("last_upload"): | |
| print(f" Last upload: {a['last_upload'][:10]}") | |
| print() | |
| # ============================================================================= | |
| # NOTIFICATION DELIVERY | |
| # ============================================================================= | |
| def send_notification(message: str) -> bool: | |
| """Send notification via gateway. | |
| To use a different notification system (email, Discord webhook, Pushover, etc.), | |
| replace the body of this function with your preferred delivery mechanism. | |
| """ | |
| if not NOTIFICATIONS_ENABLED: | |
| return False | |
| try: | |
| cfg = json.load(GATEWAY_CONFIG.open()) | |
| token = cfg["gateway"]["auth"]["token"] | |
| port = cfg["gateway"].get("port", GATEWAY_PORT_DEFAULT) | |
| except Exception as e: | |
| print(f"Gateway config error: {e}") | |
| return False | |
| payload = json.dumps({ | |
| "tool": GATEWAY_TOOL_NAME, | |
| "args": {"sessionKey": GATEWAY_SESSION_KEY, "message": message} | |
| }).encode() | |
| try: | |
| req = urllib.request.Request( | |
| f"http://127.0.0.1:{port}{GATEWAY_ENDPOINT}", | |
| data=payload, | |
| headers={"Content-Type": "application/json", "Authorization": f"Bearer {token}"}, | |
| method="POST" | |
| ) | |
| with urllib.request.urlopen(req, timeout=GATEWAY_TIMEOUT) as resp: | |
| return resp.status == 200 | |
| except Exception as e: | |
| print(f"Notification failed: {e}") | |
| return False | |
| # ============================================================================= | |
| # COMMANDS | |
| # ============================================================================= | |
| def run_full_check(api: SoundCloudAPI, config: Config) -> tuple[List[str], List[Dict]]: | |
| """Run both account and artist checks. Returns (account_notifs, releases).""" | |
| account = AccountWatcher(api, config) | |
| tracker = ArtistTracker(api) | |
| return account.check(), tracker.check_releases() | |
| def main(): | |
| config = Config.load() | |
| if not config.client_id: | |
| print(f"Error: No SOUNDCLOUD_CLIENT_ID found in {CONFIG_FILE}") | |
| print(f"\nCreate the config file with:") | |
| print(f" mkdir -p {CONFIG_FILE.parent}") | |
| print(f" cat > {CONFIG_FILE} << 'EOF'") | |
| print(f" SOUNDCLOUD_CLIENT_ID=your_id") | |
| print(f" SOUNDCLOUD_CLIENT_SECRET=your_secret") | |
| print(f" MY_USERNAME={MY_USERNAME}") | |
| print(f" EOF") | |
| return | |
| api = SoundCloudAPI(config) | |
| if not config.access_token: | |
| if not api.refresh_token(): | |
| print("Failed to get access token") | |
| return | |
| if len(sys.argv) < 2: | |
| print("SoundCloud Watcher") | |
| print() | |
| print("Commands:") | |
| print(" status Show current tracking status") | |
| print(" check Run full check with verbose output") | |
| print(" cron Silent mode — only sends notifications on updates") | |
| print(" add <user> Add artist(s) to track") | |
| print(" remove <user> Remove artist from tracking") | |
| print(" list List all tracked artists") | |
| return | |
| cmd = sys.argv[1] | |
| if cmd == "status": | |
| account = AccountWatcher(api, config) | |
| tracker = ArtistTracker(api) | |
| print("=== SoundCloud Watcher Status ===\n") | |
| print(f"Config: {CONFIG_FILE}") | |
| print(f"Token: ...{config.access_token[-8:]}" if config.access_token else "Token: None") | |
| if account.data.get("my_account"): | |
| print(f"Account: @{account.data['my_account']['username']}") | |
| print(f"Followers: {account.data.get('follower_count', len(account.data.get('my_followers', {})))}") | |
| total = len(tracker.data.get("artists", {})) | |
| dormant = sum(1 for a in tracker.data.get("artists", {}).values() if tracker._is_dormant(a)) | |
| print(f"Tracked artists: {total} ({total - dormant} active, {dormant} dormant)") | |
| print(f"Notifications: {'gateway' if NOTIFICATIONS_ENABLED else 'stdout only'}") | |
| print(f"Last check: {account.data.get('last_check', 'Never')}") | |
| elif cmd == "check": | |
| print(f"[{_utcnow().isoformat()}] Running full check...\n") | |
| account_notifs, releases = run_full_check(api, config) | |
| print("--- Account ---") | |
| for n in account_notifs: | |
| print(f" {n}") | |
| if not account_notifs: | |
| print(" No updates") | |
| print("\n--- Artist Releases ---") | |
| for r in releases: | |
| print(f" {r['artist']}: {r['title']}") | |
| if not releases: | |
| print(" No new releases") | |
| print(f"\nAPI calls: {api.calls}") | |
| elif cmd == "cron": | |
| account_notifs, releases = run_full_check(api, config) | |
| all_notifications = [] | |
| if account_notifs: | |
| all_notifications.append("**Account:**") | |
| all_notifications.extend(f"- {n}" for n in account_notifs) | |
| all_notifications.append("") | |
| if releases: | |
| all_notifications.append("**New Releases:**") | |
| for r in releases: | |
| all_notifications.extend([f"- **{r['artist']}** dropped: {r['title']}", f" {r['url']}"]) | |
| all_notifications.append("") | |
| if all_notifications: | |
| message = "SoundCloud updates:\n\n" + "\n".join(all_notifications) | |
| if NOTIFICATIONS_ENABLED: | |
| send_notification(message) | |
| else: | |
| print(message) | |
| print(f"API calls: {api.calls}") | |
| elif cmd == "add" and len(sys.argv) > 2: | |
| tracker = ArtistTracker(api) | |
| for username in sys.argv[2:]: | |
| print(tracker.add(username)) | |
| elif cmd == "remove" and len(sys.argv) > 2: | |
| tracker = ArtistTracker(api) | |
| print(tracker.remove(sys.argv[2])) | |
| elif cmd == "list": | |
| tracker = ArtistTracker(api) | |
| tracker.list() | |
| else: | |
| print(f"Unknown command: {cmd}") | |
| print("Run without arguments for usage info.") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment