Skip to content

Instantly share code, notes, and snippets.

@Konfekt
Created December 28, 2025 07:52
Show Gist options
  • Select an option

  • Save Konfekt/12157b236ae83037de35b144488a71e3 to your computer and use it in GitHub Desktop.

Select an option

Save Konfekt/12157b236ae83037de35b144488a71e3 to your computer and use it in GitHub Desktop.
Import Android Aegis authenticator (encrypted) JSON export to pass; superseded by pass-import but left as stand-alone tool here
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "cryptography>=42",
# ]
# [tool.uv]
# exclude-newer = "2025-12-26 00:00"
# ///
# Aegis to pass migration tool:
# - Reads Aegis JSON export (optionally encrypted)
# - Decrypts using password and scrypt parameters
# - Extracts OTP entries
# - Converts to otpauth:// URIs
# - Inserts into pass under otp/ prefix
#
# https://github.com/beemdevelopment/Aegis/blob/master/docs/decrypt.py
from __future__ import annotations
import argparse
import base64
import getpass
import json
import re
import subprocess
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from urllib.parse import quote, urlencode
import hashlib
from cryptography.exceptions import InvalidTag
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
def _norm_space(s: str) -> str:
return re.sub(r"\s+", " ", s).strip()
def _safe_pass_name(s: str) -> str:
s = _norm_space(s)
s = s.replace("/", "_").replace("\\", "_")
s = s.replace("\n", " ").replace("\r", " ").replace("\t", " ")
return s if s else "unnamed"
def _as_int(value: Any, default: int, *, field: str, context: str) -> int:
if value is None or value == "":
return default
try:
return int(value)
except Exception as ex:
raise ValueError(f"Invalid {field}={value!r} in {context}.") from ex
def _norm_algo(s: str) -> str:
s = re.sub(r"[^A-Za-z0-9]", "", s or "").upper()
return s or "SHA1"
def _label_for_uri(issuer: str, name: str) -> str:
issuer = _norm_space(issuer)
name = _norm_space(name)
if issuer and name:
return f"{issuer}:{name}"
return issuer or name or "unnamed"
def _hex_bytes(s: Any, *, field: str, context: str) -> bytes:
if not isinstance(s, str) or not s:
raise ValueError(f"Missing {field} in {context}.")
try:
return bytes.fromhex(s)
except Exception as ex:
raise ValueError(f"Invalid hex {field} in {context}.") from ex
def _b64_bytes(s: Any, *, field: str, context: str) -> bytes:
if not isinstance(s, str) or not s:
raise ValueError(f"Missing {field} in {context}.")
try:
return base64.b64decode(s, validate=True)
except Exception as ex:
raise ValueError(f"Invalid base64 {field} in {context}.") from ex
def _candidate_password_bytes(password: str) -> list[bytes]:
"""
Try to mirror Aegis behavior around the historical CryptoUtils.toBytesOld() bug.
Aegis current behavior uses UTF-8 bytes with exact length.
Older behavior used ByteBuffer.array() which included extra trailing NUL bytes.
Exact ByteBuffer capacity is JVM-internal, so try a few deterministic paddings.
"""
raw = password.encode("utf-8")
caps = []
caps.append(len(raw))
caps.append(max(len(raw), len(password) * 3))
caps.append(max(len(raw), len(password) * 4))
uniq_caps: list[int] = []
for c in caps:
if c not in uniq_caps:
uniq_caps.append(c)
res: list[bytes] = []
for cap in uniq_caps:
if cap <= len(raw):
res.append(raw)
else:
res.append(raw + (b"\x00" * (cap - len(raw))))
return res
def _scrypt_required_bytes(n: int, r: int) -> int:
if n <= 1 or r <= 0:
return 0
return 128 * r * n
def _auto_scrypt_maxmem(n: int, r: int, p: int) -> int:
base = _scrypt_required_bytes(n, r)
overhead = 8 * 1024 * 1024
floor = 64 * 1024 * 1024
return max(floor, base * 2 + overhead)
def _derive_scrypt_key(
*,
password_bytes: bytes,
salt: bytes,
n: int,
r: int,
p: int,
dklen: int = 32,
maxmem: int | None = None,
) -> bytes:
"""
Match Aegis SCrypt parameters.
Use hashlib.scrypt and raise OpenSSL maxmem enough to avoid "memory limit exceeded".
"""
if maxmem is None:
maxmem = _auto_scrypt_maxmem(n, r, p)
return hashlib.scrypt(password_bytes, salt=salt, n=n, r=r, p=p, dklen=dklen, maxmem=maxmem)
@dataclass(frozen=True)
class _GcmParams:
nonce: bytes
tag: bytes
@staticmethod
def from_json(obj: Any, *, context: str) -> "_GcmParams":
if not isinstance(obj, dict):
raise ValueError(f"Missing GCM params object in {context}.")
nonce = _hex_bytes(obj.get("nonce"), field="nonce", context=context)
tag = _hex_bytes(obj.get("tag"), field="tag", context=context)
return _GcmParams(nonce=nonce, tag=tag)
def _aesgcm_decrypt(*, key: bytes, params: _GcmParams, ciphertext_no_tag: bytes, aad: bytes | None = None) -> bytes:
aesgcm = AESGCM(key)
return aesgcm.decrypt(params.nonce, ciphertext_no_tag + params.tag, aad)
def _is_encrypted_aegis_export(doc: Any) -> bool:
if not isinstance(doc, dict):
return False
db = doc.get("db")
header = doc.get("header")
return isinstance(db, str) and isinstance(header, dict)
def _decrypt_aegis_export(doc: dict[str, Any], password: str, *, scrypt_maxmem: int | None) -> dict[str, Any]:
header = doc.get("header")
if not isinstance(header, dict):
raise ValueError("Missing header in encrypted Aegis export.")
slots = header.get("slots")
if not isinstance(slots, list) or not slots:
raise ValueError("Missing header.slots in encrypted Aegis export.")
db_params = _GcmParams.from_json(header.get("params"), context="header.params")
db_ciphertext = _b64_bytes(doc.get("db"), field="db", context="top-level db")
password_slots = [s for s in slots if isinstance(s, dict) and int(s.get("type", -1)) == 1]
if not password_slots:
raise ValueError("No password slots found in header.slots.")
last_error: Exception | None = None
for i, slot in enumerate(password_slots):
slot_ctx = f"header.slots[{i}]"
scrypt_n = _as_int(slot.get("n"), 0, field="n", context=slot_ctx)
scrypt_r = _as_int(slot.get("r"), 0, field="r", context=slot_ctx)
scrypt_p = _as_int(slot.get("p"), 0, field="p", context=slot_ctx)
salt = _hex_bytes(slot.get("salt"), field="salt", context=slot_ctx)
enc_master_key = _hex_bytes(slot.get("key"), field="key", context=slot_ctx)
key_params = _GcmParams.from_json(slot.get("key_params"), context=f"{slot_ctx}.key_params")
repaired = bool(slot.get("repaired", False))
pw_candidates = [password.encode("utf-8")] if repaired else _candidate_password_bytes(password)
slot_auto_maxmem = _auto_scrypt_maxmem(scrypt_n, scrypt_r, scrypt_p)
slot_maxmem = slot_auto_maxmem if scrypt_maxmem is None else max(scrypt_maxmem, slot_auto_maxmem)
for pw_bytes in pw_candidates:
try:
derived = _derive_scrypt_key(
password_bytes=pw_bytes,
salt=salt,
n=scrypt_n,
r=scrypt_r,
p=scrypt_p,
dklen=32,
maxmem=slot_maxmem,
)
master_key_bytes = _aesgcm_decrypt(
key=derived,
params=key_params,
ciphertext_no_tag=enc_master_key,
aad=None,
)
vault_bytes = _aesgcm_decrypt(
key=master_key_bytes,
params=db_params,
ciphertext_no_tag=db_ciphertext,
aad=None,
)
vault_obj = json.loads(vault_bytes.decode("utf-8"))
if not isinstance(vault_obj, dict) or "entries" not in vault_obj:
raise ValueError("Decrypted payload does not look like an Aegis vault JSON object.")
return vault_obj
except ValueError as ex:
msg = str(ex).lower()
if "memory limit exceeded" in msg:
raise ValueError(
f"scrypt maxmem too low for n={scrypt_n}, r={scrypt_r}, p={scrypt_p}, "
f"selected_maxmem={slot_maxmem} bytes, "
f"estimated_required~{_scrypt_required_bytes(scrypt_n, scrypt_r)} bytes."
) from ex
last_error = ex
continue
except InvalidTag as ex:
last_error = ex
continue
except Exception as ex:
last_error = ex
continue
raise ValueError("Failed to decrypt Aegis export with provided password.") from last_error
def _build_otpauth_uri(
*,
aegis_type: str,
issuer: str,
name: str,
secret: str,
algo: str,
digits: int,
period: int,
counter: int | None,
) -> tuple[str, str]:
otp_type = (aegis_type or "totp").lower()
uri_type = otp_type if otp_type in {"totp", "hotp"} else "totp"
label = _label_for_uri(issuer, name)
label_enc = quote(label, safe=":@-._~")
params: dict[str, str] = {
"secret": secret,
"algorithm": _norm_algo(algo),
"digits": str(digits),
}
if issuer:
params["issuer"] = issuer
if uri_type == "totp":
params["period"] = str(period)
if uri_type == "hotp":
if counter is None:
raise ValueError(f"Missing HOTP counter for label {label!r}.")
params["counter"] = str(counter)
query = urlencode(params, doseq=False, safe="", quote_via=quote)
uri = f"otpauth://{uri_type}/{label_enc}?{query}"
return uri, uri_type
def _extract_entries_and_groups(doc: Any) -> tuple[list[dict[str, Any]], dict[str, str]]:
if not isinstance(doc, dict):
return ([], {})
db = doc["db"] if isinstance(doc.get("db"), dict) else doc
entries = db.get("entries")
groups = db.get("groups")
if not isinstance(entries, list):
return ([], {})
group_map: dict[str, str] = {}
if isinstance(groups, list):
for g in groups:
if isinstance(g, dict):
gid = str(g.get("uuid") or "")
name = str(g.get("name") or "")
if gid and name:
group_map[gid] = name
return ([e for e in entries if isinstance(e, dict)], group_map)
def _pass_insert(pass_cmd: str, entry: str, content: str, force: bool) -> None:
cmd = [pass_cmd, "insert", "-m"]
if force:
cmd.append("-f")
cmd.append(entry)
subprocess.run(cmd, input=content, text=True, check=True)
def _read_password_from_args(args: argparse.Namespace) -> str | None:
if args.password is not None:
return args.password
if args.password_file is not None:
return Path(args.password_file).read_text(encoding="utf-8").rstrip("\n")
return None
def _obtain_password_for_encrypted_export(args: argparse.Namespace) -> str:
pw = _read_password_from_args(args)
if pw is not None:
return pw
if sys.stdin is not None and not sys.stdin.isatty():
pw = sys.stdin.readline()
pw = pw.rstrip("\n")
if pw:
return pw
raise ValueError("Empty password read from stdin for encrypted export.")
try:
pw = getpass.getpass("Aegis export password: ")
except EOFError as ex:
raise ValueError("No password available for encrypted export.") from ex
if not pw:
raise ValueError("Empty password entered for encrypted export.")
return pw
def _parse_scrypt_maxmem(args: argparse.Namespace) -> int | None:
if args.scrypt_maxmem is not None and args.scrypt_maxmem_mib is not None:
raise ValueError("Specify only one of --scrypt-maxmem or --scrypt-maxmem-mib.")
if args.scrypt_maxmem_mib is not None:
return int(args.scrypt_maxmem_mib) * 1024 * 1024
if args.scrypt_maxmem is not None:
return int(args.scrypt_maxmem)
return None
def main() -> int:
ap = argparse.ArgumentParser(
description="Migrate Aegis JSON export into pass under otp/ entries, storing otpauth:// URIs, with optional decryption."
)
ap.add_argument("aegis_json", help="Aegis JSON export path.")
ap.add_argument("--prefix", default="otp", help="Pass prefix directory.")
ap.add_argument("--pass-cmd", default="pass", help="pass executable name or path.")
ap.add_argument("--force", action="store_true", help="Overwrite existing pass entries.")
ap.add_argument("--dry-run", action="store_true", help="Print planned pass entry names without writing.")
ap.add_argument("--use-groups", action="store_true", help="Create otp/<group>/<issuer>:<name> when groups exist.")
ap.add_argument("--kv-metadata", action="store_true", help="Append key=value metadata lines after the otpauth:// URI.")
ap.add_argument("--password", default=None, help="Password to decrypt encrypted Aegis exports.")
ap.add_argument("--password-file", default=None, help="Read password from file to decrypt encrypted Aegis exports.")
ap.add_argument("--scrypt-maxmem", type=int, default=None, help="Override scrypt maxmem in bytes for OpenSSL-backed hashlib.scrypt.")
ap.add_argument("--scrypt-maxmem-mib", type=int, default=None, help="Override scrypt maxmem in MiB for OpenSSL-backed hashlib.scrypt.")
args = ap.parse_args()
try:
scrypt_maxmem = _parse_scrypt_maxmem(args)
except Exception as ex:
print(str(ex), file=sys.stderr)
return 2
doc = json.loads(Path(args.aegis_json).read_text(encoding="utf-8"))
if _is_encrypted_aegis_export(doc):
try:
password = _obtain_password_for_encrypted_export(args)
except Exception as ex:
print(str(ex), file=sys.stderr)
return 2
doc = _decrypt_aegis_export(doc, password, scrypt_maxmem=scrypt_maxmem)
entries, group_map = _extract_entries_and_groups(doc)
if not entries:
print("No entries found.", file=sys.stderr)
return 2
used: set[str] = set()
migrated = 0
for e in entries:
info = e.get("info")
if not isinstance(info, dict):
info = {}
aegis_type = str(e.get("type") or "totp").lower()
issuer = _norm_space(str(e.get("issuer") or ""))
name = _norm_space(str(e.get("name") or e.get("label") or ""))
uuid = str(e.get("uuid") or "")
base_label = f"{issuer}:{name}" if issuer and name else (issuer or name or "unnamed")
base_label = _safe_pass_name(base_label)
label = base_label
if label in used:
suffix = uuid[:8] if uuid else str(len(used) + 1)
label = f"{base_label}__{suffix}"
used.add(label)
group_path = ""
if args.use_groups and isinstance(e.get("groups"), list) and e["groups"]:
gid = str(e["groups"][0] or "")
gname = _safe_pass_name(group_map.get(gid, gid)) if gid else ""
if gname:
group_path = f"{gname}/"
secret = info.get("secret") or e.get("secret")
if not secret:
continue
secret = re.sub(r"\s+", "", str(secret)).upper()
algo = str(info.get("algo") or info.get("algorithm") or "SHA1")
digits = _as_int(info.get("digits"), 6, field="digits", context=label)
period = _as_int(info.get("period") or info.get("step"), 30, field="period", context=label)
counter_raw = info.get("counter")
counter = None if counter_raw is None else _as_int(counter_raw, 0, field="counter", context=label)
uri, uri_type = _build_otpauth_uri(
aegis_type=aegis_type,
issuer=issuer,
name=name,
secret=secret,
algo=algo,
digits=digits,
period=period,
counter=counter,
)
entry_name = f"{args.prefix}/{group_path}{label}"
if args.dry_run:
print(entry_name)
migrated += 1
continue
content_lines: list[str] = [uri]
if args.kv_metadata:
if aegis_type.lower() not in {"totp", "hotp"}:
content_lines.append(f"aegis_type={aegis_type}")
content_lines.append(f"type={uri_type}")
content_lines.append(f"issuer={issuer}" if issuer else "issuer=")
content_lines.append(f"name={name}" if name else "name=")
content_lines.append(f"uuid={uuid}" if uuid else "uuid=")
content = "\n".join(content_lines) + "\n"
_pass_insert(args.pass_cmd, entry_name, content, force=args.force)
migrated += 1
print(f"Migrated {migrated} entries into pass prefix '{args.prefix}/'.", file=sys.stderr)
return 0
if __name__ == "__main__":
raise SystemExit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment