Created
February 12, 2026 00:23
-
-
Save DanielVF/d8adc782193c6dbdc3b1236f82f7b7ea to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| dedup.py — Deduplicate files across one or more directories. | |
| Usage: | |
| python dedup.py [--dry-run] <dir1> [dir2] ... | |
| Duplicate files (same length + same SHA-256) are moved into a DUPLICATE/ | |
| subfolder inside their original parent directory. The "best" copy (shortest | |
| filename, then shortest directory path, then alphabetical full path) is kept | |
| in place. | |
| """ | |
| import argparse | |
| import hashlib | |
| import os | |
| import random | |
| import string | |
| import sys | |
| from collections import defaultdict | |
| DUPLICATE_DIR_NAME = "DUPLICATE" | |
| PROGRESS_CHARS = string.ascii_letters + string.digits + string.punctuation | |
| HASH_BUF_SIZE = 1 << 16 # 64 KiB read chunks | |
| def is_hidden(path): | |
| """Return True if any component of the path starts with a dot.""" | |
| parts = path.split(os.sep) | |
| return any(part.startswith(".") for part in parts if part) | |
| def inside_duplicate_dir(path): | |
| """Return True if the path passes through a DUPLICATE directory.""" | |
| parts = path.split(os.sep) | |
| return DUPLICATE_DIR_NAME in parts | |
| def sha256_of_file(filepath): | |
| h = hashlib.sha256() | |
| with open(filepath, "rb") as f: | |
| while True: | |
| chunk = f.read(HASH_BUF_SIZE) | |
| if not chunk: | |
| break | |
| h.update(chunk) | |
| return h.hexdigest() | |
| def pick_best(paths): | |
| """ | |
| Return the 'best' path from a list of duplicates. | |
| Criteria (in order): | |
| 1. Shortest filename (basename). | |
| 2. Shortest directory name (dirname). | |
| 3. Full path alphabetical order. | |
| """ | |
| def sort_key(p): | |
| return (len(os.path.basename(p)), len(os.path.dirname(p)), p) | |
| return sorted(paths, key=sort_key)[0] | |
| def unique_dest(dest): | |
| """ | |
| If *dest* already exists, append a numeric suffix before the extension | |
| until an unused name is found. | |
| """ | |
| if not os.path.exists(dest): | |
| return dest | |
| base, ext = os.path.splitext(dest) | |
| counter = 1 | |
| while True: | |
| candidate = f"{base}_{counter}{ext}" | |
| if not os.path.exists(candidate): | |
| return candidate | |
| counter += 1 | |
| def collect_files(directories): | |
| """ | |
| Walk the given directories and return a list of regular file paths, | |
| skipping hidden files/dirs, symlinks, empty files, and anything inside | |
| a DUPLICATE directory. | |
| """ | |
| files = [] | |
| seen_count = 0 | |
| for top in directories: | |
| for dirpath, dirnames, filenames in os.walk(top, followlinks=False): | |
| # Prune hidden directories and DUPLICATE directories in-place | |
| dirnames[:] = [ | |
| d for d in dirnames | |
| if not d.startswith(".") | |
| and d != DUPLICATE_DIR_NAME | |
| and not os.path.islink(os.path.join(dirpath, d)) | |
| ] | |
| for fname in filenames: | |
| if fname.startswith("."): | |
| continue | |
| full = os.path.join(dirpath, fname) | |
| if os.path.islink(full): | |
| continue | |
| if inside_duplicate_dir(full): | |
| continue | |
| try: | |
| size = os.path.getsize(full) | |
| except OSError: | |
| continue | |
| if size == 0: | |
| continue | |
| files.append((full, size)) | |
| seen_count += 1 | |
| if seen_count % 10 == 0: | |
| sys.stdout.write(random.choice(PROGRESS_CHARS)) | |
| sys.stdout.flush() | |
| return files | |
| def group_by_size(file_list): | |
| """Group files by size; discard groups with only one member.""" | |
| by_size = defaultdict(list) | |
| for path, size in file_list: | |
| by_size[size].append(path) | |
| return {sz: paths for sz, paths in by_size.items() if len(paths) > 1} | |
| def group_by_hash(size_groups): | |
| """ | |
| Within each size group, compute SHA-256 and group by (size, hash). | |
| Yields (hash, [paths]) for groups with more than one member. | |
| Progress characters are printed every 10 files hashed. | |
| """ | |
| hashed_count = 0 | |
| by_hash = defaultdict(list) | |
| for size, paths in size_groups.items(): | |
| for p in paths: | |
| try: | |
| h = sha256_of_file(p) | |
| except OSError: | |
| continue | |
| by_hash[(size, h)].append(p) | |
| hashed_count += 1 | |
| if hashed_count % 10 == 0: | |
| sys.stdout.write(random.choice(PROGRESS_CHARS)) | |
| sys.stdout.flush() | |
| return {k: v for k, v in by_hash.items() if len(v) > 1} | |
| def plan_moves(dup_groups): | |
| """ | |
| For each group of duplicates, pick the best file to keep and plan | |
| moves for the rest into their respective DUPLICATE directories. | |
| Returns a list of (src, dest) tuples. | |
| """ | |
| moves = [] | |
| for (_size, _hash), paths in dup_groups.items(): | |
| best = pick_best(paths) | |
| for p in paths: | |
| if p == best: | |
| continue | |
| parent = os.path.dirname(p) | |
| dup_dir = os.path.join(parent, DUPLICATE_DIR_NAME) | |
| dest = unique_dest(os.path.join(dup_dir, os.path.basename(p))) | |
| moves.append((p, dest)) | |
| return moves | |
| def execute_moves(moves, dry_run=False): | |
| for src, dest in moves: | |
| dup_dir = os.path.dirname(dest) | |
| if dry_run: | |
| print(f"[DRY RUN] {src} → {dest}") | |
| else: | |
| os.makedirs(dup_dir, exist_ok=True) | |
| os.rename(src, dest) | |
| print(f"Moved: {src} → {dest}") | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Deduplicate files across directories." | |
| ) | |
| parser.add_argument( | |
| "directories", | |
| nargs="+", | |
| help="One or more directories to scan.", | |
| ) | |
| parser.add_argument( | |
| "--dry-run", | |
| action="store_true", | |
| help="Show what would be done without moving any files.", | |
| ) | |
| args = parser.parse_args() | |
| # Resolve to absolute paths and validate | |
| directories = [] | |
| for d in args.directories: | |
| d = os.path.abspath(d) | |
| if not os.path.isdir(d): | |
| print(f"Error: '{d}' is not a directory.", file=sys.stderr) | |
| sys.exit(1) | |
| directories.append(d) | |
| # --- Phase 1: collect files --- | |
| print("Scanning files …") | |
| file_list = collect_files(directories) | |
| print(f"\n{len(file_list)} file(s) found.") | |
| # --- Phase 2: group by size --- | |
| size_groups = group_by_size(file_list) | |
| candidates = sum(len(v) for v in size_groups.values()) | |
| print(f"{candidates} file(s) share a size with at least one other file.") | |
| if candidates == 0: | |
| print("Nothing to deduplicate.") | |
| return | |
| # --- Phase 3: hash and group --- | |
| print("Hashing candidates …") | |
| dup_groups = group_by_hash(size_groups) | |
| dup_count = sum(len(v) - 1 for v in dup_groups.values()) | |
| print(f"\n{dup_count} duplicate(s) detected across {len(dup_groups)} group(s).") | |
| if dup_count == 0: | |
| print("Nothing to deduplicate.") | |
| return | |
| # --- Phase 4: plan & execute --- | |
| moves = plan_moves(dup_groups) | |
| execute_moves(moves, dry_run=args.dry_run) | |
| if args.dry_run: | |
| print(f"\nDry run complete. {len(moves)} file(s) would be moved.") | |
| else: | |
| print(f"\nDone. {len(moves)} file(s) moved.") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment