Skip to content

Instantly share code, notes, and snippets.

@alecjacobson
Last active December 11, 2025 02:44
Show Gist options
  • Select an option

  • Save alecjacobson/030992b697903b49cca2e78239adef90 to your computer and use it in GitHub Desktop.

Select an option

Save alecjacobson/030992b697903b49cca2e78239adef90 to your computer and use it in GitHub Desktop.
from semanticscholar import SemanticScholar
from collections import defaultdict
from tqdm import tqdm
import os
import builtins
import re
import unicodedata
from collections import defaultdict
# Functions for deduplicating author names (imperfect)
def _strip_accents(text: str) -> str:
return "".join(
c for c in unicodedata.normalize("NFKD", text)
if not unicodedata.combining(c)
)
def _clean_name(name: str) -> str:
# Normalize accents, remove commas/dots/apostrophes, compress whitespace
name = _strip_accents(name)
name = name.replace(",", " ")
name = re.sub(r"[\.']", " ", name) # keep hyphens
name = re.sub(r"\s+", " ", name).strip()
return name
def _parse_name(name: str):
"""
Returns (surname_lower, given_token_lower, is_initial)
given_token is the first non-empty token before the surname.
is_initial is True if that token is a single character (e.g., 'd' from 'D.').
"""
if not name:
return "", "", False
cleaned = _clean_name(name).lower()
if not cleaned:
return "", "", False
parts = cleaned.split(" ")
if len(parts) == 1:
# Mononym
return parts[0], "", False
surname = parts[-1]
given_token = ""
is_initial = False
for token in parts[:-1]:
token = token.strip()
if not token:
continue
given_token = token
is_initial = len(token) == 1
break
return surname, given_token, is_initial
def dedupe_author_counts(author_counts: dict[str, int]) -> dict[str, int]:
"""
Take {raw_name: count} and merge variants.
Strategy:
1. Group by (surname, first-letter-of-given-name) -> 'broad cluster'.
2. Inside each cluster:
- Identify full given-name tokens (len > 1).
- If there's exactly one full given-name token:
merge full + initial-only variants as one person.
- If there are multiple full given-name tokens:
keep each full name separate;
keep initial-only names as a separate ambiguous bucket.
"""
# broad_key -> list of (raw_name, count, given_token, is_initial)
clusters: dict[tuple[str, str], list[tuple[str, int, str, bool]]] = defaultdict(list)
for name, count in author_counts.items():
surname, given_token, is_initial = _parse_name(name)
if not surname:
continue
first_letter = given_token[0] if given_token else ""
broad_key = (surname, first_letter)
clusters[broad_key].append((name, count, given_token, is_initial))
merged: dict[str, int] = {}
for (surname, first_letter), records in clusters.items():
# Collect full given-name tokens (len > 1)
full_given_tokens = {
given_token
for (_, _, given_token, is_initial) in records
if given_token and not is_initial
}
if len(full_given_tokens) <= 1:
# Simple case: zero or one real given name in this cluster
# → treat all variants as one person
total_by_name = defaultdict(int)
for raw_name, count, given_token, is_initial in records:
total_by_name[raw_name] += count
total = sum(total_by_name.values())
# choose representative: most frequent; tie -> longest
best_name = max(
total_by_name.items(),
key=lambda kv: (kv[1], len(kv[0]))
)[0]
merged[best_name] = total
else:
# Multiple distinct full given names, e.g., 'yu' and 'yang' for Liu
# -> keep each full-name group separate
# -> initial-only names become an ambiguous bucket
full_groups: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int))
initial_bucket: dict[str, int] = defaultdict(int)
for raw_name, count, given_token, is_initial in records:
if given_token and not is_initial:
# full given name
full_groups[given_token][raw_name] += count
else:
# initial-only or missing given name
initial_bucket[raw_name] += count
# Add each full-name group as its own author
for given_token, name_counts in full_groups.items():
total = sum(name_counts.values())
best_name = max(
name_counts.items(),
key=lambda kv: (kv[1], len(kv[0]))
)[0]
merged[best_name] = total
# Keep ambiguous initials as their own aggregated entry (optional)
if initial_bucket:
total = sum(initial_bucket.values())
best_name = max(
initial_bucket.items(),
key=lambda kv: (kv[1], len(kv[0]))
)[0]
merged[best_name] = total
return merged
# Read key from env instead of hard-coding
#s2_api_key = os.environ.get("SEMANTIC_SCHOLAR_API_KEY")
sch = SemanticScholar(timeout=10)
# alec
author_ids = [2242015445,145151177,2251097727,2241538537,2244829783,2199254215,2309006642,2275054179,2312381751,2242589355]
# If author_ids is a single int, wrap it in a list
if isinstance(author_ids, int):
author_ids = [author_ids]
# Using batch calls (get_authors and get_papers is _way_ faster and more robust)
# Batch fetch all authors in one call
authors = sch.get_authors([str(aid) for aid in author_ids])
papers = []
names = []
for author in authors:
names.append(author.name)
if len(author.papers) < author.paperCount:
print(
f"Warning: {author.name} has {author.paperCount} papers, "
f"but only {len(author.papers)} were retrieved"
)
papers.extend(author.papers)
names = ", ".join(names)
# Deduplicate papers by paperId, keep one record per id
papers_by_id = {paper["paperId"]: paper for paper in papers}
paper_ids = list(papers_by_id.keys())
author_counts = defaultdict(int)
BATCH_SIZE = 500 # API limit for get_papers
try:
# Deduplicate papers by paperId, keep one record per id
papers_by_id = {paper["paperId"]: paper for paper in papers}
paper_ids = list(papers_by_id.keys())
author_counts = defaultdict(int)
BATCH_SIZE = 500 # API limit for get_papers
# Process in chunks so we stay within the batch limit
for i in tqdm(range(0, len(paper_ids), BATCH_SIZE)):
batch_ids = paper_ids[i:i + BATCH_SIZE]
# One API call for up to 500 papers instead of 500 calls
batch_papers = sch.get_papers(paper_ids=batch_ids,fields=["citations","citations.authors"])
# batch_papers is a list of Paper objects
for paper in batch_papers:
# Assuming citations are included with default fields.
# If not, use fields=["citations"] (or more specific fields) in get_papers.
citations = paper.citations
# Depending on the library version, each citation may be either:
# - a dict with "authors", or
# - an object with .raw_data["authors"].
for citation in citations:
# Try dict-style first, fall back to raw_data
c = citation
if not isinstance(c, dict):
c = getattr(c, "raw_data", c)
for author in c.get("authors", []):
name = author["name"]
author_counts[name] += 1
except Exception as e:
print(f"Error: {e}")
print("-------------------------------------------------")
# After building author_counts
deduped_author_counts = dedupe_author_counts(author_counts)
for name, count in builtins.sorted(deduped_author_counts.items(), key=lambda x: x[1], reverse=True):
print(f"{count} | {name}")
@alecjacobson
Copy link
Author

Way faster, more robust, attempt to dedupe authors

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment