Created
December 10, 2025 20:05
-
-
Save Jany-M/dbf2bd0785fa5819b8b1f90e66bba843 to your computer and use it in GitHub Desktop.
Github Actions Workflow run logs retriever (Debug failes jobs)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Script to fetch GitHub Actions workflow run logs | |
| The script fetches the latest failed jobs, then creates a log file with error summary and job run log | |
| Usage: | |
| python fetch_github_actions_logs.py [--branch BRANCH] [--run-id RUN_ID] [--all-failed] [--per-page N] | |
| """ | |
| # SET UP | |
| # Get a Github fine grained personal access token and make sure to add the correct repos and read scope to Actions | |
| # https://github.com/settings/personal-access-tokens | |
| GITHUB_TOKEN = "put_your_github_pat_here" | |
| OWNER = "your-github-user-name" | |
| REPO = "your-repo-name" | |
| LOG_DIR = "github_action_logs" | |
| import urllib.request | |
| import urllib.parse | |
| import json | |
| import sys | |
| import argparse | |
| import os | |
| import shutil | |
| import re | |
| from pathlib import Path | |
| def make_request(url, params=None, follow_redirects=True, is_redirect=False): | |
| """Make a GitHub API request | |
| Args: | |
| url: The URL to request | |
| params: Query parameters | |
| follow_redirects: Whether to follow redirects | |
| is_redirect: True if this is a redirect follow (don't send PAT to non-GitHub URLs) | |
| """ | |
| if params: | |
| url += "?" + urllib.parse.urlencode(params) | |
| req = urllib.request.Request(url) | |
| # Only send Authorization header to GitHub API, not to redirects (Azure Blob Storage) | |
| # Azure redirect URLs already contain SAS tokens for authentication | |
| is_github_url = 'api.github.com' in url or 'github.com' in url | |
| if not is_redirect or is_github_url: | |
| req.add_header("Authorization", f"Bearer {GITHUB_TOKEN}") | |
| req.add_header("Accept", "application/vnd.github.v3+json") | |
| req.add_header("User-Agent", "Python-GitHub-Actions-Log-Fetcher") | |
| try: | |
| # For redirects, use an opener that doesn't automatically follow redirects | |
| if follow_redirects and not is_redirect: | |
| # Create opener that doesn't follow redirects automatically | |
| class NoRedirectHandler(urllib.request.HTTPRedirectHandler): | |
| def redirect_request(self, req, fp, code, msg, headers, newurl): | |
| # Store redirect info and return None to prevent automatic redirect | |
| self.redirect_code = code | |
| self.redirect_url = newurl | |
| return None | |
| handler = NoRedirectHandler() | |
| opener = urllib.request.build_opener(handler) | |
| try: | |
| response = opener.open(req) | |
| # If we get here, no redirect occurred | |
| use_response = response | |
| except urllib.error.HTTPError as e: | |
| # Check if this is a redirect (3xx status) | |
| if e.code in (301, 302, 303, 307, 308): | |
| redirect_url = e.headers.get('Location') or handler.redirect_url | |
| if redirect_url: | |
| # Follow redirect without sending PAT (Azure URL has SAS token) | |
| return make_request(redirect_url, follow_redirects=False, is_redirect=True) | |
| raise | |
| else: | |
| use_response = response | |
| else: | |
| # For non-redirects or already following redirect, use normal opener | |
| use_response = urllib.request.urlopen(req) | |
| with use_response: | |
| content_type = use_response.headers.get_content_type() | |
| if content_type and 'application/json' in content_type: | |
| return json.loads(use_response.read().decode('utf-8')) | |
| else: | |
| # For logs, the response is plain text (may be gzipped) | |
| content = use_response.read() | |
| # Try to decompress if gzipped | |
| if use_response.headers.get('Content-Encoding') == 'gzip': | |
| import gzip | |
| content = gzip.decompress(content) | |
| # Decode with error handling for encoding issues | |
| # Remove BOM if present | |
| if content.startswith(b'\xef\xbb\xbf'): | |
| content = content[3:] | |
| return content.decode('utf-8', errors='replace') | |
| except urllib.error.HTTPError as e: | |
| # Provide more detailed error information | |
| error_body = "" | |
| if e.fp: | |
| try: | |
| error_body = e.read().decode('utf-8', errors='replace') | |
| except: | |
| error_body = "Could not read error response" | |
| # For 401/403 errors, provide helpful guidance | |
| if e.code in (401, 403): | |
| error_msg = f"{e.reason}" | |
| if error_body: | |
| try: | |
| error_json = json.loads(error_body) | |
| if 'message' in error_json: | |
| error_msg += f": {error_json['message']}" | |
| except: | |
| # Safely truncate error body, removing any problematic characters | |
| safe_body = error_body[:200].encode('ascii', errors='replace').decode('ascii') | |
| error_msg += f": {safe_body}" | |
| else: | |
| # Safely encode error message for Windows console | |
| safe_body = error_body[:200].encode('ascii', errors='replace').decode('ascii') | |
| error_msg = f"{e.reason}. Response: {safe_body}" | |
| raise urllib.error.HTTPError(e.url, e.code, error_msg, e.headers, e.fp) | |
| def get_workflow_runs(branch=None, per_page=5): | |
| """Get workflow runs for the repository""" | |
| url = f"https://api.github.com/repos/{OWNER}/{REPO}/actions/runs" | |
| params = {"per_page": per_page} | |
| if branch: | |
| params["branch"] = branch | |
| return make_request(url, params) | |
| def get_workflow_run(run_id): | |
| """Get details of a specific workflow run""" | |
| url = f"https://api.github.com/repos/{OWNER}/{REPO}/actions/runs/{run_id}" | |
| return make_request(url) | |
| def get_workflow_jobs(run_id): | |
| """Get jobs for a workflow run""" | |
| url = f"https://api.github.com/repos/{OWNER}/{REPO}/actions/runs/{run_id}/jobs" | |
| return make_request(url) | |
| def get_job_logs(job_id): | |
| """Get logs for a specific job | |
| Note: This endpoint requires 'Actions: Read' permission on the repository. | |
| The logs endpoint may return a redirect to the actual log file location. | |
| For fine-grained PATs: Repository permission "Actions" with at least "Read" access | |
| For classic PATs: 'repo' scope or 'actions:read' scope | |
| """ | |
| url = f"https://api.github.com/repos/{OWNER}/{REPO}/actions/jobs/{job_id}/logs" | |
| try: | |
| # The logs endpoint returns the raw log content (may be gzipped) | |
| return make_request(url, follow_redirects=True) | |
| except urllib.error.HTTPError as e: | |
| # Log the actual error for debugging | |
| error_detail = str(e) | |
| if e.code == 401: | |
| print(f" [DEBUG] 401 Unauthorized - {error_detail}") | |
| return None # Authentication issue - PAT may need 'Actions: Read' permission | |
| elif e.code == 403: | |
| print(f" [DEBUG] 403 Forbidden - {error_detail}") | |
| return None # Forbidden - PAT may not have sufficient permissions | |
| elif e.code == 404: | |
| print(f" [DEBUG] 404 Not Found - Logs may have expired") | |
| return None # Logs may have expired (logs are retained for 90 days) | |
| else: | |
| print(f" [DEBUG] HTTP {e.code} - {error_detail}") | |
| raise | |
| def cleanup_log_directory(): | |
| """Clean up existing logs in the log directory""" | |
| log_path = Path(LOG_DIR) | |
| if log_path.exists(): | |
| # Remove all files in the directory | |
| for file in log_path.iterdir(): | |
| if file.is_file(): | |
| try: | |
| file.unlink() | |
| except Exception as e: | |
| print(f" [WARN] Could not delete {file.name}: {e}") | |
| else: | |
| # Create directory if it doesn't exist | |
| log_path.mkdir(parents=True, exist_ok=True) | |
| def analyze_log_errors(logs): | |
| """Analyze GitHub Actions log to extract error information | |
| Args: | |
| logs: The log content as a string | |
| Returns: | |
| A summary string with error information | |
| """ | |
| lines = logs.split('\n') | |
| summary_lines = [] | |
| summary_lines.append("=" * 80) | |
| summary_lines.append("ERROR SUMMARY - Issues Found in This Workflow Run") | |
| summary_lines.append("=" * 80) | |
| summary_lines.append("") | |
| current_step = None | |
| step_stack = [] # Track nested step groups | |
| failed_steps = [] | |
| error_messages = [] | |
| in_error_section = False | |
| error_context = [] | |
| exit_codes = [] | |
| for i, line in enumerate(lines): | |
| # Detect step groups (can be nested) | |
| if '##[group]' in line: | |
| step_match = re.search(r'##\[group\](.+)', line) | |
| if step_match: | |
| step_name = step_match.group(1).strip() | |
| step_stack.append(step_name) | |
| current_step = step_name | |
| # Detect end of step groups | |
| if '##[endgroup]' in line: | |
| if step_stack: | |
| step_stack.pop() | |
| current_step = step_stack[-1] if step_stack else None | |
| # Detect step commands | |
| if '##[command]' in line: | |
| cmd_match = re.search(r'##\[command\](.+)', line) | |
| if cmd_match: | |
| cmd = cmd_match.group(1).strip() | |
| # Use command as step context if no group is active | |
| if not current_step: | |
| current_step = f"Command: {cmd[:50]}" | |
| # Detect errors | |
| if '##[error]' in line: | |
| in_error_section = True | |
| error_text = line.replace('##[error]', '').strip() | |
| if error_text: | |
| error_messages.append(error_text) | |
| if current_step: | |
| step_name = current_step | |
| # Check if step already in failed_steps | |
| step_found = False | |
| for step in failed_steps: | |
| if step['name'] == step_name: | |
| step_found = True | |
| step['errors'].append(error_text) | |
| break | |
| if not step_found: | |
| failed_steps.append({'name': step_name, 'errors': [error_text]}) | |
| error_context = [error_text] | |
| # Collect error context (lines after ##[error]) | |
| elif in_error_section: | |
| if line.strip() and not (line.startswith('##') or line.startswith('2025-')): | |
| error_context.append(line.strip()) | |
| if len(error_context) > 8: # Limit context | |
| in_error_section = False | |
| elif '##[group]' in line or '##[endgroup]' in line: | |
| in_error_section = False | |
| # Detect exit codes and failures | |
| if 'Process completed with exit code' in line: | |
| exit_code_match = re.search(r'exit code (\d+)', line) | |
| if exit_code_match: | |
| code = exit_code_match.group(1) | |
| exit_codes.append((code, current_step)) | |
| if code != '0': | |
| if current_step: | |
| step_name = current_step | |
| step_found = False | |
| for step in failed_steps: | |
| if step['name'] == step_name: | |
| step_found = True | |
| break | |
| if not step_found: | |
| failed_steps.append({'name': step_name, 'errors': [f"Process exited with code {code}"]}) | |
| # Detect tracebacks | |
| if 'Traceback (most recent call last):' in line: | |
| if current_step: | |
| step_name = current_step | |
| step_found = False | |
| for step in failed_steps: | |
| if step['name'] == step_name: | |
| step_found = True | |
| break | |
| if not step_found: | |
| failed_steps.append({'name': step_name, 'errors': []}) | |
| # Collect traceback (usually 10-20 lines) | |
| tb_lines = [line] | |
| for j in range(i + 1, min(i + 25, len(lines))): | |
| tb_lines.append(lines[j]) | |
| # Stop at the exception message | |
| if re.match(r'^\w+Error:', lines[j]) or re.match(r'^\w+Exception:', lines[j]): | |
| break | |
| # Stop if we hit a new step or command | |
| if '##[' in lines[j] and 'Traceback' not in lines[j]: | |
| break | |
| if tb_lines: | |
| tb_text = '\n'.join(tb_lines[:20]) | |
| error_messages.append(tb_text) | |
| # Add to step errors if we have a current step | |
| if current_step: | |
| for step in failed_steps: | |
| if step['name'] == current_step: | |
| step['errors'].append(tb_text[:500]) # First 500 chars | |
| break | |
| # Detect test failures | |
| if re.search(r'(FAILED|ERROR|FAIL)', line) and ('test' in line.lower() or 'pytest' in line.lower()): | |
| if current_step: | |
| step_name = current_step | |
| step_found = False | |
| for step in failed_steps: | |
| if step['name'] == step_name: | |
| step_found = True | |
| if line.strip() not in step['errors']: | |
| step['errors'].append(line.strip()[:200]) | |
| break | |
| if not step_found: | |
| failed_steps.append({'name': step_name, 'errors': [line.strip()[:200]]}) | |
| # Build summary | |
| if failed_steps: | |
| summary_lines.append("FAILED STEPS:") | |
| summary_lines.append("-" * 80) | |
| for step in failed_steps: | |
| summary_lines.append(f" • {step['name']}") | |
| if step['errors']: | |
| # Show first error for each step | |
| first_error = step['errors'][0] | |
| error_preview = first_error.replace('\n', ' ').strip()[:250] | |
| summary_lines.append(f" Error: {error_preview}") | |
| if len(step['errors']) > 1: | |
| summary_lines.append(f" ({len(step['errors'])} more error(s) - see full log)") | |
| summary_lines.append("") | |
| if error_messages: | |
| summary_lines.append("KEY ERROR MESSAGES:") | |
| summary_lines.append("-" * 80) | |
| for i, error in enumerate(error_messages[:5], 1): # Top 5 errors | |
| error_clean = error.replace('\n', ' ').strip()[:300] | |
| if len(error) > 300: | |
| error_clean += "..." | |
| summary_lines.append(f" {i}. {error_clean}") | |
| summary_lines.append("") | |
| # Look for common failure patterns | |
| failure_patterns = { | |
| r'ImportError|cannot import': 'Import error detected', | |
| r'AttributeError|has no attribute': 'Attribute error detected', | |
| r'TypeError|unsupported operand': 'Type error detected', | |
| r'ValueError|invalid value': 'Value error detected', | |
| r'ConnectionError|Connection refused|Connection timeout': 'Connection error detected', | |
| r'TimeoutError|timed out': 'Timeout error detected', | |
| r'AssertionError|assert.*failed': 'Test assertion failed', | |
| r'HTTPError|HTTP \d+': 'HTTP error detected', | |
| r'Database error|database.*error|psql.*error': 'Database error detected', | |
| r'Test.*failed|pytest.*FAILED': 'Test failure detected', | |
| } | |
| detected_patterns = [] | |
| logs_lower = logs.lower() | |
| for pattern, description in failure_patterns.items(): | |
| if re.search(pattern, logs_lower, re.IGNORECASE): | |
| detected_patterns.append(description) | |
| if detected_patterns: | |
| summary_lines.append("DETECTED ISSUE TYPES:") | |
| summary_lines.append("-" * 80) | |
| for pattern in detected_patterns: | |
| summary_lines.append(f" • {pattern}") | |
| summary_lines.append("") | |
| if exit_codes: | |
| non_zero = [ec for ec in exit_codes if ec[0] != '0'] | |
| if non_zero: | |
| summary_lines.append("NON-ZERO EXIT CODES:") | |
| summary_lines.append("-" * 80) | |
| for code, step in non_zero[:5]: # Show first 5 | |
| step_name = step if step else "Unknown step" | |
| summary_lines.append(f" • Exit code {code} in: {step_name}") | |
| summary_lines.append("") | |
| if not failed_steps and not error_messages and not exit_codes: | |
| summary_lines.append("No explicit errors detected in log format.") | |
| summary_lines.append("Check the full log below for details.") | |
| summary_lines.append("") | |
| summary_lines.append("=" * 80) | |
| summary_lines.append("Full log follows below:") | |
| summary_lines.append("=" * 80) | |
| summary_lines.append("") | |
| return '\n'.join(summary_lines) | |
| def fetch_and_save_logs(job_id, job_name, run_id=None, show_preview=True): | |
| """Fetch logs for a job and save to file | |
| Args: | |
| job_id: The job ID | |
| job_name: The job name | |
| run_id: The workflow run ID (required for generating correct log URL) | |
| show_preview: Whether to show log preview | |
| """ | |
| print(f"\n Fetching logs for failed job {job_id}...") | |
| try: | |
| logs = get_job_logs(job_id) | |
| if logs is None: | |
| print(f" [WARN] Could not fetch logs. Possible reasons:") | |
| print(f" - PAT needs 'Actions: Read' repository permission (fine-grained PAT)") | |
| print(f" - PAT needs 'actions:read' scope (classic PAT)") | |
| print(f" - Logs may have expired (retained for 90 days)") | |
| print(f" - Repository access may be restricted") | |
| if run_id: | |
| print(f" [INFO] View logs directly at: https://github.com/{OWNER}/{REPO}/actions/runs/{run_id}/job/{job_id}") | |
| else: | |
| print(f" [INFO] View logs at: https://github.com/{OWNER}/{REPO}/actions") | |
| print(f" [INFO] Verify PAT permissions at: https://github.com/settings/tokens") | |
| return | |
| # Ensure log directory exists and is clean | |
| log_path = Path(LOG_DIR) | |
| log_path.mkdir(parents=True, exist_ok=True) | |
| # Analyze logs to extract error information | |
| error_summary = analyze_log_errors(logs) | |
| # Save logs to file with error summary at the top (will replace if exists) | |
| safe_name = job_name.replace(' ', '_').replace('/', '_').replace('\\', '_') | |
| log_file = log_path / f"job_{job_id}_{safe_name}.txt" | |
| with open(log_file, 'w', encoding='utf-8', errors='replace') as f: | |
| # Write error summary first | |
| f.write(error_summary) | |
| # Then write the full log | |
| f.write(logs) | |
| print(f" [OK] Logs saved to: {log_file}") | |
| print(f" [INFO] Error summary added at the top of the log file") | |
| if show_preview: | |
| print(f" Last 50 lines of logs:") | |
| print(" " + "-" * 76) | |
| # Handle encoding issues on Windows console | |
| import sys | |
| for line in logs.split('\n')[-50:]: | |
| if line.strip(): | |
| try: | |
| print(f" {line}") | |
| except UnicodeEncodeError: | |
| # Fallback: encode with error handling for Windows console | |
| safe_line = line.encode('ascii', errors='replace').decode('ascii') | |
| print(f" {safe_line}") | |
| except Exception as e: | |
| # Safely print error message (avoid encoding issues) | |
| error_msg = str(e).encode('ascii', errors='replace').decode('ascii') | |
| print(f" [ERROR] Error fetching logs: {error_msg}") | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser(description='Fetch GitHub Actions workflow run logs') | |
| parser.add_argument('--branch', default='release/1.33.0', help='Branch to check (default: release/1.33.0)') | |
| parser.add_argument('--run-id', help='Specific workflow run ID to fetch') | |
| parser.add_argument('--all-failed', action='store_true', help='Fetch all failed runs (not just latest push)') | |
| parser.add_argument('--per-page', type=int, default=3, help='Number of runs to fetch per page (default: 3, since each push has 3 workflows)') | |
| parser.add_argument('--no-preview', action='store_true', help='Skip showing log preview') | |
| parser.add_argument('--keep-old-logs', action='store_true', help='Keep existing logs (do not clean up)') | |
| args = parser.parse_args() | |
| branch = args.branch | |
| run_id = args.run_id | |
| # Default to 3 since each push triggers 3 workflows (e2e, backend, ci) | |
| # If all-failed is specified, fetch more to get historical failures | |
| per_page = args.per_page if not args.all_failed else max(args.per_page, 10) | |
| # Clean up old logs at the start of each run (unless --keep-old-logs is specified) | |
| if not args.keep_old_logs: | |
| print("Cleaning up old logs...") | |
| cleanup_log_directory() | |
| print("Ready to fetch new logs.\n") | |
| print(f"Fetching workflow runs for branch: {branch}") | |
| print("=" * 80) | |
| # Get recent workflow runs | |
| runs = get_workflow_runs(branch=branch, per_page=per_page) | |
| print(f"\nFound {runs['total_count']} total runs") | |
| print(f"Showing {len(runs['workflow_runs'])} runs:\n") | |
| failed_runs = [] | |
| for run in runs['workflow_runs']: | |
| status_icon = "[FAIL]" if run['conclusion'] == 'failure' else "[PASS]" if run['conclusion'] == 'success' else "[RUN]" | |
| print(f"{status_icon} Run #{run['id']}: {run['name']} - {run['conclusion']} ({run['status']})") | |
| print(f" Branch: {run['head_branch']}, Commit: {run['head_sha'][:7]}") | |
| print(f" URL: {run['html_url']}") | |
| print() | |
| if run['conclusion'] == 'failure': | |
| failed_runs.append(run) | |
| # Process specific run or all failed runs | |
| runs_to_process = [] | |
| if run_id: | |
| runs_to_process = [r for r in runs['workflow_runs'] if str(r['id']) == str(run_id)] | |
| if not runs_to_process: | |
| # Try to fetch the run even if not in the list | |
| try: | |
| run_details = get_workflow_run(run_id) | |
| runs_to_process = [run_details] | |
| except Exception as e: | |
| print(f"\n[ERROR] Could not fetch run {run_id}: {e}") | |
| sys.exit(1) | |
| elif args.all_failed: | |
| runs_to_process = failed_runs | |
| else: | |
| # Default: process all failed runs from the latest push | |
| # Each push triggers 3 workflows (e2e, backend, ci), so get all failed runs from the latest commit | |
| if failed_runs: | |
| # Group by commit SHA to get all workflows from the same push | |
| latest_commit = failed_runs[0]['head_sha'] | |
| runs_to_process = [run for run in failed_runs if run['head_sha'] == latest_commit] | |
| if len(runs_to_process) > 0: | |
| print(f"\n[INFO] Found {len(runs_to_process)} failed workflow(s) from latest push (commit: {latest_commit[:7]})") | |
| # Process each run | |
| for run in runs_to_process: | |
| print(f"\n{'=' * 80}") | |
| print(f"Fetching details for run ID: {run['id']}") | |
| print("=" * 80) | |
| if 'name' not in run: | |
| run = get_workflow_run(run['id']) | |
| print(f"\nWorkflow: {run['name']}") | |
| print(f"Status: {run['status']}") | |
| print(f"Conclusion: {run['conclusion']}") | |
| print(f"Branch: {run['head_branch']}") | |
| print(f"Commit: {run['head_sha']}") | |
| print(f"URL: {run['html_url']}") | |
| # Get jobs for this run | |
| jobs = get_workflow_jobs(run['id']) | |
| print(f"\n{'=' * 80}") | |
| print(f"Jobs in this run ({len(jobs['jobs'])}):") | |
| print("=" * 80) | |
| for job in jobs['jobs']: | |
| status_icon = "[FAIL]" if job['conclusion'] == 'failure' else "[PASS]" if job['conclusion'] == 'success' else "[RUN]" | |
| print(f"\n{status_icon} Job #{job['id']}: {job['name']}") | |
| print(f" Status: {job['status']}, Conclusion: {job['conclusion']}") | |
| print(f" Started: {job['started_at']}") | |
| print(f" Completed: {job['completed_at']}") | |
| print(f" URL: {job['html_url']}") | |
| # Get logs for failed jobs | |
| if job['conclusion'] == 'failure': | |
| fetch_and_save_logs(job['id'], job['name'], run_id=run['id'], show_preview=not args.no_preview) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment