Created
February 3, 2026 02:33
-
-
Save GMNGeoffrey/77e8115bf11fa91fee953b374f0c8737 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Claude Code Session Manager - A TUI tool to browse, manage, and resume Claude Code sessions. | |
| """ | |
| import curses | |
| import json | |
| import os | |
| from datetime import datetime | |
| from pathlib import Path | |
| class SessionManager: | |
| def __init__(self): | |
| self.claude_dir = Path.home() / ".claude" / "projects" | |
| self.projects: list[str] = [] | |
| self.sessions: list[tuple[str, dict]] = [] # (project, entry) pairs | |
| self.index_data: dict[str, dict] = {} # project -> parsed sessions-index.json | |
| self.errors: list[str] = [] # Accumulated errors to display to user | |
| def clear_errors(self): | |
| """Clear accumulated errors.""" | |
| self.errors = [] | |
| def add_error(self, msg: str): | |
| """Add an error message.""" | |
| self.errors.append(msg) | |
| def load_projects(self) -> list[str]: | |
| """Find all project directories.""" | |
| if not self.claude_dir.exists(): | |
| return [] | |
| self.projects = [ | |
| d.name | |
| for d in self.claude_dir.iterdir() | |
| if d.is_dir() and (d / "sessions-index.json").exists() | |
| ] | |
| return self.projects | |
| def get_current_project(self) -> str | None: | |
| """Determine current project from cwd.""" | |
| cwd = os.getcwd() | |
| project_name = cwd.replace("/", "-") | |
| if not project_name.startswith("-"): | |
| project_name = "-" + project_name | |
| while "--" in project_name: | |
| project_name = project_name.replace("--", "-") | |
| return project_name if project_name in self.projects else None | |
| def load_sessions(self, project: str | None = None) -> list[tuple[str, dict]]: | |
| """Load sessions from one or all projects.""" | |
| self.sessions = [] | |
| self.index_data = {} | |
| projects_to_load = [project] if project else self.projects | |
| for proj in projects_to_load: | |
| if proj is None: | |
| continue | |
| index_path = self.claude_dir / proj / "sessions-index.json" | |
| if not index_path.exists(): | |
| continue | |
| try: | |
| with open(index_path) as f: | |
| data = json.load(f) | |
| self.index_data[proj] = data | |
| for entry in data.get("entries", []): | |
| self.sessions.append((proj, entry)) | |
| except json.JSONDecodeError as e: | |
| self.add_error(f"Failed to parse {index_path.name}: {e}") | |
| except IOError as e: | |
| self.add_error(f"Failed to read {index_path.name}: {e}") | |
| # Sort by modified date, newest first | |
| self.sessions.sort(key=lambda s: s[1].get("modified", ""), reverse=True) | |
| return self.sessions | |
| def load_conversation(self, session: dict) -> list[dict]: | |
| """Parse JSONL file and extract messages.""" | |
| messages = [] | |
| path = Path(session.get("fullPath", "")) | |
| if not path.exists(): | |
| self.add_error(f"Conversation file missing: {path}") | |
| return messages | |
| try: | |
| with open(path) as f: | |
| for line in f: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| entry = json.loads(line) | |
| entry_type = entry.get("type") | |
| if entry_type in ("user", "assistant"): | |
| content = self._extract_content( | |
| entry.get("message", {}).get("content", "") | |
| ) | |
| messages.append({ | |
| "role": entry_type, | |
| "content": content, | |
| "timestamp": entry.get("timestamp", ""), | |
| }) | |
| except json.JSONDecodeError as e: | |
| self.add_error(f"Malformed JSON line in {path.name}: {e}") | |
| except IOError as e: | |
| self.add_error(f"Failed to read conversation {path.name}: {e}") | |
| return messages | |
| def _extract_content(self, content) -> str: | |
| """Extract text content from message content field.""" | |
| if isinstance(content, str): | |
| return content | |
| if isinstance(content, list): | |
| texts = [] | |
| for item in content: | |
| if isinstance(item, dict) and item.get("type") == "text": | |
| texts.append(item.get("text", "")) | |
| return "\n".join(texts) | |
| return "" | |
| def rename_session(self, project: str, session: dict, new_title: str) -> bool: | |
| """Update session title in memory and write to index file.""" | |
| if project not in self.index_data: | |
| return False | |
| session["customTitle"] = new_title | |
| return self._save_index(project) | |
| def delete_session(self, project: str, session: dict) -> bool: | |
| """Remove session from memory, index file, and delete conversation file.""" | |
| if project not in self.index_data: | |
| return False | |
| session_id = session.get("sessionId") | |
| data = self.index_data[project] | |
| data["entries"] = [e for e in data.get("entries", []) if e.get("sessionId") != session_id] | |
| if not self._save_index(project): | |
| return False | |
| # Remove conversation file | |
| conv_path = Path(session.get("fullPath", "")) | |
| if conv_path.exists(): | |
| conv_path.unlink() | |
| # Remove from in-memory list | |
| self.sessions = [s for s in self.sessions if s[1].get("sessionId") != session_id] | |
| return True | |
| def _save_index(self, project: str) -> bool: | |
| """Write index data back to file.""" | |
| index_path = self.claude_dir / project / "sessions-index.json" | |
| try: | |
| with open(index_path, "w") as f: | |
| json.dump(self.index_data[project], f, indent=2) | |
| return True | |
| except IOError as e: | |
| self.add_error(f"Failed to save {index_path.name}: {e}") | |
| return False | |
| class TUI: | |
| def __init__(self, stdscr, manager: SessionManager): | |
| self.stdscr = stdscr | |
| self.manager = manager | |
| self.selected_idx = 0 | |
| self.scroll_offset = 0 | |
| self.show_all_projects = False | |
| self.current_project = manager.get_current_project() | |
| self.preview_count = 1 # Number of messages to show at each end | |
| self.show_all_messages = False # Toggle to show full conversation | |
| self.messages_cache: dict[str, list[dict]] = {} | |
| self.status_message = "" | |
| self.input_mode = False | |
| self.input_buffer = "" | |
| self.input_prompt = "" | |
| # Colors | |
| curses.start_color() | |
| curses.use_default_colors() | |
| curses.init_pair(1, curses.COLOR_CYAN, -1) # Header | |
| curses.init_pair(2, curses.COLOR_GREEN, -1) # Selected | |
| curses.init_pair(3, curses.COLOR_YELLOW, -1) # User messages | |
| curses.init_pair(4, curses.COLOR_BLUE, -1) # Assistant messages | |
| curses.init_pair(5, curses.COLOR_RED, -1) # Warning/delete | |
| self.reload_sessions() | |
| def reload_sessions(self): | |
| """Reload sessions based on current view mode.""" | |
| self.manager.clear_errors() | |
| if self.show_all_projects: | |
| self.manager.load_sessions() | |
| else: | |
| self.manager.load_sessions(self.current_project) | |
| self.messages_cache.clear() | |
| if self.selected_idx >= len(self.manager.sessions): | |
| self.selected_idx = max(0, len(self.manager.sessions) - 1) | |
| # Surface any errors from loading | |
| if self.manager.errors: | |
| self.status_message = "; ".join(self.manager.errors) | |
| self.manager.clear_errors() | |
| def get_current_session(self) -> tuple[str, dict] | None: | |
| """Get currently selected (project, session) tuple.""" | |
| if 0 <= self.selected_idx < len(self.manager.sessions): | |
| return self.manager.sessions[self.selected_idx] | |
| return None | |
| def get_messages(self, session: dict) -> list[dict]: | |
| """Get messages for session, using cache.""" | |
| session_id = session.get("sessionId", "") | |
| if session_id not in self.messages_cache: | |
| self.messages_cache[session_id] = self.manager.load_conversation(session) | |
| # Surface any errors from loading conversation | |
| if self.manager.errors: | |
| self.status_message = "; ".join(self.manager.errors) | |
| self.manager.clear_errors() | |
| return self.messages_cache[session_id] | |
| def get_session_title(self, session: dict) -> str: | |
| """Get display title for a session.""" | |
| return session.get("customTitle") or session.get("summary") or session.get("firstPrompt") or "No title" | |
| def _get_error_or_default(self, default: str) -> str: | |
| """Get accumulated errors or fall back to default message.""" | |
| if self.manager.errors: | |
| msg = "; ".join(self.manager.errors) | |
| self.manager.clear_errors() | |
| return msg | |
| return default | |
| def format_date(self, iso_date: str) -> str: | |
| """Format ISO date for display.""" | |
| try: | |
| dt = datetime.fromisoformat(iso_date.replace("Z", "+00:00")) | |
| return dt.strftime("%Y-%m-%d %H:%M") | |
| except (ValueError, AttributeError): | |
| return iso_date[:16] if iso_date else "Unknown" | |
| def truncate(self, text: str, max_len: int) -> str: | |
| """Truncate text with ellipsis.""" | |
| text = text.replace("\n", " ").strip() | |
| if len(text) <= max_len: | |
| return text | |
| return text[: max_len - 3] + "..." | |
| def draw(self): | |
| """Main draw function.""" | |
| self.stdscr.clear() | |
| height, width = self.stdscr.getmaxyx() | |
| # Calculate panel heights | |
| header_height = 1 | |
| footer_height = 2 | |
| list_height = min(10, max(5, (height - header_height - footer_height) // 3)) | |
| details_height = 4 | |
| preview_height = height - header_height - footer_height - list_height - details_height - 3 | |
| self.draw_header(0, width) | |
| self.draw_session_list(2, list_height, width) | |
| self.draw_details(2 + list_height + 1, details_height, width) | |
| self.draw_preview(2 + list_height + details_height + 2, preview_height, width) | |
| self.draw_footer(height - 2, width) | |
| if self.input_mode: | |
| self.draw_input(height - 3, width) | |
| self.stdscr.refresh() | |
| def draw_header(self, y: int, width: int): | |
| """Draw header bar.""" | |
| project_display = "All Projects" if self.show_all_projects else (self.current_project or "No Project") | |
| session_count = len(self.manager.sessions) | |
| title = f" Claude Sessions - {project_display} ({session_count} sessions)" | |
| # Right-aligned controls | |
| controls = "[a] Toggle All [q] Quit " | |
| title = title[: width - len(controls) - 1] | |
| self.stdscr.attron(curses.color_pair(1) | curses.A_BOLD) | |
| self.stdscr.addstr(y, 0, title.ljust(width - len(controls))) | |
| self.stdscr.addstr(y, width - len(controls), controls) | |
| self.stdscr.attroff(curses.color_pair(1) | curses.A_BOLD) | |
| # Separator | |
| self.stdscr.addstr(y + 1, 0, "─" * (width - 1)) | |
| def draw_session_list(self, y: int, height: int, width: int): | |
| """Draw scrollable session list.""" | |
| sessions = self.manager.sessions | |
| # Adjust scroll offset | |
| if self.selected_idx < self.scroll_offset: | |
| self.scroll_offset = self.selected_idx | |
| elif self.selected_idx >= self.scroll_offset + height: | |
| self.scroll_offset = self.selected_idx - height + 1 | |
| for i in range(height): | |
| idx = self.scroll_offset + i | |
| if idx >= len(sessions): | |
| break | |
| _project, session = sessions[idx] | |
| is_selected = idx == self.selected_idx | |
| # Format line | |
| date_str = self.format_date(session.get("modified", "")) | |
| msg_count = f"[{session.get('messageCount', 0)} msgs]" | |
| title = self.get_session_title(session) | |
| # Calculate available space for title text | |
| prefix = "> " if is_selected else " " | |
| fixed_len = len(prefix) + len(date_str) + 2 + len(msg_count) + 2 | |
| title_len = width - fixed_len - 2 | |
| title = self.truncate(title, max(10, title_len)) | |
| line = f"{prefix}{date_str} {title.ljust(title_len)} {msg_count}" | |
| if is_selected: | |
| self.stdscr.attron(curses.color_pair(2) | curses.A_BOLD) | |
| self.stdscr.addstr(y + i, 0, line[: width - 1]) | |
| if is_selected: | |
| self.stdscr.attroff(curses.color_pair(2) | curses.A_BOLD) | |
| # Separator | |
| self.stdscr.addstr(y + height, 0, "─" * (width - 1)) | |
| def draw_details(self, y: int, height: int, width: int): | |
| """Draw session details panel.""" | |
| current = self.get_current_session() | |
| if not current: | |
| self.stdscr.addstr(y, 0, "No sessions found") | |
| return | |
| _project, session = current | |
| self.stdscr.attron(curses.A_BOLD) | |
| self.stdscr.addstr(y, 0, "Session Details") | |
| self.stdscr.attroff(curses.A_BOLD) | |
| lines = [ | |
| f"ID: {session.get('sessionId', '')}", | |
| f"Created: {self.format_date(session.get('created', ''))} Modified: {self.format_date(session.get('modified', ''))}", | |
| f"Messages: {session.get('messageCount', 0)} Branch: {session.get('gitBranch') or '(none)'} Project: {session.get('projectPath', '')}", | |
| ] | |
| for i, line in enumerate(lines[:height]): | |
| self.stdscr.addstr(y + 1 + i, 0, self.truncate(line, width - 1)) | |
| # Separator | |
| self.stdscr.addstr(y + height, 0, "─" * (width - 1)) | |
| def draw_preview(self, y: int, height: int, width: int): | |
| """Draw conversation preview panel.""" | |
| current = self.get_current_session() | |
| if not current: | |
| return | |
| _project, session = current | |
| messages = self.get_messages(session) | |
| if not messages: | |
| self.stdscr.addstr(y, 0, "No messages found") | |
| return | |
| # Determine which messages to show | |
| if self.show_all_messages: | |
| to_show = [("msg", msg) for msg in messages] | |
| header_info = "Full" | |
| else: | |
| n = self.preview_count | |
| total = len(messages) | |
| to_show = [] | |
| # First n messages | |
| for msg in messages[:n]: | |
| to_show.append(("first", msg)) | |
| # Ellipsis if there are hidden messages in the middle | |
| if total > n * 2: | |
| to_show.append(("ellipsis", None)) | |
| # Last n messages (avoiding duplicates if lists overlap) | |
| for msg in messages[-n:]: | |
| if msg not in [m for _, m in to_show]: | |
| to_show.append(("last", msg)) | |
| header_info = f"{n}+{n}" if total > n else "All" | |
| # Header with expand/contract hints | |
| header = f"Preview ({header_info}) [+/-] Expand/Contract [f] Full" | |
| self.stdscr.attron(curses.A_BOLD) | |
| self.stdscr.addstr(y, 0, header[: width - 1]) | |
| self.stdscr.attroff(curses.A_BOLD) | |
| line_idx = 1 | |
| for label, msg in to_show: | |
| if line_idx >= height: | |
| break | |
| if label == "ellipsis": | |
| self.stdscr.addstr(y + line_idx, 0, " ... (more messages) ...") | |
| line_idx += 1 | |
| continue | |
| if msg is None: | |
| continue | |
| # Role indicator | |
| if msg.get("role") == "user": | |
| prefix = "> " | |
| color = curses.color_pair(3) | |
| else: | |
| prefix = "< " | |
| color = curses.color_pair(4) | |
| # Show content (may span multiple lines) | |
| content = msg.get("content", "")[:500] # Limit content length | |
| content_lines = content.split("\n") | |
| for content_line in content_lines: | |
| if line_idx >= height: | |
| break | |
| display_line = prefix + content_line if content_lines.index(content_line) == 0 else " " + content_line | |
| self.stdscr.attron(color) | |
| self.stdscr.addstr(y + line_idx, 0, self.truncate(display_line, width - 1)) | |
| self.stdscr.attroff(color) | |
| line_idx += 1 | |
| prefix = " " # Indent continuation lines | |
| line_idx += 1 # Blank line between messages | |
| def draw_footer(self, y: int, width: int): | |
| """Draw footer with keybindings.""" | |
| self.stdscr.addstr(y, 0, "─" * (width - 1)) | |
| if self.status_message: | |
| self.stdscr.attron(curses.color_pair(5)) | |
| self.stdscr.addstr(y + 1, 0, self.status_message[: width - 1]) | |
| self.stdscr.attroff(curses.color_pair(5)) | |
| else: | |
| keys = "[Enter] Continue [r] Rename [d] Delete [↑↓/jk] Navigate [+/-] Expand [f] Full [q] Quit" | |
| self.stdscr.addstr(y + 1, 0, keys[: width - 1]) | |
| def draw_input(self, y: int, width: int): | |
| """Draw input prompt.""" | |
| prompt = f"{self.input_prompt}: {self.input_buffer}" | |
| self.stdscr.attron(curses.A_BOLD) | |
| self.stdscr.addstr(y, 0, prompt[: width - 1]) | |
| self.stdscr.attroff(curses.A_BOLD) | |
| curses.curs_set(1) | |
| def handle_input(self) -> bool: | |
| """Handle keyboard input. Returns False to quit.""" | |
| try: | |
| key = self.stdscr.getch() | |
| except KeyboardInterrupt: | |
| return False | |
| self.status_message = "" | |
| if self.input_mode: | |
| return self.handle_input_mode(key) | |
| if key == ord("q") or key == 27: # q or Escape | |
| return False | |
| elif key == curses.KEY_UP or key == ord("k"): | |
| self.selected_idx = max(0, self.selected_idx - 1) | |
| elif key == curses.KEY_DOWN or key == ord("j"): | |
| self.selected_idx = min(len(self.manager.sessions) - 1, self.selected_idx + 1) | |
| elif key == ord("a"): | |
| self.show_all_projects = not self.show_all_projects | |
| self.reload_sessions() | |
| elif key == ord("+") or key == curses.KEY_RIGHT: | |
| self.show_all_messages = False | |
| self.preview_count += 1 | |
| elif key == ord("-") or key == curses.KEY_LEFT: | |
| self.show_all_messages = False | |
| self.preview_count = max(1, self.preview_count - 1) | |
| elif key == ord("f"): | |
| self.show_all_messages = not self.show_all_messages | |
| elif key == ord("r"): | |
| current = self.get_current_session() | |
| if current: | |
| _project, session = current | |
| self.input_mode = True | |
| self.input_prompt = "New title" | |
| self.input_buffer = self.get_session_title(session) | |
| curses.curs_set(1) | |
| elif key == ord("d"): | |
| current = self.get_current_session() | |
| if current: | |
| self.input_mode = True | |
| self.input_prompt = "Delete? (y/n)" | |
| self.input_buffer = "" | |
| elif key == 10 or key == curses.KEY_ENTER: # Enter | |
| current = self.get_current_session() | |
| if current: | |
| return self.continue_session(current) | |
| return True | |
| def handle_input_mode(self, key: int) -> bool: | |
| """Handle input in text entry mode.""" | |
| if key == 27: # Escape | |
| self.input_mode = False | |
| self.input_buffer = "" | |
| curses.curs_set(0) | |
| elif key == 10 or key == curses.KEY_ENTER: # Enter | |
| self.input_mode = False | |
| curses.curs_set(0) | |
| if self.input_prompt == "New title": | |
| current = self.get_current_session() | |
| if current and self.input_buffer: | |
| project, session = current | |
| if self.manager.rename_session(project, session, self.input_buffer): | |
| self.manager.clear_errors() | |
| self.status_message = "Session renamed" | |
| else: | |
| self.status_message = self._get_error_or_default("Failed to rename session") | |
| elif self.input_prompt == "Delete? (y/n)": | |
| if self.input_buffer.lower() == "y": | |
| current = self.get_current_session() | |
| if current: | |
| project, session = current | |
| if self.manager.delete_session(project, session): | |
| self.manager.clear_errors() | |
| self.status_message = "Session deleted" | |
| else: | |
| self.status_message = self._get_error_or_default("Failed to delete session") | |
| self.input_buffer = "" | |
| elif key == curses.KEY_BACKSPACE or key == 127: | |
| self.input_buffer = self.input_buffer[:-1] | |
| elif 32 <= key <= 126: # Printable characters | |
| self.input_buffer += chr(key) | |
| return True | |
| def continue_session(self, current: tuple[str, dict]) -> bool: | |
| """Launch claude --resume for the session.""" | |
| _project, session = current | |
| curses.endwin() | |
| os.execlp("claude", "claude", "--resume", session.get("sessionId", "")) | |
| return False # Never reached | |
| def run(self): | |
| """Main event loop.""" | |
| curses.curs_set(0) # Hide cursor | |
| self.stdscr.nodelay(False) | |
| self.stdscr.keypad(True) | |
| while True: | |
| self.draw() | |
| if not self.handle_input(): | |
| break | |
| def main(stdscr): | |
| manager = SessionManager() | |
| manager.load_projects() | |
| if not manager.projects: | |
| stdscr.addstr(0, 0, "No Claude sessions found in ~/.claude/projects/") | |
| stdscr.addstr(1, 0, "Press any key to exit...") | |
| stdscr.getch() | |
| return | |
| tui = TUI(stdscr, manager) | |
| tui.run() | |
| if __name__ == "__main__": | |
| try: | |
| curses.wrapper(main) | |
| except KeyboardInterrupt: | |
| pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment