Skip to content

Instantly share code, notes, and snippets.

@Jany-M
Created December 10, 2025 20:05
Show Gist options
  • Select an option

  • Save Jany-M/dbf2bd0785fa5819b8b1f90e66bba843 to your computer and use it in GitHub Desktop.

Select an option

Save Jany-M/dbf2bd0785fa5819b8b1f90e66bba843 to your computer and use it in GitHub Desktop.
Github Actions Workflow run logs retriever (Debug failes jobs)
#!/usr/bin/env python3
"""
Script to fetch GitHub Actions workflow run logs
The script fetches the latest failed jobs, then creates a log file with error summary and job run log
Usage:
python fetch_github_actions_logs.py [--branch BRANCH] [--run-id RUN_ID] [--all-failed] [--per-page N]
"""
# SET UP
# Get a Github fine grained personal access token and make sure to add the correct repos and read scope to Actions
# https://github.com/settings/personal-access-tokens
GITHUB_TOKEN = "put_your_github_pat_here"
OWNER = "your-github-user-name"
REPO = "your-repo-name"
LOG_DIR = "github_action_logs"
import urllib.request
import urllib.parse
import json
import sys
import argparse
import os
import shutil
import re
from pathlib import Path
def make_request(url, params=None, follow_redirects=True, is_redirect=False):
"""Make a GitHub API request
Args:
url: The URL to request
params: Query parameters
follow_redirects: Whether to follow redirects
is_redirect: True if this is a redirect follow (don't send PAT to non-GitHub URLs)
"""
if params:
url += "?" + urllib.parse.urlencode(params)
req = urllib.request.Request(url)
# Only send Authorization header to GitHub API, not to redirects (Azure Blob Storage)
# Azure redirect URLs already contain SAS tokens for authentication
is_github_url = 'api.github.com' in url or 'github.com' in url
if not is_redirect or is_github_url:
req.add_header("Authorization", f"Bearer {GITHUB_TOKEN}")
req.add_header("Accept", "application/vnd.github.v3+json")
req.add_header("User-Agent", "Python-GitHub-Actions-Log-Fetcher")
try:
# For redirects, use an opener that doesn't automatically follow redirects
if follow_redirects and not is_redirect:
# Create opener that doesn't follow redirects automatically
class NoRedirectHandler(urllib.request.HTTPRedirectHandler):
def redirect_request(self, req, fp, code, msg, headers, newurl):
# Store redirect info and return None to prevent automatic redirect
self.redirect_code = code
self.redirect_url = newurl
return None
handler = NoRedirectHandler()
opener = urllib.request.build_opener(handler)
try:
response = opener.open(req)
# If we get here, no redirect occurred
use_response = response
except urllib.error.HTTPError as e:
# Check if this is a redirect (3xx status)
if e.code in (301, 302, 303, 307, 308):
redirect_url = e.headers.get('Location') or handler.redirect_url
if redirect_url:
# Follow redirect without sending PAT (Azure URL has SAS token)
return make_request(redirect_url, follow_redirects=False, is_redirect=True)
raise
else:
use_response = response
else:
# For non-redirects or already following redirect, use normal opener
use_response = urllib.request.urlopen(req)
with use_response:
content_type = use_response.headers.get_content_type()
if content_type and 'application/json' in content_type:
return json.loads(use_response.read().decode('utf-8'))
else:
# For logs, the response is plain text (may be gzipped)
content = use_response.read()
# Try to decompress if gzipped
if use_response.headers.get('Content-Encoding') == 'gzip':
import gzip
content = gzip.decompress(content)
# Decode with error handling for encoding issues
# Remove BOM if present
if content.startswith(b'\xef\xbb\xbf'):
content = content[3:]
return content.decode('utf-8', errors='replace')
except urllib.error.HTTPError as e:
# Provide more detailed error information
error_body = ""
if e.fp:
try:
error_body = e.read().decode('utf-8', errors='replace')
except:
error_body = "Could not read error response"
# For 401/403 errors, provide helpful guidance
if e.code in (401, 403):
error_msg = f"{e.reason}"
if error_body:
try:
error_json = json.loads(error_body)
if 'message' in error_json:
error_msg += f": {error_json['message']}"
except:
# Safely truncate error body, removing any problematic characters
safe_body = error_body[:200].encode('ascii', errors='replace').decode('ascii')
error_msg += f": {safe_body}"
else:
# Safely encode error message for Windows console
safe_body = error_body[:200].encode('ascii', errors='replace').decode('ascii')
error_msg = f"{e.reason}. Response: {safe_body}"
raise urllib.error.HTTPError(e.url, e.code, error_msg, e.headers, e.fp)
def get_workflow_runs(branch=None, per_page=5):
"""Get workflow runs for the repository"""
url = f"https://api.github.com/repos/{OWNER}/{REPO}/actions/runs"
params = {"per_page": per_page}
if branch:
params["branch"] = branch
return make_request(url, params)
def get_workflow_run(run_id):
"""Get details of a specific workflow run"""
url = f"https://api.github.com/repos/{OWNER}/{REPO}/actions/runs/{run_id}"
return make_request(url)
def get_workflow_jobs(run_id):
"""Get jobs for a workflow run"""
url = f"https://api.github.com/repos/{OWNER}/{REPO}/actions/runs/{run_id}/jobs"
return make_request(url)
def get_job_logs(job_id):
"""Get logs for a specific job
Note: This endpoint requires 'Actions: Read' permission on the repository.
The logs endpoint may return a redirect to the actual log file location.
For fine-grained PATs: Repository permission "Actions" with at least "Read" access
For classic PATs: 'repo' scope or 'actions:read' scope
"""
url = f"https://api.github.com/repos/{OWNER}/{REPO}/actions/jobs/{job_id}/logs"
try:
# The logs endpoint returns the raw log content (may be gzipped)
return make_request(url, follow_redirects=True)
except urllib.error.HTTPError as e:
# Log the actual error for debugging
error_detail = str(e)
if e.code == 401:
print(f" [DEBUG] 401 Unauthorized - {error_detail}")
return None # Authentication issue - PAT may need 'Actions: Read' permission
elif e.code == 403:
print(f" [DEBUG] 403 Forbidden - {error_detail}")
return None # Forbidden - PAT may not have sufficient permissions
elif e.code == 404:
print(f" [DEBUG] 404 Not Found - Logs may have expired")
return None # Logs may have expired (logs are retained for 90 days)
else:
print(f" [DEBUG] HTTP {e.code} - {error_detail}")
raise
def cleanup_log_directory():
"""Clean up existing logs in the log directory"""
log_path = Path(LOG_DIR)
if log_path.exists():
# Remove all files in the directory
for file in log_path.iterdir():
if file.is_file():
try:
file.unlink()
except Exception as e:
print(f" [WARN] Could not delete {file.name}: {e}")
else:
# Create directory if it doesn't exist
log_path.mkdir(parents=True, exist_ok=True)
def analyze_log_errors(logs):
"""Analyze GitHub Actions log to extract error information
Args:
logs: The log content as a string
Returns:
A summary string with error information
"""
lines = logs.split('\n')
summary_lines = []
summary_lines.append("=" * 80)
summary_lines.append("ERROR SUMMARY - Issues Found in This Workflow Run")
summary_lines.append("=" * 80)
summary_lines.append("")
current_step = None
step_stack = [] # Track nested step groups
failed_steps = []
error_messages = []
in_error_section = False
error_context = []
exit_codes = []
for i, line in enumerate(lines):
# Detect step groups (can be nested)
if '##[group]' in line:
step_match = re.search(r'##\[group\](.+)', line)
if step_match:
step_name = step_match.group(1).strip()
step_stack.append(step_name)
current_step = step_name
# Detect end of step groups
if '##[endgroup]' in line:
if step_stack:
step_stack.pop()
current_step = step_stack[-1] if step_stack else None
# Detect step commands
if '##[command]' in line:
cmd_match = re.search(r'##\[command\](.+)', line)
if cmd_match:
cmd = cmd_match.group(1).strip()
# Use command as step context if no group is active
if not current_step:
current_step = f"Command: {cmd[:50]}"
# Detect errors
if '##[error]' in line:
in_error_section = True
error_text = line.replace('##[error]', '').strip()
if error_text:
error_messages.append(error_text)
if current_step:
step_name = current_step
# Check if step already in failed_steps
step_found = False
for step in failed_steps:
if step['name'] == step_name:
step_found = True
step['errors'].append(error_text)
break
if not step_found:
failed_steps.append({'name': step_name, 'errors': [error_text]})
error_context = [error_text]
# Collect error context (lines after ##[error])
elif in_error_section:
if line.strip() and not (line.startswith('##') or line.startswith('2025-')):
error_context.append(line.strip())
if len(error_context) > 8: # Limit context
in_error_section = False
elif '##[group]' in line or '##[endgroup]' in line:
in_error_section = False
# Detect exit codes and failures
if 'Process completed with exit code' in line:
exit_code_match = re.search(r'exit code (\d+)', line)
if exit_code_match:
code = exit_code_match.group(1)
exit_codes.append((code, current_step))
if code != '0':
if current_step:
step_name = current_step
step_found = False
for step in failed_steps:
if step['name'] == step_name:
step_found = True
break
if not step_found:
failed_steps.append({'name': step_name, 'errors': [f"Process exited with code {code}"]})
# Detect tracebacks
if 'Traceback (most recent call last):' in line:
if current_step:
step_name = current_step
step_found = False
for step in failed_steps:
if step['name'] == step_name:
step_found = True
break
if not step_found:
failed_steps.append({'name': step_name, 'errors': []})
# Collect traceback (usually 10-20 lines)
tb_lines = [line]
for j in range(i + 1, min(i + 25, len(lines))):
tb_lines.append(lines[j])
# Stop at the exception message
if re.match(r'^\w+Error:', lines[j]) or re.match(r'^\w+Exception:', lines[j]):
break
# Stop if we hit a new step or command
if '##[' in lines[j] and 'Traceback' not in lines[j]:
break
if tb_lines:
tb_text = '\n'.join(tb_lines[:20])
error_messages.append(tb_text)
# Add to step errors if we have a current step
if current_step:
for step in failed_steps:
if step['name'] == current_step:
step['errors'].append(tb_text[:500]) # First 500 chars
break
# Detect test failures
if re.search(r'(FAILED|ERROR|FAIL)', line) and ('test' in line.lower() or 'pytest' in line.lower()):
if current_step:
step_name = current_step
step_found = False
for step in failed_steps:
if step['name'] == step_name:
step_found = True
if line.strip() not in step['errors']:
step['errors'].append(line.strip()[:200])
break
if not step_found:
failed_steps.append({'name': step_name, 'errors': [line.strip()[:200]]})
# Build summary
if failed_steps:
summary_lines.append("FAILED STEPS:")
summary_lines.append("-" * 80)
for step in failed_steps:
summary_lines.append(f" • {step['name']}")
if step['errors']:
# Show first error for each step
first_error = step['errors'][0]
error_preview = first_error.replace('\n', ' ').strip()[:250]
summary_lines.append(f" Error: {error_preview}")
if len(step['errors']) > 1:
summary_lines.append(f" ({len(step['errors'])} more error(s) - see full log)")
summary_lines.append("")
if error_messages:
summary_lines.append("KEY ERROR MESSAGES:")
summary_lines.append("-" * 80)
for i, error in enumerate(error_messages[:5], 1): # Top 5 errors
error_clean = error.replace('\n', ' ').strip()[:300]
if len(error) > 300:
error_clean += "..."
summary_lines.append(f" {i}. {error_clean}")
summary_lines.append("")
# Look for common failure patterns
failure_patterns = {
r'ImportError|cannot import': 'Import error detected',
r'AttributeError|has no attribute': 'Attribute error detected',
r'TypeError|unsupported operand': 'Type error detected',
r'ValueError|invalid value': 'Value error detected',
r'ConnectionError|Connection refused|Connection timeout': 'Connection error detected',
r'TimeoutError|timed out': 'Timeout error detected',
r'AssertionError|assert.*failed': 'Test assertion failed',
r'HTTPError|HTTP \d+': 'HTTP error detected',
r'Database error|database.*error|psql.*error': 'Database error detected',
r'Test.*failed|pytest.*FAILED': 'Test failure detected',
}
detected_patterns = []
logs_lower = logs.lower()
for pattern, description in failure_patterns.items():
if re.search(pattern, logs_lower, re.IGNORECASE):
detected_patterns.append(description)
if detected_patterns:
summary_lines.append("DETECTED ISSUE TYPES:")
summary_lines.append("-" * 80)
for pattern in detected_patterns:
summary_lines.append(f" • {pattern}")
summary_lines.append("")
if exit_codes:
non_zero = [ec for ec in exit_codes if ec[0] != '0']
if non_zero:
summary_lines.append("NON-ZERO EXIT CODES:")
summary_lines.append("-" * 80)
for code, step in non_zero[:5]: # Show first 5
step_name = step if step else "Unknown step"
summary_lines.append(f" • Exit code {code} in: {step_name}")
summary_lines.append("")
if not failed_steps and not error_messages and not exit_codes:
summary_lines.append("No explicit errors detected in log format.")
summary_lines.append("Check the full log below for details.")
summary_lines.append("")
summary_lines.append("=" * 80)
summary_lines.append("Full log follows below:")
summary_lines.append("=" * 80)
summary_lines.append("")
return '\n'.join(summary_lines)
def fetch_and_save_logs(job_id, job_name, run_id=None, show_preview=True):
"""Fetch logs for a job and save to file
Args:
job_id: The job ID
job_name: The job name
run_id: The workflow run ID (required for generating correct log URL)
show_preview: Whether to show log preview
"""
print(f"\n Fetching logs for failed job {job_id}...")
try:
logs = get_job_logs(job_id)
if logs is None:
print(f" [WARN] Could not fetch logs. Possible reasons:")
print(f" - PAT needs 'Actions: Read' repository permission (fine-grained PAT)")
print(f" - PAT needs 'actions:read' scope (classic PAT)")
print(f" - Logs may have expired (retained for 90 days)")
print(f" - Repository access may be restricted")
if run_id:
print(f" [INFO] View logs directly at: https://github.com/{OWNER}/{REPO}/actions/runs/{run_id}/job/{job_id}")
else:
print(f" [INFO] View logs at: https://github.com/{OWNER}/{REPO}/actions")
print(f" [INFO] Verify PAT permissions at: https://github.com/settings/tokens")
return
# Ensure log directory exists and is clean
log_path = Path(LOG_DIR)
log_path.mkdir(parents=True, exist_ok=True)
# Analyze logs to extract error information
error_summary = analyze_log_errors(logs)
# Save logs to file with error summary at the top (will replace if exists)
safe_name = job_name.replace(' ', '_').replace('/', '_').replace('\\', '_')
log_file = log_path / f"job_{job_id}_{safe_name}.txt"
with open(log_file, 'w', encoding='utf-8', errors='replace') as f:
# Write error summary first
f.write(error_summary)
# Then write the full log
f.write(logs)
print(f" [OK] Logs saved to: {log_file}")
print(f" [INFO] Error summary added at the top of the log file")
if show_preview:
print(f" Last 50 lines of logs:")
print(" " + "-" * 76)
# Handle encoding issues on Windows console
import sys
for line in logs.split('\n')[-50:]:
if line.strip():
try:
print(f" {line}")
except UnicodeEncodeError:
# Fallback: encode with error handling for Windows console
safe_line = line.encode('ascii', errors='replace').decode('ascii')
print(f" {safe_line}")
except Exception as e:
# Safely print error message (avoid encoding issues)
error_msg = str(e).encode('ascii', errors='replace').decode('ascii')
print(f" [ERROR] Error fetching logs: {error_msg}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Fetch GitHub Actions workflow run logs')
parser.add_argument('--branch', default='release/1.33.0', help='Branch to check (default: release/1.33.0)')
parser.add_argument('--run-id', help='Specific workflow run ID to fetch')
parser.add_argument('--all-failed', action='store_true', help='Fetch all failed runs (not just latest push)')
parser.add_argument('--per-page', type=int, default=3, help='Number of runs to fetch per page (default: 3, since each push has 3 workflows)')
parser.add_argument('--no-preview', action='store_true', help='Skip showing log preview')
parser.add_argument('--keep-old-logs', action='store_true', help='Keep existing logs (do not clean up)')
args = parser.parse_args()
branch = args.branch
run_id = args.run_id
# Default to 3 since each push triggers 3 workflows (e2e, backend, ci)
# If all-failed is specified, fetch more to get historical failures
per_page = args.per_page if not args.all_failed else max(args.per_page, 10)
# Clean up old logs at the start of each run (unless --keep-old-logs is specified)
if not args.keep_old_logs:
print("Cleaning up old logs...")
cleanup_log_directory()
print("Ready to fetch new logs.\n")
print(f"Fetching workflow runs for branch: {branch}")
print("=" * 80)
# Get recent workflow runs
runs = get_workflow_runs(branch=branch, per_page=per_page)
print(f"\nFound {runs['total_count']} total runs")
print(f"Showing {len(runs['workflow_runs'])} runs:\n")
failed_runs = []
for run in runs['workflow_runs']:
status_icon = "[FAIL]" if run['conclusion'] == 'failure' else "[PASS]" if run['conclusion'] == 'success' else "[RUN]"
print(f"{status_icon} Run #{run['id']}: {run['name']} - {run['conclusion']} ({run['status']})")
print(f" Branch: {run['head_branch']}, Commit: {run['head_sha'][:7]}")
print(f" URL: {run['html_url']}")
print()
if run['conclusion'] == 'failure':
failed_runs.append(run)
# Process specific run or all failed runs
runs_to_process = []
if run_id:
runs_to_process = [r for r in runs['workflow_runs'] if str(r['id']) == str(run_id)]
if not runs_to_process:
# Try to fetch the run even if not in the list
try:
run_details = get_workflow_run(run_id)
runs_to_process = [run_details]
except Exception as e:
print(f"\n[ERROR] Could not fetch run {run_id}: {e}")
sys.exit(1)
elif args.all_failed:
runs_to_process = failed_runs
else:
# Default: process all failed runs from the latest push
# Each push triggers 3 workflows (e2e, backend, ci), so get all failed runs from the latest commit
if failed_runs:
# Group by commit SHA to get all workflows from the same push
latest_commit = failed_runs[0]['head_sha']
runs_to_process = [run for run in failed_runs if run['head_sha'] == latest_commit]
if len(runs_to_process) > 0:
print(f"\n[INFO] Found {len(runs_to_process)} failed workflow(s) from latest push (commit: {latest_commit[:7]})")
# Process each run
for run in runs_to_process:
print(f"\n{'=' * 80}")
print(f"Fetching details for run ID: {run['id']}")
print("=" * 80)
if 'name' not in run:
run = get_workflow_run(run['id'])
print(f"\nWorkflow: {run['name']}")
print(f"Status: {run['status']}")
print(f"Conclusion: {run['conclusion']}")
print(f"Branch: {run['head_branch']}")
print(f"Commit: {run['head_sha']}")
print(f"URL: {run['html_url']}")
# Get jobs for this run
jobs = get_workflow_jobs(run['id'])
print(f"\n{'=' * 80}")
print(f"Jobs in this run ({len(jobs['jobs'])}):")
print("=" * 80)
for job in jobs['jobs']:
status_icon = "[FAIL]" if job['conclusion'] == 'failure' else "[PASS]" if job['conclusion'] == 'success' else "[RUN]"
print(f"\n{status_icon} Job #{job['id']}: {job['name']}")
print(f" Status: {job['status']}, Conclusion: {job['conclusion']}")
print(f" Started: {job['started_at']}")
print(f" Completed: {job['completed_at']}")
print(f" URL: {job['html_url']}")
# Get logs for failed jobs
if job['conclusion'] == 'failure':
fetch_and_save_logs(job['id'], job['name'], run_id=run['id'], show_preview=not args.no_preview)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment