Created
December 30, 2025 12:59
-
-
Save lewcpe/d7d5cef5c5b4046fb22769f81b486e7e to your computer and use it in GitHub Desktop.
FastCGI Intercepting Proxy
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import socket | |
| import struct | |
| import select | |
| import threading | |
| import re | |
| import sys | |
| import gzip | |
| import argparse | |
| """ | |
| FastCGI Intercepting Proxy | |
| This script acts as a middleware between a Web Server (e.g., Nginx, Apache) and a | |
| FastCGI backend (e.g., PHP-FPM). It intercepts the STDOUT stream from the backend, | |
| performs real-time string replacement in the response body, and transparently | |
| forwards the modified data back to the web server. | |
| Key Features: | |
| - FastCGI Protocol Parsing: Correctly handles FastCGI record framing. | |
| - GZIP Support: Automatically detects, decompresses, modifies, and re-compresses | |
| gzipped responses. | |
| - Dynamic Header Correction: Recalculates 'Content-Length' headers after | |
| modifications to prevent truncated responses or protocol errors. | |
| - Threaded Architecture: Handles multiple concurrent requests using Python threads. | |
| - Context Logging: Prints surrounding text of replacements for easy debugging. | |
| Usage: | |
| python3 fcgi_proxy.py -p 9001 -P 9000 -o "old-domain.com" -n "new-domain.com" | |
| Nginx Configuration: | |
| fastcgi_pass 127.0.0.1:9001; | |
| """ | |
| # Default Configuration | |
| DEFAULT_LISTEN_HOST = '0.0.0.0' | |
| DEFAULT_LISTEN_PORT = 9001 | |
| DEFAULT_BACKEND_HOST = '127.0.0.1' | |
| DEFAULT_BACKEND_PORT = 9000 | |
| DEFAULT_OLD_STRING = 'internal.example.com' | |
| DEFAULT_NEW_STRING = 'www.example.com' | |
| # Global variables (will be updated by args) | |
| LISTEN_HOST = DEFAULT_LISTEN_HOST | |
| LISTEN_PORT = DEFAULT_LISTEN_PORT | |
| BACKEND_HOST = DEFAULT_BACKEND_HOST | |
| BACKEND_PORT = DEFAULT_BACKEND_PORT | |
| OLD_STRING = DEFAULT_OLD_STRING.encode('utf-8') | |
| NEW_STRING = DEFAULT_NEW_STRING.encode('utf-8') | |
| # FastCGI Constants | |
| FCGI_HEADER_LEN = 8 | |
| FCGI_VERSION_1 = 1 | |
| FCGI_BEGIN_REQUEST = 1 | |
| FCGI_ABORT_REQUEST = 2 | |
| FCGI_END_REQUEST = 3 | |
| FCGI_PARAMS = 4 | |
| FCGI_STDIN = 5 | |
| FCGI_STDOUT = 6 | |
| FCGI_STDERR = 7 | |
| FCGI_DATA = 8 | |
| FCGI_GET_VALUES = 9 | |
| FCGI_GET_VALUES_RESULT = 10 | |
| FCGI_UNKNOWN_TYPE = 11 | |
| def get_header(data): | |
| # version, type, requestId, contentLength, paddingLength, reserved | |
| return struct.unpack('!BBHHBB', data) | |
| def pack_header(fcgi_type, request_id, content_length, padding_length): | |
| return struct.pack('!BBHHBB', FCGI_VERSION_1, fcgi_type, request_id, content_length, padding_length, 0) | |
| def create_record(fcgi_type, request_id, content): | |
| """Creates one or more FCGI records from content.""" | |
| records = [] | |
| # Max content length per record is 65535 | |
| offset = 0 | |
| total = len(content) | |
| if total == 0: | |
| # Empty record | |
| records.append(pack_header(fcgi_type, request_id, 0, 0)) | |
| return b''.join(records) | |
| while offset < total: | |
| chunk_size = min(total - offset, 65535) | |
| chunk = content[offset:offset+chunk_size] | |
| # Calculate padding to align to 8 bytes (optional but recommended in spec, though many impls ignore) | |
| # We'll just use 0 padding for simplicity unless strict alignment is needed. | |
| # Spec suggests padding to 8-byte boundary is good practice. | |
| padding_len = (8 - (chunk_size % 8)) % 8 | |
| header = pack_header(fcgi_type, request_id, chunk_size, padding_len) | |
| records.append(header + chunk + (b'\x00' * padding_len)) | |
| offset += chunk_size | |
| return b''.join(records) | |
| def process_response(content): | |
| """ | |
| Replaces string in content, handling GZIP and updating Content-Length. | |
| Content is the raw payload of STDOUT (Headers + Body). | |
| """ | |
| # 1. Separate Headers and Body | |
| split_idx = content.find(b'\r\n\r\n') | |
| is_crlf = True | |
| if split_idx == -1: | |
| split_idx = content.find(b'\n\n') | |
| is_crlf = False | |
| if split_idx == -1: | |
| # No headers found, treat as all body or error? | |
| # In FCGI STDOUT, headers are expected. | |
| return content | |
| header_end = split_idx + (4 if is_crlf else 2) | |
| headers_raw = content[:split_idx] | |
| body = content[header_end:] | |
| # 2. Check for Compression | |
| is_gzipped = False | |
| # Simple check; for robust HTTP parsing usually we'd parse lines | |
| if re.search(b'(?i)Content-Encoding:\\s*gzip', headers_raw): | |
| is_gzipped = True | |
| try: | |
| # 3. Decompress | |
| body = gzip.decompress(body) | |
| print("[Info] Decompressed GZIP body for processing.") | |
| except Exception as e: | |
| print(f"[Error] Failed to decompress body: {e}") | |
| # If decompression fails, we can't replace safely. Return original. | |
| return content | |
| # 4. Analyze and Replace | |
| # Case-insensitive search | |
| pattern = re.compile(re.escape(OLD_STRING), re.IGNORECASE) | |
| matches = list(pattern.finditer(body)) | |
| if matches: | |
| print(f"[Info] Found {len(matches)} occurrences of '{OLD_STRING.decode()}' (case-insensitive).") | |
| for i, m in enumerate(matches): | |
| start = m.start() | |
| end = m.end() | |
| # Context window | |
| ctx_start = max(0, start - 100) | |
| ctx_end = min(len(body), end + 100) | |
| pre_context = body[ctx_start:start] | |
| match_str = body[start:end] | |
| post_context = body[end:ctx_end] | |
| print(f"--- Match #{i+1} ---") | |
| print(f"Context: {pre_context.decode('utf-8', 'replace')}[[{match_str.decode('utf-8', 'replace')}]] {post_context.decode('utf-8', 'replace')}") | |
| print("-------------------") | |
| # Perform replacement | |
| body = pattern.sub(NEW_STRING, body) | |
| # 5. Re-compress if needed | |
| if is_gzipped: | |
| body = gzip.compress(body) | |
| print(f"[Info] Re-compressed body. New size: {len(body)}") | |
| # 6. Update Content-Length header | |
| new_len = len(body) | |
| # Regex for Content-Length in headers | |
| len_pattern = re.compile(b'(?i)(Content-Length:\\s*)(\\d+)') | |
| def replace_len(match): | |
| print(f"[Info] Updating Content-Length to {new_len}") | |
| return match.group(1) + str(new_len).encode('ascii') | |
| new_headers = len_pattern.sub(replace_len, headers_raw) | |
| # 7. Reassemble | |
| return new_headers + (b'\r\n\r\n' if is_crlf else b'\n\n') + body | |
| def handle_client(client_sock): | |
| server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| try: | |
| server_sock.connect((BACKEND_HOST, BACKEND_PORT)) | |
| except Exception as e: | |
| print(f"[Error] Could not connect to backend {BACKEND_HOST}:{BACKEND_PORT} - {e}") | |
| client_sock.close() | |
| return | |
| # Buffers for STDOUT per RequestID | |
| # request_id -> bytearray | |
| stdout_buffers = {} | |
| # We need to track if we've seen END_REQUEST for a request ID to know when to flush? | |
| # Actually, we process when we see END_REQUEST packet from server. | |
| inputs = [client_sock, server_sock] | |
| try: | |
| while True: | |
| readable, _, _ = select.select(inputs, [], []) | |
| for sock in readable: | |
| if sock is client_sock: | |
| # Read from Web Server (Client) | |
| # We assume Web Server sends valid FCGI records. | |
| # We just forward them to Backend. | |
| # But we need to read exactly record by record or just stream? | |
| # Since we don't modify Client->Server traffic, we can just stream raw bytes. | |
| # HOWEVER, if we just stream, we might break boundaries if we are not careful? | |
| # No, TCP stream is stream. | |
| data = sock.recv(4096) | |
| if not data: | |
| return # Connection closed | |
| server_sock.sendall(data) | |
| elif sock is server_sock: | |
| # Read from Backend (Server) | |
| # We MUST parse this to intercept STDOUT | |
| # We need to read the header first (8 bytes) | |
| # NOTE: recv might return less than 8 bytes or split packets. | |
| # Ideally we need a buffer for incoming server data. | |
| # For simplicity in this script, we'll try to peek/read header. | |
| # But implementing full stream buffering is safer. | |
| pass | |
| # The above select loop with raw recv is insufficient for parsing FCGI records | |
| # because a recv(4096) might contain 1.5 records. | |
| # We need a dedicated reader loop for the server_sock. | |
| break # Go to robust loop below | |
| except Exception: | |
| pass | |
| # Robust Loop Re-implementation | |
| # We need buffers for raw socket data | |
| client_buffer = b'' | |
| server_buffer = b'' | |
| while True: | |
| rlist, _, _ = select.select([client_sock, server_sock], [], []) | |
| if client_sock in rlist: | |
| data = client_sock.recv(4096) | |
| if not data: break | |
| server_sock.sendall(data) | |
| if server_sock in rlist: | |
| data = server_sock.recv(8192) | |
| if not data: break | |
| server_buffer += data | |
| # Process server_buffer | |
| while len(server_buffer) >= FCGI_HEADER_LEN: | |
| header = server_buffer[:FCGI_HEADER_LEN] | |
| version, fcgi_type, req_id, content_len, padding_len, reserved = get_header(header) | |
| total_record_len = FCGI_HEADER_LEN + content_len + padding_len | |
| if len(server_buffer) < total_record_len: | |
| # Wait for more data | |
| break | |
| # Extract full record | |
| record = server_buffer[:total_record_len] | |
| server_buffer = server_buffer[total_record_len:] | |
| content = record[FCGI_HEADER_LEN : FCGI_HEADER_LEN + content_len] | |
| # Padding is ignored | |
| if fcgi_type == FCGI_STDOUT: | |
| # Buffer content | |
| if req_id not in stdout_buffers: | |
| stdout_buffers[req_id] = bytearray() | |
| stdout_buffers[req_id].extend(content) | |
| elif fcgi_type == FCGI_END_REQUEST: | |
| # Process and Flush STDOUT | |
| if req_id in stdout_buffers: | |
| full_content = stdout_buffers.pop(req_id) | |
| modified_content = process_response(full_content) | |
| # Send modified STDOUT packets | |
| # We must chunk it | |
| client_sock.sendall(create_record(FCGI_STDOUT, req_id, modified_content)) | |
| # Send Empty STDOUT to signal end of stream (standard FCGI) | |
| client_sock.sendall(create_record(FCGI_STDOUT, req_id, b'')) | |
| # Forward the END_REQUEST record as is | |
| client_sock.sendall(record) | |
| else: | |
| # Forward other records (STDERR, etc) immediately | |
| client_sock.sendall(record) | |
| client_sock.close() | |
| server_sock.close() | |
| def main(): | |
| global LISTEN_HOST, LISTEN_PORT, BACKEND_HOST, BACKEND_PORT, OLD_STRING, NEW_STRING | |
| parser = argparse.ArgumentParser(description='FastCGI Proxy with String Replacement') | |
| parser.add_argument('--listen-host', '-l', default=DEFAULT_LISTEN_HOST, help=f'Host to listen on (default: {DEFAULT_LISTEN_HOST})') | |
| parser.add_argument('--listen-port', '-p', type=int, default=DEFAULT_LISTEN_PORT, help=f'Port to listen on (default: {DEFAULT_LISTEN_PORT})') | |
| parser.add_argument('--backend-host', '-b', default=DEFAULT_BACKEND_HOST, help=f'Backend FastCGI Host (default: {DEFAULT_BACKEND_HOST})') | |
| parser.add_argument('--backend-port', '-P', type=int, default=DEFAULT_BACKEND_PORT, help=f'Backend FastCGI Port (default: {DEFAULT_BACKEND_PORT})') | |
| parser.add_argument('--old-string', '-o', default=DEFAULT_OLD_STRING, help=f'String to replace (default: {DEFAULT_OLD_STRING})') | |
| parser.add_argument('--new-string', '-n', default=DEFAULT_NEW_STRING, help=f'New string replacement (default: {DEFAULT_NEW_STRING})') | |
| args = parser.parse_args() | |
| # Update globals | |
| LISTEN_HOST = args.listen_host | |
| LISTEN_PORT = args.listen_port | |
| BACKEND_HOST = args.backend_host | |
| BACKEND_PORT = args.backend_port | |
| OLD_STRING = args.old_string.encode('utf-8') | |
| NEW_STRING = args.new_string.encode('utf-8') | |
| server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
| try: | |
| server.bind((LISTEN_HOST, LISTEN_PORT)) | |
| except Exception as e: | |
| print(f"Error binding to {LISTEN_PORT}: {e}") | |
| return | |
| server.listen(10) | |
| print(f"Proxy listening on {LISTEN_HOST}:{LISTEN_PORT} -> {BACKEND_HOST}:{BACKEND_PORT}") | |
| print(f"Replacing '{OLD_STRING.decode()}' -> '{NEW_STRING.decode()}'") | |
| try: | |
| while True: | |
| client_sock, addr = server.accept() | |
| # Handle in a thread | |
| t = threading.Thread(target=handle_client, args=(client_sock,)) | |
| t.daemon = True | |
| t.start() | |
| except KeyboardInterrupt: | |
| print("Shutting down...") | |
| finally: | |
| server.close() | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment