|
#!/usr/bin/env python3 |
|
|
|
import argparse |
|
import json |
|
import time |
|
import csv |
|
import requests |
|
from urllib.parse import urlparse, urljoin |
|
from statistics import mean |
|
from pathlib import Path |
|
|
|
import pandas as pd |
|
from bs4 import BeautifulSoup |
|
|
|
# ---------------- CONFIG ---------------- |
|
|
|
BASE_URL = "https://nptel.ac.in" |
|
COURSES_URL = f"{BASE_URL}/courses" |
|
STATS_API = "https://nptel.ac.in/api/stats/{}" |
|
|
|
HEADERS = { |
|
"accept": "*/*", |
|
"user-agent": "Mozilla/5.0 (X11; Linux x86_64)", |
|
"referer": "https://nptel.ac.in/", |
|
} |
|
|
|
MONTHS = { |
|
"Jan": 1, "Feb": 2, "Mar": 3, "Apr": 4, |
|
"May": 5, "Jun": 6, "Jul": 7, "Aug": 8, |
|
"Sep": 9, "Oct": 10, "Nov": 11, "Dec": 12, |
|
} |
|
|
|
# ---------------- UTIL ---------------- |
|
|
|
def extract_course_id(course_url: str) -> str: |
|
return urlparse(course_url).path.rstrip("/").split("/")[-1] |
|
|
|
|
|
def normalize(text: str) -> str: |
|
return " ".join(text.lower().split()) |
|
|
|
|
|
# ---------------- STEP 1: SCRAPE ---------------- |
|
|
|
def scrape_nptel_courses(): |
|
print("Scraping all NPTEL courses…") |
|
response = requests.get(COURSES_URL, timeout=20) |
|
response.raise_for_status() |
|
|
|
soup = BeautifulSoup(response.text, "html.parser") |
|
courses = [] |
|
|
|
for a_tag in soup.find_all("a", href=True): |
|
name_div = a_tag.find("div", class_="name") |
|
if not name_div: |
|
continue |
|
|
|
courses.append({ |
|
"name": name_div.get_text(strip=True), |
|
"url": urljoin(BASE_URL, a_tag["href"]), |
|
}) |
|
|
|
print(f"Found {len(courses)} total courses") |
|
return courses |
|
|
|
|
|
# ---------------- STEP 2: INTERSECT ---------------- |
|
|
|
def extract_approved_course_names(xlsx_path: str) -> set[str]: |
|
df = pd.read_excel(xlsx_path) |
|
approved = set() |
|
|
|
for col in df.columns: |
|
for val in df[col].dropna(): |
|
if isinstance(val, str): |
|
approved.add(normalize(val)) |
|
|
|
return approved |
|
|
|
|
|
def intersect_courses(all_courses, approved_xlsx): |
|
print("Intersecting with approved courses…") |
|
approved_names = extract_approved_course_names(approved_xlsx) |
|
approved_courses = [] |
|
|
|
for course in all_courses: |
|
name_norm = normalize(course["name"]) |
|
try: |
|
name_norm = name_norm.split("noc:")[1] |
|
except Exception: |
|
pass |
|
|
|
if name_norm in approved_names: |
|
approved_courses.append(course) |
|
|
|
print(f"Approved courses found: {len(approved_courses)}") |
|
return approved_courses |
|
|
|
|
|
# ---------------- STEP 3: ANALYSIS ---------------- |
|
|
|
def fetch_course_stats(course_id: str) -> dict | None: |
|
try: |
|
r = requests.get(STATS_API.format(course_id), headers=HEADERS, timeout=10) |
|
r.raise_for_status() |
|
payload = r.json() |
|
return payload["data"][0] if payload.get("data") else None |
|
except Exception: |
|
return None |
|
|
|
|
|
def timeline_at_least_n_months(timeline: str, min_months: int) -> bool: |
|
try: |
|
months_part = timeline.split()[0] |
|
start, end = months_part.split("-") |
|
s, e = MONTHS[start], MONTHS[end] |
|
|
|
duration = e - s + 1 if e >= s else (12 - s + 1) + e |
|
return duration >= min_months |
|
except Exception: |
|
return False |
|
|
|
|
|
def all_runs_valid(runs, min_mark, min_months): |
|
if not runs: |
|
return False |
|
|
|
for run in runs: |
|
try: |
|
if int(run.get("max_mark", 0)) < min_mark: |
|
return False |
|
if not timeline_at_least_n_months(run.get("Timeline", ""), min_months): |
|
return False |
|
except Exception: |
|
return False |
|
|
|
return True |
|
|
|
|
|
def analyze_courses( |
|
courses, |
|
min_mark, |
|
min_months, |
|
batch_size, |
|
sleep_time, |
|
): |
|
print("Analyzing approved courses…") |
|
results = [] |
|
total = len(courses) |
|
|
|
for i in range(0, total, batch_size): |
|
batch = courses[i:i + batch_size] |
|
print(f"\nBatch {i//batch_size + 1}: {i+1}-{min(i+batch_size, total)}") |
|
|
|
for course in batch: |
|
cid = extract_course_id(course["url"]) |
|
print(f" → {course['name']} ({cid})") |
|
|
|
stats = fetch_course_stats(cid) |
|
if not stats: |
|
continue |
|
|
|
runs = stats.get("run_wise_stats", []) |
|
if not all_runs_valid(runs, min_mark, min_months): |
|
continue |
|
|
|
marks = [int(r["max_mark"]) for r in runs if "max_mark" in r] |
|
|
|
results.append({ |
|
"name": course["name"], |
|
"course_id": cid, |
|
"url": course["url"], |
|
"runs": len(marks), |
|
"max_marks": marks, |
|
"average_max_mark": round(mean(marks), 2), |
|
"min_max_mark": min(marks), |
|
}) |
|
|
|
if i + batch_size < total: |
|
time.sleep(sleep_time) |
|
|
|
results.sort( |
|
key=lambda x: (x["average_max_mark"], x["min_max_mark"], x["runs"]), |
|
reverse=True, |
|
) |
|
|
|
return results |
|
|
|
|
|
# ---------------- OUTPUT ---------------- |
|
|
|
def write_csv(data, path): |
|
with open(path, "w", newline="", encoding="utf-8") as f: |
|
writer = csv.DictWriter( |
|
f, |
|
fieldnames=[ |
|
"name", "course_id", "url", "runs", |
|
"average_max_mark", "min_max_mark", "max_marks" |
|
], |
|
) |
|
writer.writeheader() |
|
|
|
for row in data: |
|
row = dict(row) |
|
row["max_marks"] = ",".join(map(str, row["max_marks"])) |
|
writer.writerow(row) |
|
|
|
|
|
# ---------------- CLI ---------------- |
|
|
|
def main(): |
|
parser = argparse.ArgumentParser( |
|
description="Complete NPTEL pipeline: scrape → intersect → analyze" |
|
) |
|
|
|
parser.add_argument("--approved-xlsx", required=True, help="Approved courses Excel file") |
|
parser.add_argument("--output-json", required=True, help="Final output JSON") |
|
parser.add_argument("--csv", help="Optional CSV output") |
|
|
|
parser.add_argument("--min-mark", type=int, default=90) |
|
parser.add_argument("--min-months", type=int, default=3) |
|
parser.add_argument("--batch-size", type=int, default=25) |
|
parser.add_argument("--sleep", type=int, default=5) |
|
|
|
args = parser.parse_args() |
|
|
|
all_courses = scrape_nptel_courses() |
|
approved_courses = intersect_courses(all_courses, args.approved_xlsx) |
|
|
|
results = analyze_courses( |
|
approved_courses, |
|
args.min_mark, |
|
args.min_months, |
|
args.batch_size, |
|
args.sleep, |
|
) |
|
|
|
with open(args.output_json, "w", encoding="utf-8") as f: |
|
json.dump(results, f, indent=2, ensure_ascii=False) |
|
|
|
print(f"\nSaved final JSON → {args.output_json}") |
|
|
|
if args.csv: |
|
write_csv(results, args.csv) |
|
print(f"Saved CSV → {args.csv}") |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |