Skip to content

Instantly share code, notes, and snippets.

@wlinds
Last active February 7, 2026 19:55
Show Gist options
  • Select an option

  • Save wlinds/e5a3e830eeea7f5472f81bf075382ed6 to your computer and use it in GitHub Desktop.

Select an option

Save wlinds/e5a3e830eeea7f5472f81bf075382ed6 to your computer and use it in GitHub Desktop.
SoundCloud Watcher for OpenClaw - Track your SoundCloud account
#!/usr/bin/env python3
"""
SoundCloud Watcher - cron script for tracking your SoundCloud account
and getting notified about new releases from artists you care about.
Features:
- Follower change detection (new/lost followers by name)
- Track engagement tracking (who liked, repost counts)
- New release detection from a curated artist list
- Dormant artist throttling (skip inactive artists to save API calls)
- Rate limit backoff (exponential backoff on 429s)
- Single entry runs everything
Setup:
1. Create config file (see PATHS below) with your SoundCloud API credentials
2. Run: python3 soundcloud_cron.py add <artist_username> - for each artist
3. Run: python3 soundcloud_cron.py check - to verify everything works
4. Add to cron: 0 */6 * * * python3 /path/to/soundcloud_cron.py cron
Config file format (one KEY=VALUE per line):
SOUNDCLOUD_CLIENT_ID=your_client_id
SOUNDCLOUD_CLIENT_SECRET=your_client_secret
MY_USERNAME=your_soundcloud_username
"""
import json
import sys
import time
import urllib.request
import urllib.error
import urllib.parse
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional
# =============================================================================
# CONFIGURATION — edit these to match your setup
# =============================================================================
# Your SoundCloud username (the URL slug, e.g. soundcloud.com/THIS_PART)
MY_USERNAME = "your_username"
# Where to store config and data files
CONFIG_DIR = Path.home() / ".config" / "soundcloud-watcher"
CONFIG_FILE = CONFIG_DIR / "credentials.env"
ACCOUNT_DATA = CONFIG_DIR / "account.json" # Your account tracking data
ARTISTS_DATA = CONFIG_DIR / "artists.json" # Tracked artist data
BACKOFF_FILE = CONFIG_DIR / "backoff.json" # Rate limit state (auto-managed)
# --- Notification settings ---
# Set to True to send notifications via a gateway (see send_notification())
# Set to False to just print to stdout (good for testing or piping to other tools)
NOTIFICATIONS_ENABLED = False
# Gateway config file (only needed if NOTIFICATIONS_ENABLED = True)
GATEWAY_CONFIG = CONFIG_DIR / "gateway.json"
GATEWAY_SESSION_KEY = "default"
GATEWAY_PORT_DEFAULT = 8080 # Fallback if not specified in gateway config
GATEWAY_ENDPOINT = "/tools/invoke" # Gateway API endpoint path
GATEWAY_TOOL_NAME = "sessions_send" # Tool name for the gateway payload
# =============================================================================
# TUNING — adjust these to balance API usage vs responsiveness
# =============================================================================
# How many of YOUR recent tracks to monitor for likes/reposts
MY_TRACKS_LIMIT = 10
# How many recent tracks to fetch per artist when checking for new releases
ARTIST_TRACKS_LIMIT = 5
# How many tracks to fetch when first adding an artist (seeds known tracks)
ARTIST_ADD_LIMIT = 50
# Artists who haven't uploaded in this many days are considered "dormant"
DORMANT_DAYS = 90
# Dormant artists are only checked every N days instead of every run
DORMANT_CHECK_INTERVAL_DAYS = 7
# Max stored track IDs per artist (older ones pruned to save disk/memory)
MAX_KNOWN_TRACKS = 50
# Max likers to fetch per track
MAX_LIKERS_PER_TRACK = 50
# Followers pagination page size (SoundCloud max is 200)
FOLLOWERS_PAGE_SIZE = 200
# --- Rate limit backoff ---
BACKOFF_BASE_SECONDS = 300 # 5 min initial backoff after a 429
BACKOFF_MAX_SECONDS = 7200 # 2 hour ceiling
# --- Timeouts ---
API_TIMEOUT = 15 # Seconds per API request
GATEWAY_TIMEOUT = 60 # Seconds for gateway notification delivery
# =============================================================================
# INTERNALS — you probably don't need to change anything below
# =============================================================================
API_BASE = "https://api.soundcloud.com"
class Config:
"""Loads credentials from the config file and manages token persistence."""
def __init__(self):
self.client_id = ""
self.client_secret = ""
self.access_token = ""
self.my_username = MY_USERNAME
@classmethod
def load(cls) -> "Config":
cfg = cls()
if CONFIG_FILE.exists():
for line in CONFIG_FILE.read_text().splitlines():
if "=" in line and not line.startswith("#"):
k, v = line.split("=", 1)
k, v = k.strip(), v.strip()
if k == "SOUNDCLOUD_CLIENT_ID":
cfg.client_id = v
elif k == "SOUNDCLOUD_CLIENT_SECRET":
cfg.client_secret = v
elif k == "SOUNDCLOUD_ACCESS_TOKEN":
cfg.access_token = v
elif k == "MY_USERNAME":
cfg.my_username = v
return cfg
def save_token(self, token: str):
"""Write new access token back to config file."""
self.access_token = token
CONFIG_FILE.parent.mkdir(parents=True, exist_ok=True)
lines = CONFIG_FILE.read_text().splitlines() if CONFIG_FILE.exists() else []
new_lines = []
found = False
for line in lines:
if line.startswith("SOUNDCLOUD_ACCESS_TOKEN="):
new_lines.append(f"SOUNDCLOUD_ACCESS_TOKEN={token}")
found = True
else:
new_lines.append(line)
if not found:
new_lines.append(f"SOUNDCLOUD_ACCESS_TOKEN={token}")
CONFIG_FILE.write_text("\n".join(new_lines) + "\n")
def _utcnow() -> datetime:
"""Timezone-aware UTC now."""
return datetime.now(timezone.utc)
def _parse_timestamp(ts: str) -> Optional[datetime]:
"""Parse SoundCloud timestamps into timezone-aware UTC datetimes.
SoundCloud returns dates like: "2026/01/22 16:22:27 +0000"
We normalize to ISO 8601 before parsing.
"""
if not ts:
return None
try:
# "2026/01/22 16:22:27 +0000" -> "2026-01-22T16:22:27+00:00"
cleaned = ts.replace("/", "-").replace(" ", "T", 1)
cleaned = cleaned.replace(" +0000", "+00:00").replace("Z", "+00:00")
dt = datetime.fromisoformat(cleaned)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except Exception as e:
print(f"Warning: Could not parse timestamp '{ts}': {e}")
return None
class SoundCloudAPI:
"""Minimal SoundCloud API client with automatic token refresh and rate limit backoff."""
def __init__(self, config: Config):
self.config = config
self.calls = 0 # Running count of API calls this session
def _check_backoff(self) -> Optional[int]:
"""Returns seconds remaining in backoff period, or None if clear."""
if not BACKOFF_FILE.exists():
return None
try:
data = json.loads(BACKOFF_FILE.read_text())
elapsed = time.time() - data.get("last_fail", 0)
backoff = min(BACKOFF_BASE_SECONDS * (2 ** data.get("fail_count", 0)), BACKOFF_MAX_SECONDS)
if elapsed < backoff:
return int(backoff - elapsed)
except Exception:
pass
return None
def _set_backoff(self):
"""Record a rate limit failure for exponential backoff."""
try:
data = json.loads(BACKOFF_FILE.read_text()) if BACKOFF_FILE.exists() else {}
data["fail_count"] = data.get("fail_count", 0) + 1
data["last_fail"] = time.time()
BACKOFF_FILE.parent.mkdir(parents=True, exist_ok=True)
BACKOFF_FILE.write_text(json.dumps(data))
except Exception:
pass
def _clear_backoff(self):
"""Clear backoff state after a successful token refresh."""
if BACKOFF_FILE.exists():
BACKOFF_FILE.unlink()
def refresh_token(self) -> bool:
"""Refresh OAuth token using client credentials grant."""
if not self.config.client_id or not self.config.client_secret:
return False
remaining = self._check_backoff()
if remaining:
print(f"Token refresh in backoff ({remaining}s remaining)")
return False
try:
data = urllib.parse.urlencode({
"grant_type": "client_credentials",
"client_id": self.config.client_id,
"client_secret": self.config.client_secret
}).encode()
req = urllib.request.Request(
f"{API_BASE}/oauth2/token",
data=data,
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
with urllib.request.urlopen(req, timeout=API_TIMEOUT) as resp:
result = json.loads(resp.read().decode())
self.config.save_token(result["access_token"])
self._clear_backoff()
print("Token refreshed")
return True
except urllib.error.HTTPError as e:
if e.code == 429:
self._set_backoff()
print("Token refresh rate limited (429)")
else:
print(f"Token refresh failed: {e}")
return False
except Exception as e:
print(f"Token refresh failed: {e}")
return False
def get(self, url: str, params: Dict = None, retry: bool = True) -> Optional[Dict]:
"""Make authenticated GET request. Accepts relative (/users/...) or full URLs."""
self.calls += 1
if url.startswith("/"):
full_url = f"{API_BASE}{url}"
elif url.startswith("http"):
full_url = url
else:
full_url = f"{API_BASE}/{url}"
if params:
separator = "&" if "?" in full_url else "?"
query = urllib.parse.urlencode(params)
full_url = f"{full_url}{separator}{query}"
headers = {}
if self.config.access_token:
headers["Authorization"] = f"OAuth {self.config.access_token}"
try:
req = urllib.request.Request(full_url, headers=headers)
with urllib.request.urlopen(req, timeout=API_TIMEOUT) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
if e.code == 401 and retry:
if self.refresh_token():
return self.get(url, params, retry=False)
print(f"API error {e.code}: {full_url}")
return None
except Exception as e:
print(f"API error: {e}")
return None
def resolve(self, username: str) -> Optional[Dict]:
"""Resolve a username to a full user profile object."""
return self.get("/resolve", {"url": f"https://soundcloud.com/{username}"})
def get_user(self, user_id: int) -> Optional[Dict]:
"""Get user profile by ID (includes followers_count)."""
return self.get(f"/users/{user_id}")
def get_tracks(self, user_id: int, limit: int = 20) -> List[Dict]:
"""Get a user's tracks. Response includes play/like/repost counts per track."""
data = self.get(f"/users/{user_id}/tracks", {"limit": limit, "linked_partitioning": 1})
if not data:
return []
return data.get("collection", data) if isinstance(data, dict) else data
def get_track_likers(self, track_id: int, limit: int = MAX_LIKERS_PER_TRACK) -> Dict[str, Dict]:
"""Get users who liked a specific track. Returns {user_id: {username, display_name}}."""
data = self.get(f"/tracks/{track_id}/favoriters", {"limit": limit, "linked_partitioning": 1})
if not data:
return {}
likers = {}
collection = data.get("collection", data) if isinstance(data, dict) else data
if isinstance(collection, list):
for u in collection:
if isinstance(u, dict) and "id" in u:
likers[str(u["id"])] = {
"username": u.get("permalink", u.get("username", "unknown")),
"display_name": u.get("full_name", u.get("username", "unknown"))
}
return likers
def get_followers_paginated(self, user_id: int) -> Dict[str, Dict]:
"""Paginate through all followers. Expensive — only call when follower count changes."""
followers = {}
next_url = f"/users/{user_id}/followers"
params = {"limit": FOLLOWERS_PAGE_SIZE, "linked_partitioning": 1}
while next_url:
data = self.get(next_url, params)
if not data:
break
for f in data.get("collection", []):
if isinstance(f, dict) and "id" in f:
followers[str(f["id"])] = {
"username": f.get("permalink", f.get("username", "unknown")),
"display_name": f.get("full_name", f.get("username", "unknown"))
}
next_href = data.get("next_href")
if next_href and next_href != next_url:
next_url = next_href # Full URL with cursor params included
params = None
else:
break
return followers
class AccountWatcher:
"""Monitors your account: follower changes and per-track engagement."""
def __init__(self, api: SoundCloudAPI, config: Config):
self.api = api
self.config = config
self.data = self._load()
def _load(self) -> Dict:
default = {
"my_account": None,
"my_followers": {},
"follower_count": 0,
"track_stats": [],
"last_check": None
}
if ACCOUNT_DATA.exists():
try:
data = json.load(ACCOUNT_DATA.open())
for k in default:
if k not in data:
data[k] = default[k]
return data
except Exception:
pass
return default
def _save(self):
ACCOUNT_DATA.parent.mkdir(parents=True, exist_ok=True)
self.data["last_check"] = _utcnow().isoformat()
json.dump(self.data, ACCOUNT_DATA.open("w"), indent=2)
def check(self) -> List[str]:
"""Run account check. Returns list of human-readable notification strings.
API calls on a quiet day: 2 (profile + tracks)
API calls on follower change: 2 + ceil(followers/200) + tracks_with_new_likes
"""
notifications = []
# Resolve account on first run
if not self.data["my_account"]:
user = self.api.resolve(self.config.my_username)
if not user:
return ["Failed to resolve SoundCloud user"]
self.data["my_account"] = {
"user_id": user["id"],
"username": user.get("permalink", self.config.my_username)
}
user_id = self.data["my_account"]["user_id"]
# Fetch profile to check follower count (1 API call)
profile = self.api.get_user(user_id)
if not profile:
print("Failed to fetch profile, skipping account check")
return notifications # Don't save — preserve previous state
current_count = profile.get("followers_count", 0)
stored_count = self.data.get("follower_count", 0)
# Only paginate full follower list if the count actually changed
if current_count != stored_count or not self.data["my_followers"]:
print(f"Follower count changed: {stored_count} -> {current_count}, fetching list...")
current_followers = self.api.get_followers_paginated(user_id)
if not current_followers and stored_count > 0:
# Empty response with known followers = API hiccup, don't wipe data
print("API returned empty followers, skipping comparison")
else:
stored_followers = self.data.get("my_followers", {})
# Skip diff on first run (everything would show as "new")
if stored_followers:
new = [f["display_name"] for uid, f in current_followers.items() if uid not in stored_followers]
lost = [f["display_name"] for uid, f in stored_followers.items() if uid not in current_followers]
if new:
names = ", ".join(new[:3]) + (f" +{len(new)-3} more" if len(new) > 3 else "")
notifications.append(f"New follower{'s' if len(new) > 1 else ''}: **{names}**")
if lost:
names = ", ".join(lost[:3])
notifications.append(f"Lost follower{'s' if len(lost) > 1 else ''}: {names}")
self.data["my_followers"] = current_followers
self.data["follower_count"] = current_count
else:
print(f"Follower count unchanged ({current_count}), skipping pagination")
# Fetch my tracks — play/like/repost counts included in response (1 API call)
tracks = self.api.get_tracks(user_id, limit=MY_TRACKS_LIMIT)
if tracks:
prev_stats = {s["track_id"]: s for s in self.data.get("track_stats", [])}
new_stats = []
for t in tracks:
track_id = t["id"]
title = t.get("title", "Unknown")
current_likes = t.get("likes_count", 0) or t.get("favoritings_count", 0) or 0
current_reposts = t.get("reposts_count", 0) or 0
stats = {
"track_id": track_id,
"title": title,
"plays": t.get("playback_count", 0) or 0,
"likes": current_likes,
"reposts": current_reposts,
"likers": {}
}
prev = prev_stats.get(track_id)
if prev:
prev_likes = prev.get("likes", 0)
prev_likers = prev.get("likers", {})
# Only fetch liker list if like count changed (or never seeded)
needs_liker_fetch = current_likes != prev_likes or (current_likes > 0 and not prev_likers)
if needs_liker_fetch:
current_likers = self.api.get_track_likers(track_id)
stats["likers"] = current_likers
new_liker_names = [
u["display_name"] or u["username"]
for uid, u in current_likers.items()
if uid not in prev_likers
]
unliker_names = [
u["display_name"] or u["username"]
for uid, u in prev_likers.items()
if uid not in current_likers
]
if new_liker_names:
names = ", ".join(new_liker_names[:3])
if len(new_liker_names) > 3:
names += f" +{len(new_liker_names)-3} more"
notifications.append(f"**{names}** liked '{title}'")
if unliker_names:
names = ", ".join(unliker_names[:3])
notifications.append(f"{names} unliked '{title}'")
else:
# No change — carry forward previous liker data
stats["likers"] = prev_likers
new_reposts = current_reposts - prev.get("reposts", 0)
if new_reposts > 0:
notifications.append(f"'{title}' got {new_reposts} repost{'s' if new_reposts > 1 else ''}!")
else:
# First time seeing this track — seed likers without notifying
stats["likers"] = self.api.get_track_likers(track_id)
new_stats.append(stats)
self.data["track_stats"] = new_stats
else:
# API failure — don't overwrite good data with empty data
print("Failed to fetch tracks, keeping previous stats")
self._save()
return notifications
class ArtistTracker:
"""Tracks a curated list of artists and detects new releases."""
def __init__(self, api: SoundCloudAPI):
self.api = api
self.data = self._load()
def _load(self) -> Dict:
if ARTISTS_DATA.exists():
try:
return json.load(ARTISTS_DATA.open())
except Exception:
pass
return {"artists": {}, "updated_at": None}
def _save(self):
ARTISTS_DATA.parent.mkdir(parents=True, exist_ok=True)
self.data["updated_at"] = _utcnow().isoformat()
json.dump(self.data, ARTISTS_DATA.open("w"), indent=2)
def _is_dormant(self, artist: Dict) -> bool:
"""An artist is dormant if their last upload was more than DORMANT_DAYS ago."""
dt = _parse_timestamp(artist.get("last_upload"))
if not dt:
return False # Unknown upload date = assume active (safer to check)
return (_utcnow() - dt).days > DORMANT_DAYS
def _should_skip(self, artist: Dict) -> bool:
"""Active artists: check every run. Dormant: every DORMANT_CHECK_INTERVAL_DAYS."""
if not self._is_dormant(artist):
return False
dt = _parse_timestamp(artist.get("last_checked"))
if not dt:
return False # Never checked = don't skip
return (_utcnow() - dt).days < DORMANT_CHECK_INTERVAL_DAYS
def check_releases(self) -> List[Dict]:
"""Check all tracked artists for new releases.
API calls: 1 per active artist, 1 per dormant artist due for check, 0 for skipped.
"""
notifications = []
checked = 0
skipped = 0
for username, artist in self.data["artists"].items():
if self._should_skip(artist):
skipped += 1
continue
checked += 1
user_id = artist.get("user_id")
if not user_id:
continue
tracks = self.api.get_tracks(user_id, limit=ARTIST_TRACKS_LIMIT)
known_ids = set(artist.get("known_track_ids", []))
self.data["artists"][username]["last_checked"] = _utcnow().isoformat()
for track in tracks:
if track["id"] not in known_ids:
duration_sec = (track.get("duration", 0) or 0) // 1000
notifications.append({
"artist": artist.get("display_name", username),
"title": track.get("title", "Unknown"),
"url": track.get("permalink_url", ""),
"duration": f"{duration_sec // 60}:{duration_sec % 60:02d}",
"genre": track.get("genre")
})
if "known_track_ids" not in self.data["artists"][username]:
self.data["artists"][username]["known_track_ids"] = []
self.data["artists"][username]["known_track_ids"].append(track["id"])
if track.get("created_at"):
self.data["artists"][username]["last_upload"] = track["created_at"]
# Prune old track IDs to prevent unbounded growth
ids = self.data["artists"][username].get("known_track_ids", [])
if len(ids) > MAX_KNOWN_TRACKS:
self.data["artists"][username]["known_track_ids"] = ids[-MAX_KNOWN_TRACKS:]
dormant_count = sum(1 for a in self.data["artists"].values() if self._is_dormant(a))
print(f"Checked {checked} artists, skipped {skipped} dormant, {dormant_count} total dormant")
self._save()
return notifications
def add(self, username: str) -> str:
"""Add an artist to tracking. Seeds known tracks to avoid false notifications."""
user = self.api.resolve(username)
if not user:
return f"Could not find user: {username}"
tracks = self.api.get_tracks(user["id"], limit=ARTIST_ADD_LIMIT)
total_plays = sum(t.get("playback_count", 0) or 0 for t in tracks)
genres = {}
for t in tracks:
g = (t.get("genre") or "").lower().strip()
if g:
genres[g] = genres.get(g, 0) + 1
top_genres = [g for g, _ in sorted(genres.items(), key=lambda x: -x[1])[:3]]
last_upload = None
if tracks:
dates = [t.get("created_at") for t in tracks if t.get("created_at")]
if dates:
last_upload = max(dates)
followers = user.get("followers_count", 0) or 0
self.data["artists"][username.lower()] = {
"username": user.get("permalink", username),
"display_name": user.get("full_name") or user.get("username", username),
"user_id": user["id"],
"permalink_url": user.get("permalink_url", f"https://soundcloud.com/{username}"),
"followers": followers,
"track_count": user.get("track_count", 0),
"total_plays": total_plays,
"genres": top_genres,
"last_upload": last_upload,
"known_track_ids": [t["id"] for t in tracks][-MAX_KNOWN_TRACKS:],
"added_at": _utcnow().isoformat(),
"last_updated": _utcnow().isoformat()
}
self._save()
return f"Added {user.get('full_name', username)} ({followers:,} followers, {len(tracks)} tracks)"
def remove(self, username: str) -> str:
"""Remove an artist from tracking."""
key = username.lower()
for k, artist in self.data["artists"].items():
if k == key or artist.get("username", "").lower() == key:
name = artist.get("display_name", k)
del self.data["artists"][k]
self._save()
return f"Removed {name}"
return f"Artist '{username}' not found"
def list(self):
"""Print all tracked artists sorted by follower count."""
artists = sorted(self.data["artists"].values(), key=lambda x: x.get("followers", 0), reverse=True)
print(f"\n=== Tracked Artists ({len(artists)}) ===\n")
for a in artists:
dormant = self._is_dormant(a)
status = " [DORMANT]" if dormant else ""
print(f"{a['display_name']} (@{a['username']}){status}")
print(f" {a.get('followers', 0):,} followers | {a.get('track_count', 0)} tracks")
if a.get("last_upload"):
print(f" Last upload: {a['last_upload'][:10]}")
print()
# =============================================================================
# NOTIFICATION DELIVERY
# =============================================================================
def send_notification(message: str) -> bool:
"""Send notification via gateway.
To use a different notification system (email, Discord webhook, Pushover, etc.),
replace the body of this function with your preferred delivery mechanism.
"""
if not NOTIFICATIONS_ENABLED:
return False
try:
cfg = json.load(GATEWAY_CONFIG.open())
token = cfg["gateway"]["auth"]["token"]
port = cfg["gateway"].get("port", GATEWAY_PORT_DEFAULT)
except Exception as e:
print(f"Gateway config error: {e}")
return False
payload = json.dumps({
"tool": GATEWAY_TOOL_NAME,
"args": {"sessionKey": GATEWAY_SESSION_KEY, "message": message}
}).encode()
try:
req = urllib.request.Request(
f"http://127.0.0.1:{port}{GATEWAY_ENDPOINT}",
data=payload,
headers={"Content-Type": "application/json", "Authorization": f"Bearer {token}"},
method="POST"
)
with urllib.request.urlopen(req, timeout=GATEWAY_TIMEOUT) as resp:
return resp.status == 200
except Exception as e:
print(f"Notification failed: {e}")
return False
# =============================================================================
# COMMANDS
# =============================================================================
def run_full_check(api: SoundCloudAPI, config: Config) -> tuple[List[str], List[Dict]]:
"""Run both account and artist checks. Returns (account_notifs, releases)."""
account = AccountWatcher(api, config)
tracker = ArtistTracker(api)
return account.check(), tracker.check_releases()
def main():
config = Config.load()
if not config.client_id:
print(f"Error: No SOUNDCLOUD_CLIENT_ID found in {CONFIG_FILE}")
print(f"\nCreate the config file with:")
print(f" mkdir -p {CONFIG_FILE.parent}")
print(f" cat > {CONFIG_FILE} << 'EOF'")
print(f" SOUNDCLOUD_CLIENT_ID=your_id")
print(f" SOUNDCLOUD_CLIENT_SECRET=your_secret")
print(f" MY_USERNAME={MY_USERNAME}")
print(f" EOF")
return
api = SoundCloudAPI(config)
if not config.access_token:
if not api.refresh_token():
print("Failed to get access token")
return
if len(sys.argv) < 2:
print("SoundCloud Watcher")
print()
print("Commands:")
print(" status Show current tracking status")
print(" check Run full check with verbose output")
print(" cron Silent mode — only sends notifications on updates")
print(" add <user> Add artist(s) to track")
print(" remove <user> Remove artist from tracking")
print(" list List all tracked artists")
return
cmd = sys.argv[1]
if cmd == "status":
account = AccountWatcher(api, config)
tracker = ArtistTracker(api)
print("=== SoundCloud Watcher Status ===\n")
print(f"Config: {CONFIG_FILE}")
print(f"Token: ...{config.access_token[-8:]}" if config.access_token else "Token: None")
if account.data.get("my_account"):
print(f"Account: @{account.data['my_account']['username']}")
print(f"Followers: {account.data.get('follower_count', len(account.data.get('my_followers', {})))}")
total = len(tracker.data.get("artists", {}))
dormant = sum(1 for a in tracker.data.get("artists", {}).values() if tracker._is_dormant(a))
print(f"Tracked artists: {total} ({total - dormant} active, {dormant} dormant)")
print(f"Notifications: {'gateway' if NOTIFICATIONS_ENABLED else 'stdout only'}")
print(f"Last check: {account.data.get('last_check', 'Never')}")
elif cmd == "check":
print(f"[{_utcnow().isoformat()}] Running full check...\n")
account_notifs, releases = run_full_check(api, config)
print("--- Account ---")
for n in account_notifs:
print(f" {n}")
if not account_notifs:
print(" No updates")
print("\n--- Artist Releases ---")
for r in releases:
print(f" {r['artist']}: {r['title']}")
if not releases:
print(" No new releases")
print(f"\nAPI calls: {api.calls}")
elif cmd == "cron":
account_notifs, releases = run_full_check(api, config)
all_notifications = []
if account_notifs:
all_notifications.append("**Account:**")
all_notifications.extend(f"- {n}" for n in account_notifs)
all_notifications.append("")
if releases:
all_notifications.append("**New Releases:**")
for r in releases:
all_notifications.extend([f"- **{r['artist']}** dropped: {r['title']}", f" {r['url']}"])
all_notifications.append("")
if all_notifications:
message = "SoundCloud updates:\n\n" + "\n".join(all_notifications)
if NOTIFICATIONS_ENABLED:
send_notification(message)
else:
print(message)
print(f"API calls: {api.calls}")
elif cmd == "add" and len(sys.argv) > 2:
tracker = ArtistTracker(api)
for username in sys.argv[2:]:
print(tracker.add(username))
elif cmd == "remove" and len(sys.argv) > 2:
tracker = ArtistTracker(api)
print(tracker.remove(sys.argv[2]))
elif cmd == "list":
tracker = ArtistTracker(api)
tracker.list()
else:
print(f"Unknown command: {cmd}")
print("Run without arguments for usage info.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment