Created
December 9, 2025 01:25
-
-
Save bx33661/1640f3e39ab1a81c423a59ecc67b0cf1 to your computer and use it in GitHub Desktop.
关于CodeQL的脚本分享
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| Convert CodeQL SARIF to: | |
| { | |
| "dataFlowPath": [ | |
| { "threadFlows": [ { "steps": [ ... ] } ] }, | |
| ... | |
| ] | |
| } | |
| - 仅处理 path-problem 结果(包含 codeFlows/threadFlows/locations) | |
| - 默认最多输出 3 条路径(可用 --max-results 调整) | |
| - 默认导出“所有” threadFlow(可用 --threadflow-index 选择单个;-1 表示全部) | |
| """ | |
| import argparse | |
| import json | |
| import os | |
| import sys | |
| from urllib.parse import urlparse, unquote | |
| def to_path(uri: str, make_relative_to: str | None = None) -> str: | |
| """Normalize SARIF artifactLocation.uri -> filesystem-like path.""" | |
| if not uri: | |
| return "" | |
| parsed = urlparse(uri) | |
| if parsed.scheme in ("file", ""): | |
| # strip "file://" and URL-decode | |
| path = unquote(parsed.path or uri) | |
| # windows drive like /C:/... | |
| if os.name == "nt" and path.startswith("/") and len(path) > 3 and path[2] == ":": | |
| path = path[1:] | |
| else: | |
| # keep as-is but URL-decode; some SARIF may store relative paths without scheme | |
| path = unquote(uri) | |
| # Normalize separators | |
| path = path.replace("\\", "/") | |
| if make_relative_to: | |
| try: | |
| path = os.path.relpath(path, start=make_relative_to).replace("\\", "/") | |
| except Exception: | |
| pass | |
| return path | |
| def get_region(loc: dict) -> dict: | |
| phys = (loc or {}).get("physicalLocation", {}) or {} | |
| region = phys.get("region", {}) or {} | |
| msg = (loc.get("message", {}) or {}).get("text", "") or "" | |
| return dict( | |
| startLine=region.get("startLine") or 0, | |
| startColumn=region.get("startColumn") or 0, | |
| endColumn=region.get("endColumn") or 0, | |
| file=to_path((phys.get("artifactLocation") or {}).get("uri", "")), | |
| description=msg, | |
| ) | |
| def extract_threadflow_steps(threadflow: dict) -> list[dict]: | |
| locs = (threadflow or {}).get("locations", []) or [] | |
| steps = [] | |
| total = len(locs) | |
| for idx, l in enumerate(locs): | |
| reg = get_region(l.get("location", {})) | |
| node_type = "Intermediate" | |
| if idx == 0: | |
| node_type = "Source" | |
| elif idx == total - 1: | |
| node_type = "Sink" | |
| steps.append( | |
| { | |
| "stepNumber": idx + 1, | |
| "location": { | |
| "file": reg["file"], | |
| "startLine": reg["startLine"], | |
| "startColumn": reg["startColumn"], | |
| "endColumn": reg["endColumn"], | |
| "description": reg["description"], | |
| "nodeType": node_type, | |
| }, | |
| } | |
| ) | |
| return steps | |
| def sarif_to_paths( | |
| sarif: dict, | |
| max_results: int, | |
| threadflow_index: int, | |
| rule_filter: str | None, | |
| make_relative_to: str | None, | |
| ): | |
| out = {"dataFlowPath": []} | |
| runs = sarif.get("runs", []) or [] | |
| count = 0 | |
| for run in runs: | |
| results = run.get("results", []) or [] | |
| if rule_filter: | |
| results = [r for r in results if rule_filter in (r.get("ruleId") or "")] | |
| for res in results: | |
| # Gather ALL threadFlows from ALL codeFlows (some SARIF has multiple) | |
| tflows = [] | |
| code_flows = res.get("codeFlows", []) or [] | |
| for cf in code_flows: | |
| tflows.extend(cf.get("threadFlows", []) or []) | |
| # Fallback: some producers may put threadFlows directly on result | |
| if not tflows: | |
| tflows = res.get("threadFlows", []) or [] | |
| if not tflows: | |
| continue | |
| # Select which threadFlows to export | |
| if threadflow_index >= 0: | |
| tflows = [tflows[min(threadflow_index, len(tflows) - 1)]] | |
| for tf in tflows: | |
| if count >= max_results: | |
| return out | |
| steps = extract_threadflow_steps(tf) | |
| if make_relative_to: | |
| for s in steps: | |
| s["location"]["file"] = to_path( | |
| s["location"]["file"], make_relative_to=make_relative_to | |
| ) | |
| out["dataFlowPath"].append({"threadFlows": [{"steps": steps}]}) | |
| count += 1 | |
| return out | |
| def main(): | |
| p = argparse.ArgumentParser(description="Convert CodeQL SARIF to required path JSON.") | |
| p.add_argument("sarif", help="Input SARIF file (from `codeql database analyze --format=sarif*`).") | |
| p.add_argument("-o", "--out", default="result.json", help="Output JSON file. Default: result.json") | |
| p.add_argument("--max-results", type=int, default=10, help="Max number of paths to export (across all results). Default: 3") | |
| p.add_argument( | |
| "--threadflow-index", | |
| type=int, | |
| default=-1, | |
| help="Which threadFlow to pick per result (0-based). Use -1 to export ALL threadFlows. Default: -1", | |
| ) | |
| p.add_argument("--rule-filter", help="Only include results whose ruleId contains this substring.") | |
| p.add_argument("--relative-to", help="Make file paths relative to this directory.") | |
| args = p.parse_args() | |
| try: | |
| with open(args.sarif, "r", encoding="utf-8") as f: | |
| sarif = json.load(f) | |
| except Exception as e: | |
| print(f"Failed to read SARIF: {e}", file=sys.stderr) | |
| sys.exit(1) | |
| data = sarif_to_paths( | |
| sarif, | |
| max_results=max(1, args.max_results), | |
| threadflow_index=args.threadflow_index, | |
| rule_filter=args.rule_filter, | |
| make_relative_to=args.relative_to, | |
| ) | |
| try: | |
| with open(args.out, "w", encoding="utf-8") as f: | |
| json.dump(data, f, ensure_ascii=False, indent=2) | |
| except Exception as e: | |
| print(f"Failed to write output JSON: {e}", file=sys.stderr) | |
| sys.exit(1) | |
| print(f"Wrote {args.out} with {len(data['dataFlowPath'])} path(s).") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment