Skip to content

Instantly share code, notes, and snippets.

@bx33661
Created December 9, 2025 01:25
Show Gist options
  • Select an option

  • Save bx33661/1640f3e39ab1a81c423a59ecc67b0cf1 to your computer and use it in GitHub Desktop.

Select an option

Save bx33661/1640f3e39ab1a81c423a59ecc67b0cf1 to your computer and use it in GitHub Desktop.
关于CodeQL的脚本分享
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Convert CodeQL SARIF to:
{
"dataFlowPath": [
{ "threadFlows": [ { "steps": [ ... ] } ] },
...
]
}
- 仅处理 path-problem 结果(包含 codeFlows/threadFlows/locations)
- 默认最多输出 3 条路径(可用 --max-results 调整)
- 默认导出“所有” threadFlow(可用 --threadflow-index 选择单个;-1 表示全部)
"""
import argparse
import json
import os
import sys
from urllib.parse import urlparse, unquote
def to_path(uri: str, make_relative_to: str | None = None) -> str:
"""Normalize SARIF artifactLocation.uri -> filesystem-like path."""
if not uri:
return ""
parsed = urlparse(uri)
if parsed.scheme in ("file", ""):
# strip "file://" and URL-decode
path = unquote(parsed.path or uri)
# windows drive like /C:/...
if os.name == "nt" and path.startswith("/") and len(path) > 3 and path[2] == ":":
path = path[1:]
else:
# keep as-is but URL-decode; some SARIF may store relative paths without scheme
path = unquote(uri)
# Normalize separators
path = path.replace("\\", "/")
if make_relative_to:
try:
path = os.path.relpath(path, start=make_relative_to).replace("\\", "/")
except Exception:
pass
return path
def get_region(loc: dict) -> dict:
phys = (loc or {}).get("physicalLocation", {}) or {}
region = phys.get("region", {}) or {}
msg = (loc.get("message", {}) or {}).get("text", "") or ""
return dict(
startLine=region.get("startLine") or 0,
startColumn=region.get("startColumn") or 0,
endColumn=region.get("endColumn") or 0,
file=to_path((phys.get("artifactLocation") or {}).get("uri", "")),
description=msg,
)
def extract_threadflow_steps(threadflow: dict) -> list[dict]:
locs = (threadflow or {}).get("locations", []) or []
steps = []
total = len(locs)
for idx, l in enumerate(locs):
reg = get_region(l.get("location", {}))
node_type = "Intermediate"
if idx == 0:
node_type = "Source"
elif idx == total - 1:
node_type = "Sink"
steps.append(
{
"stepNumber": idx + 1,
"location": {
"file": reg["file"],
"startLine": reg["startLine"],
"startColumn": reg["startColumn"],
"endColumn": reg["endColumn"],
"description": reg["description"],
"nodeType": node_type,
},
}
)
return steps
def sarif_to_paths(
sarif: dict,
max_results: int,
threadflow_index: int,
rule_filter: str | None,
make_relative_to: str | None,
):
out = {"dataFlowPath": []}
runs = sarif.get("runs", []) or []
count = 0
for run in runs:
results = run.get("results", []) or []
if rule_filter:
results = [r for r in results if rule_filter in (r.get("ruleId") or "")]
for res in results:
# Gather ALL threadFlows from ALL codeFlows (some SARIF has multiple)
tflows = []
code_flows = res.get("codeFlows", []) or []
for cf in code_flows:
tflows.extend(cf.get("threadFlows", []) or [])
# Fallback: some producers may put threadFlows directly on result
if not tflows:
tflows = res.get("threadFlows", []) or []
if not tflows:
continue
# Select which threadFlows to export
if threadflow_index >= 0:
tflows = [tflows[min(threadflow_index, len(tflows) - 1)]]
for tf in tflows:
if count >= max_results:
return out
steps = extract_threadflow_steps(tf)
if make_relative_to:
for s in steps:
s["location"]["file"] = to_path(
s["location"]["file"], make_relative_to=make_relative_to
)
out["dataFlowPath"].append({"threadFlows": [{"steps": steps}]})
count += 1
return out
def main():
p = argparse.ArgumentParser(description="Convert CodeQL SARIF to required path JSON.")
p.add_argument("sarif", help="Input SARIF file (from `codeql database analyze --format=sarif*`).")
p.add_argument("-o", "--out", default="result.json", help="Output JSON file. Default: result.json")
p.add_argument("--max-results", type=int, default=10, help="Max number of paths to export (across all results). Default: 3")
p.add_argument(
"--threadflow-index",
type=int,
default=-1,
help="Which threadFlow to pick per result (0-based). Use -1 to export ALL threadFlows. Default: -1",
)
p.add_argument("--rule-filter", help="Only include results whose ruleId contains this substring.")
p.add_argument("--relative-to", help="Make file paths relative to this directory.")
args = p.parse_args()
try:
with open(args.sarif, "r", encoding="utf-8") as f:
sarif = json.load(f)
except Exception as e:
print(f"Failed to read SARIF: {e}", file=sys.stderr)
sys.exit(1)
data = sarif_to_paths(
sarif,
max_results=max(1, args.max_results),
threadflow_index=args.threadflow_index,
rule_filter=args.rule_filter,
make_relative_to=args.relative_to,
)
try:
with open(args.out, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"Failed to write output JSON: {e}", file=sys.stderr)
sys.exit(1)
print(f"Wrote {args.out} with {len(data['dataFlowPath'])} path(s).")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment