Last active
December 11, 2025 02:44
-
-
Save alecjacobson/030992b697903b49cca2e78239adef90 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from semanticscholar import SemanticScholar | |
| from collections import defaultdict | |
| from tqdm import tqdm | |
| import os | |
| import builtins | |
| import re | |
| import unicodedata | |
| from collections import defaultdict | |
| # Functions for deduplicating author names (imperfect) | |
| def _strip_accents(text: str) -> str: | |
| return "".join( | |
| c for c in unicodedata.normalize("NFKD", text) | |
| if not unicodedata.combining(c) | |
| ) | |
| def _clean_name(name: str) -> str: | |
| # Normalize accents, remove commas/dots/apostrophes, compress whitespace | |
| name = _strip_accents(name) | |
| name = name.replace(",", " ") | |
| name = re.sub(r"[\.']", " ", name) # keep hyphens | |
| name = re.sub(r"\s+", " ", name).strip() | |
| return name | |
| def _parse_name(name: str): | |
| """ | |
| Returns (surname_lower, given_token_lower, is_initial) | |
| given_token is the first non-empty token before the surname. | |
| is_initial is True if that token is a single character (e.g., 'd' from 'D.'). | |
| """ | |
| if not name: | |
| return "", "", False | |
| cleaned = _clean_name(name).lower() | |
| if not cleaned: | |
| return "", "", False | |
| parts = cleaned.split(" ") | |
| if len(parts) == 1: | |
| # Mononym | |
| return parts[0], "", False | |
| surname = parts[-1] | |
| given_token = "" | |
| is_initial = False | |
| for token in parts[:-1]: | |
| token = token.strip() | |
| if not token: | |
| continue | |
| given_token = token | |
| is_initial = len(token) == 1 | |
| break | |
| return surname, given_token, is_initial | |
| def dedupe_author_counts(author_counts: dict[str, int]) -> dict[str, int]: | |
| """ | |
| Take {raw_name: count} and merge variants. | |
| Strategy: | |
| 1. Group by (surname, first-letter-of-given-name) -> 'broad cluster'. | |
| 2. Inside each cluster: | |
| - Identify full given-name tokens (len > 1). | |
| - If there's exactly one full given-name token: | |
| merge full + initial-only variants as one person. | |
| - If there are multiple full given-name tokens: | |
| keep each full name separate; | |
| keep initial-only names as a separate ambiguous bucket. | |
| """ | |
| # broad_key -> list of (raw_name, count, given_token, is_initial) | |
| clusters: dict[tuple[str, str], list[tuple[str, int, str, bool]]] = defaultdict(list) | |
| for name, count in author_counts.items(): | |
| surname, given_token, is_initial = _parse_name(name) | |
| if not surname: | |
| continue | |
| first_letter = given_token[0] if given_token else "" | |
| broad_key = (surname, first_letter) | |
| clusters[broad_key].append((name, count, given_token, is_initial)) | |
| merged: dict[str, int] = {} | |
| for (surname, first_letter), records in clusters.items(): | |
| # Collect full given-name tokens (len > 1) | |
| full_given_tokens = { | |
| given_token | |
| for (_, _, given_token, is_initial) in records | |
| if given_token and not is_initial | |
| } | |
| if len(full_given_tokens) <= 1: | |
| # Simple case: zero or one real given name in this cluster | |
| # → treat all variants as one person | |
| total_by_name = defaultdict(int) | |
| for raw_name, count, given_token, is_initial in records: | |
| total_by_name[raw_name] += count | |
| total = sum(total_by_name.values()) | |
| # choose representative: most frequent; tie -> longest | |
| best_name = max( | |
| total_by_name.items(), | |
| key=lambda kv: (kv[1], len(kv[0])) | |
| )[0] | |
| merged[best_name] = total | |
| else: | |
| # Multiple distinct full given names, e.g., 'yu' and 'yang' for Liu | |
| # -> keep each full-name group separate | |
| # -> initial-only names become an ambiguous bucket | |
| full_groups: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int)) | |
| initial_bucket: dict[str, int] = defaultdict(int) | |
| for raw_name, count, given_token, is_initial in records: | |
| if given_token and not is_initial: | |
| # full given name | |
| full_groups[given_token][raw_name] += count | |
| else: | |
| # initial-only or missing given name | |
| initial_bucket[raw_name] += count | |
| # Add each full-name group as its own author | |
| for given_token, name_counts in full_groups.items(): | |
| total = sum(name_counts.values()) | |
| best_name = max( | |
| name_counts.items(), | |
| key=lambda kv: (kv[1], len(kv[0])) | |
| )[0] | |
| merged[best_name] = total | |
| # Keep ambiguous initials as their own aggregated entry (optional) | |
| if initial_bucket: | |
| total = sum(initial_bucket.values()) | |
| best_name = max( | |
| initial_bucket.items(), | |
| key=lambda kv: (kv[1], len(kv[0])) | |
| )[0] | |
| merged[best_name] = total | |
| return merged | |
| # Read key from env instead of hard-coding | |
| #s2_api_key = os.environ.get("SEMANTIC_SCHOLAR_API_KEY") | |
| sch = SemanticScholar(timeout=10) | |
| # alec | |
| author_ids = [2242015445,145151177,2251097727,2241538537,2244829783,2199254215,2309006642,2275054179,2312381751,2242589355] | |
| # If author_ids is a single int, wrap it in a list | |
| if isinstance(author_ids, int): | |
| author_ids = [author_ids] | |
| # Using batch calls (get_authors and get_papers is _way_ faster and more robust) | |
| # Batch fetch all authors in one call | |
| authors = sch.get_authors([str(aid) for aid in author_ids]) | |
| papers = [] | |
| names = [] | |
| for author in authors: | |
| names.append(author.name) | |
| if len(author.papers) < author.paperCount: | |
| print( | |
| f"Warning: {author.name} has {author.paperCount} papers, " | |
| f"but only {len(author.papers)} were retrieved" | |
| ) | |
| papers.extend(author.papers) | |
| names = ", ".join(names) | |
| # Deduplicate papers by paperId, keep one record per id | |
| papers_by_id = {paper["paperId"]: paper for paper in papers} | |
| paper_ids = list(papers_by_id.keys()) | |
| author_counts = defaultdict(int) | |
| BATCH_SIZE = 500 # API limit for get_papers | |
| try: | |
| # Deduplicate papers by paperId, keep one record per id | |
| papers_by_id = {paper["paperId"]: paper for paper in papers} | |
| paper_ids = list(papers_by_id.keys()) | |
| author_counts = defaultdict(int) | |
| BATCH_SIZE = 500 # API limit for get_papers | |
| # Process in chunks so we stay within the batch limit | |
| for i in tqdm(range(0, len(paper_ids), BATCH_SIZE)): | |
| batch_ids = paper_ids[i:i + BATCH_SIZE] | |
| # One API call for up to 500 papers instead of 500 calls | |
| batch_papers = sch.get_papers(paper_ids=batch_ids,fields=["citations","citations.authors"]) | |
| # batch_papers is a list of Paper objects | |
| for paper in batch_papers: | |
| # Assuming citations are included with default fields. | |
| # If not, use fields=["citations"] (or more specific fields) in get_papers. | |
| citations = paper.citations | |
| # Depending on the library version, each citation may be either: | |
| # - a dict with "authors", or | |
| # - an object with .raw_data["authors"]. | |
| for citation in citations: | |
| # Try dict-style first, fall back to raw_data | |
| c = citation | |
| if not isinstance(c, dict): | |
| c = getattr(c, "raw_data", c) | |
| for author in c.get("authors", []): | |
| name = author["name"] | |
| author_counts[name] += 1 | |
| except Exception as e: | |
| print(f"Error: {e}") | |
| print("-------------------------------------------------") | |
| # After building author_counts | |
| deduped_author_counts = dedupe_author_counts(author_counts) | |
| for name, count in builtins.sorted(deduped_author_counts.items(), key=lambda x: x[1], reverse=True): | |
| print(f"{count} | {name}") |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Way faster, more robust, attempt to dedupe authors