Skip to content

Instantly share code, notes, and snippets.

@sthames42
Forked from utoddl/av-filter
Last active January 5, 2026 19:57
Show Gist options
  • Select an option

  • Save sthames42/b05f3f1cd13f18a136c12f0a76802091 to your computer and use it in GitHub Desktop.

Select an option

Save sthames42/b05f3f1cd13f18a136c12f0a76802091 to your computer and use it in GitHub Desktop.
Filter for "ansible-vault"ing YAML Data Values

Ansible Vault Filter

av-filter encrypts and decrypts YAML file values with ansible-vault and was created by Todd Lewis to encrypt only string values. I did some refactoring to provide encryption of all values and preservation of quotes and control characters in strings.

Unfortunately, Ansible does not reparse decrypted YAML values, which are strings, by default. They can be converted to non-string values with a filter. For example: my_value: "{{ vault.value | int }}". But quoted strings will decrypt to include the surrounding quotes and embedded control characters. There is currently no filter to convert them.

The changes I made have been replaced with warnings as to how non-string values will be handled by Ansible. The previous version is tagged reparse, in this repo, and could still be useful if Ansible is changed such that YAML values can be reparsed following decryption.

#!/usr/bin/env python
#*******************************************************************************
# Filter for encrypting/decrypting YAML file values with ansible-vault.
# From: https://gist.github.com/utoddl/66baa4154618ba1fc8ec8127483e7e89
#
# Differences from original:
# - Warnings added for when encrypted data will not decrypt accurately.
# - Additional `ansible-vault` parameters added as script parameters.
# - ANSIBLE_VAULT_IDENTITY is not managed in this script. `ansible-vault` will
# look for its environment variables.
# https://docs.ansible.com/projects/ansible/latest/vault_guide/vault_using_encrypted_content.html
#*******************************************************************************
ABOUT = '''\
Filter for encrypting/decrypting YAML file values with ansible-vault.
Takes input from stdin, encrypts unencrypted values, decrypts encrypted values,
and writes the YAML result to stdout with comments in the input file preserved
in the output. This simplifies maintenance of encrypted values in Ansible vars
files and eliminates the need to encrypt entire files.
Notes:
- `ansible-vault` will prompt for the encrypt/decrypt password if neither
'--vault-id' nor '--vault-password-file' are provided and the password can't
be determined from the config file or vault environment variables.
- Non-string values will always be decrypted as strings since Ansible does not
reparse the input following decryption.
- Some string values in the unencrypted input will not be restored accurately
when the output is passed through this filter for decryption.
- Additional spaces between names and values, such as "name: value", will
be reduced to a single space.
- Quoted strings will not be restored with quotes.
- Embedded control characters in double-quoted strings will not be preserved.
- Folded block scalar strings (>) will be restored without newlines.
'''
import argparse, io, re, pathlib, subprocess, sys, textwrap
import ruamel.yaml
from inspect import getmembers
#*******************************************************************************
# ansible_vault
#
# data = Object containing scalar value to encrypt/decrypt.
# command = "encrypt" or "decrypt"
# extra_args = Additional `ansible-vault` command arguments.
#
# Runs `ansible-vault` and returns the encrypted/decrypted `data` scalar value.
# Passes parameters common to encrypt/decrypt functions and adds `extra_args`.
# Prints error messages and aborts if command fails.
#*******************************************************************************
def ansible_vault(data, command, extra_args=[]):
pd("{}: >> ({})".format(command, data))
cmd_args = ["ansible-vault", command]
if args.vault_id:
vault_ids = [['--vault-id', id] for id in args.vault_id]
cmd_args += [arg for list in vault_ids for arg in list]
if args.vault_password_file:
cmd_args += ['--vault-password-file', args.vault_password_file]
cmd_args += extra_args
pd("cmd: {}".format(" ".join(cmd_args)))
try:
rv = subprocess.run(cmd_args, input=bytes(str(data), "utf-8"),
capture_output=True, check=True)
text = str(rv.stdout, encoding="utf-8")
pd("{}: << ({})".format(command, text))
return text
except subprocess.CalledProcessError as cpe:
eprint("[RUN]: {}\n{}".format(" ".join(cpe.cmd), cpe.stderr.decode()))
sys.exit(1)
#*******************************************************************************
# ansible_vault_decrypt_string(data)
#
# Returns the vault-decrypted `data` value. Decrypted string is reprocessed as a
# literal block scalar string (|) if it includes line breaks.
#*******************************************************************************
def ansible_vault_decrypt_string(data, name):
text = ansible_vault(data, 'decrypt')
if len(lines := text.splitlines(True)) > 1 or text.endswith("\n"):
if text.endswith("\n\n"):
style = "|+"
elif text.endswith("\n"):
style = "|"
else:
style = "|-"
newyml = "".join([style+"\n"] + [" " + line for line in lines])
text = YAML().load(newyml)
return text
#*******************************************************************************
# ansible_vault_encrypt_string(data)
#
# Returns a `TaggedScalar` object containing the vault-encrypted `data` value.
#*******************************************************************************
def ansible_vault_encrypt_string(data, name):
if not isinstance(data, str):
warn(f"{name}: Non-string value will decrypt as a string.")
if (isinstance(data, ruamel.yaml.scalarstring.ScalarString) and
getattr(data, 'style') in ['"', "'"]):
warn(f"{name}: Quoted string will not be restored with quotes.")
if (isinstance(data, ruamel.yaml.scalarstring.FoldedScalarString)):
warn(f"{name}: Folded (>) block scalar string will not be accurately restored.")
extra_args = ['--encrypt-vault-id', args.encrypt_vault_id]\
if args.encrypt_vault_id else []
text = ansible_vault(data, 'encrypt', extra_args)
return ruamel.yaml.comments.TaggedScalar(text, tag="!vault", style="|")
#*******************************************************************************
# attrs(obj)
#
# Returns list of `obj` attributes for debug printing. Callables (functions) and
# attributes with names like "__name__" are excluded.
#*******************************************************************************
def attrs(obj):
props = [
"{}: {}".format(name, value) for name, value in getmembers(obj)
if not re.match('^__.+__$', name) and not callable(value)
]
return "\n".join(["{}".format(type(obj))] + props)
#*******************************************************************************
# eprint()
#
# Prints a message to stderr.
#*******************************************************************************
def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
#*******************************************************************************
# pd(msg, delta)
#
# Prints a debug message to stderr. `delta` increases/decreases the indent width.
# `args.verbose` determines how much detail to print.
#*******************************************************************************
def pd(msg, delta=0):
if delta > 0:
pd.prefix = " " + pd.prefix
if args.verbose > len(pd.prefix):
for line in msg.splitlines():
eprint("{}{}".format(pd.prefix, line))
if delta < 0 and len(pd.prefix):
pd.prefix = pd.prefix[1:]
pd.prefix = ""
#*******************************************************************************
# walk_tree(data)
#
# Iterate down through `data` and encrypt/decrypt scalar values.
#*******************************************************************************
def walk_tree(data, name=None):
pd(">>>walk_tree:({})\n::{}".format(type(data), data), 1)
if isinstance(data, dict):
for key in data:
pd("walk_tree: key={}, type={}, value={}".format(key, type(data[key]), data[key]))
data[key] = walk_tree(data[key], f"{name}.{key}" if name else key)
elif isinstance(data, list):
for idx in range(len(data)):
pd("walk_tree: idx={}, type={}, value={}".format(idx, type(data[idx]), data[idx]))
data[idx] = walk_tree(data[idx], f"{name}[{idx}]")
else:
pd("\n=== before ===\n" + attrs(data) + "\n--------------\n")
if (isinstance(data, ruamel.yaml.comments.TaggedScalar) and
getattr(data, "_yaml_tag").value == "!vault"):
data = ansible_vault_decrypt_string(data, name)
else:
data = ansible_vault_encrypt_string(data, name)
pd("\n=== after ===\n" + attrs(data) + "\n--------------\n")
pd("<<<walk_tree", -1)
return data
#*******************************************************************************
# warn(msg)
#
# Prints warning message to stderr if not running quietly.
#*******************************************************************************
def warn(msg):
if not args.quiet:
eprint(f"[WARNING]: {msg}")
#*******************************************************************************
# YAML(typ, width)
#
# Returns preconfigured `ruamel.yaml` object.
#*******************************************************************************
def YAML(typ=None, width=None):
yaml = ruamel.yaml.YAML(typ=typ)
yaml.explicit_start = False
yaml.default_flow_style = False
yaml.preserve_quotes = True
yaml.width = width
return yaml
#*******************************************************************************
# Main Line
#*******************************************************************************
if sys.version_info[:2] < (3, 6):
raise SystemExit(
"ERROR: {} requires Python version 3.6 or later. Current version: {}".format(
sys.argv[0], ".".join(sys.version.splitlines())
)
)
#*-------------------------*
#* Parse the command line. *
#*-------------------------*
description, epilog = ABOUT.split("\n\n", 1)
parser = argparse.ArgumentParser(
formatter_class = argparse.RawDescriptionHelpFormatter,
description = description,
epilog = epilog
)
parser.add_argument("-w", "--width", type=int, default=80,
help="Maximum output line width, default=%(default)i.")
parser.add_argument("-q", "--quiet", action='store_true',
help="Do not print warning messages.")
parser.add_argument("-v", "--verbose", default=0, action="count",
help="Print more debug messages. Adding multiple -v increases the verbosity.")
parser.add_argument("--vault-id", action='append', default=[],
help="The vault identity to use. This argument may be specified multiple times.")
parser.add_argument("--encrypt-vault-id",
help="The vault id used to encrypt (required if more than one vault-id is provided).")
parser.add_argument("--vault-password-file",
type=lambda a: f if (f := pathlib.Path(a)).is_file() else parser.error("The file %s does not exist!" % a),
help="Vault password input file.")
args = parser.parse_args()
pd("parameters: {}".format(args))
#*--------------------------*
#* Read in YAML from stdin. *
#*--------------------------*
lines = sys.stdin.read()
# Uncomment to use the debugger.
# https://stackoverflow.com/a/34687825/2245849
#
# sys.stdin = open("/dev/tty")
# breakpoint()
#*---------------------------------------*
#* Encrypt/decript YAML input to stdout. *
#*---------------------------------------*
yaml = YAML(width=args.width)
data = yaml.load(lines)
yaml.indent(mapping=2, sequence=4, offset=2)
yaml.dump(walk_tree(data), sys.stdout)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment