Skip to content

Instantly share code, notes, and snippets.

@lewcpe
Created December 30, 2025 12:59
Show Gist options
  • Select an option

  • Save lewcpe/d7d5cef5c5b4046fb22769f81b486e7e to your computer and use it in GitHub Desktop.

Select an option

Save lewcpe/d7d5cef5c5b4046fb22769f81b486e7e to your computer and use it in GitHub Desktop.
FastCGI Intercepting Proxy
import socket
import struct
import select
import threading
import re
import sys
import gzip
import argparse
"""
FastCGI Intercepting Proxy
This script acts as a middleware between a Web Server (e.g., Nginx, Apache) and a
FastCGI backend (e.g., PHP-FPM). It intercepts the STDOUT stream from the backend,
performs real-time string replacement in the response body, and transparently
forwards the modified data back to the web server.
Key Features:
- FastCGI Protocol Parsing: Correctly handles FastCGI record framing.
- GZIP Support: Automatically detects, decompresses, modifies, and re-compresses
gzipped responses.
- Dynamic Header Correction: Recalculates 'Content-Length' headers after
modifications to prevent truncated responses or protocol errors.
- Threaded Architecture: Handles multiple concurrent requests using Python threads.
- Context Logging: Prints surrounding text of replacements for easy debugging.
Usage:
python3 fcgi_proxy.py -p 9001 -P 9000 -o "old-domain.com" -n "new-domain.com"
Nginx Configuration:
fastcgi_pass 127.0.0.1:9001;
"""
# Default Configuration
DEFAULT_LISTEN_HOST = '0.0.0.0'
DEFAULT_LISTEN_PORT = 9001
DEFAULT_BACKEND_HOST = '127.0.0.1'
DEFAULT_BACKEND_PORT = 9000
DEFAULT_OLD_STRING = 'internal.example.com'
DEFAULT_NEW_STRING = 'www.example.com'
# Global variables (will be updated by args)
LISTEN_HOST = DEFAULT_LISTEN_HOST
LISTEN_PORT = DEFAULT_LISTEN_PORT
BACKEND_HOST = DEFAULT_BACKEND_HOST
BACKEND_PORT = DEFAULT_BACKEND_PORT
OLD_STRING = DEFAULT_OLD_STRING.encode('utf-8')
NEW_STRING = DEFAULT_NEW_STRING.encode('utf-8')
# FastCGI Constants
FCGI_HEADER_LEN = 8
FCGI_VERSION_1 = 1
FCGI_BEGIN_REQUEST = 1
FCGI_ABORT_REQUEST = 2
FCGI_END_REQUEST = 3
FCGI_PARAMS = 4
FCGI_STDIN = 5
FCGI_STDOUT = 6
FCGI_STDERR = 7
FCGI_DATA = 8
FCGI_GET_VALUES = 9
FCGI_GET_VALUES_RESULT = 10
FCGI_UNKNOWN_TYPE = 11
def get_header(data):
# version, type, requestId, contentLength, paddingLength, reserved
return struct.unpack('!BBHHBB', data)
def pack_header(fcgi_type, request_id, content_length, padding_length):
return struct.pack('!BBHHBB', FCGI_VERSION_1, fcgi_type, request_id, content_length, padding_length, 0)
def create_record(fcgi_type, request_id, content):
"""Creates one or more FCGI records from content."""
records = []
# Max content length per record is 65535
offset = 0
total = len(content)
if total == 0:
# Empty record
records.append(pack_header(fcgi_type, request_id, 0, 0))
return b''.join(records)
while offset < total:
chunk_size = min(total - offset, 65535)
chunk = content[offset:offset+chunk_size]
# Calculate padding to align to 8 bytes (optional but recommended in spec, though many impls ignore)
# We'll just use 0 padding for simplicity unless strict alignment is needed.
# Spec suggests padding to 8-byte boundary is good practice.
padding_len = (8 - (chunk_size % 8)) % 8
header = pack_header(fcgi_type, request_id, chunk_size, padding_len)
records.append(header + chunk + (b'\x00' * padding_len))
offset += chunk_size
return b''.join(records)
def process_response(content):
"""
Replaces string in content, handling GZIP and updating Content-Length.
Content is the raw payload of STDOUT (Headers + Body).
"""
# 1. Separate Headers and Body
split_idx = content.find(b'\r\n\r\n')
is_crlf = True
if split_idx == -1:
split_idx = content.find(b'\n\n')
is_crlf = False
if split_idx == -1:
# No headers found, treat as all body or error?
# In FCGI STDOUT, headers are expected.
return content
header_end = split_idx + (4 if is_crlf else 2)
headers_raw = content[:split_idx]
body = content[header_end:]
# 2. Check for Compression
is_gzipped = False
# Simple check; for robust HTTP parsing usually we'd parse lines
if re.search(b'(?i)Content-Encoding:\\s*gzip', headers_raw):
is_gzipped = True
try:
# 3. Decompress
body = gzip.decompress(body)
print("[Info] Decompressed GZIP body for processing.")
except Exception as e:
print(f"[Error] Failed to decompress body: {e}")
# If decompression fails, we can't replace safely. Return original.
return content
# 4. Analyze and Replace
# Case-insensitive search
pattern = re.compile(re.escape(OLD_STRING), re.IGNORECASE)
matches = list(pattern.finditer(body))
if matches:
print(f"[Info] Found {len(matches)} occurrences of '{OLD_STRING.decode()}' (case-insensitive).")
for i, m in enumerate(matches):
start = m.start()
end = m.end()
# Context window
ctx_start = max(0, start - 100)
ctx_end = min(len(body), end + 100)
pre_context = body[ctx_start:start]
match_str = body[start:end]
post_context = body[end:ctx_end]
print(f"--- Match #{i+1} ---")
print(f"Context: {pre_context.decode('utf-8', 'replace')}[[{match_str.decode('utf-8', 'replace')}]] {post_context.decode('utf-8', 'replace')}")
print("-------------------")
# Perform replacement
body = pattern.sub(NEW_STRING, body)
# 5. Re-compress if needed
if is_gzipped:
body = gzip.compress(body)
print(f"[Info] Re-compressed body. New size: {len(body)}")
# 6. Update Content-Length header
new_len = len(body)
# Regex for Content-Length in headers
len_pattern = re.compile(b'(?i)(Content-Length:\\s*)(\\d+)')
def replace_len(match):
print(f"[Info] Updating Content-Length to {new_len}")
return match.group(1) + str(new_len).encode('ascii')
new_headers = len_pattern.sub(replace_len, headers_raw)
# 7. Reassemble
return new_headers + (b'\r\n\r\n' if is_crlf else b'\n\n') + body
def handle_client(client_sock):
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
server_sock.connect((BACKEND_HOST, BACKEND_PORT))
except Exception as e:
print(f"[Error] Could not connect to backend {BACKEND_HOST}:{BACKEND_PORT} - {e}")
client_sock.close()
return
# Buffers for STDOUT per RequestID
# request_id -> bytearray
stdout_buffers = {}
# We need to track if we've seen END_REQUEST for a request ID to know when to flush?
# Actually, we process when we see END_REQUEST packet from server.
inputs = [client_sock, server_sock]
try:
while True:
readable, _, _ = select.select(inputs, [], [])
for sock in readable:
if sock is client_sock:
# Read from Web Server (Client)
# We assume Web Server sends valid FCGI records.
# We just forward them to Backend.
# But we need to read exactly record by record or just stream?
# Since we don't modify Client->Server traffic, we can just stream raw bytes.
# HOWEVER, if we just stream, we might break boundaries if we are not careful?
# No, TCP stream is stream.
data = sock.recv(4096)
if not data:
return # Connection closed
server_sock.sendall(data)
elif sock is server_sock:
# Read from Backend (Server)
# We MUST parse this to intercept STDOUT
# We need to read the header first (8 bytes)
# NOTE: recv might return less than 8 bytes or split packets.
# Ideally we need a buffer for incoming server data.
# For simplicity in this script, we'll try to peek/read header.
# But implementing full stream buffering is safer.
pass
# The above select loop with raw recv is insufficient for parsing FCGI records
# because a recv(4096) might contain 1.5 records.
# We need a dedicated reader loop for the server_sock.
break # Go to robust loop below
except Exception:
pass
# Robust Loop Re-implementation
# We need buffers for raw socket data
client_buffer = b''
server_buffer = b''
while True:
rlist, _, _ = select.select([client_sock, server_sock], [], [])
if client_sock in rlist:
data = client_sock.recv(4096)
if not data: break
server_sock.sendall(data)
if server_sock in rlist:
data = server_sock.recv(8192)
if not data: break
server_buffer += data
# Process server_buffer
while len(server_buffer) >= FCGI_HEADER_LEN:
header = server_buffer[:FCGI_HEADER_LEN]
version, fcgi_type, req_id, content_len, padding_len, reserved = get_header(header)
total_record_len = FCGI_HEADER_LEN + content_len + padding_len
if len(server_buffer) < total_record_len:
# Wait for more data
break
# Extract full record
record = server_buffer[:total_record_len]
server_buffer = server_buffer[total_record_len:]
content = record[FCGI_HEADER_LEN : FCGI_HEADER_LEN + content_len]
# Padding is ignored
if fcgi_type == FCGI_STDOUT:
# Buffer content
if req_id not in stdout_buffers:
stdout_buffers[req_id] = bytearray()
stdout_buffers[req_id].extend(content)
elif fcgi_type == FCGI_END_REQUEST:
# Process and Flush STDOUT
if req_id in stdout_buffers:
full_content = stdout_buffers.pop(req_id)
modified_content = process_response(full_content)
# Send modified STDOUT packets
# We must chunk it
client_sock.sendall(create_record(FCGI_STDOUT, req_id, modified_content))
# Send Empty STDOUT to signal end of stream (standard FCGI)
client_sock.sendall(create_record(FCGI_STDOUT, req_id, b''))
# Forward the END_REQUEST record as is
client_sock.sendall(record)
else:
# Forward other records (STDERR, etc) immediately
client_sock.sendall(record)
client_sock.close()
server_sock.close()
def main():
global LISTEN_HOST, LISTEN_PORT, BACKEND_HOST, BACKEND_PORT, OLD_STRING, NEW_STRING
parser = argparse.ArgumentParser(description='FastCGI Proxy with String Replacement')
parser.add_argument('--listen-host', '-l', default=DEFAULT_LISTEN_HOST, help=f'Host to listen on (default: {DEFAULT_LISTEN_HOST})')
parser.add_argument('--listen-port', '-p', type=int, default=DEFAULT_LISTEN_PORT, help=f'Port to listen on (default: {DEFAULT_LISTEN_PORT})')
parser.add_argument('--backend-host', '-b', default=DEFAULT_BACKEND_HOST, help=f'Backend FastCGI Host (default: {DEFAULT_BACKEND_HOST})')
parser.add_argument('--backend-port', '-P', type=int, default=DEFAULT_BACKEND_PORT, help=f'Backend FastCGI Port (default: {DEFAULT_BACKEND_PORT})')
parser.add_argument('--old-string', '-o', default=DEFAULT_OLD_STRING, help=f'String to replace (default: {DEFAULT_OLD_STRING})')
parser.add_argument('--new-string', '-n', default=DEFAULT_NEW_STRING, help=f'New string replacement (default: {DEFAULT_NEW_STRING})')
args = parser.parse_args()
# Update globals
LISTEN_HOST = args.listen_host
LISTEN_PORT = args.listen_port
BACKEND_HOST = args.backend_host
BACKEND_PORT = args.backend_port
OLD_STRING = args.old_string.encode('utf-8')
NEW_STRING = args.new_string.encode('utf-8')
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
server.bind((LISTEN_HOST, LISTEN_PORT))
except Exception as e:
print(f"Error binding to {LISTEN_PORT}: {e}")
return
server.listen(10)
print(f"Proxy listening on {LISTEN_HOST}:{LISTEN_PORT} -> {BACKEND_HOST}:{BACKEND_PORT}")
print(f"Replacing '{OLD_STRING.decode()}' -> '{NEW_STRING.decode()}'")
try:
while True:
client_sock, addr = server.accept()
# Handle in a thread
t = threading.Thread(target=handle_client, args=(client_sock,))
t.daemon = True
t.start()
except KeyboardInterrupt:
print("Shutting down...")
finally:
server.close()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment