Skip to content

Instantly share code, notes, and snippets.

@r33drichards
Last active December 29, 2025 18:06
Show Gist options
  • Select an option

  • Save r33drichards/1b5fb765d7fefe820dd734f30f68626c to your computer and use it in GitHub Desktop.

Select an option

Save r33drichards/1b5fb765d7fefe820dd734f30f68626c to your computer and use it in GitHub Desktop.
CUA-Bench Architecture Guide with Mermaid Diagrams

CUA-Bench Architecture Guide

A comprehensive guide to understanding the CUA-Bench codebase, focusing on benchmark launching and Docker container execution.

Table of Contents

  1. Overview
  2. Project Structure
  3. High-Level Architecture
  4. CLI Command Flow
  5. Docker Container Architecture
  6. Task Definition & Execution
  7. Agent Architecture
  8. Session Management
  9. Complete Execution Flow
  10. Key Classes Reference

Overview

CUA-Bench is a toolkit for creating, managing, and evaluating computer-use (desktop automation) benchmarks for AI agents. It supports:

  • Webtop environments: HTML/CSS/JS based desktop simulations via Playwright
  • VM environments: Real Windows/Linux/macOS/Android VMs
  • Agent evaluation: Test AI agents on desktop automation tasks
  • Dataset generation: Convert trajectories into training data

Project Structure

cua-bench/
├── cua_bench/                 # Main package
│   ├── cli/                   # CLI commands (run, interact, sessions, etc.)
│   ├── sessions/              # Session & provider management
│   │   └── providers/         # Docker provider implementation
│   ├── agents/                # Agent implementations (CuaAgent, GeminiAgent)
│   ├── computers/             # Desktop session providers (webtop, computer)
│   ├── batch/                 # Batch execution solver (runs inside container)
│   ├── processors/            # Dataset processing (aguvis, gui-r1)
│   ├── Environment2.py        # Core environment class
│   ├── core.py                # Task class & make() function
│   ├── decorators.py          # @tasks_config, @setup_task, etc.
│   ├── types.py               # Action types (ClickAction, TypeAction, etc.)
│   └── tracing.py             # Trajectory recording
├── tasks/                     # Example task environments
├── Dockerfile                 # Container for batch execution
└── pyproject.toml             # Dependencies

High-Level Architecture

graph TB
    subgraph "User Interface"
        CLI[cb CLI]
    end

    subgraph "Orchestration Layer"
        RUN[Run Command]
        SM[Session Manager]
        DP[Docker Provider]
    end

    subgraph "Docker Containers"
        MC[Main Container<br/>cua-bench:latest]
        CC[Child Container<br/>cua-xfce:latest]
    end

    subgraph "Inside Main Container"
        BS[Batch Solver]
        ENV[EnvironmentV2]
        AG[Agent]
        TR[Tracing]
    end

    subgraph "Session Providers"
        WT[Webtop Session<br/>Playwright]
        VM[VM Session<br/>Computer API]
    end

    CLI --> RUN
    RUN --> SM
    SM --> DP
    DP --> MC
    DP --> CC
    MC --> BS
    BS --> ENV
    ENV --> AG
    ENV --> TR
    ENV --> WT
    ENV --> VM
    CC -.-> VM
Loading

CLI Command Flow

Available Commands

Command Purpose
cb run Execute tasks with agents or oracle solutions
cb interact Interactive task execution (non-headless)
cb sessions List/manage/stop running sessions
cb runs Watch batch runs, aggregate statistics
cb process Convert outputs to training datasets
cb view-trace Open HTML trace viewer

Run Command Sequence

sequenceDiagram
    participant U as User
    participant CLI as cb run
    participant DP as Docker Provider
    participant DC as Docker Container
    participant BS as Batch Solver
    participant ENV as Environment
    participant AG as Agent

    U->>CLI: cb run tasks/my_env --agent cua-agent
    CLI->>CLI: Load .env, validate args
    CLI->>CLI: Count task variants
    CLI->>DP: Create provider instance

    loop For each task variant
        CLI->>DP: start_session(env_path, variant_idx)
        DP->>DP: Create Docker network (if needed)
        DP->>DC: docker run cua-bench:latest
        DC->>BS: python -m cua_bench.batch.solver
        BS->>ENV: make(env_path)
        BS->>ENV: reset(task_id)

        alt Agent Mode
            BS->>AG: perform_task(description, session)
            loop Until done or max_steps
                AG->>ENV: screenshot()
                AG->>AG: LLM decision
                AG->>ENV: execute_action()
            end
        else Oracle Mode
            BS->>ENV: solve()
        end

        BS->>ENV: evaluate()
        BS->>ENV: tracing.save_to_disk()
    end

    CLI->>U: Display results in TUI
Loading

Docker Container Architecture

Container Networking

graph LR
    subgraph "Docker Network: cua-bench_default"
        MC1[Main Container 1<br/>Session: abc123]
        MC2[Main Container 2<br/>Session: def456]
        CC1[Child Container<br/>XFCE Desktop]
        CC2[Child Container<br/>XFCE Desktop]
    end

    MC1 -->|API Port 8000| CC1
    MC2 -->|API Port 8000| CC2

    HOST[Host Machine]
    HOST -->|VNC 6901| CC1
    HOST -->|VNC 6902| CC2
Loading

Container Creation Flow

flowchart TD
    A[start_session called] --> B{Task needs<br/>child container?}
    B -->|Yes: provider=computer| C[Create XFCE Container]
    B -->|No: provider=webtop| D[Skip child container]
    C --> E[Allocate VNC + API ports]
    E --> F[Start child container]
    F --> G[Build CUA_TASK_CONTAINERS env var]
    D --> H[Build docker run command]
    G --> H
    H --> I[Mount /app/env read-only]
    I --> J[Mount /tmp/td_output for traces]
    J --> K[Pass env vars:<br/>API keys, BATCH_TASK_INDEX]
    K --> L[docker run -d cua-bench:latest]
    L --> M[Save session to ~/.cua/cbsessions.json]
Loading

Dockerfile Layers

graph TD
    A[python:3.12-slim] --> B[System Dependencies<br/>libnss3, libxkbcommon0, etc.]
    B --> C[Node.js + npm]
    C --> D[Claude Code CLI]
    D --> E[Python Dependencies<br/>pip install -e .]
    E --> F[Playwright + Chromium]
    F --> G[Non-root user: cuauser]
    G --> H[cua-bench:latest]
Loading

Task Definition & Execution

Decorator Pattern

Tasks are defined using decorators that mark functions for discovery:

import cua_bench as cb

@cb.tasks_config(split="train")
def load():
    """Returns list of Task objects"""
    return [
        cb.Task(
            description="Click the Submit button",
            computer={"provider": "webtop", "setup_config": {...}}
        )
    ]

@cb.setup_task(split="train")
async def start(task_cfg, session):
    """Initialize task state"""
    await session.launch_window(html="...", title="My App")

@cb.solve_task(split="train")
async def solve(task_cfg, session):
    """Oracle solution"""
    await session.execute_action(ClickAction(x=100, y=200))

@cb.evaluate_task(split="train")
async def evaluate(task_cfg, session) -> list[float]:
    """Return rewards"""
    return [1.0 if success else 0.0]

Environment Loading

flowchart TD
    A[make&#40;env_path&#41;] --> B[Load main.py as module]
    B --> C[Scan for _td_type attributes]
    C --> D{Function type?}
    D -->|tasks_config| E[Register tasks_config_fn]
    D -->|setup_task| F[Register setup_task_fn]
    D -->|solve_task| G[Register solve_task_fn]
    D -->|evaluate_task| H[Register evaluate_task_fn]
    E --> I[Create EnvironmentV2]
    F --> I
    G --> I
    H --> I
Loading

Environment Lifecycle

stateDiagram-v2
    [*] --> Created: make()
    Created --> Ready: reset(task_id)
    Ready --> Stepping: step(action)
    Stepping --> Ready: screenshot returned
    Ready --> Solving: solve()
    Solving --> Ready: oracle complete
    Ready --> Evaluated: evaluate()
    Evaluated --> Closed: close()
    Closed --> [*]
Loading

Agent Architecture

Agent Class Hierarchy

classDiagram
    class BaseAgent {
        <<abstract>>
        +name() str
        +perform_task(description, session, logging_dir) AgentResult
    }

    class CuaAgent {
        +name() "cua-agent"
        +perform_task() AgentResult
        -_create_custom_computer()
    }

    class GeminiAgent {
        +name() "gemini"
        +perform_task() AgentResult
    }

    class AgentResult {
        +total_input_tokens: int
        +total_output_tokens: int
        +failure_mode: FailureMode
    }

    BaseAgent <|-- CuaAgent
    BaseAgent <|-- GeminiAgent
    CuaAgent --> AgentResult
    GeminiAgent --> AgentResult
Loading

Agent Execution Loop

flowchart TD
    A[perform_task called] --> B[Create computer adapter]
    B --> C[Take initial screenshot]
    C --> D{Step < max_steps?}
    D -->|Yes| E[Send screenshot + task to LLM]
    E --> F[Parse action from response]
    F --> G{Action type?}
    G -->|click| H[Execute click]
    G -->|type| I[Execute typing]
    G -->|key| J[Execute keypress]
    G -->|done| K[Exit loop]
    H --> L[Take screenshot]
    I --> L
    J --> L
    L --> M[Track tokens]
    M --> D
    D -->|No| N[Max steps reached]
    K --> O[Return AgentResult]
    N --> O
Loading

Session Management

Session States

stateDiagram-v2
    [*] --> running: start_session()
    running --> completed: Container exits 0
    running --> failed: Container exits non-zero
    running --> stopped: stop_session()
    completed --> deleted: cleanup
    failed --> deleted: cleanup
    stopped --> deleted: cleanup
    deleted --> [*]
Loading

Session Storage

Sessions are tracked in ~/.cua/cbsessions.json:

{
  "session_id": "abc123",
  "container_id": "sha256:...",
  "env_path": "/path/to/tasks/my_env",
  "output_dir": "/tmp/outputs/abc123",
  "status": "running",
  "provider": "docker",
  "run_id": "run_20240101_120000",
  "agent": "cua-agent",
  "model": "claude-sonnet-4-20250514",
  "child_containers": ["container_id_1"],
  "created_at": 1704110400
}

Complete Execution Flow

flowchart TB
    subgraph "1. CLI Layer"
        A[cb run tasks/my_env<br/>--agent cua-agent]
        B[Parse arguments]
        C[Load .env file]
        D[Count task variants]
    end

    subgraph "2. Session Planning"
        E[Create session tasks]
        F[Build container script]
        G[Initialize Docker provider]
    end

    subgraph "3. Docker Orchestration"
        H[Create shared network]
        I[For each session:]
        J[Start child containers<br/>if needed]
        K[Start main container]
        L[Mount volumes]
        M[Pass env vars]
    end

    subgraph "4. Container Execution"
        N[batch/solver.py]
        O[Load environment]
        P[reset&#40;task_id&#41;]
        Q{Agent or<br/>Oracle?}
        R[Agent loop]
        S[Oracle solve]
        T[evaluate&#40;&#41;]
        U[Save trace]
    end

    subgraph "5. Monitoring"
        V[Watch TUI]
        W[Poll container status]
        X[Display rewards]
        Y[Show statistics]
    end

    A --> B --> C --> D --> E --> F --> G
    G --> H --> I --> J --> K --> L --> M
    M --> N --> O --> P --> Q
    Q -->|Agent| R --> T
    Q -->|Oracle| S --> T
    T --> U
    G --> V --> W --> X --> Y
Loading

Key Classes Reference

EnvironmentV2

The core environment wrapper that delegates to session providers:

class EnvironmentV2:
    # Injected functions from decorators
    tasks_config_fn: Callable       # Load task list
    setup_task_fn: Callable         # Initialize task state
    solve_task_fn: Callable         # Oracle solution
    evaluate_task_fn: Callable      # Compute rewards

    # Session management
    session: DesktopSession         # Webtop or Computer provider
    bot: Bot                        # Helper for solution writing
    tracing: Tracing                # Trajectory recording

    # Lifecycle methods
    async def reset(task_id) -> Tuple[Image, Task]
    async def step(action) -> Image
    async def solve() -> Image
    async def evaluate() -> List[float]
    async def close() -> None

Action Types

# Mouse actions
ClickAction(x: int, y: int)
RightClickAction(x: int, y: int)
DoubleClickAction(x: int, y: int)
DragAction(from_x, from_y, to_x, to_y, duration)
ScrollAction(direction: str, amount: int)

# Keyboard actions
TypeAction(text: str)
KeyAction(key: str)
HotkeyAction(keys: List[str])

# Control actions
DoneAction()
WaitAction(seconds: float)

Session Providers

classDiagram
    class SessionProvider {
        <<abstract>>
        +start_session(session_id, env_path, container_script, ...)
        +get_session_status(session_id)
        +stop_session(session_id)
        +get_session_logs(session_id, tail)
    }

    class DockerProvider {
        -network_name: str
        -created_network: bool
        +start_session()
        +get_session_status()
        +stop_session()
        -_ensure_network()
        -_create_child_containers()
    }

    class DesktopSession {
        <<abstract>>
        +screenshot() Image
        +execute_action(action)
        +launch_window(html, title)
        +close()
    }

    class WebDesktopSession {
        -browser: Browser
        -page: Page
        +screenshot()
        +execute_action()
    }

    class VMDesktopSession {
        -computer: Computer
        +screenshot()
        +execute_action()
    }

    SessionProvider <|-- DockerProvider
    DesktopSession <|-- WebDesktopSession
    DesktopSession <|-- VMDesktopSession
Loading

Quick Reference

Running Benchmarks

# Run with oracle solution
cb run tasks/my_env --oracle

# Run with AI agent
cb run tasks/my_env --agent cua-agent --model claude-sonnet-4-20250514

# Run multiple variants in parallel
cb run tasks/my_env --max-parallel 8 --max-variants 10

# Interactive mode (opens browser)
cb interact tasks/my_env

Managing Sessions

# List all sessions
cb sessions list

# View logs for a session
cb sessions logs <session_id>

# Stop a running session
cb sessions stop <session_id>

# Clean up stopped sessions
cb sessions --clean

Processing Results

# View trace in browser
cb view-trace ./outputs/session_abc123

# Process for training data
cb process ./outputs --mode aguvis-stage-1

# Push to Hugging Face
cb process ./outputs --push-to-hub username/dataset

Environment Variables

Variable Purpose
ANTHROPIC_API_KEY API key for Claude models
GOOGLE_API_KEY API key for Gemini models
BATCH_TASK_INDEX Current task variant index (set by container)
BATCH_TASK_COUNT Total variants (set by container)
CUA_TASK_CONTAINERS JSON of child container info

Generated for CUA-Bench repository understanding

CUA-Bench Cloud Provider Implementation Guide

A practical guide for building a cloud hosting service using Incus/KVM with Linux and Windows support.

Table of Contents

  1. Main Container Deep Dive
  2. Communication Protocol
  3. Incus/KVM Architecture
  4. VM Specifications
  5. Cloud Provider Implementation
  6. Windows computer-server
  7. Implementation Checklist

Main Container Deep Dive

What the Main Container Actually Does

The main container (cua-bench:latest) is a Python runtime environment that:

  1. Runs the solver script (python -m cua_bench.batch.solver)
  2. Loads task environments from /app/env
  3. Connects to desktop VMs via HTTP API
  4. Runs AI agents that make decisions based on screenshots
  5. Records traces to /tmp/td_output
graph TB
    subgraph "Main Container / Solver VM"
        Solver[batch/solver.py]
        Env[EnvironmentV2]
        Agent[CuaAgent / GeminiAgent]

        subgraph "Session Abstraction"
            VMSession[VMDesktopSession]
            ComputerLib[computer library]
        end

        subgraph "Outputs"
            Traces[/tmp/td_output/]
        end
    end

    subgraph "Desktop VM (Linux or Windows)"
        API[computer-server :8000]
        Desktop[Desktop Environment]
        VNC[VNC :6901]
    end

    Solver --> Env
    Env --> Agent
    Agent --> VMSession
    VMSession --> ComputerLib
    ComputerLib -->|HTTP| API
    API --> Desktop
Loading

Main Container Components

Component Purpose Required on VM?
Python 3.12 Runtime Yes
cua-bench package Task loading, tracing Yes
cua-agent package AI agent SDK Yes
cua-computer package Desktop VM communication Yes
Playwright + Chromium Only for webtop provider Optional
Claude Code CLI Only if using Claude Code agent Optional

What Gets Executed

# The solver command:
python3 -m cua_bench.batch.solver /app/env \
    --eval \
    --agent "cua-agent" \
    --model "claude-sonnet-4-20250514" \
    --max-steps 100

Solver Execution Flow

sequenceDiagram
    participant Main as Solver VM
    participant Desktop as Desktop VM

    Note over Main: 1. Load environment
    Main->>Main: env = make("/app/env")
    Main->>Main: tasks = env.tasks_config_fn()

    Note over Main: 2. Read container info from env var
    Main->>Main: Parse CUA_TASK_CONTAINERS JSON

    Note over Main: 3. Reset environment
    Main->>Main: env.reset(task_id=BATCH_TASK_INDEX)

    Note over Main: 4. Create session to Desktop VM
    Main->>Desktop: Connect via HTTP (vm_ip:8000)

    Note over Main: 5. Run agent loop
    loop Until done or max_steps
        Main->>Desktop: GET /screenshot
        Desktop-->>Main: PNG bytes
        Main->>Main: Send to LLM with task
        Main->>Main: LLM returns action
        Main->>Desktop: POST /click or /type or /key
    end

    Note over Main: 6. Evaluate and save
    Main->>Main: env.evaluate()
    Main->>Main: tracing.save_to_disk("/tmp/td_output")
Loading

How VMDesktopSession Connects

# cua_bench/computers/computer.py

# When CUA_TASK_CONTAINERS is set:
task_containers = json.loads(os.environ.get("CUA_TASK_CONTAINERS"))
container_info = task_containers[task_index]

comp = Computer(
    os_type=container_info["os_type"],   # "linux" or "windows"
    use_host_computer_server=True,        # Use existing server, don't start new
    api_host=container_info["name"],      # IP address of desktop VM
    api_port=8000,                        # API port
    noVNC_port=6901                       # VNC port (for debugging)
)

Communication Protocol

HTTP API Endpoints (computer-server)

The desktop VM runs computer-server which exposes these endpoints:

Endpoint Method Body Response
/screenshot POST - PNG bytes
/click POST {x, y, button} {status: "ok"}
/double_click POST {x, y} {status: "ok"}
/right_click POST {x, y} {status: "ok"}
/type POST {text} {status: "ok"}
/key POST {key} {status: "ok"}
/hotkey POST {keys: [...]} {status: "ok"}
/scroll POST {x, y, scroll_x, scroll_y} {status: "ok"}
/drag POST {from_x, from_y, to_x, to_y} {status: "ok"}
/move_cursor POST {x, y} {status: "ok"}

Protocol Flow

sequenceDiagram
    participant Agent as AI Agent
    participant Session as VMDesktopSession
    participant Computer as computer library
    participant API as computer-server

    Agent->>Session: screenshot()
    Session->>Computer: interface.screenshot()
    Computer->>API: POST /screenshot
    API-->>Computer: PNG bytes
    Computer-->>Session: bytes
    Session-->>Agent: base64 encoded

    Agent->>Agent: LLM decides: click(100, 200)

    Agent->>Session: execute_action(ClickAction(100, 200))
    Session->>Computer: interface.left_click(100, 200)
    Computer->>API: POST /click {x: 100, y: 200}
    API->>API: Simulate mouse click
    API-->>Computer: {status: "ok"}
Loading

Incus/KVM Architecture

Overall Architecture

graph TB
    subgraph "Your Cloud Service"
        API[Cloud API]
        Orchestrator[VM Orchestrator]
        Storage[File Storage]
    end

    subgraph "Incus/KVM Host 1"
        SolverVM1[Solver VM<br/>Linux]
        DesktopVM1[Desktop VM<br/>Linux XFCE]
    end

    subgraph "Incus/KVM Host 2"
        SolverVM2[Solver VM<br/>Linux]
        DesktopVM2[Desktop VM<br/>Windows 11]
    end

    API --> Orchestrator
    Orchestrator -->|provision| SolverVM1
    Orchestrator -->|provision| DesktopVM1
    Orchestrator -->|provision| SolverVM2
    Orchestrator -->|provision| DesktopVM2

    Storage -->|upload env| SolverVM1
    Storage -->|upload env| SolverVM2
    SolverVM1 -->|download traces| Storage
    SolverVM2 -->|download traces| Storage

    SolverVM1 -->|HTTP :8000| DesktopVM1
    SolverVM2 -->|HTTP :8000| DesktopVM2
Loading

Session Lifecycle with Incus

sequenceDiagram
    participant CLI as cb run
    participant Cloud as Cloud Provider
    participant Incus as Incus API
    participant Solver as Solver VM
    participant Desktop as Desktop VM

    CLI->>Cloud: start_session(env_path, task_config)

    Note over Cloud: 1. Determine OS type from task
    Cloud->>Cloud: os_type = task.computer.os_type

    Note over Cloud: 2. Create Desktop VM
    Cloud->>Incus: Create VM (linux or windows image)
    Incus-->>Desktop: Boot VM
    Desktop->>Desktop: Start computer-server :8000

    Note over Cloud: 3. Wait for Desktop ready
    Cloud->>Desktop: Poll GET /health until ready

    Note over Cloud: 4. Create Solver VM
    Cloud->>Incus: Create VM with env vars
    Incus-->>Solver: Boot VM

    Note over Cloud: 5. Upload task files
    Cloud->>Solver: SCP /app/env files

    Note over Cloud: 6. Start solver
    Solver->>Solver: python -m cua_bench.batch.solver

    Note over Solver,Desktop: 7. Agent execution
    loop Agent loop
        Solver->>Desktop: HTTP API calls
    end

    Note over Cloud: 8. Download results
    Solver->>Cloud: SCP /tmp/td_output

    Note over Cloud: 9. Cleanup
    Cloud->>Incus: Destroy Solver VM
    Cloud->>Incus: Destroy Desktop VM
Loading

VM Specifications

1. Solver VM (Linux)

Replaces the main Docker container.

name: cua-solver-linux
os: Ubuntu 22.04 LTS
arch: x86_64

resources:
  cpu: 2 cores
  ram: 4 GB
  disk: 20 GB

software:
  - python3.12
  - pip packages:
    - cua-bench
    - cua-agent[gemini,qwen]
    - cua-computer
    - anthropic
    - datasets
  # Optional (only if using webtop provider):
  - chromium
  - playwright

environment_variables:
  BATCH_TASK_INDEX: "0"
  BATCH_TASK_COUNT: "1"
  CUA_TASK_CONTAINERS: '[{"name": "10.0.0.5", "api_port": 8000, "novnc_port": 6901, "os_type": "linux"}]'
  ANTHROPIC_API_KEY: "sk-..."
  OPENAI_API_KEY: "sk-..."
  GOOGLE_API_KEY: "..."

filesystem:
  /app/env/: Task environment files (uploaded)
  /tmp/td_output/: Trace outputs (downloaded)

startup_command: |
  python3 -m cua_bench.batch.solver /app/env \
    --eval \
    --agent "cua-agent" \
    --model "claude-sonnet-4-20250514"

2. Linux Desktop VM

name: cua-desktop-linux
os: Ubuntu 22.04 LTS with XFCE
arch: x86_64

resources:
  cpu: 2 cores
  ram: 4 GB
  disk: 20 GB
  display: 1024x768 (virtual framebuffer)

software:
  - xfce4 (desktop environment)
  - xvfb (virtual framebuffer)
  - x11vnc or tigervnc
  - python3
  - computer-server (from cua-computer package)
  - bench_ui (for webview windows)

services:
  - computer-server on 0.0.0.0:8000
  - VNC server on 0.0.0.0:6901

startup_script: |
  # Start virtual display
  Xvfb :99 -screen 0 1024x768x24 &
  export DISPLAY=:99

  # Start desktop
  startxfce4 &

  # Start VNC
  x11vnc -display :99 -forever -shared -rfbport 6901 &

  # Start computer-server
  python3 -m computer.server --host 0.0.0.0 --port 8000

3. Windows Desktop VM

name: cua-desktop-windows
os: Windows 10/11
arch: x86_64

resources:
  cpu: 4 cores
  ram: 8 GB
  disk: 60 GB
  display: 1920x1080

software:
  - Windows 10/11 (activated)
  - Python 3.12
  - computer-server-windows (custom)
  - TightVNC or UltraVNC server

services:
  - computer-server on 0.0.0.0:8000
  - VNC server on 0.0.0.0:6901
  # OR RDP on 0.0.0.0:3389

startup_script: |
  # Start VNC server (runs as service)
  # Start computer-server
  python computer_server_windows.py

Cloud Provider Implementation

Provider Class Structure

from cua_bench.sessions.providers.base import SessionProvider
from typing import Dict, Any, Optional
from pathlib import Path
import json
import os

class IncusCloudProvider(SessionProvider):
    """Incus/KVM-based cloud provider for CUA-Bench."""

    def __init__(self, incus_url: str = "https://localhost:8443"):
        self.client = IncusClient(incus_url)
        self.running_instances: Dict[str, Dict] = {}

    async def start_session(
        self,
        session_id: str,
        env_path: Path,
        container_script: str,
        image_uri: Optional[str] = None,
        output_dir: Optional[str] = None,
        **kwargs
    ) -> Dict[str, Any]:
        """Start a new VM-based session."""
        task_index = kwargs.get('task_index', 0)

        # 1. Load task config to determine OS type
        task_config = await self._load_task_config(env_path, task_index)
        os_type = task_config.get("os_type", "linux")

        # 2. Create Desktop VM
        desktop_image = self._get_desktop_image(os_type)
        desktop_vm = await self._create_vm(
            name=f"desktop-{session_id}",
            image=desktop_image,
            profiles=["desktop-" + os_type],
        )

        # 3. Wait for computer-server to be ready
        desktop_ip = await self._get_vm_ip(desktop_vm["name"])
        await self._wait_for_health(desktop_ip, 8000, timeout=120)

        # 4. Create Solver VM
        solver_vm = await self._create_vm(
            name=f"solver-{session_id}",
            image="cua-solver-linux",
            profiles=["solver"],
            config={
                "user.BATCH_TASK_INDEX": str(task_index),
                "user.BATCH_TASK_COUNT": "1",
                "user.CUA_TASK_CONTAINERS": json.dumps([{
                    "name": desktop_ip,
                    "api_port": 8000,
                    "novnc_port": 6901,
                    "os_type": os_type,
                }]),
                "user.ANTHROPIC_API_KEY": os.environ.get("ANTHROPIC_API_KEY", ""),
            },
        )

        solver_ip = await self._get_vm_ip(solver_vm["name"])

        # 5. Upload task environment files
        await self._upload_directory(solver_ip, env_path, "/app/env")

        # 6. Start the solver script
        await self._exec_command(
            solver_vm["name"],
            f"nohup {container_script} > /tmp/solver.log 2>&1 &"
        )

        # 7. Track and return session info
        session_data = {
            "session_id": session_id,
            "container_id": solver_vm["name"],
            "provider": "incus-cloud",
            "status": "running",
            "env_path": str(env_path),
            "output_dir": output_dir or f"/tmp/cb_output/{session_id}",
            "task_index": task_index,
            "child_containers": [{
                "name": desktop_ip,
                "id": desktop_vm["name"],
                "api_port": 8000,
                "novnc_port": 6901,
                "os_type": os_type,
            }],
            "solver_ip": solver_ip,
            "desktop_ip": desktop_ip,
        }

        self.running_instances[session_id] = session_data
        return session_data

    async def get_session_status(self, session_id: str) -> Dict[str, Any]:
        """Get VM status by checking if solver process is still running."""
        session = self.running_instances.get(session_id)
        if not session:
            return {"status": "not_found", "session_id": session_id}

        solver_name = session["container_id"]

        try:
            # Check VM state
            vm_state = await self.client.get_instance_state(solver_name)

            if vm_state["status"] != "Running":
                return {
                    "session_id": session_id,
                    "status": "stopped",
                    "container_id": solver_name,
                }

            # Check if solver process is still running
            result = await self._exec_command(
                solver_name,
                "pgrep -f 'cua_bench.batch.solver' || echo 'not_running'"
            )

            if "not_running" in result:
                # Check exit status
                exit_check = await self._exec_command(
                    solver_name,
                    "cat /tmp/solver_exit_code 2>/dev/null || echo '0'"
                )
                exit_code = int(exit_check.strip())

                return {
                    "session_id": session_id,
                    "status": "completed" if exit_code == 0 else "failed",
                    "exit_code": exit_code,
                    "container_id": solver_name,
                }

            return {
                "session_id": session_id,
                "status": "running",
                "container_id": solver_name,
            }

        except Exception as e:
            return {
                "session_id": session_id,
                "status": "error",
                "error": str(e),
            }

    async def stop_session(self, session_id: str) -> None:
        """Stop and destroy both VMs."""
        session = self.running_instances.get(session_id)
        if not session:
            return

        # Stop solver VM
        solver_name = session["container_id"]
        await self.client.stop_instance(solver_name)
        await self.client.delete_instance(solver_name)

        # Stop desktop VM(s)
        for child in session.get("child_containers", []):
            desktop_name = child.get("id")
            if desktop_name:
                await self.client.stop_instance(desktop_name)
                await self.client.delete_instance(desktop_name)

        del self.running_instances[session_id]

    async def get_session_logs(self, session_id: str, tail: Optional[int] = None) -> str:
        """Get solver logs from the VM."""
        session = self.running_instances.get(session_id)
        if not session:
            return f"Session {session_id} not found"

        solver_name = session["container_id"]

        cmd = "cat /tmp/solver.log"
        if tail:
            cmd = f"tail -n {tail} /tmp/solver.log"

        return await self._exec_command(solver_name, cmd)

    # --- Helper methods ---

    def _get_desktop_image(self, os_type: str) -> str:
        """Get the appropriate desktop image for the OS type."""
        images = {
            "linux": "cua-desktop-linux",
            "windows": "cua-desktop-windows",
        }
        return images.get(os_type, "cua-desktop-linux")

    async def _load_task_config(self, env_path: Path, task_index: int) -> Dict:
        """Load task configuration to determine OS type."""
        from cua_bench import make
        env = make(str(env_path))
        tasks = env.tasks_config_fn()
        task = tasks[task_index]
        await env.close()

        if task.computer:
            return task.computer.get("setup_config", {})
        return {}

    async def _create_vm(self, name: str, image: str, **kwargs) -> Dict:
        """Create a VM using Incus."""
        # Implementation depends on your Incus client
        pass

    async def _get_vm_ip(self, name: str) -> str:
        """Get the IP address of a VM."""
        pass

    async def _wait_for_health(self, ip: str, port: int, timeout: int = 60):
        """Wait for computer-server to be healthy."""
        import aiohttp
        import asyncio

        start = asyncio.get_event_loop().time()
        while asyncio.get_event_loop().time() - start < timeout:
            try:
                async with aiohttp.ClientSession() as session:
                    async with session.get(f"http://{ip}:{port}/health") as resp:
                        if resp.status == 200:
                            return
            except:
                pass
            await asyncio.sleep(2)

        raise TimeoutError(f"Desktop VM {ip}:{port} not ready after {timeout}s")

    async def _upload_directory(self, ip: str, local_path: Path, remote_path: str):
        """Upload a directory to the VM via SCP or similar."""
        pass

    async def _exec_command(self, vm_name: str, command: str) -> str:
        """Execute a command on a VM."""
        pass

Incus Client Wrapper

import aiohttp
from typing import Dict, Any

class IncusClient:
    """Async client for Incus REST API."""

    def __init__(self, base_url: str, cert_path: str = None, key_path: str = None):
        self.base_url = base_url.rstrip("/")
        self.cert = (cert_path, key_path) if cert_path else None

    async def _request(self, method: str, path: str, **kwargs) -> Dict:
        connector = aiohttp.TCPConnector(ssl=False)  # Configure SSL properly
        async with aiohttp.ClientSession(connector=connector) as session:
            async with session.request(
                method,
                f"{self.base_url}{path}",
                **kwargs
            ) as resp:
                return await resp.json()

    async def create_instance(
        self,
        name: str,
        image: str,
        instance_type: str = "virtual-machine",
        config: Dict = None,
        profiles: list = None,
    ) -> Dict:
        """Create a new instance (container or VM)."""
        payload = {
            "name": name,
            "source": {
                "type": "image",
                "alias": image,
            },
            "type": instance_type,
            "config": config or {},
            "profiles": profiles or ["default"],
        }
        return await self._request("POST", "/1.0/instances", json=payload)

    async def start_instance(self, name: str) -> Dict:
        """Start an instance."""
        return await self._request(
            "PUT",
            f"/1.0/instances/{name}/state",
            json={"action": "start"}
        )

    async def stop_instance(self, name: str) -> Dict:
        """Stop an instance."""
        return await self._request(
            "PUT",
            f"/1.0/instances/{name}/state",
            json={"action": "stop"}
        )

    async def delete_instance(self, name: str) -> Dict:
        """Delete an instance."""
        return await self._request("DELETE", f"/1.0/instances/{name}")

    async def get_instance_state(self, name: str) -> Dict:
        """Get instance state."""
        result = await self._request("GET", f"/1.0/instances/{name}/state")
        return result.get("metadata", {})

    async def exec_command(self, name: str, command: list) -> Dict:
        """Execute a command in an instance."""
        payload = {
            "command": command,
            "wait-for-websocket": False,
            "interactive": False,
        }
        return await self._request(
            "POST",
            f"/1.0/instances/{name}/exec",
            json=payload
        )

Windows computer-server

Full Implementation

# computer_server_windows.py
"""
Computer-server implementation for Windows.
Provides the same HTTP API as the Linux version.
"""

from fastapi import FastAPI, Response
from pydantic import BaseModel
from typing import List, Optional
import ctypes
from ctypes import wintypes
from PIL import ImageGrab
import io
import time

app = FastAPI(title="computer-server-windows")

# Windows API setup
user32 = ctypes.windll.user32
kernel32 = ctypes.windll.kernel32

# Input structures for SendInput
class MOUSEINPUT(ctypes.Structure):
    _fields_ = [
        ("dx", wintypes.LONG),
        ("dy", wintypes.LONG),
        ("mouseData", wintypes.DWORD),
        ("dwFlags", wintypes.DWORD),
        ("time", wintypes.DWORD),
        ("dwExtraInfo", ctypes.POINTER(ctypes.c_ulong)),
    ]

class KEYBDINPUT(ctypes.Structure):
    _fields_ = [
        ("wVk", wintypes.WORD),
        ("wScan", wintypes.WORD),
        ("dwFlags", wintypes.DWORD),
        ("time", wintypes.DWORD),
        ("dwExtraInfo", ctypes.POINTER(ctypes.c_ulong)),
    ]

class INPUT(ctypes.Structure):
    class _INPUT(ctypes.Union):
        _fields_ = [("mi", MOUSEINPUT), ("ki", KEYBDINPUT)]
    _anonymous_ = ("_input",)
    _fields_ = [("type", wintypes.DWORD), ("_input", _INPUT)]

# Constants
INPUT_MOUSE = 0
INPUT_KEYBOARD = 1
MOUSEEVENTF_MOVE = 0x0001
MOUSEEVENTF_LEFTDOWN = 0x0002
MOUSEEVENTF_LEFTUP = 0x0004
MOUSEEVENTF_RIGHTDOWN = 0x0008
MOUSEEVENTF_RIGHTUP = 0x0010
MOUSEEVENTF_MIDDLEDOWN = 0x0020
MOUSEEVENTF_MIDDLEUP = 0x0040
MOUSEEVENTF_WHEEL = 0x0800
MOUSEEVENTF_ABSOLUTE = 0x8000
KEYEVENTF_KEYUP = 0x0002
KEYEVENTF_UNICODE = 0x0004

# Virtual key codes
VK_MAP = {
    "enter": 0x0D, "return": 0x0D,
    "tab": 0x09,
    "escape": 0x1B, "esc": 0x1B,
    "backspace": 0x08,
    "delete": 0x2E,
    "insert": 0x2D,
    "home": 0x24,
    "end": 0x23,
    "pageup": 0x21,
    "pagedown": 0x22,
    "up": 0x26, "arrowup": 0x26,
    "down": 0x28, "arrowdown": 0x28,
    "left": 0x25, "arrowleft": 0x25,
    "right": 0x27, "arrowright": 0x27,
    "shift": 0x10,
    "ctrl": 0x11, "control": 0x11,
    "alt": 0x12,
    "win": 0x5B, "windows": 0x5B, "super": 0x5B,
    "space": 0x20,
    "f1": 0x70, "f2": 0x71, "f3": 0x72, "f4": 0x73,
    "f5": 0x74, "f6": 0x75, "f7": 0x76, "f8": 0x77,
    "f9": 0x78, "f10": 0x79, "f11": 0x7A, "f12": 0x7B,
}

# Request models
class ClickRequest(BaseModel):
    x: int
    y: int
    button: str = "left"

class TypeRequest(BaseModel):
    text: str

class KeyRequest(BaseModel):
    key: str

class HotkeyRequest(BaseModel):
    keys: List[str]

class ScrollRequest(BaseModel):
    x: int
    y: int
    scroll_x: int = 0
    scroll_y: int = 0

class DragRequest(BaseModel):
    from_x: int
    from_y: int
    to_x: int
    to_y: int
    duration: float = 0.5

class MoveRequest(BaseModel):
    x: int
    y: int

# Helper functions
def _move_cursor(x: int, y: int):
    """Move cursor to absolute position."""
    user32.SetCursorPos(x, y)

def _send_mouse_event(flags: int, dx: int = 0, dy: int = 0, data: int = 0):
    """Send a mouse event."""
    inp = INPUT(type=INPUT_MOUSE)
    inp.mi.dx = dx
    inp.mi.dy = dy
    inp.mi.mouseData = data
    inp.mi.dwFlags = flags
    inp.mi.time = 0
    inp.mi.dwExtraInfo = None
    user32.SendInput(1, ctypes.byref(inp), ctypes.sizeof(INPUT))

def _send_key(vk: int, down: bool = True):
    """Send a key event."""
    inp = INPUT(type=INPUT_KEYBOARD)
    inp.ki.wVk = vk
    inp.ki.wScan = 0
    inp.ki.dwFlags = 0 if down else KEYEVENTF_KEYUP
    inp.ki.time = 0
    inp.ki.dwExtraInfo = None
    user32.SendInput(1, ctypes.byref(inp), ctypes.sizeof(INPUT))

def _get_vk(key: str) -> int:
    """Get virtual key code for a key name."""
    key_lower = key.lower()
    if key_lower in VK_MAP:
        return VK_MAP[key_lower]
    if len(key) == 1:
        return user32.VkKeyScanW(ord(key)) & 0xFF
    raise ValueError(f"Unknown key: {key}")

# Endpoints
@app.get("/health")
async def health():
    """Health check endpoint."""
    return {"status": "ok", "platform": "windows"}

@app.post("/screenshot")
async def screenshot():
    """Capture screenshot and return as PNG."""
    img = ImageGrab.grab()
    buffer = io.BytesIO()
    img.save(buffer, format="PNG")
    return Response(content=buffer.getvalue(), media_type="image/png")

@app.post("/click")
async def click(req: ClickRequest):
    """Click at position."""
    _move_cursor(req.x, req.y)
    time.sleep(0.01)

    if req.button == "left":
        _send_mouse_event(MOUSEEVENTF_LEFTDOWN)
        _send_mouse_event(MOUSEEVENTF_LEFTUP)
    elif req.button == "right":
        _send_mouse_event(MOUSEEVENTF_RIGHTDOWN)
        _send_mouse_event(MOUSEEVENTF_RIGHTUP)
    elif req.button == "middle":
        _send_mouse_event(MOUSEEVENTF_MIDDLEDOWN)
        _send_mouse_event(MOUSEEVENTF_MIDDLEUP)

    return {"status": "ok"}

@app.post("/double_click")
async def double_click(req: ClickRequest):
    """Double click at position."""
    _move_cursor(req.x, req.y)
    time.sleep(0.01)
    _send_mouse_event(MOUSEEVENTF_LEFTDOWN)
    _send_mouse_event(MOUSEEVENTF_LEFTUP)
    time.sleep(0.05)
    _send_mouse_event(MOUSEEVENTF_LEFTDOWN)
    _send_mouse_event(MOUSEEVENTF_LEFTUP)
    return {"status": "ok"}

@app.post("/right_click")
async def right_click(req: ClickRequest):
    """Right click at position."""
    _move_cursor(req.x, req.y)
    time.sleep(0.01)
    _send_mouse_event(MOUSEEVENTF_RIGHTDOWN)
    _send_mouse_event(MOUSEEVENTF_RIGHTUP)
    return {"status": "ok"}

@app.post("/type")
async def type_text(req: TypeRequest):
    """Type text using SendInput with Unicode."""
    for char in req.text:
        # Use Unicode input for broad character support
        inp_down = INPUT(type=INPUT_KEYBOARD)
        inp_down.ki.wVk = 0
        inp_down.ki.wScan = ord(char)
        inp_down.ki.dwFlags = KEYEVENTF_UNICODE
        inp_down.ki.time = 0
        inp_down.ki.dwExtraInfo = None

        inp_up = INPUT(type=INPUT_KEYBOARD)
        inp_up.ki.wVk = 0
        inp_up.ki.wScan = ord(char)
        inp_up.ki.dwFlags = KEYEVENTF_UNICODE | KEYEVENTF_KEYUP
        inp_up.ki.time = 0
        inp_up.ki.dwExtraInfo = None

        user32.SendInput(1, ctypes.byref(inp_down), ctypes.sizeof(INPUT))
        user32.SendInput(1, ctypes.byref(inp_up), ctypes.sizeof(INPUT))
        time.sleep(0.01)

    return {"status": "ok"}

@app.post("/key")
async def key(req: KeyRequest):
    """Press a single key."""
    vk = _get_vk(req.key)
    _send_key(vk, down=True)
    _send_key(vk, down=False)
    return {"status": "ok"}

@app.post("/hotkey")
async def hotkey(req: HotkeyRequest):
    """Press a key combination (e.g., Ctrl+C)."""
    vks = [_get_vk(k) for k in req.keys]

    # Press all keys down
    for vk in vks:
        _send_key(vk, down=True)
        time.sleep(0.01)

    # Release all keys in reverse order
    for vk in reversed(vks):
        _send_key(vk, down=False)
        time.sleep(0.01)

    return {"status": "ok"}

@app.post("/scroll")
async def scroll(req: ScrollRequest):
    """Scroll at position."""
    _move_cursor(req.x, req.y)
    time.sleep(0.01)

    if req.scroll_y != 0:
        # Positive = scroll up, negative = scroll down
        _send_mouse_event(MOUSEEVENTF_WHEEL, data=req.scroll_y * 120)

    return {"status": "ok"}

@app.post("/drag")
async def drag(req: DragRequest):
    """Drag from one position to another."""
    _move_cursor(req.from_x, req.from_y)
    time.sleep(0.01)

    _send_mouse_event(MOUSEEVENTF_LEFTDOWN)

    # Interpolate movement
    steps = max(10, int(req.duration * 60))
    for i in range(1, steps + 1):
        t = i / steps
        x = int(req.from_x + (req.to_x - req.from_x) * t)
        y = int(req.from_y + (req.to_y - req.from_y) * t)
        _move_cursor(x, y)
        time.sleep(req.duration / steps)

    _send_mouse_event(MOUSEEVENTF_LEFTUP)
    return {"status": "ok"}

@app.post("/move_cursor")
async def move_cursor(req: MoveRequest):
    """Move cursor to position."""
    _move_cursor(req.x, req.y)
    return {"status": "ok"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Running on Windows

REM Install dependencies
pip install fastapi uvicorn pillow

REM Run the server
python computer_server_windows.py

Windows Service Setup

# Create a Windows service using NSSM
nssm install ComputerServer "C:\Python312\python.exe" "C:\cua\computer_server_windows.py"
nssm set ComputerServer Start SERVICE_AUTO_START
nssm start ComputerServer

Implementation Checklist

Phase 1: Core Infrastructure

  • Incus Client

    • Instance creation (VMs)
    • Instance state management
    • Command execution
    • File transfer
  • Cloud Provider Class

    • Implement SessionProvider interface
    • VM provisioning logic
    • Environment variable injection
    • Status monitoring

Phase 2: VM Images

  • Solver VM Image (Linux)

    • Ubuntu 22.04 base
    • Python 3.12 + pip
    • cua-bench package
    • Startup script
  • Linux Desktop VM Image

    • XFCE desktop
    • Xvfb virtual display
    • VNC server
    • computer-server
  • Windows Desktop VM Image

    • Windows 10/11
    • Python + dependencies
    • computer-server-windows
    • VNC server

Phase 3: Networking

  • Internal network between VMs
  • IP assignment / DNS
  • Firewall rules (port 8000, 6901)
  • External access for debugging

Phase 4: File Transfer

  • Upload task env to solver VM
  • Download traces from solver VM
  • Efficient transfer (rsync/scp)

Phase 5: Testing

  • Linux task execution
  • Windows task execution
  • Multi-session parallel execution
  • Error handling and cleanup

Quick Reference

Environment Variables for Solver VM

Variable Example Description
BATCH_TASK_INDEX 0 Which task variant to run
BATCH_TASK_COUNT 1 Total variants
CUA_TASK_CONTAINERS [{"name": "10.0.0.5", ...}] Desktop VM info
ANTHROPIC_API_KEY sk-... API key for Claude

CUA_TASK_CONTAINERS Schema

[
  {
    "name": "10.0.0.5",      // IP or hostname of desktop VM
    "api_port": 8000,         // computer-server port
    "novnc_port": 6901,       // VNC port
    "os_type": "linux"        // or "windows"
  }
]

Solver Command

python3 -m cua_bench.batch.solver /app/env \
    --eval \
    --agent "cua-agent" \
    --model "claude-sonnet-4-20250514" \
    --max-steps 100

Generated for CUA-Bench cloud provider development with Incus/KVM

CUA-Bench Docker Provider Deep Dive

A detailed technical guide for understanding the Docker provider architecture, designed to inform the development of a cloud hosting service with Windows support.

Table of Contents

  1. Provider Interface Contract
  2. Docker Provider Implementation
  3. Container Lifecycle
  4. Two-Tier Container Architecture
  5. Networking & Port Allocation
  6. Session Management
  7. Environment Variable Protocol
  8. Cloud Provider Requirements
  9. Windows Support Considerations

Provider Interface Contract

The SessionProvider abstract base class defines the interface that any provider must implement:

# cua_bench/sessions/providers/base.py

class SessionProvider(ABC):

    @abstractmethod
    async def start_session(
        self,
        session_id: str,           # Unique identifier
        env_path: Path,            # Path to task environment directory
        container_script: str,     # Command to run inside container
        image_uri: Optional[str],  # Container image (default: cua-bench:latest)
        output_dir: Optional[str], # Where to save outputs
        **kwargs                   # Provider-specific args (task_index, etc.)
    ) -> Dict[str, Any]:
        """Start a new session. Returns session metadata dict."""

    @abstractmethod
    async def get_session_status(self, session_id: str) -> Dict[str, Any]:
        """Get session status: running, completed, failed, deleted, etc."""

    @abstractmethod
    async def stop_session(self, session_id: str) -> None:
        """Stop and cleanup a session and its child containers."""

    @abstractmethod
    async def get_session_logs(self, session_id: str, tail: Optional[int] = None) -> str:
        """Get container logs."""

Return Value Contract

start_session() must return a dict with at least:

{
    "session_id": str,        # The session ID passed in
    "container_id": str,      # Provider-specific container/instance ID
    "provider": str,          # Provider name ("docker", "cloud", etc.)
    "status": str,            # Initial status ("running")
    "env_path": str,          # Path to environment
    "output_dir": str,        # Output directory path
    "image": str,             # Image used
    "task_index": int,        # Which task variant
    "task_count": int,        # Total variants
    "child_containers": list, # Child container info (see below)
}

get_session_status() must return:

{
    "session_id": str,
    "container_id": str,
    "status": str,            # "running" | "completed" | "failed" | "deleted" | "stopped"
    "exit_code": Optional[int],
    "started_at": Optional[str],
    "finished_at": Optional[str],
}

Docker Provider Implementation

Class Structure

classDiagram
    class DockerProvider {
        -running_containers: Dict~str, str~
        -session_networks: Dict~str, str~

        +start_session(session_id, env_path, script, ...) Dict
        +get_session_status(session_id) Dict
        +stop_session(session_id) void
        +get_session_logs(session_id, tail) str

        -_ensure_shared_network() str
        -_create_computer_container(config, idx, network) Dict
        -_cleanup_stale_child_containers() void
        -_stop_container(name) void
        -_find_available_port(start, max) int
        -_find_two_available_ports() tuple
    }

    class SessionProvider {
        <<abstract>>
        +start_session()*
        +get_session_status()*
        +stop_session()*
        +get_session_logs()*
    }

    SessionProvider <|-- DockerProvider
Loading

Key Implementation Details

# cua_bench/sessions/providers/docker.py

class DockerProvider(SessionProvider):
    def __init__(self):
        # Track active containers by session_id
        self.running_containers: Dict[str, str] = {}  # session_id -> container_id
        self.session_networks: Dict[str, str] = {}    # session_id -> network_name

Container Lifecycle

State Machine

stateDiagram-v2
    [*] --> Initializing: start_session()

    Initializing --> CreatingNetwork: _ensure_shared_network()
    CreatingNetwork --> CheckingTasks: Load environment

    CheckingTasks --> CreatingChildContainers: Task needs VM
    CheckingTasks --> StartingMainContainer: No VM needed

    CreatingChildContainers --> StartingMainContainer: Child ready

    StartingMainContainer --> Running: docker run -d

    Running --> Completed: Exit code 0
    Running --> Failed: Exit code != 0
    Running --> Stopped: stop_session() / SIGKILL

    Completed --> Deleted: Cleanup
    Failed --> Deleted: Cleanup
    Stopped --> Deleted: Cleanup

    Deleted --> [*]
Loading

Lifecycle Sequence

sequenceDiagram
    participant CLI as cb run
    participant DP as DockerProvider
    participant Docker as Docker Daemon
    participant MC as Main Container
    participant CC as Child Container
    participant Solver as batch/solver.py

    CLI->>DP: start_session(session_id, env_path, script)

    Note over DP: 1. Cleanup stale containers
    DP->>DP: _cleanup_stale_child_containers()

    Note over DP: 2. Ensure network exists
    DP->>Docker: docker network inspect cua-bench_default
    alt Network doesn't exist
        DP->>Docker: docker network create cua-bench_default
    end

    Note over DP: 3. Load environment to check task config
    DP->>DP: make(env_path).tasks_config_fn()

    alt Task has computer.provider == "computer"
        Note over DP: 4a. Create child VM container
        DP->>DP: _find_two_available_ports()
        DP->>Docker: docker run -d trycua/cua-xfce:latest
        Docker-->>CC: Start XFCE desktop
        DP->>DP: Store child_containers info
    end

    Note over DP: 5. Build main container command
    DP->>DP: Build docker run with volumes, env vars

    Note over DP: 6. Start main container
    DP->>Docker: docker run -d cua-bench:latest
    Docker-->>MC: Start solver container

    MC->>Solver: python -m cua_bench.batch.solver /app/env

    alt Task uses computer provider
        Solver->>CC: Connect via container_name:8000
        CC-->>Solver: API responses + screenshots
    end

    Solver->>MC: Write traces to /tmp/td_output
    MC-->>Docker: Exit

    Note over CLI: Polling loop
    loop Until terminal state
        CLI->>DP: get_session_status(session_id)
        DP->>Docker: docker inspect container_id
        Docker-->>DP: Container state
        DP-->>CLI: {status: "running"|"completed"|"failed"}
    end
Loading

Two-Tier Container Architecture

CUA-Bench uses a two-tier container architecture when tasks require VM-based desktops:

graph TB
    subgraph "Host Machine"
        CLI[cb CLI]
        DP[Docker Provider]
    end

    subgraph "Docker Network: cua-bench_default"
        subgraph "Main Container (cua-bench:latest)"
            Solver[batch/solver.py]
            Env[EnvironmentV2]
            VMSession[VMDesktopSession]
        end

        subgraph "Child Container (cua-xfce:latest)"
            XFCE[XFCE Desktop]
            VNC[VNC Server :6901]
            API[computer-server :8000]
        end
    end

    CLI --> DP
    DP -->|creates| Solver
    DP -->|creates| XFCE

    VMSession -->|HTTP API| API
    VMSession -->|screenshots, actions| API

    VNC -.->|debug access| CLI
Loading

Container Roles

Container Image Purpose
Main Container cua-bench:latest Runs solver, agent, environment logic
Child Container trycua/cua-xfce:latest Provides actual desktop environment

Child Container Configuration

# Created in _create_computer_container()

docker_cmd = [
    "docker", "run", "-d",
    "--name", container_name,           # e.g., "cb-task-0-a1b2c"
    "--network", network_name,          # "cua-bench_default"
    "-p", f"{novnc_port}:6901",        # VNC access from host
    "-p", f"{api_port}:8000",          # API access from host
    "-e", "VNC_PW=password",
    "-e", "VNCOPTIONS=-disableBasicAuth",
    "-e", f"VNC_RESOLUTION={width}x{height}",
    "trycua/cua-xfce:latest"
]

Child Container Info Structure

child_container_info = {
    "name": "cb-task-0-a1b2c",   # Container name (used for DNS)
    "api_port": 8000,            # Host-mapped API port
    "novnc_port": 6901,          # Host-mapped VNC port
    "os_type": "linux",          # OS type
}

Networking & Port Allocation

Shared Network Architecture

graph LR
    subgraph "Docker Network: cua-bench_default"
        direction TB
        MC1[cb-session-abc<br/>Main Container 1]
        MC2[cb-session-def<br/>Main Container 2]
        CC1[cb-task-0-xyz<br/>Child Container 1]
        CC2[cb-task-1-uvw<br/>Child Container 2]
    end

    MC1 -->|cb-task-0-xyz:8000| CC1
    MC2 -->|cb-task-1-uvw:8000| CC2

    HOST[Host Machine]
    HOST -->|localhost:random_port| CC1
    HOST -->|localhost:random_port| CC2
Loading

Port Allocation Algorithm

def _find_available_port(self, start_port: int = 8000, max_port: int = 65535) -> int:
    """Find an available port starting from a random port in the range."""
    # Start from random port to reduce conflicts in parallel execution
    port = random.randint(start_port, min(max_port - 1000, 60000))

    while port <= max_port:
        try:
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
                sock.bind(('localhost', port))
                return port
        except OSError:
            port += 1

    raise RuntimeError(f"No available ports found in range {start_port}-{max_port}")

def _find_two_available_ports(self) -> tuple[int, int]:
    """Find two available ports for API and noVNC."""
    api_port = self._find_available_port(8000, 65535)
    novnc_port = api_port + 1
    # ... continue searching for second port
    return api_port, novnc_port

DNS Resolution

Within the Docker network, containers communicate using container names as hostnames:

# In VMDesktopSession._init()
comp = Computer(
    os_type=container_info["os_type"],
    use_host_computer_server=True,
    api_host=container_name,    # e.g., "cb-task-0-xyz" - Docker DNS resolves this
    api_port=8000,              # Internal container port (not host-mapped)
    noVNC_port=6901
)

Session Management

Persistent Session Storage

Sessions are tracked in ~/.cua/cbsessions.json:

# cua_bench/sessions/manager.py

SESSION_FILE = Path.home() / ".cua" / "cbsessions.json"

def add_session(session_data: Dict[str, Any]) -> None:
    """Add session with timestamp."""
    sessions = _load_sessions()
    if "created_at" not in session_data:
        session_data["created_at"] = time.time()
    sessions[session_id] = session_data
    _save_sessions(sessions)

Session Data Schema

{
  "session_id": "task-a1b2c3d4",
  "container_id": "sha256:abc123...",
  "provider": "docker",
  "status": "running",
  "env_path": "/path/to/tasks/my_task",
  "output_dir": "/tmp/cb_run_xyz/task-a1b2c3d4",
  "image": "cua-bench:latest",
  "task_index": 0,
  "task_count": 1,
  "child_containers": [
    {
      "name": "cb-task-0-xyz",
      "api_port": 8234,
      "novnc_port": 8235,
      "os_type": "linux"
    }
  ],
  "run_id": "run-12345678",
  "agent": "cua-agent",
  "model": "claude-sonnet-4-20250514",
  "created_at": 1704110400.123
}

Environment Variable Protocol

Variables Passed to Main Container

# In DockerProvider.start_session()

docker_cmd = [
    "docker", "run", "-d",
    # ... other flags ...

    # Task identification
    "-e", f"BATCH_TASK_INDEX={task_index}",    # Which task variant to run
    "-e", f"BATCH_TASK_COUNT={task_count}",    # Total variants (usually 1)

    # API keys (passed through from host)
    "-e", f"ANTHROPIC_API_KEY={value}",
    "-e", f"OPENAI_API_KEY={value}",
    "-e", f"GOOGLE_API_KEY={value}",
    "-e", f"GOOGLE_CLOUD_PROJECT={value}",
    "-e", f"GOOGLE_APPLICATION_CREDENTIALS={value}",
    "-e", f"AZURE_OPENAI_API_KEY={value}",
    "-e", f"AZURE_OPENAI_ENDPOINT={value}",

    # Child container info (JSON)
    "-e", f"CUA_TASK_CONTAINERS={containers_json}",
]

CUA_TASK_CONTAINERS Format

[
  {
    "name": "cb-task-0-xyz",
    "api_port": 8000,
    "novnc_port": 6901,
    "os_type": "linux"
  }
]

How Solver Reads Container Info

# In cua_bench/computers/computer.py - VMDesktopSession._init()

task_containers_json = os.environ.get("CUA_TASK_CONTAINERS")
task_index = int(os.environ.get("BATCH_TASK_INDEX", 0))

if task_containers_json:
    # Use pre-created container
    task_containers = json.loads(task_containers_json)
    container_info = task_containers[task_index]

    comp = Computer(
        os_type=container_info["os_type"],
        use_host_computer_server=True,
        api_host=container_info["name"],  # Docker DNS name
        api_port=8000,                     # Internal port
        noVNC_port=6901
    )
else:
    # Create container dynamically (interactive mode)
    comp = Computer(
        os_type=config.get("os_type", "linux"),
        provider_type="docker",
        display=f"{width}x{height}",
        image="trycua/cua-xfce:latest",
        ephemeral=True,
        name=container_name,
        api_port=api_port,
        noVNC_port=novnc_port
    )

Cloud Provider Requirements

To implement a cloud provider, you need to replicate the Docker provider's behavior with cloud VMs:

Core Requirements

flowchart TD
    subgraph "Cloud Provider Must Implement"
        A[VM Provisioning]
        B[Container/Process Execution]
        C[File Transfer]
        D[Networking]
        E[Status Monitoring]
        F[Cleanup]
    end

    A --> A1[Create VM from image]
    A --> A2[Configure resources CPU/RAM]
    A --> A3[Set display resolution]

    B --> B1[Run solver script]
    B --> B2[Pass environment variables]
    B --> B3[Capture stdout/stderr]

    C --> C1[Upload env_path to VM]
    C --> C2[Download traces from VM]

    D --> D1[Internal networking between VMs]
    D --> D2[VNC/remote access]
    D --> D3[API endpoint exposure]

    E --> E1[Poll VM status]
    E --> E2[Detect completion/failure]

    F --> F1[Terminate VMs]
    F --> F2[Clean up resources]
Loading

Cloud Provider Interface Mapping

Docker Concept Cloud Equivalent
Container VM Instance
Docker Image VM Image / Snapshot
Docker Network VPC / Subnet
Volume Mount File Upload/Download
Container Name DNS Private IP / DNS
Port Mapping Security Group / Firewall Rules
docker run Create + Start VM
docker inspect Get VM Status
docker logs Serial Console / Cloud Logging
docker stop Stop + Terminate VM

Proposed Cloud Provider Structure

class CloudProvider(SessionProvider):
    def __init__(self, api_key: str, region: str):
        self.client = CloudClient(api_key, region)
        self.running_instances: Dict[str, str] = {}

    async def start_session(
        self,
        session_id: str,
        env_path: Path,
        container_script: str,
        **kwargs
    ) -> Dict[str, Any]:
        # 1. Create VPC/network if needed
        network_id = await self._ensure_network()

        # 2. Check if task needs a desktop VM
        child_instances = []
        if task_needs_desktop:
            child = await self._create_desktop_vm(
                os_type=task_config["os_type"],
                resolution=f"{width}x{height}",
                network_id=network_id
            )
            child_instances.append(child)

        # 3. Create main solver VM
        main_vm = await self._create_solver_vm(
            network_id=network_id,
            env_path=env_path,
            script=container_script,
            child_instances=child_instances,
            env_vars={
                "BATCH_TASK_INDEX": task_index,
                "CUA_TASK_CONTAINERS": json.dumps(child_instances),
                "ANTHROPIC_API_KEY": os.environ.get("ANTHROPIC_API_KEY"),
                # ... other env vars
            }
        )

        return {
            "session_id": session_id,
            "container_id": main_vm["id"],
            "provider": "cloud",
            "status": "running",
            "child_containers": child_instances,
            # ...
        }

File Transfer Strategy

sequenceDiagram
    participant CLI as cb run (Host)
    participant CP as Cloud Provider
    participant Storage as Cloud Storage
    participant MainVM as Main VM
    participant DesktopVM as Desktop VM

    Note over CLI,CP: 1. Upload task environment
    CLI->>Storage: Upload env_path as tarball
    Storage-->>CLI: storage_url

    Note over CP,MainVM: 2. Start VMs with download script
    CP->>MainVM: Create VM with startup script
    MainVM->>Storage: Download + extract env_path
    MainVM->>MainVM: Run solver

    Note over MainVM,Storage: 3. Upload results
    MainVM->>Storage: Upload /tmp/td_output

    Note over CLI,Storage: 4. Download results
    CLI->>Storage: Download traces
Loading

Windows Support Considerations

Current Linux-Only Architecture

The current system uses trycua/cua-xfce:latest which is Linux-only. For Windows support:

Windows Desktop VM Requirements

flowchart TD
    subgraph "Windows VM Requirements"
        A[Windows Image]
        B[Remote Access]
        C[API Server]
        D[Input Simulation]
    end

    A --> A1[Windows 10/11 base image]
    A --> A2[Pre-installed dependencies]
    A --> A3[Headless configuration]

    B --> B1[RDP Protocol]
    B --> B2[VNC Alternative]
    B --> B3[Screenshot API]

    C --> C1[computer-server equivalent]
    C --> C2[HTTP API on port 8000]
    C --> C3[Same interface as Linux]

    D --> D1[Win32 API for clicks]
    D --> D2[SendInput for keyboard]
    D --> D3[Screen capture via GDI+]
Loading

OS-Specific Container/VM Images

OS Type Current Image Proposed Windows Image
Linux trycua/cua-xfce:latest Same
Windows N/A trycua/cua-windows:latest or Cloud Windows VM

Windows-Specific Challenges

flowchart LR
    subgraph "Challenges"
        A[Licensing]
        B[Image Size]
        C[Boot Time]
        D[API Compatibility]
    end

    A --> A1[Windows requires licenses]
    A --> A2[Cloud providers handle this]

    B --> B1[Windows images are 10-20GB+]
    B --> B2[Pre-cached images needed]

    C --> C1[Windows boots slower than Linux]
    C --> C2[Warm pool of VMs recommended]

    D --> D1[computer-server needs Windows port]
    D --> D2[Same HTTP API contract]
Loading

Proposed Windows Architecture

graph TB
    subgraph "Cloud Provider with Windows Support"
        CP[Cloud Provider]

        subgraph "VM Pool"
            LVM1[Linux VM Pool]
            WVM1[Windows VM Pool]
        end

        subgraph "Images"
            LI[Linux: cua-xfce]
            WI[Windows: cua-windows]
        end
    end

    CP -->|os_type=linux| LVM1
    CP -->|os_type=windows| WVM1

    LVM1 -.-> LI
    WVM1 -.-> WI
Loading

Task Configuration for Windows

# Example task supporting Windows
cb.Task(
    description="Click Start menu and open Notepad",
    computer={
        "provider": "computer",  # or "cloud"
        "setup_config": {
            "os_type": "windows",  # NEW: Windows support
            "width": 1920,
            "height": 1080,
            "background": None,  # Windows handles wallpaper differently
        }
    }
)

computer-server Windows Implementation

The computer-server API must be ported to Windows with the same interface:

# Same HTTP API endpoints:
# POST /click      - Simulate mouse click
# POST /type       - Simulate keyboard input
# POST /key        - Simulate key press
# POST /hotkey     - Simulate key combination
# GET  /screenshot - Capture screen as PNG

# Windows-specific implementation:
import ctypes
from ctypes import wintypes

user32 = ctypes.windll.user32

def click(x: int, y: int):
    user32.SetCursorPos(x, y)
    user32.mouse_event(0x0002, 0, 0, 0, 0)  # MOUSEEVENTF_LEFTDOWN
    user32.mouse_event(0x0004, 0, 0, 0, 0)  # MOUSEEVENTF_LEFTUP

def screenshot() -> bytes:
    from PIL import ImageGrab
    img = ImageGrab.grab()
    # Convert to PNG bytes
    ...

Implementation Checklist for Cloud Provider

Phase 1: Basic Cloud Provider

  • Implement SessionProvider interface
  • VM provisioning (create/start/stop/terminate)
  • File upload (env_path → VM)
  • File download (traces → local)
  • Environment variable injection
  • Status polling
  • Log retrieval

Phase 2: Networking

  • VPC/network creation
  • Internal DNS or IP assignment
  • Security group configuration
  • VNC/RDP access for debugging

Phase 3: Windows Support

  • Windows VM image creation
  • computer-server Windows port
  • OS-type detection and routing
  • Windows-specific input simulation
  • Windows screenshot capture

Phase 4: Optimization

  • VM pool / warm instances
  • Parallel VM provisioning
  • Caching of common images
  • Cost optimization (spot instances, etc.)

Quick Reference: Key Files

File Purpose
cua_bench/sessions/providers/base.py Provider interface definition
cua_bench/sessions/providers/docker.py Docker provider implementation
cua_bench/sessions/manager.py Session storage and lookup
cua_bench/cli/commands/run.py CLI orchestration
cua_bench/batch/solver.py Runs inside container
cua_bench/computers/computer.py VMDesktopSession - connects to VMs
cua_bench/computers/webtop.py WebDesktopSession - Playwright-based
cua_bench/Environment2.py Core environment lifecycle

Generated for CUA-Bench cloud provider development

CUA-Bench Parallelism and Resource Scaling

Understanding how benchmark runs scale and how to optimize resource allocation for cloud deployments.

Table of Contents

  1. Execution Model
  2. One Session = Two VMs
  3. Parallelism Control
  4. Running Large Benchmarks
  5. Resource Calculations
  6. VM Pooling Strategy
  7. Cloud Provider Scaling

Execution Model

Key Insight: Each task execution gets its own isolated pair of VMs.

graph TB
    subgraph "One Benchmark Run"
        CMD["cb run --dataset-path ./tasks --max-parallel 16"]
    end

    subgraph "Creates N Sessions"
        S1[Session 1]
        S2[Session 2]
        S3[Session 3]
        SN[Session N]
    end

    subgraph "Each Session = 2 VMs"
        direction LR
        SOLVER[Solver VM<br/>Runs agent/oracle]
        DESKTOP[Desktop VM<br/>Linux or Windows]
        SOLVER -->|HTTP API| DESKTOP
    end

    CMD --> S1 & S2 & S3 & SN
    S1 --> SOLVER
Loading

Why This Model?

  • Isolation: Each task run is independent, no cross-contamination
  • Reproducibility: Fresh desktop state for every execution
  • Parallelism: Can scale horizontally by running more sessions
  • Flexibility: Different tasks can use different OS types

One Session = Two VMs

Every session creates exactly two VMs:

flowchart LR
    subgraph "Session: task-abc123"
        subgraph "Solver VM"
            Python[Python 3.12]
            CUABench[cua-bench]
            Agent[AI Agent]
            Traces[/tmp/td_output/]
        end

        subgraph "Desktop VM"
            OS[Linux XFCE<br/>or Windows]
            API[computer-server :8000]
            VNC[VNC :6901]
        end

        Agent -->|screenshot, click, type| API
    end
Loading
VM Purpose Lifecycle Resources
Solver VM Runs Python, agent, records traces Created → Runs → Destroyed 2 CPU, 4GB RAM
Desktop VM Provides GUI environment Created → Runs → Destroyed 2-4 CPU, 4-8GB RAM

Parallelism Control

The Semaphore Model

Parallelism is controlled by --max-parallel (default: 16):

# From cua_bench/cli/commands/run.py

# 1. Collect ALL session tasks upfront
session_tasks = []
for env_path in env_paths:           # e.g., 10 tasks
    for variant_idx in variants:      # e.g., 100 variants each
        session_tasks.append({
            'env_path': env_path,
            'variant_idx': variant_idx,
            'session_id': f"task-{uuid.uuid4().hex[:8]}",
        })
# Result: 1000 session_tasks

# 2. Semaphore limits concurrent sessions
semaphore = asyncio.Semaphore(max_parallel)  # e.g., 16

async def start_with_semaphore(task_info):
    async with semaphore:  # Only max_parallel can run at once
        session_id = await start_single_session(task_info)
        await wait_for_session_completion(session_id)  # Block until done

# 3. Start all tasks (semaphore limits actual concurrency)
tasks = [start_with_semaphore(t) for t in session_tasks]
await asyncio.gather(*tasks)

Execution Timeline

gantt
    title 1000 Sessions with max_parallel=16
    dateFormat X
    axisFormat %s

    section Slot 1
    Session 1     :0, 300
    Session 17    :300, 600
    Session 33    :600, 900

    section Slot 2
    Session 2     :0, 280
    Session 18    :280, 560
    Session 34    :560, 840

    section Slot 16
    Session 16    :0, 320
    Session 32    :320, 640
    Session 48    :640, 960
Loading

CLI Examples

# Run 10 tasks, 1 variant each, 16 parallel (default)
cb run --dataset-path ./my_tasks

# Run with higher parallelism
cb run --dataset-path ./my_tasks --max-parallel 64

# Run specific number of variants per task
cb run --dataset-path ./my_tasks --max-variants 100

# Run single task, all variants
cb run ./tasks/click_task --max-parallel 32

Running Large Benchmarks

Scenario: 10 Tasks × 100 Runs Each

There are two ways to achieve this:

Option 1: Define Variants in Task (Recommended)

Each task's @cb.tasks_config returns 100 variants:

# tasks/my_task/main.py
import cua_bench as cb

@cb.tasks_config(split="train")
def load():
    tasks = []
    for run_id in range(100):  # 100 variants
        tasks.append(cb.Task(
            description="Complete the form and submit",
            task_id=f"run_{run_id}",
            metadata={
                "seed": run_id,           # Different seed per run
                "run_number": run_id,
            },
            computer={
                "provider": "computer",
                "setup_config": {
                    "os_type": "linux",
                    "width": 1024,
                    "height": 768,
                }
            }
        ))
    return tasks
# This creates 10 tasks × 100 variants = 1000 sessions
cb run --dataset-path ./my_10_tasks --max-parallel 64

Option 2: External Loop

Run the command multiple times:

#!/bin/bash
for run in {1..100}; do
    echo "Starting run $run of 100"
    RANDOM_SEED=$run cb run --dataset-path ./my_10_tasks --max-parallel 64
done

Option 3: Hybrid Approach

Define base tasks, multiply externally:

# tasks/my_task/main.py
import os
import cua_bench as cb

@cb.tasks_config(split="train")
def load():
    # Get run offset from environment
    run_offset = int(os.environ.get("RUN_OFFSET", 0))
    runs_per_batch = int(os.environ.get("RUNS_PER_BATCH", 10))

    tasks = []
    for i in range(runs_per_batch):
        run_id = run_offset + i
        tasks.append(cb.Task(
            description="Complete the form",
            task_id=f"run_{run_id}",
            metadata={"seed": run_id},
            computer={...}
        ))
    return tasks
# Run in batches
for offset in 0 10 20 30 40 50 60 70 80 90; do
    RUN_OFFSET=$offset RUNS_PER_BATCH=10 cb run ./tasks/my_task --max-parallel 64
done

Resource Calculations

Formula

Peak VMs = max_parallel × 2
Total VM-Hours = (num_sessions × avg_duration_hours × 2)

Reference Table

Tasks Variants Total Sessions Max Parallel Peak VMs Est. Duration*
1 1 1 1 2 5 min
10 1 10 10 20 5 min
10 10 100 16 32 30 min
10 100 1,000 16 32 5 hours
10 100 1,000 64 128 1.5 hours
10 100 1,000 256 512 20 min
100 100 10,000 256 512 3.5 hours

*Assuming ~5 min average per task execution

Cost Estimation

def estimate_cost(
    num_tasks: int,
    variants_per_task: int,
    max_parallel: int,
    avg_task_minutes: float = 5,
    solver_vm_cost_per_hour: float = 0.05,   # e.g., small Linux VM
    desktop_vm_cost_per_hour: float = 0.10,  # e.g., medium VM with GUI
):
    total_sessions = num_tasks * variants_per_task
    total_vm_hours = (total_sessions * avg_task_minutes / 60) * 2  # 2 VMs per session

    solver_hours = total_sessions * avg_task_minutes / 60
    desktop_hours = total_sessions * avg_task_minutes / 60

    cost = (solver_hours * solver_vm_cost_per_hour +
            desktop_hours * desktop_vm_cost_per_hour)

    wall_clock_hours = (total_sessions * avg_task_minutes) / (max_parallel * 60)

    return {
        "total_sessions": total_sessions,
        "total_vm_hours": total_vm_hours,
        "estimated_cost": cost,
        "wall_clock_hours": wall_clock_hours,
    }

# Example: 10 tasks × 100 variants
result = estimate_cost(10, 100, max_parallel=64)
# {
#   "total_sessions": 1000,
#   "total_vm_hours": 166.7,
#   "estimated_cost": $12.50,
#   "wall_clock_hours": 1.3
# }

VM Pooling Strategy

The Problem with Create/Destroy

Without pooling, each session:

sequenceDiagram
    participant P as Provider
    participant D as Desktop VM

    Note over P,D: Session Start
    P->>D: Create VM (30-120s)
    P->>D: Wait for boot
    P->>D: Wait for computer-server

    Note over P,D: Task Execution (~5 min)
    P->>D: Run task...

    Note over P,D: Session End
    P->>D: Destroy VM
Loading

Overhead per session: 1-3 minutes just for VM lifecycle

Solution: VM Pooling

flowchart TB
    subgraph "VM Pool"
        L1[Linux VM 1<br/>idle]
        L2[Linux VM 2<br/>in use]
        L3[Linux VM 3<br/>idle]
        W1[Windows VM 1<br/>idle]
        W2[Windows VM 2<br/>in use]
    end

    subgraph "Sessions"
        S1[Session A] -->|acquire| L2
        S2[Session B] -->|acquire| W2
        S3[Session C] -->|waiting| L1
    end

    L2 -->|release + reset| L1
    W2 -->|release + reset| W1
Loading

Pool Implementation

import asyncio
from typing import Dict, List, Optional
from dataclasses import dataclass

@dataclass
class PooledVM:
    id: str
    ip: str
    os_type: str
    in_use: bool = False

class VMPool:
    """Pool of pre-warmed VMs for fast session startup."""

    def __init__(
        self,
        incus_client,
        os_type: str,
        image: str,
        size: int = 16,
        min_ready: int = 4,
    ):
        self.client = incus_client
        self.os_type = os_type
        self.image = image
        self.target_size = size
        self.min_ready = min_ready

        self.vms: Dict[str, PooledVM] = {}
        self.available: asyncio.Queue = asyncio.Queue()
        self.lock = asyncio.Lock()

    async def initialize(self):
        """Pre-warm the pool with VMs."""
        tasks = []
        for i in range(self.target_size):
            tasks.append(self._create_vm(f"pool-{self.os_type}-{i}"))

        vms = await asyncio.gather(*tasks)
        for vm in vms:
            self.vms[vm.id] = vm
            await self.available.put(vm.id)

        print(f"Pool initialized: {len(self.vms)} {self.os_type} VMs ready")

    async def _create_vm(self, name: str) -> PooledVM:
        """Create a single VM for the pool."""
        vm_data = await self.client.create_instance(
            name=name,
            image=self.image,
            instance_type="virtual-machine",
        )
        await self.client.start_instance(name)

        # Wait for VM to be ready
        ip = await self._wait_for_ip(name)
        await self._wait_for_health(ip, 8000)

        return PooledVM(id=name, ip=ip, os_type=self.os_type)

    async def acquire(self, timeout: float = 300) -> PooledVM:
        """Acquire a VM from the pool."""
        try:
            vm_id = await asyncio.wait_for(
                self.available.get(),
                timeout=timeout
            )
        except asyncio.TimeoutError:
            # Pool exhausted, create a new VM
            vm = await self._create_vm(f"pool-{self.os_type}-overflow-{len(self.vms)}")
            self.vms[vm.id] = vm
            vm_id = vm.id

        vm = self.vms[vm_id]
        vm.in_use = True
        return vm

    async def release(self, vm: PooledVM):
        """Release a VM back to the pool after resetting it."""
        # Reset VM to clean state
        await self._reset_vm(vm)

        vm.in_use = False
        await self.available.put(vm.id)

    async def _reset_vm(self, vm: PooledVM):
        """Reset VM to clean state for next session."""
        if self.os_type == "windows":
            # Windows: restart explorer, clear temp files
            await self.client.exec_command(vm.id, [
                "powershell", "-Command",
                "Remove-Item -Path $env:TEMP\\* -Recurse -Force -ErrorAction SilentlyContinue"
            ])
        else:
            # Linux: kill user processes, clear tmp
            await self.client.exec_command(vm.id, [
                "bash", "-c",
                "pkill -u cuauser; rm -rf /tmp/cua_*"
            ])

        # Wait for computer-server to be healthy again
        await self._wait_for_health(vm.ip, 8000)

    async def _wait_for_ip(self, name: str) -> str:
        """Wait for VM to get an IP address."""
        for _ in range(60):
            state = await self.client.get_instance_state(name)
            network = state.get("network", {})
            for iface, data in network.items():
                if iface == "lo":
                    continue
                for addr in data.get("addresses", []):
                    if addr.get("family") == "inet":
                        return addr.get("address")
            await asyncio.sleep(1)
        raise TimeoutError(f"VM {name} did not get IP")

    async def _wait_for_health(self, ip: str, port: int, timeout: int = 120):
        """Wait for computer-server to be healthy."""
        import aiohttp
        start = asyncio.get_event_loop().time()
        while asyncio.get_event_loop().time() - start < timeout:
            try:
                async with aiohttp.ClientSession() as session:
                    async with session.get(
                        f"http://{ip}:{port}/health",
                        timeout=aiohttp.ClientTimeout(total=5)
                    ) as resp:
                        if resp.status == 200:
                            return
            except:
                pass
            await asyncio.sleep(2)
        raise TimeoutError(f"computer-server at {ip}:{port} not healthy")

Using Pools in Cloud Provider

class IncusCloudProvider(SessionProvider):
    def __init__(self, incus_url: str):
        self.client = IncusClient(incus_url)

        # Create pools for each OS type
        self.pools = {
            "linux": VMPool(
                self.client,
                os_type="linux",
                image="cua-desktop-linux",
                size=32,  # Pre-warm 32 Linux VMs
            ),
            "windows": VMPool(
                self.client,
                os_type="windows",
                image="cua-desktop-windows",
                size=8,   # Pre-warm 8 Windows VMs (slower to boot)
            ),
        }

    async def initialize(self):
        """Initialize pools before accepting sessions."""
        await asyncio.gather(
            self.pools["linux"].initialize(),
            self.pools["windows"].initialize(),
        )

    async def start_session(self, session_id, env_path, ...):
        # Determine OS type
        os_type = task_config.get("os_type", "linux")

        # Acquire from pool (fast!) instead of creating
        desktop_vm = await self.pools[os_type].acquire()

        # Create solver VM (these are quick, no need to pool)
        solver_vm = await self._create_solver_vm(...)

        # ... rest of session setup ...

    async def stop_session(self, session_id):
        # Return desktop VM to pool (don't destroy!)
        await self.pools[os_type].release(desktop_vm)

        # Destroy solver VM (ephemeral)
        await self.client.delete_instance(solver_vm.id)

Pooling Benefits

Metric Without Pooling With Pooling
Session startup 60-180s 2-5s
VM creates/hr (1000 sessions) 2000 ~50
Windows boot overhead 2-3 min each Once per pool VM
Resource efficiency Low High

Cloud Provider Scaling

Architecture for High Parallelism

graph TB
    subgraph "Control Plane"
        API[Cloud API]
        ORCH[Orchestrator]
        DB[(Session DB)]
    end

    subgraph "VM Pools"
        LP[Linux Pool<br/>64 VMs]
        WP[Windows Pool<br/>16 VMs]
    end

    subgraph "Incus Cluster"
        H1[KVM Host 1]
        H2[KVM Host 2]
        H3[KVM Host 3]
        H4[KVM Host 4]
    end

    API --> ORCH
    ORCH --> DB
    ORCH --> LP & WP
    LP --> H1 & H2
    WP --> H3 & H4
Loading

Scaling Recommendations

Max Parallel Linux Pool Windows Pool KVM Hosts Total RAM
16 16 4 2 128 GB
64 64 16 4 512 GB
256 256 64 16 2 TB
1024 1024 256 64 8 TB

Auto-Scaling Strategy

class AutoScalingPool(VMPool):
    """VM Pool with auto-scaling based on demand."""

    def __init__(self, ..., min_size: int = 4, max_size: int = 64):
        super().__init__(...)
        self.min_size = min_size
        self.max_size = max_size
        self.scale_up_threshold = 0.8   # 80% utilization
        self.scale_down_threshold = 0.3  # 30% utilization

    async def monitor_and_scale(self):
        """Background task to adjust pool size."""
        while True:
            utilization = self._get_utilization()
            current_size = len(self.vms)

            if utilization > self.scale_up_threshold:
                # Scale up
                new_size = min(current_size * 2, self.max_size)
                await self._scale_to(new_size)

            elif utilization < self.scale_down_threshold:
                # Scale down
                new_size = max(current_size // 2, self.min_size)
                await self._scale_to(new_size)

            await asyncio.sleep(60)  # Check every minute

    def _get_utilization(self) -> float:
        in_use = sum(1 for vm in self.vms.values() if vm.in_use)
        return in_use / len(self.vms) if self.vms else 0

Quick Reference

CLI Parallelism Flags

cb run <task> [options]

Options:
  --max-parallel N    Max concurrent sessions (default: 16)
  --max-variants N    Max variants per task (default: all)

Resource Formula

Peak VMs           = max_parallel × 2
Total Sessions     = num_tasks × variants_per_task
Wall Clock Time    = (total_sessions × avg_minutes) / max_parallel
Total VM-Hours     = total_sessions × avg_minutes / 60 × 2

Pool Sizing Guidelines

  • Linux Desktop VMs: Pool size = max_parallel (fast reset)
  • Windows Desktop VMs: Pool size = max_parallel × 1.5 (slower reset)
  • Solver VMs: No pooling needed (ephemeral, quick to create)

Generated for CUA-Bench cloud provider scaling

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment