Skip to content

Instantly share code, notes, and snippets.

@benoit74
Created December 23, 2025 09:20
Show Gist options
  • Select an option

  • Save benoit74/906b8c06163c29016fd28c808359c89c to your computer and use it in GitHub Desktop.

Select an option

Save benoit74/906b8c06163c29016fd28c808359c89c to your computer and use it in GitHub Desktop.
Import new identities with password to Ory
#!/usr/bin/env python3
"""
Script to import users into Ory from a CSV file with password hash transformation.
"""
import csv
import sys
import requests
from typing import Tuple
from pathlib import Path
def transform_pbkdf2_hash(hash_str: str) -> str:
"""
Transform pbkdf2 hash from CSV format to Ory format.
CSV: pbkdf2:<algorithm>:<iteration>$<salt>$<hash>
Ory: $pbkdf2-<algorithm>$i=<iteration>,l=<length>$<salt>$<hash>
"""
# pbkdf2:sha256:310000$abcd1234$hashhash
parts = hash_str.split('$')
if len(parts) != 3:
raise ValueError(f"Invalid pbkdf2 format: {hash_str}")
header = parts[0] # pbkdf2:sha256:310000
salt = parts[1]
hash_value = parts[2]
# Parse header
header_parts = header.split(':')
if len(header_parts) != 3 or header_parts[0] != 'pbkdf2':
raise ValueError(f"Invalid pbkdf2 header: {header}")
algorithm = header_parts[1]
iteration = header_parts[2]
length = 32 # Fixed length
return f"$pbkdf2-{algorithm}$i={iteration},l={length}${salt}${hash_value}"
def transform_scrypt_hash(hash_str: str) -> str:
"""
Transform scrypt hash from CSV format to Ory format.
CSV: scrypt:<cost>:<block>:<parallelization>$<salt>$<hash>
Ory: $scrypt$ln=<cost>,r=<block>,p=<parallelization>$<salt>$<hash>
"""
# scrypt:32768:8:1$salt$hash
parts = hash_str.split('$')
if len(parts) != 3:
raise ValueError(f"Invalid scrypt format: {hash_str}")
header = parts[0] # scrypt:32768:8:1
salt = parts[1]
hash_value = parts[2]
# Parse header
header_parts = header.split(':')
if len(header_parts) != 4 or header_parts[0] != 'scrypt':
raise ValueError(f"Invalid scrypt header: {header}")
cost = header_parts[1]
block = header_parts[2]
parallelization = header_parts[3]
return f"$scrypt$ln={cost},r={block},p={parallelization}${salt}${hash_value}"
def transform_password_hash(password_hash: str) -> str:
"""
Transform password hash based on its type (pbkdf2 or scrypt).
"""
if password_hash.startswith("pbkdf2"):
return transform_pbkdf2_hash(password_hash)
elif password_hash.startswith("scrypt"):
return transform_scrypt_hash(password_hash)
else:
raise ValueError(f"Unsupported password hash type: {password_hash}")
def identity_exists(
email: str,
project_slug: str,
api_token: str
) -> Tuple[bool, bool]:
"""
Check if an identity with the given email already exists.
Returns (exists, rate_limited) tuple.
"""
url = f"https://{project_slug}.projects.oryapis.com/admin/identities"
headers = {
"Authorization": f"Bearer {api_token}",
"Content-Type": "application/json"
}
params = {
"credentials_identifier": email
}
try:
response = requests.get(url, headers=headers, params=params, timeout=10)
if response.status_code == 429:
return False, True # Rate limited
if response.status_code == 200:
# Check if any identity was returned
data = response.json()
# API returns a list directly
return len(data) > 0, False
return False, False # Identity doesn't exist
except requests.exceptions.RequestException:
# On network errors, assume identity doesn't exist
return False, False
def create_identity(
email: str,
name: str,
hashed_password: str,
project_slug: str,
api_token: str,
schema_id: str
) -> Tuple[bool, str, bool]:
"""
Create an identity in Ory with the given email, name and hashed password.
Returns (success, message, rate_limited) tuple.
"""
url = f"https://{project_slug}.projects.oryapis.com/admin/identities"
headers = {
"Authorization": f"Bearer {api_token}",
"Content-Type": "application/json"
}
payload = {
"schema_id": schema_id,
"traits": {
"email": email,
"name": name
},
"credentials": {
"password": {
"config": {
"hashed_password": hashed_password
}
}
}
}
try:
response = requests.post(url, json=payload, headers=headers, timeout=10)
if response.status_code == 429:
return False, f"Rate limited (429) while creating identity for {email}", True
response.raise_for_status()
return True, f"Successfully created identity for {email}", False
except requests.exceptions.RequestException as e:
return False, f"Failed to create identity for {email}: {str(e)}", False
def main():
if len(sys.argv) < 5:
print("Usage: python ory_import.py <csv_file> <project_slug> <api_token> <schema_id>")
print()
print("CSV format (3 columns, no header):")
print(" name,email,password_hash")
print()
print("Example:")
print(" python ory_import.py users.csv my-project ory_pat_xRKLsFEOUFQFVBjd6o3FQDifaLYhabGd preset://email")
sys.exit(1)
csv_file = sys.argv[1]
project_slug = sys.argv[2]
api_token = sys.argv[3]
schema_id = sys.argv[4]
if not Path(csv_file).exists():
print(f"Error: CSV file not found: {csv_file}")
sys.exit(1)
total = 0
success = 0
failed = 0
try:
with open(csv_file, 'r') as f:
reader = csv.reader(f)
for row in reader:
if len(row) != 3:
print(f"Skipping invalid row (expected 3 columns): {row}")
continue
name, email, password_hash = row
name = name.strip()
email = email.strip()
password_hash = password_hash.strip()
if not name or not email or not password_hash:
print(f"Skipping row with empty name, email or password_hash")
continue
total += 1
try:
# Check if identity already exists
exists, rate_limited = identity_exists(email, project_slug, api_token)
if rate_limited:
print(f"✗ Rate limited (429) while checking if identity exists for {email}")
print(f"Aborting due to rate limiting. Processed {success} identities successfully.")
sys.exit(1)
if exists:
print(f"⊘ Identity already exists for {email}")
continue
# Transform password hash
transformed_hash = transform_password_hash(password_hash)
# Create identity via API
is_success, message, rate_limited = create_identity(
email,
name,
transformed_hash,
project_slug,
api_token,
schema_id
)
if rate_limited:
print(f"✗ {message}")
print(f"Aborting due to rate limiting. Processed {success} identities successfully.")
sys.exit(1)
if is_success:
success += 1
print(f"✓ {message}")
else:
failed += 1
print(f"✗ {message}")
except ValueError as e:
failed += 1
print(f"✗ Error processing {email}: {str(e)}")
print()
print(f"Summary: {total} total, {success} successful, {failed} failed")
if failed > 0:
sys.exit(1)
except Exception as e:
print(f"Fatal error: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment