Skip to content

Instantly share code, notes, and snippets.

@warhammerkid
Created December 27, 2025 15:49
Show Gist options
  • Select an option

  • Save warhammerkid/9618ae50d550a2e9457227a90d75c083 to your computer and use it in GitHub Desktop.

Select an option

Save warhammerkid/9618ae50d550a2e9457227a90d75c083 to your computer and use it in GitHub Desktop.
from dataclasses import dataclass
from enum import IntEnum, unique
import hashlib
import os
import struct
import sys
from typing import Optional
if sys.version_info < (3, 11):
from typing_extensions import Self
else:
from typing import Self
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature, encode_dss_signature
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.padding import PKCS7
def aes_decrypt(data: bytes, aes_key: bytes, iv: Optional[bytes] = None) -> bytes:
"""Decrypt data using non-standard AES-CBC.
Format: [length:2][iv_seed:4?][encrypted_data:N]
Args:
data: Encrypted message
aes_key: 16-byte AES key
iv: Optional 16-byte IV. If None, IV seed will be extracted from data
Returns:
Decrypted plaintext (without padding)
"""
view = memoryview(data)
data_len = struct.unpack('!H', view[:2])[0]
# If no IV is given, derive it from the 4 bytes after the length header
if iv is None:
iv_seed = view[2:6]
iv = hashlib.md5(iv_seed).digest()
encrypted = view[6:]
else:
encrypted = view[2:]
# Decrypt the data
cipher = Cipher(algorithms.AES(aes_key), modes.CBC(iv))
decryptor = cipher.decryptor()
decrypted = decryptor.update(encrypted) + decryptor.finalize()
# Use the length prefix to strip off the correct amount of padding
return decrypted[:data_len]
def aes_encrypt(data: bytes, aes_key: bytes, iv: Optional[bytes] = None) -> bytes:
"""Encrypt data using non-standard AES-CBC.
Format: [length:2][iv_seed:4?][encrypted_data:N]
Args:
data: Plaintext data to encrypt
aes_key: 16-byte AES key
iv: Optional 16-byte IV. If None, random IV seed will be generated
Returns:
Encrypted message
"""
message_header = struct.pack('!H', len(data))
# If no IV is given, generate a random seed, hash that for the iv, and add
# the seed to the header
if iv is None:
iv_seed = os.urandom(4)
iv = hashlib.md5(iv_seed).digest()
message_header += iv_seed
# Encrypt the data
aes = algorithms.AES(aes_key)
padder = PKCS7(aes.block_size).padder()
padded = padder.update(data) + padder.finalize()
cipher = Cipher(aes, modes.CBC(iv))
encryptor = cipher.encryptor()
encrypted = encryptor.update(padded) + encryptor.finalize()
return message_header + encrypted
@dataclass
class KeyBundle:
"""Bundle of cryptographic keys needed for encrypted communication.
Attributes:
signing_key: EC private key for signing our own public key during handshake
verify_key: EC public key for verifying peer's signature
shared_secret: 16-byte static shared secret (same for client and server)
"""
signing_key: ec.EllipticCurvePrivateKey
verify_key: ec.EllipticCurvePublicKey
shared_secret: bytes
def __post_init__(self):
if len(self.shared_secret) != 16:
raise ValueError('shared_secret must be exactly 16 bytes')
@unique
class HandshakeState(IntEnum):
CHALLENGE = 1 # server message
CHALLENGE_RESPONSE = 2 # client message
CHALLENGE_ACCEPTED = 3 # server message
SERVER_PUBLIC_KEY = 4 # server message
CLIENT_PUBLIC_KEY = 5 # client message
ECDH_ACCEPTED = 6 # server message
class HandshakeMessage:
PREFIX = b'**'
def __init__(self, state: HandshakeState, body: bytes) -> None:
self.state = state
self.body = body
def bytearray(self) -> bytearray:
msg = bytearray(len(self.body) + 6)
msg[0:2] = self.PREFIX
struct.pack_into('BB', msg, 2, self.state, len(self.body))
msg[4:-2] = self.body
struct.pack_into('!H', msg, -2, sum(msg[2:]))
return msg
@classmethod
def parse(cls, data: bytes) -> Self:
view = memoryview(data)
if view[0:2] != cls.PREFIX:
raise ValueError(f'Prefix is not correct: {view[0:2]}')
checksum = struct.pack('!H', sum(view[2:-2]))
if view[-2:] != checksum:
raise ValueError(f'Checksum is not correct: {checksum!r} vs {view[-2:]}')
state, length = struct.unpack('BB', view[2:4])
body_length = len(view) - 6
if body_length != length:
raise ValueError(f'Body length should be {length} but is {body_length}')
return cls(HandshakeState(state), view[4:-2].tobytes())
class HandshakeProtocol:
def __init__(self, key_bundle: KeyBundle) -> None:
self._key_bundle = key_bundle
self._aes_key: Optional[bytes] = None
self._aes_iv: Optional[bytes] = None
self._ephemeral_private_key: Optional[ec.EllipticCurvePrivateKey] = None
self._ephemeral_public_key: Optional[ec.EllipticCurvePublicKey] = None
self._peer_public_key: Optional[ec.EllipticCurvePublicKey] = None
self._session_aes_key: Optional[bytes] = None
@property
def session_aes_key(self):
"""The AES key to use after the handshake has been completed"""
return self._session_aes_key
def handle(self, data: Optional[bytes]) -> Optional[bytearray]:
"""Decodes client or server messages and generates the correct response.
Attributes:
data: The preceeding message or None for the initial server message
for the current state
"""
if data is None:
# Handle server-initiated rounds - challenge then key exchange
if self._aes_key is None:
response = self._generate_challenge()
else:
response = self._generate_server_public_key()
else:
# Parse message
if self._ephemeral_public_key and self._aes_key and self._aes_iv:
decrypted = aes_decrypt(data, self._aes_key, self._aes_iv)
message = HandshakeMessage.parse(decrypted)
else:
message = HandshakeMessage.parse(data)
# Handle message responses
if message.state == HandshakeState.CHALLENGE:
response = self._handle_challenge(message)
elif message.state == HandshakeState.CHALLENGE_RESPONSE:
response = self._handle_challenge_response(message)
elif message.state == HandshakeState.CHALLENGE_ACCEPTED:
self._handle_challenge_accepted(message)
return None
elif message.state == HandshakeState.SERVER_PUBLIC_KEY:
response = self._handle_server_public_key(message)
elif message.state == HandshakeState.CLIENT_PUBLIC_KEY:
response = self._handle_client_public_key(message)
elif message.state == HandshakeState.ECDH_ACCEPTED:
self._handle_ecdh_accepted(message)
return None
# Convert response to a bytearray
if self._ephemeral_public_key and self._aes_key and self._aes_iv:
response_bytes = bytes(response.bytearray())
encrypted_bytes = aes_encrypt(response_bytes, self._aes_key, self._aes_iv)
return bytearray(encrypted_bytes)
elif response:
return response.bytearray()
else:
raise Exception('This should not be possible')
def _generate_challenge(self) -> HandshakeMessage:
challenge = os.urandom(4)
self._set_aes_key(challenge)
return HandshakeMessage(HandshakeState.CHALLENGE, challenge)
def _handle_challenge(self, message: HandshakeMessage) -> HandshakeMessage:
self._set_aes_key(message.body)
assert self._aes_iv is not None
return HandshakeMessage(HandshakeState.CHALLENGE_RESPONSE, self._aes_iv[8:12])
def _handle_challenge_response(self, message: HandshakeMessage) -> HandshakeMessage:
assert self._aes_iv is not None
if message.body == self._aes_iv[8:12]:
return HandshakeMessage(HandshakeState.CHALLENGE_ACCEPTED, b'\x00')
else:
raise Exception('TODO: figure out what this should do')
def _handle_challenge_accepted(self, message: HandshakeMessage) -> None:
if message.body != b'\x00':
raise ValueError(f'Challenge response was not 0: {message.body!r}')
# Generate ephemeral keys so that future messages are assumed to be
# encrypted
self._set_ephemeral_keys()
def _generate_server_public_key(self) -> HandshakeMessage:
self._set_ephemeral_keys()
key_and_signature = self._sign_public_key()
return HandshakeMessage(HandshakeState.SERVER_PUBLIC_KEY, key_and_signature)
def _handle_server_public_key(self, message: HandshakeMessage) -> HandshakeMessage:
# Verify and load the server public key
self._peer_public_key = self._verify_public_key(message.body)
# Build response message
key_and_signature = self._sign_public_key()
return HandshakeMessage(HandshakeState.CLIENT_PUBLIC_KEY, key_and_signature)
def _handle_client_public_key(self, message: HandshakeMessage) -> HandshakeMessage:
# Verify and load the client public key
self._peer_public_key = self._verify_public_key(message.body)
# Set the session aes key
assert self._ephemeral_private_key is not None
self._session_aes_key = self._ephemeral_private_key.exchange(ec.ECDH(), self._peer_public_key)
# Build response message
return HandshakeMessage(HandshakeState.ECDH_ACCEPTED, b'\x00')
def _handle_ecdh_accepted(self, message: HandshakeMessage) -> None:
if message.body != b'\x00':
raise ValueError(f'ECDH accepted response was not 0: {message.body!r}')
# Set the session aes key
assert self._ephemeral_private_key is not None
assert self._peer_public_key is not None
self._session_aes_key = self._ephemeral_private_key.exchange(ec.ECDH(), self._peer_public_key)
def _sign_public_key(self) -> bytes:
assert self._aes_iv is not None
assert self._ephemeral_public_key is not None
# Convert the ephemeral public key to a 64 byte string, removing the
# first byte that says that it's an uncompressed point
public_key_bytes = self._ephemeral_public_key.public_bytes(
serialization.Encoding.X962, serialization.PublicFormat.UncompressedPoint
)[1:]
# Sign the public key with our signing key and convert the DER-encoded
# signature to a raw 64 byte string
signing_data = public_key_bytes + self._aes_iv
der_signature = self._key_bundle.signing_key.sign(signing_data, ec.ECDSA(hashes.SHA256()))
r, s = decode_dss_signature(der_signature)
signature = r.to_bytes(32, 'big') + s.to_bytes(32, 'big')
return public_key_bytes + signature
def _verify_public_key(self, key_and_signature: bytes) -> ec.EllipticCurvePublicKey:
assert self._aes_iv is not None
# Split the message into the server public key and the signature
if len(key_and_signature) != 128:
raise ValueError(f'Signed key length should be 128 but is {len(key_and_signature)}')
data, signature = key_and_signature[:64], key_and_signature[64:]
# Check the signature - it needs to be DER-encoded first
signing_data = data + self._aes_iv
r, s = int.from_bytes(signature[:32], 'big'), int.from_bytes(signature[32:], 'big')
der_signature = encode_dss_signature(r, s)
self._key_bundle.verify_key.verify(der_signature, signing_data, ec.ECDSA(hashes.SHA256()))
# Return public key
return ec.EllipticCurvePublicKey.from_encoded_point(ec.SECP256R1(), b'\x04' + data)
def _set_aes_key(self, challenge: bytes) -> None:
if len(challenge) != 4:
raise ValueError(f'Expected challenge to be 4 bytes but was {len(challenge)}')
# Calculate MD5 of reverse order challenge for IV
self._aes_iv = hashlib.md5(challenge[::-1]).digest()
# XOR the IV with the shared secret to derive the key
self._aes_key = bytes([x ^ y for x, y in zip(self._aes_iv, self._key_bundle.shared_secret)])
def _set_ephemeral_keys(self) -> None:
self._ephemeral_private_key = ec.generate_private_key(ec.SECP256R1())
self._ephemeral_public_key = self._ephemeral_private_key.public_key()
import unittest
from unittest.mock import patch
from cryptography.hazmat.primitives.asymmetric import ec
from bluetti_mqtt.bluetooth.encryption import HandshakeProtocol, KeyBundle
class TestBluetoothEncryption(unittest.TestCase):
CLIENT_KEY = '21435a09bdece87b866b90795363a7fe6d7bc4dca2f527456db4df41a6b7bb0e'
SERVER_KEY = 'e6b006a7910953a1069f6438d327b48b0c01a50b22c95c6a7c6f13582576a932'
SHARED_SECRET = b')yb\xf3\xa6\x10N\x00,\x1f"*(\xd0\xa6\x01'
def setUp(self):
client_key = ec.derive_private_key(int(self.CLIENT_KEY, 16), curve=ec.SECP256R1())
server_key = ec.derive_private_key(int(self.SERVER_KEY, 16), curve=ec.SECP256R1())
self.server = HandshakeProtocol(KeyBundle(server_key, client_key.public_key(), self.SHARED_SECRET))
self.client = HandshakeProtocol(KeyBundle(client_key, server_key.public_key(), self.SHARED_SECRET))
def test_handshake(self):
# Step 1 - Generate challenge
with patch('os.urandom', return_value=b'1234'):
step_1 = self.server.handle(None)
self.assertEqual(step_1, b'**\x01\x041234\x00\xcf')
# Step 2 - Generate challenge response
step_2 = self.client.handle(step_1)
self.assertEqual(step_2, b'**\x02\x04N\xe2\xfc\xa7\x02\xd9')
# Step 3 - Validate challenge response
step_3 = self.server.handle(step_2)
self.assertEqual(step_3, b'**\x03\x01\x00\x00\x04')
# Step 4 - Tell client their challenge response is accepted
self.client.handle(step_3)
# Step 5 - Generate message with signed server public key
step_5 = self.server.handle(None)
# Step 6 - Send the client the signed server public key
step_6 = self.client.handle(step_5)
# Step 7 - Send the server the signed client public key
step_7 = self.server.handle(step_6)
# Step 8 - Send the client the handshake accepted message
self.client.handle(step_7)
# Verify they are using the same AES key
self.assertIsNotNone(self.server.session_aes_key)
self.assertEqual(self.server.session_aes_key, self.client.session_aes_key)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment