Skip to content

Instantly share code, notes, and snippets.

@JJTech0130
Last active February 7, 2026 02:00
Show Gist options
  • Select an option

  • Save JJTech0130/49a9958f7ad2a535305fcbb8076fd418 to your computer and use it in GitHub Desktop.

Select an option

Save JJTech0130/49a9958f7ad2a535305fcbb8076fd418 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import argparse
import json
import time
import threading
from datetime import datetime, timezone
from typing import Dict, Any, Tuple, List
import pika
from pika.adapters.blocking_connection import BlockingChannel
from pika.spec import Basic, BasicProperties
# Basic overview of how minions currently work:
# 1. Minion enrolls by sending a message to the worker_enroll_queue with its ID, hostname, and role.
# 2. Minion starts a heartbeat thread that sends a heartbeat message every 10 seconds to the heartbeat_queue with its ID, timestamp, and resource usage.
# 3. Minion connects to the task_request_queue and waits for tasks.
# in KoTH mode, it subscribes to specific topics on the koth_task_request_exchange instead of the task_request_queue.
# 4. When a task is received, it processes the task and sends a response to the task_response_queue with the status and any error message.
# in KoTH mode, it sends the response to the koth_task_response_queue instead.
# We're not really using RabbitMQ as intended, for some reason we have a separate vhost (and thus TCP connection) for each queue
# Scorify RabbitMQ config
RABBITMQ_HOST = "scorify.local"
RABBITMQ_PORT = 5672
RABBITMQ_USER = "minion"
RABBITMQ_PASS = "yABq0eqFcXkzB6wozxqBoKWZ4oZSHgkzwkAPeK7YMggKV3PTrIP0d1XSZ4N2GXrz"
# Minion configuration
MINION_ID = "00000000-0000-0000-0000-000000000001"
HOSTNAME = "debug-minion-1"
HEARTBEAT_INTERVAL = 10
def run_check(task_request: Dict[str, Any]) -> Tuple[str, str]:
# Check implementation
# (status, error) - status is "up", "down", or "unknown"
try:
source_name = task_request.get("source_name", "unknown")
config = task_request.get("config", "{}")
config = json.loads(config)
print(f"[*] Running check: {source_name} with config: {config}")
if source_name == "ping":
print(f"[*] Pinging {config["target"]}...")
time.sleep(0.1)
return "up", ""
except Exception as e:
# maybe a real implementation would submit errors rather than raising
raise e
return "unknown", str(e)
def run_koth_check(task_request: Dict[str, Any]) -> Tuple[str, str]:
try:
filename = task_request.get("filename", "")
print(f"[*] Reading KOTH file: {filename}")
# team UUID that captured
return "4b8dc1d3-335b-49f0-aa79-faf37e1fb225", ""
except Exception as e:
return "", str(e)
def create_connection(vhost: str) -> pika.BlockingConnection:
credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS)
parameters = pika.ConnectionParameters(
host=RABBITMQ_HOST,
port=RABBITMQ_PORT,
virtual_host=vhost,
credentials=credentials,
heartbeat=600,
blocked_connection_timeout=300,
)
return pika.BlockingConnection(parameters)
def send_task_response(task_response: Dict[str, Any]) -> None:
connection = create_connection("task_response_vhost")
channel = connection.channel()
channel.queue_declare(queue="task_response_queue", durable=False)
channel.basic_publish(
exchange="", routing_key="task_response_queue", body=json.dumps(task_response)
)
connection.close()
def process_task_request(
ch: BlockingChannel,
method: Basic.Deliver,
_properties: BasicProperties,
body: bytes,
) -> None:
"""Process incoming service task request"""
try:
task_request = json.loads(body)
status_id = task_request.get("status_id")
print(f"[<] Received task: status_id={status_id}")
status, error = run_check(task_request)
task_response = {
"status_id": status_id,
"minion_id": MINION_ID,
"status": status,
"error": error,
}
send_task_response(task_response)
print(f"[>] Sent response: status={status}, error={error}")
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"[!] Task processing error: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
def send_koth_task_response(task_response: Dict[str, Any]) -> None:
connection = create_connection("koth_task_response_vhost")
channel = connection.channel()
channel.queue_declare(queue="koth_task_response_queue", durable=False)
# Server automatically adds minion_id, but we include it for completeness
channel.basic_publish(
exchange="",
routing_key="koth_task_response_queue",
body=json.dumps(task_response),
)
connection.close()
def process_koth_task_request(
ch: BlockingChannel,
method: Basic.Deliver,
_properties: BasicProperties,
body: bytes,
) -> None:
try:
task_request = json.loads(body)
status_id = task_request.get("status_id")
print(f"[<] Received KOTH task: status_id={status_id}")
content, error = run_koth_check(task_request)
task_response = {
"status_id": status_id,
"minion_id": MINION_ID,
"content": content,
"error": error,
}
send_koth_task_response(task_response)
print(f"[>] Sent KOTH response: content={content}, error={error}")
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"[!] KOTH task processing error: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
def enroll_worker(role: str) -> None:
try:
connection = create_connection("worker_enroll_vhost")
channel = connection.channel()
channel.queue_declare(queue="worker_enroll_queue", durable=False)
enrollment_msg = {
"minion_id": MINION_ID,
"hostname": HOSTNAME,
"role": role,
}
channel.basic_publish(
exchange="",
routing_key="worker_enroll_queue",
body=json.dumps(enrollment_msg),
)
print(f"[+] Enrolled worker: {enrollment_msg}")
connection.close()
except Exception as e:
print(f"[!] Enrollment error: {e}")
raise e
def heartbeat_loop() -> None:
connection = create_connection("heartbeat_vhost")
channel = connection.channel()
channel.queue_declare(queue="heartbeat_queue", durable=False)
while True:
try:
heartbeat_msg = {
"minion_id": MINION_ID,
"timestamp": datetime.now(timezone.utc)
.isoformat()
.replace("+00:00", "Z"),
"memory_usage": 0,
"memory_total": 0,
"cpu_usage": 0,
"goroutines": 0,
}
channel.basic_publish(
exchange="",
routing_key="heartbeat_queue",
body=json.dumps(heartbeat_msg),
)
print("[♥] Sent heartbeat")
except Exception as e:
print(f"[!] Heartbeat error: {e}")
# Try to reconnect on error
try:
connection = create_connection("heartbeat_vhost")
channel = connection.channel()
channel.queue_declare(queue="heartbeat_queue", durable=False)
except Exception as reconnect_err:
print(f"[!] Heartbeat reconnect failed: {reconnect_err}")
time.sleep(HEARTBEAT_INTERVAL)
def run_service_mode() -> None:
print("[*] Running in SERVICE mode")
enroll_worker("service")
# Start heartbeat daemon thread
threading.Thread(target=heartbeat_loop, daemon=True).start()
print(f"[*] Heartbeat started (interval: {HEARTBEAT_INTERVAL}s)")
# Connect and consume tasks
connection = create_connection("task_request_vhost")
channel = connection.channel()
channel.queue_declare(queue="task_request_queue", durable=False)
channel.basic_qos(prefetch_count=1)
print("[*] Waiting for tasks...")
channel.basic_consume(
queue="task_request_queue", on_message_callback=process_task_request
)
try:
channel.start_consuming()
except KeyboardInterrupt:
print("\n[*] Shutting down...")
channel.stop_consuming()
connection.close()
print("[*] Shutdown complete")
def run_koth_mode(topics: List[str]) -> None:
print(f"[*] Running in KOTH mode with topics: {topics}")
enroll_worker("koth")
# Start heartbeat daemon thread
threading.Thread(target=heartbeat_loop, daemon=True).start()
print(f"[*] Heartbeat started (interval: {HEARTBEAT_INTERVAL}s)")
# Connect to KOTH task request exchange
connection = create_connection("koth_task_request_vhost")
channel = connection.channel()
# Declare the topic exchange
channel.exchange_declare(
exchange="koth_task_request_exchange", exchange_type="topic", durable=False
)
# Create exclusive, auto-delete queue
result = channel.queue_declare(queue="", exclusive=True, auto_delete=True)
queue_name = result.method.queue
# Bind to each topic
for topic in topics:
channel.queue_bind(
exchange="koth_task_request_exchange", queue=queue_name, routing_key=topic
)
print(f"[+] Subscribed to topic: {topic}")
print(f"[*] Waiting for KOTH tasks on {len(topics)} topic(s)...")
channel.basic_consume(
queue=queue_name, on_message_callback=process_koth_task_request, auto_ack=False
)
try:
channel.start_consuming()
except KeyboardInterrupt:
print("\n[*] Shutting down...")
channel.stop_consuming()
connection.close()
print("[*] Shutdown complete")
def main() -> None:
# Parse command-line arguments
parser = argparse.ArgumentParser(
description="Scorify Minion - Service or KOTH mode"
)
parser.add_argument(
"--topic",
action="append",
dest="topics",
help="KOTH topic to subscribe to (can be specified multiple times). If provided, runs in KOTH mode.",
)
args = parser.parse_args()
print(f"[*] Minion ID: {MINION_ID}")
print(f"[*] Hostname: {HOSTNAME}")
# Determine mode based on --topic flag
if args.topics:
run_koth_mode(args.topics)
else:
run_service_mode()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment