Last active
February 7, 2026 02:00
-
-
Save JJTech0130/49a9958f7ad2a535305fcbb8076fd418 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import argparse | |
| import json | |
| import time | |
| import threading | |
| from datetime import datetime, timezone | |
| from typing import Dict, Any, Tuple, List | |
| import pika | |
| from pika.adapters.blocking_connection import BlockingChannel | |
| from pika.spec import Basic, BasicProperties | |
| # Basic overview of how minions currently work: | |
| # 1. Minion enrolls by sending a message to the worker_enroll_queue with its ID, hostname, and role. | |
| # 2. Minion starts a heartbeat thread that sends a heartbeat message every 10 seconds to the heartbeat_queue with its ID, timestamp, and resource usage. | |
| # 3. Minion connects to the task_request_queue and waits for tasks. | |
| # in KoTH mode, it subscribes to specific topics on the koth_task_request_exchange instead of the task_request_queue. | |
| # 4. When a task is received, it processes the task and sends a response to the task_response_queue with the status and any error message. | |
| # in KoTH mode, it sends the response to the koth_task_response_queue instead. | |
| # We're not really using RabbitMQ as intended, for some reason we have a separate vhost (and thus TCP connection) for each queue | |
| # Scorify RabbitMQ config | |
| RABBITMQ_HOST = "scorify.local" | |
| RABBITMQ_PORT = 5672 | |
| RABBITMQ_USER = "minion" | |
| RABBITMQ_PASS = "yABq0eqFcXkzB6wozxqBoKWZ4oZSHgkzwkAPeK7YMggKV3PTrIP0d1XSZ4N2GXrz" | |
| # Minion configuration | |
| MINION_ID = "00000000-0000-0000-0000-000000000001" | |
| HOSTNAME = "debug-minion-1" | |
| HEARTBEAT_INTERVAL = 10 | |
| def run_check(task_request: Dict[str, Any]) -> Tuple[str, str]: | |
| # Check implementation | |
| # (status, error) - status is "up", "down", or "unknown" | |
| try: | |
| source_name = task_request.get("source_name", "unknown") | |
| config = task_request.get("config", "{}") | |
| config = json.loads(config) | |
| print(f"[*] Running check: {source_name} with config: {config}") | |
| if source_name == "ping": | |
| print(f"[*] Pinging {config["target"]}...") | |
| time.sleep(0.1) | |
| return "up", "" | |
| except Exception as e: | |
| # maybe a real implementation would submit errors rather than raising | |
| raise e | |
| return "unknown", str(e) | |
| def run_koth_check(task_request: Dict[str, Any]) -> Tuple[str, str]: | |
| try: | |
| filename = task_request.get("filename", "") | |
| print(f"[*] Reading KOTH file: {filename}") | |
| # team UUID that captured | |
| return "4b8dc1d3-335b-49f0-aa79-faf37e1fb225", "" | |
| except Exception as e: | |
| return "", str(e) | |
| def create_connection(vhost: str) -> pika.BlockingConnection: | |
| credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS) | |
| parameters = pika.ConnectionParameters( | |
| host=RABBITMQ_HOST, | |
| port=RABBITMQ_PORT, | |
| virtual_host=vhost, | |
| credentials=credentials, | |
| heartbeat=600, | |
| blocked_connection_timeout=300, | |
| ) | |
| return pika.BlockingConnection(parameters) | |
| def send_task_response(task_response: Dict[str, Any]) -> None: | |
| connection = create_connection("task_response_vhost") | |
| channel = connection.channel() | |
| channel.queue_declare(queue="task_response_queue", durable=False) | |
| channel.basic_publish( | |
| exchange="", routing_key="task_response_queue", body=json.dumps(task_response) | |
| ) | |
| connection.close() | |
| def process_task_request( | |
| ch: BlockingChannel, | |
| method: Basic.Deliver, | |
| _properties: BasicProperties, | |
| body: bytes, | |
| ) -> None: | |
| """Process incoming service task request""" | |
| try: | |
| task_request = json.loads(body) | |
| status_id = task_request.get("status_id") | |
| print(f"[<] Received task: status_id={status_id}") | |
| status, error = run_check(task_request) | |
| task_response = { | |
| "status_id": status_id, | |
| "minion_id": MINION_ID, | |
| "status": status, | |
| "error": error, | |
| } | |
| send_task_response(task_response) | |
| print(f"[>] Sent response: status={status}, error={error}") | |
| ch.basic_ack(delivery_tag=method.delivery_tag) | |
| except Exception as e: | |
| print(f"[!] Task processing error: {e}") | |
| ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) | |
| def send_koth_task_response(task_response: Dict[str, Any]) -> None: | |
| connection = create_connection("koth_task_response_vhost") | |
| channel = connection.channel() | |
| channel.queue_declare(queue="koth_task_response_queue", durable=False) | |
| # Server automatically adds minion_id, but we include it for completeness | |
| channel.basic_publish( | |
| exchange="", | |
| routing_key="koth_task_response_queue", | |
| body=json.dumps(task_response), | |
| ) | |
| connection.close() | |
| def process_koth_task_request( | |
| ch: BlockingChannel, | |
| method: Basic.Deliver, | |
| _properties: BasicProperties, | |
| body: bytes, | |
| ) -> None: | |
| try: | |
| task_request = json.loads(body) | |
| status_id = task_request.get("status_id") | |
| print(f"[<] Received KOTH task: status_id={status_id}") | |
| content, error = run_koth_check(task_request) | |
| task_response = { | |
| "status_id": status_id, | |
| "minion_id": MINION_ID, | |
| "content": content, | |
| "error": error, | |
| } | |
| send_koth_task_response(task_response) | |
| print(f"[>] Sent KOTH response: content={content}, error={error}") | |
| ch.basic_ack(delivery_tag=method.delivery_tag) | |
| except Exception as e: | |
| print(f"[!] KOTH task processing error: {e}") | |
| ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) | |
| def enroll_worker(role: str) -> None: | |
| try: | |
| connection = create_connection("worker_enroll_vhost") | |
| channel = connection.channel() | |
| channel.queue_declare(queue="worker_enroll_queue", durable=False) | |
| enrollment_msg = { | |
| "minion_id": MINION_ID, | |
| "hostname": HOSTNAME, | |
| "role": role, | |
| } | |
| channel.basic_publish( | |
| exchange="", | |
| routing_key="worker_enroll_queue", | |
| body=json.dumps(enrollment_msg), | |
| ) | |
| print(f"[+] Enrolled worker: {enrollment_msg}") | |
| connection.close() | |
| except Exception as e: | |
| print(f"[!] Enrollment error: {e}") | |
| raise e | |
| def heartbeat_loop() -> None: | |
| connection = create_connection("heartbeat_vhost") | |
| channel = connection.channel() | |
| channel.queue_declare(queue="heartbeat_queue", durable=False) | |
| while True: | |
| try: | |
| heartbeat_msg = { | |
| "minion_id": MINION_ID, | |
| "timestamp": datetime.now(timezone.utc) | |
| .isoformat() | |
| .replace("+00:00", "Z"), | |
| "memory_usage": 0, | |
| "memory_total": 0, | |
| "cpu_usage": 0, | |
| "goroutines": 0, | |
| } | |
| channel.basic_publish( | |
| exchange="", | |
| routing_key="heartbeat_queue", | |
| body=json.dumps(heartbeat_msg), | |
| ) | |
| print("[♥] Sent heartbeat") | |
| except Exception as e: | |
| print(f"[!] Heartbeat error: {e}") | |
| # Try to reconnect on error | |
| try: | |
| connection = create_connection("heartbeat_vhost") | |
| channel = connection.channel() | |
| channel.queue_declare(queue="heartbeat_queue", durable=False) | |
| except Exception as reconnect_err: | |
| print(f"[!] Heartbeat reconnect failed: {reconnect_err}") | |
| time.sleep(HEARTBEAT_INTERVAL) | |
| def run_service_mode() -> None: | |
| print("[*] Running in SERVICE mode") | |
| enroll_worker("service") | |
| # Start heartbeat daemon thread | |
| threading.Thread(target=heartbeat_loop, daemon=True).start() | |
| print(f"[*] Heartbeat started (interval: {HEARTBEAT_INTERVAL}s)") | |
| # Connect and consume tasks | |
| connection = create_connection("task_request_vhost") | |
| channel = connection.channel() | |
| channel.queue_declare(queue="task_request_queue", durable=False) | |
| channel.basic_qos(prefetch_count=1) | |
| print("[*] Waiting for tasks...") | |
| channel.basic_consume( | |
| queue="task_request_queue", on_message_callback=process_task_request | |
| ) | |
| try: | |
| channel.start_consuming() | |
| except KeyboardInterrupt: | |
| print("\n[*] Shutting down...") | |
| channel.stop_consuming() | |
| connection.close() | |
| print("[*] Shutdown complete") | |
| def run_koth_mode(topics: List[str]) -> None: | |
| print(f"[*] Running in KOTH mode with topics: {topics}") | |
| enroll_worker("koth") | |
| # Start heartbeat daemon thread | |
| threading.Thread(target=heartbeat_loop, daemon=True).start() | |
| print(f"[*] Heartbeat started (interval: {HEARTBEAT_INTERVAL}s)") | |
| # Connect to KOTH task request exchange | |
| connection = create_connection("koth_task_request_vhost") | |
| channel = connection.channel() | |
| # Declare the topic exchange | |
| channel.exchange_declare( | |
| exchange="koth_task_request_exchange", exchange_type="topic", durable=False | |
| ) | |
| # Create exclusive, auto-delete queue | |
| result = channel.queue_declare(queue="", exclusive=True, auto_delete=True) | |
| queue_name = result.method.queue | |
| # Bind to each topic | |
| for topic in topics: | |
| channel.queue_bind( | |
| exchange="koth_task_request_exchange", queue=queue_name, routing_key=topic | |
| ) | |
| print(f"[+] Subscribed to topic: {topic}") | |
| print(f"[*] Waiting for KOTH tasks on {len(topics)} topic(s)...") | |
| channel.basic_consume( | |
| queue=queue_name, on_message_callback=process_koth_task_request, auto_ack=False | |
| ) | |
| try: | |
| channel.start_consuming() | |
| except KeyboardInterrupt: | |
| print("\n[*] Shutting down...") | |
| channel.stop_consuming() | |
| connection.close() | |
| print("[*] Shutdown complete") | |
| def main() -> None: | |
| # Parse command-line arguments | |
| parser = argparse.ArgumentParser( | |
| description="Scorify Minion - Service or KOTH mode" | |
| ) | |
| parser.add_argument( | |
| "--topic", | |
| action="append", | |
| dest="topics", | |
| help="KOTH topic to subscribe to (can be specified multiple times). If provided, runs in KOTH mode.", | |
| ) | |
| args = parser.parse_args() | |
| print(f"[*] Minion ID: {MINION_ID}") | |
| print(f"[*] Hostname: {HOSTNAME}") | |
| # Determine mode based on --topic flag | |
| if args.topics: | |
| run_koth_mode(args.topics) | |
| else: | |
| run_service_mode() | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment