Created
February 8, 2026 12:42
-
-
Save danharper/627a45e7b6bbcb402d5d8311e22d2c6b to your computer and use it in GitHub Desktop.
claude code's ext2reader it wrote to extract files from an image
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| ext2reader.py - Pure Python ext2 filesystem reader for raw disk images. | |
| Reads files and directories from an ext2 partition inside a raw disk image | |
| by parsing the binary structures directly. No ext2 drivers or FUSE needed. | |
| Usage: | |
| python3 ext2reader.py <image> ls <path> | |
| python3 ext2reader.py <image> cat <path> | |
| python3 ext2reader.py <image> extract <fs_path> <local_path> | |
| python3 ext2reader.py <image> extract-tree <fs_path> <local_dir> | |
| python3 ext2reader.py <image> info | |
| Examples: | |
| python3 ext2reader.py disk.raw ls / | |
| python3 ext2reader.py disk.raw cat /etc/hostname | |
| python3 ext2reader.py disk.raw extract /etc/passwd ./passwd | |
| python3 ext2reader.py disk.raw extract-tree /var/www ./www-extracted | |
| python3 ext2reader.py disk.raw info | |
| The script auto-detects the first Linux partition (type 0x83) from the MBR. | |
| NOTE FROM HUMAN: This assumes you've already converted the .ova to .raw, e.g. | |
| - tar -xf ./disk.ova # outputs a vmdk | |
| - qemu-img convert -O raw ./disk-disk1.vmdk ./disk.raw | |
| """ | |
| import struct | |
| import sys | |
| import os | |
| class Ext2Reader: | |
| """Reads an ext2 filesystem from a raw disk image.""" | |
| # Inode type masks | |
| S_IFMT = 0xF000 | |
| S_IFDIR = 0x4000 | |
| S_IFREG = 0x8000 | |
| S_IFLNK = 0xA000 | |
| # Directory entry file types | |
| FT_NAMES = {1: "file", 2: "dir", 3: "chr", 4: "blk", | |
| 5: "fifo", 6: "sock", 7: "link"} | |
| def __init__(self, f, partition_offset=None): | |
| self.f = f | |
| if partition_offset is not None: | |
| self.part_offset = partition_offset | |
| else: | |
| self.part_offset = self._find_linux_partition() | |
| self._read_superblock() | |
| def _find_linux_partition(self): | |
| """Parse MBR to find the first Linux (0x83) partition.""" | |
| self.f.seek(446) | |
| for i in range(4): | |
| entry = self.f.read(16) | |
| ptype = entry[4] | |
| lba_start = struct.unpack_from("<I", entry, 8)[0] | |
| num_sectors = struct.unpack_from("<I", entry, 12)[0] | |
| if ptype == 0x83 and num_sectors > 0: | |
| offset = lba_start * 512 | |
| return offset | |
| raise ValueError("No Linux partition (type 0x83) found in MBR") | |
| def _read_superblock(self): | |
| """Read and validate the ext2 superblock.""" | |
| self.f.seek(self.part_offset + 1024) | |
| sb = self.f.read(1024) | |
| magic = struct.unpack_from("<H", sb, 56)[0] | |
| if magic != 0xEF53: | |
| raise ValueError(f"Bad ext2 magic: 0x{magic:04x} (expected 0xEF53)") | |
| self.inodes_count = struct.unpack_from("<I", sb, 0)[0] | |
| self.blocks_count = struct.unpack_from("<I", sb, 4)[0] | |
| self.log_block_size = struct.unpack_from("<I", sb, 24)[0] | |
| self.block_size = 1024 << self.log_block_size | |
| self.inodes_per_group = struct.unpack_from("<I", sb, 40)[0] | |
| self.inode_size = struct.unpack_from("<I", sb, 88)[0] | |
| self.rev_level = struct.unpack_from("<I", sb, 76)[0] | |
| self.feature_incompat = struct.unpack_from("<I", sb, 96)[0] | |
| # For revision 0 filesystems, inode size is always 128 | |
| if self.rev_level == 0: | |
| self.inode_size = 128 | |
| def _read_block(self, block_num): | |
| """Read a single block from the filesystem.""" | |
| if block_num == 0: | |
| return b"\x00" * self.block_size | |
| self.f.seek(self.part_offset + block_num * self.block_size) | |
| return self.f.read(self.block_size) | |
| def _read_inode(self, ino_num): | |
| """Read a raw inode by its number (1-indexed).""" | |
| bg = (ino_num - 1) // self.inodes_per_group | |
| idx = (ino_num - 1) % self.inodes_per_group | |
| # Read block group descriptor (32 bytes each, table starts at block 1 | |
| # for block_size >= 2048, or block 2 for block_size == 1024) | |
| gdt_block = 1 if self.block_size >= 2048 else 2 | |
| self.f.seek(self.part_offset + gdt_block * self.block_size + bg * 32) | |
| gd = self.f.read(32) | |
| inode_table_block = struct.unpack_from("<I", gd, 8)[0] | |
| offset = (self.part_offset | |
| + inode_table_block * self.block_size | |
| + idx * self.inode_size) | |
| self.f.seek(offset) | |
| return self.f.read(self.inode_size) | |
| def _parse_inode(self, raw): | |
| """Parse a raw inode into a dict of useful fields.""" | |
| mode = struct.unpack_from("<H", raw, 0)[0] | |
| size = struct.unpack_from("<I", raw, 4)[0] | |
| atime = struct.unpack_from("<I", raw, 8)[0] | |
| ctime = struct.unpack_from("<I", raw, 12)[0] | |
| mtime = struct.unpack_from("<I", raw, 16)[0] | |
| links = struct.unpack_from("<H", raw, 26)[0] | |
| blocks = [struct.unpack_from("<I", raw, 40 + i * 4)[0] for i in range(15)] | |
| return { | |
| "mode": mode, | |
| "size": size, | |
| "atime": atime, | |
| "ctime": ctime, | |
| "mtime": mtime, | |
| "links": links, | |
| "blocks": blocks, # 0-11: direct, 12: indirect, 13: double, 14: triple | |
| "raw": raw, | |
| "is_dir": (mode & self.S_IFMT) == self.S_IFDIR, | |
| "is_file": (mode & self.S_IFMT) == self.S_IFREG, | |
| "is_symlink": (mode & self.S_IFMT) == self.S_IFLNK, | |
| } | |
| def _get_data_blocks(self, info): | |
| """ | |
| Resolve all data block numbers for an inode, following | |
| indirect, double-indirect, and triple-indirect pointers. | |
| """ | |
| result = [] | |
| bs = self.block_size | |
| ptrs_per_block = bs // 4 # number of 4-byte block pointers per block | |
| # 12 direct blocks (indices 0-11) | |
| for i in range(12): | |
| bn = info["blocks"][i] | |
| if bn == 0: | |
| return result | |
| result.append(bn) | |
| # Single indirect (index 12) | |
| if info["blocks"][12]: | |
| ind = self._read_block(info["blocks"][12]) | |
| for i in range(ptrs_per_block): | |
| bn = struct.unpack_from("<I", ind, i * 4)[0] | |
| if bn == 0: | |
| return result | |
| result.append(bn) | |
| # Double indirect (index 13) | |
| if info["blocks"][13]: | |
| dind = self._read_block(info["blocks"][13]) | |
| for j in range(ptrs_per_block): | |
| ind_bn = struct.unpack_from("<I", dind, j * 4)[0] | |
| if ind_bn == 0: | |
| return result | |
| ind = self._read_block(ind_bn) | |
| for i in range(ptrs_per_block): | |
| bn = struct.unpack_from("<I", ind, i * 4)[0] | |
| if bn == 0: | |
| return result | |
| result.append(bn) | |
| # Triple indirect (index 14) | |
| if info["blocks"][14]: | |
| tind = self._read_block(info["blocks"][14]) | |
| for k in range(ptrs_per_block): | |
| dind_bn = struct.unpack_from("<I", tind, k * 4)[0] | |
| if dind_bn == 0: | |
| return result | |
| dind = self._read_block(dind_bn) | |
| for j in range(ptrs_per_block): | |
| ind_bn = struct.unpack_from("<I", dind, j * 4)[0] | |
| if ind_bn == 0: | |
| return result | |
| ind = self._read_block(ind_bn) | |
| for i in range(ptrs_per_block): | |
| bn = struct.unpack_from("<I", ind, i * 4)[0] | |
| if bn == 0: | |
| return result | |
| result.append(bn) | |
| return result | |
| def read_file(self, info): | |
| """Read the full contents of a file or symlink target.""" | |
| size = info["size"] | |
| # Fast symlinks: target stored inline in the block pointers area | |
| # when the symlink is short enough (< 60 bytes) and no blocks allocated | |
| if info["is_symlink"] and size < 60 and info["blocks"][0] == 0: | |
| return bytes(info["raw"][40:40 + size]) | |
| blocks = self._get_data_blocks(info) | |
| data = bytearray() | |
| for bn in blocks: | |
| data.extend(self._read_block(bn)) | |
| return bytes(data[:size]) | |
| def read_dir(self, info): | |
| """ | |
| Parse directory entries from a directory inode. | |
| Returns list of (inode_number, file_type, name) tuples. | |
| """ | |
| data = self.read_file(info) | |
| entries = [] | |
| offset = 0 | |
| while offset < len(data) and offset < info["size"]: | |
| if offset + 8 > len(data): | |
| break | |
| d_inode = struct.unpack_from("<I", data, offset)[0] | |
| d_rec_len = struct.unpack_from("<H", data, offset + 4)[0] | |
| d_name_len = data[offset + 6] | |
| d_file_type = data[offset + 7] | |
| if d_rec_len == 0: | |
| break | |
| if d_inode != 0: | |
| name = data[offset + 8:offset + 8 + d_name_len].decode("utf-8", errors="replace") | |
| entries.append((d_inode, d_file_type, name)) | |
| offset += d_rec_len | |
| return entries | |
| def resolve_path(self, path): | |
| """Resolve a filesystem path to an inode number. Returns None if not found.""" | |
| parts = [p for p in path.split("/") if p] | |
| current_ino = 2 # root inode is always 2 | |
| for part in parts: | |
| raw = self._read_inode(current_ino) | |
| info = self._parse_inode(raw) | |
| if not info["is_dir"]: | |
| return None | |
| entries = self.read_dir(info) | |
| found = False | |
| for ino, ftype, name in entries: | |
| if name == part: | |
| current_ino = ino | |
| found = True | |
| break | |
| if not found: | |
| return None | |
| return current_ino | |
| def stat(self, path): | |
| """Get parsed inode info for a path. Returns None if not found.""" | |
| ino = self.resolve_path(path) | |
| if ino is None: | |
| return None | |
| return self._parse_inode(self._read_inode(ino)) | |
| def ls(self, path="/"): | |
| """List directory contents with type and size info.""" | |
| ino = self.resolve_path(path) | |
| if ino is None: | |
| print(f"Path not found: {path}") | |
| return | |
| info = self._parse_inode(self._read_inode(ino)) | |
| if not info["is_dir"]: | |
| print(f"{path}: not a directory (mode=0o{info['mode']:o})") | |
| return | |
| entries = self.read_dir(info) | |
| for child_ino, ftype, name in entries: | |
| if name in (".", ".."): | |
| continue | |
| tname = self.FT_NAMES.get(ftype, f"?{ftype}") | |
| extra = "" | |
| if ftype == 7: # symlink | |
| try: | |
| child_info = self._parse_inode(self._read_inode(child_ino)) | |
| target = self.read_file(child_info).decode("utf-8", errors="replace") | |
| extra = f" -> {target}" | |
| except Exception: | |
| pass | |
| elif ftype == 1: # regular file | |
| try: | |
| child_info = self._parse_inode(self._read_inode(child_ino)) | |
| extra = f" ({child_info['size']:,} bytes)" | |
| except Exception: | |
| pass | |
| print(f" {tname:5s} {name}{extra}") | |
| def cat(self, path): | |
| """Print file contents to stdout.""" | |
| ino = self.resolve_path(path) | |
| if ino is None: | |
| print(f"Path not found: {path}", file=sys.stderr) | |
| sys.exit(1) | |
| info = self._parse_inode(self._read_inode(ino)) | |
| if info["is_symlink"]: | |
| target = self.read_file(info).decode("utf-8", errors="replace") | |
| print(f"symlink -> {target}") | |
| return | |
| if info["is_dir"]: | |
| print(f"{path}: is a directory", file=sys.stderr) | |
| sys.exit(1) | |
| data = self.read_file(info) | |
| sys.stdout.buffer.write(data) | |
| def extract(self, fs_path, local_path): | |
| """Extract a single file to a local path.""" | |
| ino = self.resolve_path(fs_path) | |
| if ino is None: | |
| print(f"Path not found: {fs_path}", file=sys.stderr) | |
| sys.exit(1) | |
| info = self._parse_inode(self._read_inode(ino)) | |
| data = self.read_file(info) | |
| os.makedirs(os.path.dirname(local_path) or ".", exist_ok=True) | |
| with open(local_path, "wb") as out: | |
| out.write(data) | |
| print(f"Extracted {fs_path} ({len(data):,} bytes) -> {local_path}") | |
| def extract_tree(self, fs_path, local_dir): | |
| """Recursively extract a directory tree.""" | |
| ino = self.resolve_path(fs_path) | |
| if ino is None: | |
| print(f"Path not found: {fs_path}", file=sys.stderr) | |
| sys.exit(1) | |
| info = self._parse_inode(self._read_inode(ino)) | |
| if not info["is_dir"]: | |
| # Single file | |
| self.extract(fs_path, os.path.join(local_dir, os.path.basename(fs_path))) | |
| return | |
| self._extract_tree_recursive(ino, fs_path, local_dir) | |
| def _extract_tree_recursive(self, ino_num, fs_path, local_path): | |
| """Internal recursive tree extraction.""" | |
| info = self._parse_inode(self._read_inode(ino_num)) | |
| if not info["is_dir"]: | |
| return | |
| os.makedirs(local_path, exist_ok=True) | |
| entries = self.read_dir(info) | |
| for child_ino, ftype, name in entries: | |
| if name in (".", ".."): | |
| continue | |
| child_fs = fs_path.rstrip("/") + "/" + name | |
| child_local = os.path.join(local_path, name) | |
| try: | |
| child_info = self._parse_inode(self._read_inode(child_ino)) | |
| if child_info["is_symlink"]: | |
| target = self.read_file(child_info).decode("utf-8", errors="replace") | |
| try: | |
| os.symlink(target, child_local) | |
| except OSError: | |
| with open(child_local + ".symlink", "w") as out: | |
| out.write(target) | |
| print(f" link {child_fs} -> {target}") | |
| elif child_info["is_dir"]: | |
| self._extract_tree_recursive(child_ino, child_fs, child_local) | |
| elif child_info["is_file"]: | |
| data = self.read_file(child_info) | |
| with open(child_local, "wb") as out: | |
| out.write(data) | |
| print(f" file {child_fs} ({len(data):,} bytes)") | |
| except Exception as e: | |
| print(f" ERROR {child_fs}: {e}", file=sys.stderr) | |
| def info(self): | |
| """Print filesystem metadata.""" | |
| print(f"Partition offset: {self.part_offset} bytes (sector {self.part_offset // 512})") | |
| print(f"Block size: {self.block_size}") | |
| print(f"Inode size: {self.inode_size}") | |
| print(f"Total inodes: {self.inodes_count:,}") | |
| print(f"Total blocks: {self.blocks_count:,}") | |
| print(f"Inodes per group: {self.inodes_per_group:,}") | |
| print(f"Revision: {self.rev_level}") | |
| has_journal = bool(self.feature_incompat & 0x0004) | |
| has_extents = bool(self.feature_incompat & 0x0040) | |
| if has_extents: | |
| fs_type = "ext4" | |
| elif has_journal: | |
| fs_type = "ext3" | |
| else: | |
| fs_type = "ext2" | |
| print(f"Filesystem type: {fs_type}") | |
| print(f"Volume size: {self.blocks_count * self.block_size / (1024**3):.1f} GiB") | |
| def main(): | |
| if len(sys.argv) < 3: | |
| print(__doc__.strip()) | |
| sys.exit(1) | |
| image_path = sys.argv[1] | |
| command = sys.argv[2] | |
| with open(image_path, "rb") as f: | |
| reader = Ext2Reader(f) | |
| if command == "info": | |
| reader.info() | |
| elif command == "ls": | |
| path = sys.argv[3] if len(sys.argv) > 3 else "/" | |
| reader.ls(path) | |
| elif command == "cat": | |
| if len(sys.argv) < 4: | |
| print("Usage: ext2reader.py <image> cat <path>", file=sys.stderr) | |
| sys.exit(1) | |
| reader.cat(sys.argv[3]) | |
| elif command == "extract": | |
| if len(sys.argv) < 5: | |
| print("Usage: ext2reader.py <image> extract <fs_path> <local_path>", file=sys.stderr) | |
| sys.exit(1) | |
| reader.extract(sys.argv[3], sys.argv[4]) | |
| elif command == "extract-tree": | |
| if len(sys.argv) < 5: | |
| print("Usage: ext2reader.py <image> extract-tree <fs_path> <local_dir>", file=sys.stderr) | |
| sys.exit(1) | |
| reader.extract_tree(sys.argv[3], sys.argv[4]) | |
| else: | |
| print(f"Unknown command: {command}", file=sys.stderr) | |
| print("Commands: info, ls, cat, extract, extract-tree", file=sys.stderr) | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment