Last active
February 10, 2026 18:16
-
-
Save malfet/958905a42edb6f85cb22899ffc45c83e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Fetch PyTorch outside collaborators and infer company affiliation from commit emails. | |
| Requires: `gh` CLI authenticated with appropriate permissions. | |
| Usage: python fetch_collaborator_affiliations.py [--repo pytorch/pytorch] [--max-commits 100] | |
| Caches results in pytorch_collab_emails.json to avoid re-fetching known collaborators. | |
| """ | |
| import argparse | |
| import json | |
| import os | |
| import subprocess | |
| import sys | |
| from collections import Counter | |
| CACHE_FILE = "pytorch_collab_emails.json" | |
| GENERIC_EMAIL_DOMAINS = { | |
| "gmail.com", | |
| "hotmail.com", | |
| "outlook.com", | |
| "yahoo.com", | |
| "protonmail.com", | |
| "icloud.com", | |
| "live.com", | |
| "aol.com", | |
| "mail.com", | |
| "users.noreply.github.com", | |
| } | |
| def gh_api(endpoint: str, paginate: bool = False) -> list | dict: | |
| cmd = ["gh", "api", endpoint, "--header", "Accept: application/vnd.github+json"] | |
| if paginate: | |
| cmd.append("--paginate") | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| if result.returncode != 0: | |
| print(f"Error calling gh api {endpoint}: {result.stderr}", file=sys.stderr) | |
| return [] | |
| text = result.stdout.strip() | |
| if not text: | |
| return [] | |
| if paginate: | |
| text = text.replace("]\n[", ",").replace("][", ",") | |
| return json.loads(text) | |
| def get_outside_collaborators(repo: str) -> list[dict]: | |
| endpoint = f"/repos/{repo}/collaborators?affiliation=outside&per_page=100" | |
| return gh_api(endpoint, paginate=True) | |
| def get_permission(collab: dict) -> str: | |
| perms = collab.get("permissions", {}) | |
| if perms.get("admin"): | |
| return "admin" | |
| if perms.get("maintain"): | |
| return "maintain" | |
| if perms.get("push"): | |
| return "write" | |
| if perms.get("triage"): | |
| return "triage" | |
| if perms.get("pull"): | |
| return "read" | |
| role = collab.get("role_name", "") | |
| if role: | |
| return role.lower() | |
| return "unknown" | |
| def get_commit_emails(repo: str, author: str, max_commits: int) -> list[str]: | |
| endpoint = ( | |
| f"/repos/{repo}/commits?author={author}&per_page={min(max_commits, 100)}" | |
| ) | |
| commits = gh_api(endpoint) | |
| if not isinstance(commits, list): | |
| return [] | |
| emails = [] | |
| for commit in commits[:max_commits]: | |
| commit_data = commit.get("commit", {}) | |
| for field in ("author", "committer"): | |
| email = commit_data.get(field, {}).get("email", "") | |
| if email and "noreply" not in email: | |
| emails.append(email) | |
| return emails | |
| def infer_affiliation(emails: list[str]) -> str: | |
| if not emails: | |
| return "Unknown" | |
| domains = [] | |
| for email in emails: | |
| parts = email.split("@") | |
| if len(parts) == 2: | |
| domain = parts[1].lower() | |
| if domain not in GENERIC_EMAIL_DOMAINS: | |
| domains.append(domain) | |
| if not domains: | |
| return "Unknown (personal email only)" | |
| domain_counts = Counter(domains) | |
| top_domain, _ = domain_counts.most_common(1)[0] | |
| return top_domain | |
| def get_profile_company(username: str) -> str: | |
| user = gh_api(f"/users/{username}") | |
| if isinstance(user, dict): | |
| company = user.get("company") or "" | |
| return company.strip().lstrip("@") | |
| return "" | |
| def load_cache() -> dict[str, dict]: | |
| if os.path.exists(CACHE_FILE): | |
| with open(CACHE_FILE) as f: | |
| data = json.load(f) | |
| # Support both list (old format) and dict (new format) | |
| if isinstance(data, list): | |
| return {entry["username"]: entry for entry in data} | |
| return data | |
| return {} | |
| def save_cache(cache: dict[str, dict]) -> None: | |
| with open(CACHE_FILE, "w") as f: | |
| json.dump(cache, f, indent=2) | |
| def print_header(): | |
| print( | |
| f"{'Username':<25} {'Permission':<12} {'Profile Company':<25} " | |
| f"{'Email Domain':<30} Emails" | |
| ) | |
| print("-" * 130) | |
| def print_row(r: dict): | |
| emails_str = ", ".join(r["emails"][:3]) | |
| if len(r["emails"]) > 3: | |
| emails_str += f" (+{len(r['emails']) - 3} more)" | |
| print( | |
| f"{r['username']:<25} {r['permission']:<12} " | |
| f"{r['profile_company'] or '-':<25} " | |
| f"{r['email_affiliation']:<30} {emails_str}" | |
| ) | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Fetch outside collaborator affiliations from commit emails" | |
| ) | |
| parser.add_argument("--repo", default="pytorch/pytorch") | |
| parser.add_argument( | |
| "--max-commits", | |
| type=int, | |
| default=50, | |
| help="Max commits to inspect per collaborator", | |
| ) | |
| parser.add_argument( | |
| "--no-cache", | |
| action="store_true", | |
| help="Ignore cache and re-fetch everything", | |
| ) | |
| args = parser.parse_args() | |
| cache = {} if args.no_cache else load_cache() | |
| print(f"Fetching outside collaborators for {args.repo}...") | |
| collaborators = get_outside_collaborators(args.repo) | |
| if not collaborators: | |
| print("No outside collaborators found (or insufficient permissions).") | |
| print( | |
| "Note: You need admin/org-owner access to list outside collaborators.", | |
| file=sys.stderr, | |
| ) | |
| sys.exit(1) | |
| print(f"Found {len(collaborators)} outside collaborators.") | |
| cached_count = sum(1 for c in collaborators if c["login"] in cache) | |
| if cached_count: | |
| print(f" {cached_count} cached, {len(collaborators) - cached_count} to fetch.") | |
| print() | |
| print_header() | |
| results = [] | |
| for i, collab in enumerate(collaborators): | |
| username = collab["login"] | |
| permission = get_permission(collab) | |
| if username in cache: | |
| entry = cache[username] | |
| entry["permission"] = permission # always update permission from live data | |
| results.append(entry) | |
| print_row(entry) | |
| continue | |
| print( | |
| f" [{i + 1}/{len(collaborators)}] Fetching {username}...", | |
| end="", | |
| file=sys.stderr, | |
| ) | |
| profile_company = get_profile_company(username) | |
| emails = get_commit_emails(args.repo, username, args.max_commits) | |
| email_affiliation = infer_affiliation(emails) | |
| unique_emails = sorted(set(emails)) if emails else [] | |
| entry = { | |
| "username": username, | |
| "permission": permission, | |
| "profile_company": profile_company, | |
| "email_affiliation": email_affiliation, | |
| "emails": unique_emails, | |
| } | |
| results.append(entry) | |
| cache[username] = entry | |
| save_cache(cache) | |
| print(" done", file=sys.stderr) | |
| print_row(entry) | |
| # Remove collaborators no longer in the outside list | |
| current_usernames = {c["login"] for c in collaborators} | |
| removed = set(cache.keys()) - current_usernames | |
| for username in removed: | |
| del cache[username] | |
| if removed: | |
| save_cache(cache) | |
| print(f"\nRemoved {len(removed)} stale entries from cache: {', '.join(sorted(removed))}") | |
| print(f"\nCache saved to {CACHE_FILE}") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment