Created
December 23, 2025 09:20
-
-
Save benoit74/906b8c06163c29016fd28c808359c89c to your computer and use it in GitHub Desktop.
Import new identities with password to Ory
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Script to import users into Ory from a CSV file with password hash transformation. | |
| """ | |
| import csv | |
| import sys | |
| import requests | |
| from typing import Tuple | |
| from pathlib import Path | |
| def transform_pbkdf2_hash(hash_str: str) -> str: | |
| """ | |
| Transform pbkdf2 hash from CSV format to Ory format. | |
| CSV: pbkdf2:<algorithm>:<iteration>$<salt>$<hash> | |
| Ory: $pbkdf2-<algorithm>$i=<iteration>,l=<length>$<salt>$<hash> | |
| """ | |
| # pbkdf2:sha256:310000$abcd1234$hashhash | |
| parts = hash_str.split('$') | |
| if len(parts) != 3: | |
| raise ValueError(f"Invalid pbkdf2 format: {hash_str}") | |
| header = parts[0] # pbkdf2:sha256:310000 | |
| salt = parts[1] | |
| hash_value = parts[2] | |
| # Parse header | |
| header_parts = header.split(':') | |
| if len(header_parts) != 3 or header_parts[0] != 'pbkdf2': | |
| raise ValueError(f"Invalid pbkdf2 header: {header}") | |
| algorithm = header_parts[1] | |
| iteration = header_parts[2] | |
| length = 32 # Fixed length | |
| return f"$pbkdf2-{algorithm}$i={iteration},l={length}${salt}${hash_value}" | |
| def transform_scrypt_hash(hash_str: str) -> str: | |
| """ | |
| Transform scrypt hash from CSV format to Ory format. | |
| CSV: scrypt:<cost>:<block>:<parallelization>$<salt>$<hash> | |
| Ory: $scrypt$ln=<cost>,r=<block>,p=<parallelization>$<salt>$<hash> | |
| """ | |
| # scrypt:32768:8:1$salt$hash | |
| parts = hash_str.split('$') | |
| if len(parts) != 3: | |
| raise ValueError(f"Invalid scrypt format: {hash_str}") | |
| header = parts[0] # scrypt:32768:8:1 | |
| salt = parts[1] | |
| hash_value = parts[2] | |
| # Parse header | |
| header_parts = header.split(':') | |
| if len(header_parts) != 4 or header_parts[0] != 'scrypt': | |
| raise ValueError(f"Invalid scrypt header: {header}") | |
| cost = header_parts[1] | |
| block = header_parts[2] | |
| parallelization = header_parts[3] | |
| return f"$scrypt$ln={cost},r={block},p={parallelization}${salt}${hash_value}" | |
| def transform_password_hash(password_hash: str) -> str: | |
| """ | |
| Transform password hash based on its type (pbkdf2 or scrypt). | |
| """ | |
| if password_hash.startswith("pbkdf2"): | |
| return transform_pbkdf2_hash(password_hash) | |
| elif password_hash.startswith("scrypt"): | |
| return transform_scrypt_hash(password_hash) | |
| else: | |
| raise ValueError(f"Unsupported password hash type: {password_hash}") | |
| def identity_exists( | |
| email: str, | |
| project_slug: str, | |
| api_token: str | |
| ) -> Tuple[bool, bool]: | |
| """ | |
| Check if an identity with the given email already exists. | |
| Returns (exists, rate_limited) tuple. | |
| """ | |
| url = f"https://{project_slug}.projects.oryapis.com/admin/identities" | |
| headers = { | |
| "Authorization": f"Bearer {api_token}", | |
| "Content-Type": "application/json" | |
| } | |
| params = { | |
| "credentials_identifier": email | |
| } | |
| try: | |
| response = requests.get(url, headers=headers, params=params, timeout=10) | |
| if response.status_code == 429: | |
| return False, True # Rate limited | |
| if response.status_code == 200: | |
| # Check if any identity was returned | |
| data = response.json() | |
| # API returns a list directly | |
| return len(data) > 0, False | |
| return False, False # Identity doesn't exist | |
| except requests.exceptions.RequestException: | |
| # On network errors, assume identity doesn't exist | |
| return False, False | |
| def create_identity( | |
| email: str, | |
| name: str, | |
| hashed_password: str, | |
| project_slug: str, | |
| api_token: str, | |
| schema_id: str | |
| ) -> Tuple[bool, str, bool]: | |
| """ | |
| Create an identity in Ory with the given email, name and hashed password. | |
| Returns (success, message, rate_limited) tuple. | |
| """ | |
| url = f"https://{project_slug}.projects.oryapis.com/admin/identities" | |
| headers = { | |
| "Authorization": f"Bearer {api_token}", | |
| "Content-Type": "application/json" | |
| } | |
| payload = { | |
| "schema_id": schema_id, | |
| "traits": { | |
| "email": email, | |
| "name": name | |
| }, | |
| "credentials": { | |
| "password": { | |
| "config": { | |
| "hashed_password": hashed_password | |
| } | |
| } | |
| } | |
| } | |
| try: | |
| response = requests.post(url, json=payload, headers=headers, timeout=10) | |
| if response.status_code == 429: | |
| return False, f"Rate limited (429) while creating identity for {email}", True | |
| response.raise_for_status() | |
| return True, f"Successfully created identity for {email}", False | |
| except requests.exceptions.RequestException as e: | |
| return False, f"Failed to create identity for {email}: {str(e)}", False | |
| def main(): | |
| if len(sys.argv) < 5: | |
| print("Usage: python ory_import.py <csv_file> <project_slug> <api_token> <schema_id>") | |
| print() | |
| print("CSV format (3 columns, no header):") | |
| print(" name,email,password_hash") | |
| print() | |
| print("Example:") | |
| print(" python ory_import.py users.csv my-project ory_pat_xRKLsFEOUFQFVBjd6o3FQDifaLYhabGd preset://email") | |
| sys.exit(1) | |
| csv_file = sys.argv[1] | |
| project_slug = sys.argv[2] | |
| api_token = sys.argv[3] | |
| schema_id = sys.argv[4] | |
| if not Path(csv_file).exists(): | |
| print(f"Error: CSV file not found: {csv_file}") | |
| sys.exit(1) | |
| total = 0 | |
| success = 0 | |
| failed = 0 | |
| try: | |
| with open(csv_file, 'r') as f: | |
| reader = csv.reader(f) | |
| for row in reader: | |
| if len(row) != 3: | |
| print(f"Skipping invalid row (expected 3 columns): {row}") | |
| continue | |
| name, email, password_hash = row | |
| name = name.strip() | |
| email = email.strip() | |
| password_hash = password_hash.strip() | |
| if not name or not email or not password_hash: | |
| print(f"Skipping row with empty name, email or password_hash") | |
| continue | |
| total += 1 | |
| try: | |
| # Check if identity already exists | |
| exists, rate_limited = identity_exists(email, project_slug, api_token) | |
| if rate_limited: | |
| print(f"✗ Rate limited (429) while checking if identity exists for {email}") | |
| print(f"Aborting due to rate limiting. Processed {success} identities successfully.") | |
| sys.exit(1) | |
| if exists: | |
| print(f"⊘ Identity already exists for {email}") | |
| continue | |
| # Transform password hash | |
| transformed_hash = transform_password_hash(password_hash) | |
| # Create identity via API | |
| is_success, message, rate_limited = create_identity( | |
| email, | |
| name, | |
| transformed_hash, | |
| project_slug, | |
| api_token, | |
| schema_id | |
| ) | |
| if rate_limited: | |
| print(f"✗ {message}") | |
| print(f"Aborting due to rate limiting. Processed {success} identities successfully.") | |
| sys.exit(1) | |
| if is_success: | |
| success += 1 | |
| print(f"✓ {message}") | |
| else: | |
| failed += 1 | |
| print(f"✗ {message}") | |
| except ValueError as e: | |
| failed += 1 | |
| print(f"✗ Error processing {email}: {str(e)}") | |
| print() | |
| print(f"Summary: {total} total, {success} successful, {failed} failed") | |
| if failed > 0: | |
| sys.exit(1) | |
| except Exception as e: | |
| print(f"Fatal error: {str(e)}") | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment