|
#!/usr/bin/env python |
|
#******************************************************************************* |
|
# Filter for encrypting/decrypting YAML file values with ansible-vault. |
|
# From: https://gist.github.com/utoddl/66baa4154618ba1fc8ec8127483e7e89 |
|
# |
|
# Differences from original: |
|
# - Warnings added for when encrypted data will not decrypt accurately. |
|
# - Additional `ansible-vault` parameters added as script parameters. |
|
# - ANSIBLE_VAULT_IDENTITY is not managed in this script. `ansible-vault` will |
|
# look for its environment variables. |
|
# https://docs.ansible.com/projects/ansible/latest/vault_guide/vault_using_encrypted_content.html |
|
#******************************************************************************* |
|
ABOUT = '''\ |
|
Filter for encrypting/decrypting YAML file values with ansible-vault. |
|
|
|
Takes input from stdin, encrypts unencrypted values, decrypts encrypted values, |
|
and writes the YAML result to stdout with comments in the input file preserved |
|
in the output. This simplifies maintenance of encrypted values in Ansible vars |
|
files and eliminates the need to encrypt entire files. |
|
|
|
Notes: |
|
- `ansible-vault` will prompt for the encrypt/decrypt password if neither |
|
'--vault-id' nor '--vault-password-file' are provided and the password can't |
|
be determined from the config file or vault environment variables. |
|
- Non-string values will always be decrypted as strings since Ansible does not |
|
reparse the input following decryption. |
|
- Some string values in the unencrypted input will not be restored accurately |
|
when the output is passed through this filter for decryption. |
|
- Additional spaces between names and values, such as "name: value", will |
|
be reduced to a single space. |
|
- Quoted strings will not be restored with quotes. |
|
- Embedded control characters in double-quoted strings will not be preserved. |
|
- Folded block scalar strings (>) will be restored without newlines. |
|
|
|
''' |
|
|
|
import argparse, io, re, pathlib, subprocess, sys, textwrap |
|
import ruamel.yaml |
|
from inspect import getmembers |
|
|
|
#******************************************************************************* |
|
# ansible_vault |
|
# |
|
# data = Object containing scalar value to encrypt/decrypt. |
|
# command = "encrypt" or "decrypt" |
|
# extra_args = Additional `ansible-vault` command arguments. |
|
# |
|
# Runs `ansible-vault` and returns the encrypted/decrypted `data` scalar value. |
|
# Passes parameters common to encrypt/decrypt functions and adds `extra_args`. |
|
# Prints error messages and aborts if command fails. |
|
#******************************************************************************* |
|
def ansible_vault(data, command, extra_args=[]): |
|
pd("{}: >> ({})".format(command, data)) |
|
|
|
cmd_args = ["ansible-vault", command] |
|
|
|
if args.vault_id: |
|
vault_ids = [['--vault-id', id] for id in args.vault_id] |
|
cmd_args += [arg for list in vault_ids for arg in list] |
|
|
|
if args.vault_password_file: |
|
cmd_args += ['--vault-password-file', args.vault_password_file] |
|
|
|
cmd_args += extra_args |
|
|
|
pd("cmd: {}".format(" ".join(cmd_args))) |
|
|
|
try: |
|
rv = subprocess.run(cmd_args, input=bytes(str(data), "utf-8"), |
|
capture_output=True, check=True) |
|
text = str(rv.stdout, encoding="utf-8") |
|
|
|
pd("{}: << ({})".format(command, text)) |
|
return text |
|
|
|
except subprocess.CalledProcessError as cpe: |
|
eprint("[RUN]: {}\n{}".format(" ".join(cpe.cmd), cpe.stderr.decode())) |
|
sys.exit(1) |
|
|
|
|
|
#******************************************************************************* |
|
# ansible_vault_decrypt_string(data) |
|
# |
|
# Returns the vault-decrypted `data` value. Decrypted string is reprocessed as a |
|
# literal block scalar string (|) if it includes line breaks. |
|
#******************************************************************************* |
|
def ansible_vault_decrypt_string(data, name): |
|
text = ansible_vault(data, 'decrypt') |
|
|
|
if len(lines := text.splitlines(True)) > 1 or text.endswith("\n"): |
|
if text.endswith("\n\n"): |
|
style = "|+" |
|
elif text.endswith("\n"): |
|
style = "|" |
|
else: |
|
style = "|-" |
|
newyml = "".join([style+"\n"] + [" " + line for line in lines]) |
|
text = YAML().load(newyml) |
|
|
|
return text |
|
|
|
#******************************************************************************* |
|
# ansible_vault_encrypt_string(data) |
|
# |
|
# Returns a `TaggedScalar` object containing the vault-encrypted `data` value. |
|
#******************************************************************************* |
|
def ansible_vault_encrypt_string(data, name): |
|
|
|
if not isinstance(data, str): |
|
warn(f"{name}: Non-string value will decrypt as a string.") |
|
|
|
if (isinstance(data, ruamel.yaml.scalarstring.ScalarString) and |
|
getattr(data, 'style') in ['"', "'"]): |
|
warn(f"{name}: Quoted string will not be restored with quotes.") |
|
|
|
if (isinstance(data, ruamel.yaml.scalarstring.FoldedScalarString)): |
|
warn(f"{name}: Folded (>) block scalar string will not be accurately restored.") |
|
|
|
extra_args = ['--encrypt-vault-id', args.encrypt_vault_id]\ |
|
if args.encrypt_vault_id else [] |
|
|
|
text = ansible_vault(data, 'encrypt', extra_args) |
|
return ruamel.yaml.comments.TaggedScalar(text, tag="!vault", style="|") |
|
|
|
#******************************************************************************* |
|
# attrs(obj) |
|
# |
|
# Returns list of `obj` attributes for debug printing. Callables (functions) and |
|
# attributes with names like "__name__" are excluded. |
|
#******************************************************************************* |
|
def attrs(obj): |
|
props = [ |
|
"{}: {}".format(name, value) for name, value in getmembers(obj) |
|
if not re.match('^__.+__$', name) and not callable(value) |
|
] |
|
return "\n".join(["{}".format(type(obj))] + props) |
|
|
|
#******************************************************************************* |
|
# eprint() |
|
# |
|
# Prints a message to stderr. |
|
#******************************************************************************* |
|
def eprint(*args, **kwargs): |
|
print(*args, file=sys.stderr, **kwargs) |
|
|
|
#******************************************************************************* |
|
# pd(msg, delta) |
|
# |
|
# Prints a debug message to stderr. `delta` increases/decreases the indent width. |
|
# `args.verbose` determines how much detail to print. |
|
#******************************************************************************* |
|
def pd(msg, delta=0): |
|
if delta > 0: |
|
pd.prefix = " " + pd.prefix |
|
|
|
if args.verbose > len(pd.prefix): |
|
for line in msg.splitlines(): |
|
eprint("{}{}".format(pd.prefix, line)) |
|
|
|
if delta < 0 and len(pd.prefix): |
|
pd.prefix = pd.prefix[1:] |
|
pd.prefix = "" |
|
|
|
#******************************************************************************* |
|
# walk_tree(data) |
|
# |
|
# Iterate down through `data` and encrypt/decrypt scalar values. |
|
#******************************************************************************* |
|
def walk_tree(data, name=None): |
|
|
|
pd(">>>walk_tree:({})\n::{}".format(type(data), data), 1) |
|
|
|
if isinstance(data, dict): |
|
for key in data: |
|
pd("walk_tree: key={}, type={}, value={}".format(key, type(data[key]), data[key])) |
|
data[key] = walk_tree(data[key], f"{name}.{key}" if name else key) |
|
|
|
elif isinstance(data, list): |
|
for idx in range(len(data)): |
|
pd("walk_tree: idx={}, type={}, value={}".format(idx, type(data[idx]), data[idx])) |
|
data[idx] = walk_tree(data[idx], f"{name}[{idx}]") |
|
|
|
else: |
|
pd("\n=== before ===\n" + attrs(data) + "\n--------------\n") |
|
|
|
if (isinstance(data, ruamel.yaml.comments.TaggedScalar) and |
|
getattr(data, "_yaml_tag").value == "!vault"): |
|
data = ansible_vault_decrypt_string(data, name) |
|
else: |
|
data = ansible_vault_encrypt_string(data, name) |
|
|
|
pd("\n=== after ===\n" + attrs(data) + "\n--------------\n") |
|
|
|
pd("<<<walk_tree", -1) |
|
return data |
|
|
|
|
|
#******************************************************************************* |
|
# warn(msg) |
|
# |
|
# Prints warning message to stderr if not running quietly. |
|
#******************************************************************************* |
|
def warn(msg): |
|
if not args.quiet: |
|
eprint(f"[WARNING]: {msg}") |
|
|
|
|
|
#******************************************************************************* |
|
# YAML(typ, width) |
|
# |
|
# Returns preconfigured `ruamel.yaml` object. |
|
#******************************************************************************* |
|
def YAML(typ=None, width=None): |
|
yaml = ruamel.yaml.YAML(typ=typ) |
|
yaml.explicit_start = False |
|
yaml.default_flow_style = False |
|
yaml.preserve_quotes = True |
|
yaml.width = width |
|
return yaml |
|
|
|
#******************************************************************************* |
|
# Main Line |
|
#******************************************************************************* |
|
if sys.version_info[:2] < (3, 6): |
|
raise SystemExit( |
|
"ERROR: {} requires Python version 3.6 or later. Current version: {}".format( |
|
sys.argv[0], ".".join(sys.version.splitlines()) |
|
) |
|
) |
|
|
|
#*-------------------------* |
|
#* Parse the command line. * |
|
#*-------------------------* |
|
description, epilog = ABOUT.split("\n\n", 1) |
|
parser = argparse.ArgumentParser( |
|
formatter_class = argparse.RawDescriptionHelpFormatter, |
|
description = description, |
|
epilog = epilog |
|
) |
|
parser.add_argument("-w", "--width", type=int, default=80, |
|
help="Maximum output line width, default=%(default)i.") |
|
|
|
parser.add_argument("-q", "--quiet", action='store_true', |
|
help="Do not print warning messages.") |
|
|
|
parser.add_argument("-v", "--verbose", default=0, action="count", |
|
help="Print more debug messages. Adding multiple -v increases the verbosity.") |
|
|
|
parser.add_argument("--vault-id", action='append', default=[], |
|
help="The vault identity to use. This argument may be specified multiple times.") |
|
|
|
parser.add_argument("--encrypt-vault-id", |
|
help="The vault id used to encrypt (required if more than one vault-id is provided).") |
|
|
|
parser.add_argument("--vault-password-file", |
|
type=lambda a: f if (f := pathlib.Path(a)).is_file() else parser.error("The file %s does not exist!" % a), |
|
help="Vault password input file.") |
|
|
|
args = parser.parse_args() |
|
pd("parameters: {}".format(args)) |
|
|
|
#*--------------------------* |
|
#* Read in YAML from stdin. * |
|
#*--------------------------* |
|
lines = sys.stdin.read() |
|
|
|
# Uncomment to use the debugger. |
|
# https://stackoverflow.com/a/34687825/2245849 |
|
# |
|
# sys.stdin = open("/dev/tty") |
|
# breakpoint() |
|
|
|
#*---------------------------------------* |
|
#* Encrypt/decript YAML input to stdout. * |
|
#*---------------------------------------* |
|
yaml = YAML(width=args.width) |
|
data = yaml.load(lines) |
|
|
|
yaml.indent(mapping=2, sequence=4, offset=2) |
|
yaml.dump(walk_tree(data), sys.stdout) |