Created
December 28, 2025 07:52
-
-
Save Konfekt/12157b236ae83037de35b144488a71e3 to your computer and use it in GitHub Desktop.
Import Android Aegis authenticator (encrypted) JSON export to pass; superseded by pass-import but left as stand-alone tool here
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env -S uv run --script | |
| # /// script | |
| # requires-python = ">=3.10" | |
| # dependencies = [ | |
| # "cryptography>=42", | |
| # ] | |
| # [tool.uv] | |
| # exclude-newer = "2025-12-26 00:00" | |
| # /// | |
| # Aegis to pass migration tool: | |
| # - Reads Aegis JSON export (optionally encrypted) | |
| # - Decrypts using password and scrypt parameters | |
| # - Extracts OTP entries | |
| # - Converts to otpauth:// URIs | |
| # - Inserts into pass under otp/ prefix | |
| # | |
| # https://github.com/beemdevelopment/Aegis/blob/master/docs/decrypt.py | |
| from __future__ import annotations | |
| import argparse | |
| import base64 | |
| import getpass | |
| import json | |
| import re | |
| import subprocess | |
| import sys | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Any | |
| from urllib.parse import quote, urlencode | |
| import hashlib | |
| from cryptography.exceptions import InvalidTag | |
| from cryptography.hazmat.primitives.ciphers.aead import AESGCM | |
| def _norm_space(s: str) -> str: | |
| return re.sub(r"\s+", " ", s).strip() | |
| def _safe_pass_name(s: str) -> str: | |
| s = _norm_space(s) | |
| s = s.replace("/", "_").replace("\\", "_") | |
| s = s.replace("\n", " ").replace("\r", " ").replace("\t", " ") | |
| return s if s else "unnamed" | |
| def _as_int(value: Any, default: int, *, field: str, context: str) -> int: | |
| if value is None or value == "": | |
| return default | |
| try: | |
| return int(value) | |
| except Exception as ex: | |
| raise ValueError(f"Invalid {field}={value!r} in {context}.") from ex | |
| def _norm_algo(s: str) -> str: | |
| s = re.sub(r"[^A-Za-z0-9]", "", s or "").upper() | |
| return s or "SHA1" | |
| def _label_for_uri(issuer: str, name: str) -> str: | |
| issuer = _norm_space(issuer) | |
| name = _norm_space(name) | |
| if issuer and name: | |
| return f"{issuer}:{name}" | |
| return issuer or name or "unnamed" | |
| def _hex_bytes(s: Any, *, field: str, context: str) -> bytes: | |
| if not isinstance(s, str) or not s: | |
| raise ValueError(f"Missing {field} in {context}.") | |
| try: | |
| return bytes.fromhex(s) | |
| except Exception as ex: | |
| raise ValueError(f"Invalid hex {field} in {context}.") from ex | |
| def _b64_bytes(s: Any, *, field: str, context: str) -> bytes: | |
| if not isinstance(s, str) or not s: | |
| raise ValueError(f"Missing {field} in {context}.") | |
| try: | |
| return base64.b64decode(s, validate=True) | |
| except Exception as ex: | |
| raise ValueError(f"Invalid base64 {field} in {context}.") from ex | |
| def _candidate_password_bytes(password: str) -> list[bytes]: | |
| """ | |
| Try to mirror Aegis behavior around the historical CryptoUtils.toBytesOld() bug. | |
| Aegis current behavior uses UTF-8 bytes with exact length. | |
| Older behavior used ByteBuffer.array() which included extra trailing NUL bytes. | |
| Exact ByteBuffer capacity is JVM-internal, so try a few deterministic paddings. | |
| """ | |
| raw = password.encode("utf-8") | |
| caps = [] | |
| caps.append(len(raw)) | |
| caps.append(max(len(raw), len(password) * 3)) | |
| caps.append(max(len(raw), len(password) * 4)) | |
| uniq_caps: list[int] = [] | |
| for c in caps: | |
| if c not in uniq_caps: | |
| uniq_caps.append(c) | |
| res: list[bytes] = [] | |
| for cap in uniq_caps: | |
| if cap <= len(raw): | |
| res.append(raw) | |
| else: | |
| res.append(raw + (b"\x00" * (cap - len(raw)))) | |
| return res | |
| def _scrypt_required_bytes(n: int, r: int) -> int: | |
| if n <= 1 or r <= 0: | |
| return 0 | |
| return 128 * r * n | |
| def _auto_scrypt_maxmem(n: int, r: int, p: int) -> int: | |
| base = _scrypt_required_bytes(n, r) | |
| overhead = 8 * 1024 * 1024 | |
| floor = 64 * 1024 * 1024 | |
| return max(floor, base * 2 + overhead) | |
| def _derive_scrypt_key( | |
| *, | |
| password_bytes: bytes, | |
| salt: bytes, | |
| n: int, | |
| r: int, | |
| p: int, | |
| dklen: int = 32, | |
| maxmem: int | None = None, | |
| ) -> bytes: | |
| """ | |
| Match Aegis SCrypt parameters. | |
| Use hashlib.scrypt and raise OpenSSL maxmem enough to avoid "memory limit exceeded". | |
| """ | |
| if maxmem is None: | |
| maxmem = _auto_scrypt_maxmem(n, r, p) | |
| return hashlib.scrypt(password_bytes, salt=salt, n=n, r=r, p=p, dklen=dklen, maxmem=maxmem) | |
| @dataclass(frozen=True) | |
| class _GcmParams: | |
| nonce: bytes | |
| tag: bytes | |
| @staticmethod | |
| def from_json(obj: Any, *, context: str) -> "_GcmParams": | |
| if not isinstance(obj, dict): | |
| raise ValueError(f"Missing GCM params object in {context}.") | |
| nonce = _hex_bytes(obj.get("nonce"), field="nonce", context=context) | |
| tag = _hex_bytes(obj.get("tag"), field="tag", context=context) | |
| return _GcmParams(nonce=nonce, tag=tag) | |
| def _aesgcm_decrypt(*, key: bytes, params: _GcmParams, ciphertext_no_tag: bytes, aad: bytes | None = None) -> bytes: | |
| aesgcm = AESGCM(key) | |
| return aesgcm.decrypt(params.nonce, ciphertext_no_tag + params.tag, aad) | |
| def _is_encrypted_aegis_export(doc: Any) -> bool: | |
| if not isinstance(doc, dict): | |
| return False | |
| db = doc.get("db") | |
| header = doc.get("header") | |
| return isinstance(db, str) and isinstance(header, dict) | |
| def _decrypt_aegis_export(doc: dict[str, Any], password: str, *, scrypt_maxmem: int | None) -> dict[str, Any]: | |
| header = doc.get("header") | |
| if not isinstance(header, dict): | |
| raise ValueError("Missing header in encrypted Aegis export.") | |
| slots = header.get("slots") | |
| if not isinstance(slots, list) or not slots: | |
| raise ValueError("Missing header.slots in encrypted Aegis export.") | |
| db_params = _GcmParams.from_json(header.get("params"), context="header.params") | |
| db_ciphertext = _b64_bytes(doc.get("db"), field="db", context="top-level db") | |
| password_slots = [s for s in slots if isinstance(s, dict) and int(s.get("type", -1)) == 1] | |
| if not password_slots: | |
| raise ValueError("No password slots found in header.slots.") | |
| last_error: Exception | None = None | |
| for i, slot in enumerate(password_slots): | |
| slot_ctx = f"header.slots[{i}]" | |
| scrypt_n = _as_int(slot.get("n"), 0, field="n", context=slot_ctx) | |
| scrypt_r = _as_int(slot.get("r"), 0, field="r", context=slot_ctx) | |
| scrypt_p = _as_int(slot.get("p"), 0, field="p", context=slot_ctx) | |
| salt = _hex_bytes(slot.get("salt"), field="salt", context=slot_ctx) | |
| enc_master_key = _hex_bytes(slot.get("key"), field="key", context=slot_ctx) | |
| key_params = _GcmParams.from_json(slot.get("key_params"), context=f"{slot_ctx}.key_params") | |
| repaired = bool(slot.get("repaired", False)) | |
| pw_candidates = [password.encode("utf-8")] if repaired else _candidate_password_bytes(password) | |
| slot_auto_maxmem = _auto_scrypt_maxmem(scrypt_n, scrypt_r, scrypt_p) | |
| slot_maxmem = slot_auto_maxmem if scrypt_maxmem is None else max(scrypt_maxmem, slot_auto_maxmem) | |
| for pw_bytes in pw_candidates: | |
| try: | |
| derived = _derive_scrypt_key( | |
| password_bytes=pw_bytes, | |
| salt=salt, | |
| n=scrypt_n, | |
| r=scrypt_r, | |
| p=scrypt_p, | |
| dklen=32, | |
| maxmem=slot_maxmem, | |
| ) | |
| master_key_bytes = _aesgcm_decrypt( | |
| key=derived, | |
| params=key_params, | |
| ciphertext_no_tag=enc_master_key, | |
| aad=None, | |
| ) | |
| vault_bytes = _aesgcm_decrypt( | |
| key=master_key_bytes, | |
| params=db_params, | |
| ciphertext_no_tag=db_ciphertext, | |
| aad=None, | |
| ) | |
| vault_obj = json.loads(vault_bytes.decode("utf-8")) | |
| if not isinstance(vault_obj, dict) or "entries" not in vault_obj: | |
| raise ValueError("Decrypted payload does not look like an Aegis vault JSON object.") | |
| return vault_obj | |
| except ValueError as ex: | |
| msg = str(ex).lower() | |
| if "memory limit exceeded" in msg: | |
| raise ValueError( | |
| f"scrypt maxmem too low for n={scrypt_n}, r={scrypt_r}, p={scrypt_p}, " | |
| f"selected_maxmem={slot_maxmem} bytes, " | |
| f"estimated_required~{_scrypt_required_bytes(scrypt_n, scrypt_r)} bytes." | |
| ) from ex | |
| last_error = ex | |
| continue | |
| except InvalidTag as ex: | |
| last_error = ex | |
| continue | |
| except Exception as ex: | |
| last_error = ex | |
| continue | |
| raise ValueError("Failed to decrypt Aegis export with provided password.") from last_error | |
| def _build_otpauth_uri( | |
| *, | |
| aegis_type: str, | |
| issuer: str, | |
| name: str, | |
| secret: str, | |
| algo: str, | |
| digits: int, | |
| period: int, | |
| counter: int | None, | |
| ) -> tuple[str, str]: | |
| otp_type = (aegis_type or "totp").lower() | |
| uri_type = otp_type if otp_type in {"totp", "hotp"} else "totp" | |
| label = _label_for_uri(issuer, name) | |
| label_enc = quote(label, safe=":@-._~") | |
| params: dict[str, str] = { | |
| "secret": secret, | |
| "algorithm": _norm_algo(algo), | |
| "digits": str(digits), | |
| } | |
| if issuer: | |
| params["issuer"] = issuer | |
| if uri_type == "totp": | |
| params["period"] = str(period) | |
| if uri_type == "hotp": | |
| if counter is None: | |
| raise ValueError(f"Missing HOTP counter for label {label!r}.") | |
| params["counter"] = str(counter) | |
| query = urlencode(params, doseq=False, safe="", quote_via=quote) | |
| uri = f"otpauth://{uri_type}/{label_enc}?{query}" | |
| return uri, uri_type | |
| def _extract_entries_and_groups(doc: Any) -> tuple[list[dict[str, Any]], dict[str, str]]: | |
| if not isinstance(doc, dict): | |
| return ([], {}) | |
| db = doc["db"] if isinstance(doc.get("db"), dict) else doc | |
| entries = db.get("entries") | |
| groups = db.get("groups") | |
| if not isinstance(entries, list): | |
| return ([], {}) | |
| group_map: dict[str, str] = {} | |
| if isinstance(groups, list): | |
| for g in groups: | |
| if isinstance(g, dict): | |
| gid = str(g.get("uuid") or "") | |
| name = str(g.get("name") or "") | |
| if gid and name: | |
| group_map[gid] = name | |
| return ([e for e in entries if isinstance(e, dict)], group_map) | |
| def _pass_insert(pass_cmd: str, entry: str, content: str, force: bool) -> None: | |
| cmd = [pass_cmd, "insert", "-m"] | |
| if force: | |
| cmd.append("-f") | |
| cmd.append(entry) | |
| subprocess.run(cmd, input=content, text=True, check=True) | |
| def _read_password_from_args(args: argparse.Namespace) -> str | None: | |
| if args.password is not None: | |
| return args.password | |
| if args.password_file is not None: | |
| return Path(args.password_file).read_text(encoding="utf-8").rstrip("\n") | |
| return None | |
| def _obtain_password_for_encrypted_export(args: argparse.Namespace) -> str: | |
| pw = _read_password_from_args(args) | |
| if pw is not None: | |
| return pw | |
| if sys.stdin is not None and not sys.stdin.isatty(): | |
| pw = sys.stdin.readline() | |
| pw = pw.rstrip("\n") | |
| if pw: | |
| return pw | |
| raise ValueError("Empty password read from stdin for encrypted export.") | |
| try: | |
| pw = getpass.getpass("Aegis export password: ") | |
| except EOFError as ex: | |
| raise ValueError("No password available for encrypted export.") from ex | |
| if not pw: | |
| raise ValueError("Empty password entered for encrypted export.") | |
| return pw | |
| def _parse_scrypt_maxmem(args: argparse.Namespace) -> int | None: | |
| if args.scrypt_maxmem is not None and args.scrypt_maxmem_mib is not None: | |
| raise ValueError("Specify only one of --scrypt-maxmem or --scrypt-maxmem-mib.") | |
| if args.scrypt_maxmem_mib is not None: | |
| return int(args.scrypt_maxmem_mib) * 1024 * 1024 | |
| if args.scrypt_maxmem is not None: | |
| return int(args.scrypt_maxmem) | |
| return None | |
| def main() -> int: | |
| ap = argparse.ArgumentParser( | |
| description="Migrate Aegis JSON export into pass under otp/ entries, storing otpauth:// URIs, with optional decryption." | |
| ) | |
| ap.add_argument("aegis_json", help="Aegis JSON export path.") | |
| ap.add_argument("--prefix", default="otp", help="Pass prefix directory.") | |
| ap.add_argument("--pass-cmd", default="pass", help="pass executable name or path.") | |
| ap.add_argument("--force", action="store_true", help="Overwrite existing pass entries.") | |
| ap.add_argument("--dry-run", action="store_true", help="Print planned pass entry names without writing.") | |
| ap.add_argument("--use-groups", action="store_true", help="Create otp/<group>/<issuer>:<name> when groups exist.") | |
| ap.add_argument("--kv-metadata", action="store_true", help="Append key=value metadata lines after the otpauth:// URI.") | |
| ap.add_argument("--password", default=None, help="Password to decrypt encrypted Aegis exports.") | |
| ap.add_argument("--password-file", default=None, help="Read password from file to decrypt encrypted Aegis exports.") | |
| ap.add_argument("--scrypt-maxmem", type=int, default=None, help="Override scrypt maxmem in bytes for OpenSSL-backed hashlib.scrypt.") | |
| ap.add_argument("--scrypt-maxmem-mib", type=int, default=None, help="Override scrypt maxmem in MiB for OpenSSL-backed hashlib.scrypt.") | |
| args = ap.parse_args() | |
| try: | |
| scrypt_maxmem = _parse_scrypt_maxmem(args) | |
| except Exception as ex: | |
| print(str(ex), file=sys.stderr) | |
| return 2 | |
| doc = json.loads(Path(args.aegis_json).read_text(encoding="utf-8")) | |
| if _is_encrypted_aegis_export(doc): | |
| try: | |
| password = _obtain_password_for_encrypted_export(args) | |
| except Exception as ex: | |
| print(str(ex), file=sys.stderr) | |
| return 2 | |
| doc = _decrypt_aegis_export(doc, password, scrypt_maxmem=scrypt_maxmem) | |
| entries, group_map = _extract_entries_and_groups(doc) | |
| if not entries: | |
| print("No entries found.", file=sys.stderr) | |
| return 2 | |
| used: set[str] = set() | |
| migrated = 0 | |
| for e in entries: | |
| info = e.get("info") | |
| if not isinstance(info, dict): | |
| info = {} | |
| aegis_type = str(e.get("type") or "totp").lower() | |
| issuer = _norm_space(str(e.get("issuer") or "")) | |
| name = _norm_space(str(e.get("name") or e.get("label") or "")) | |
| uuid = str(e.get("uuid") or "") | |
| base_label = f"{issuer}:{name}" if issuer and name else (issuer or name or "unnamed") | |
| base_label = _safe_pass_name(base_label) | |
| label = base_label | |
| if label in used: | |
| suffix = uuid[:8] if uuid else str(len(used) + 1) | |
| label = f"{base_label}__{suffix}" | |
| used.add(label) | |
| group_path = "" | |
| if args.use_groups and isinstance(e.get("groups"), list) and e["groups"]: | |
| gid = str(e["groups"][0] or "") | |
| gname = _safe_pass_name(group_map.get(gid, gid)) if gid else "" | |
| if gname: | |
| group_path = f"{gname}/" | |
| secret = info.get("secret") or e.get("secret") | |
| if not secret: | |
| continue | |
| secret = re.sub(r"\s+", "", str(secret)).upper() | |
| algo = str(info.get("algo") or info.get("algorithm") or "SHA1") | |
| digits = _as_int(info.get("digits"), 6, field="digits", context=label) | |
| period = _as_int(info.get("period") or info.get("step"), 30, field="period", context=label) | |
| counter_raw = info.get("counter") | |
| counter = None if counter_raw is None else _as_int(counter_raw, 0, field="counter", context=label) | |
| uri, uri_type = _build_otpauth_uri( | |
| aegis_type=aegis_type, | |
| issuer=issuer, | |
| name=name, | |
| secret=secret, | |
| algo=algo, | |
| digits=digits, | |
| period=period, | |
| counter=counter, | |
| ) | |
| entry_name = f"{args.prefix}/{group_path}{label}" | |
| if args.dry_run: | |
| print(entry_name) | |
| migrated += 1 | |
| continue | |
| content_lines: list[str] = [uri] | |
| if args.kv_metadata: | |
| if aegis_type.lower() not in {"totp", "hotp"}: | |
| content_lines.append(f"aegis_type={aegis_type}") | |
| content_lines.append(f"type={uri_type}") | |
| content_lines.append(f"issuer={issuer}" if issuer else "issuer=") | |
| content_lines.append(f"name={name}" if name else "name=") | |
| content_lines.append(f"uuid={uuid}" if uuid else "uuid=") | |
| content = "\n".join(content_lines) + "\n" | |
| _pass_insert(args.pass_cmd, entry_name, content, force=args.force) | |
| migrated += 1 | |
| print(f"Migrated {migrated} entries into pass prefix '{args.prefix}/'.", file=sys.stderr) | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment