Skip to content

Instantly share code, notes, and snippets.

@betatim
Last active December 15, 2025 16:45
Show Gist options
  • Select an option

  • Save betatim/338c984020d4f061ade00caa81abcb03 to your computer and use it in GitHub Desktop.

Select an option

Save betatim/338c984020d4f061ade00caa81abcb03 to your computer and use it in GitHub Desktop.
Investigate ray with max_calls=1 for cuml.accel
#!/usr/bin/env python3
"""
Ray + RandomForestClassifier with max_calls=1
Demonstrates the impact of max_calls=1 on Ray task execution when using
scikit-learn's RandomForestClassifier.
"""
import time
import ray
from sklearn.datasets import make_classification
from sklearn.ensemble import RandomForestClassifier
@ray.remote(max_calls=1)
def fit_existing_random_forest(task_id, rf, X_train, y_train):
"""
Fit a RandomForestClassifier on provided data.
With max_calls=1, this task will be executed by a fresh worker that:
1. Imports sklearn (if not already in worker)
2. Deserializes X_train and y_train
3. Fits the model
4. Returns results and terminates
Args:
task_id: Identifier for this task
X_train: Training features
y_train: Training labels
Returns:
Dictionary with timing and accuracy information
"""
start_time = time.time()
# Create and fit the model
fit_start = time.time()
rf.fit(X_train, y_train)
fit_time = time.time() - fit_start
# Calculate training accuracy (quick check)
train_score = rf.score(X_train, y_train)
total_time = time.time() - start_time
return {
'task_id': task_id,
'total_time': total_time,
'fit_time': fit_time,
'train_score': train_score,
'n_samples': len(X_train),
'n_features': X_train.shape[1]
}
@ray.remote(max_calls=1)
def fit_random_forest(task_id, X_train, y_train, n_estimators=100):
"""
Fit a RandomForestClassifier on provided data.
With max_calls=1, this task will be executed by a fresh worker that:
1. Imports sklearn (if not already in worker)
2. Deserializes X_train and y_train
3. Fits the model
4. Returns results and terminates
Args:
task_id: Identifier for this task
X_train: Training features
y_train: Training labels
n_estimators: Number of trees in the forest
Returns:
Dictionary with timing and accuracy information
"""
start_time = time.time()
# Create and fit the model
create_start = time.time()
rf = RandomForestClassifier(n_estimators=n_estimators, random_state=task_id)
create_time = time.time() - create_start
fit_start = time.time()
rf.fit(X_train, y_train)
fit_time = time.time() - fit_start
# Calculate training accuracy (quick check)
train_score = rf.score(X_train, y_train)
total_time = time.time() - start_time
return {
'task_id': task_id,
'total_time': total_time,
'fit_time': fit_time,
'create_time': create_time,
'train_score': train_score,
'n_estimators': n_estimators,
'n_samples': len(X_train),
'n_features': X_train.shape[1]
}
@ray.remote(max_calls=10)
def fit_random_forest_max10(task_id, X_train, y_train, n_estimators=100):
"""
Fit a RandomForestClassifier on provided data.
With max_calls=1, this task will be executed by a fresh worker that:
1. Imports sklearn (if not already in worker)
2. Deserializes X_train and y_train
3. Fits the model
4. Returns results and terminates
Args:
task_id: Identifier for this task
X_train: Training features
y_train: Training labels
n_estimators: Number of trees in the forest
Returns:
Dictionary with timing and accuracy information
"""
start_time = time.time()
# Create and fit the model
create_start = time.time()
rf = RandomForestClassifier(n_estimators=n_estimators, random_state=task_id)
create_time = time.time() - create_start
fit_start = time.time()
rf.fit(X_train, y_train)
fit_time = time.time() - fit_start
# Calculate training accuracy (quick check)
train_score = rf.score(X_train, y_train)
total_time = time.time() - start_time
return {
'task_id': task_id,
'total_time': total_time,
'fit_time': fit_time,
'create_time': create_time,
'train_score': train_score,
'n_estimators': n_estimators,
'n_samples': len(X_train),
'n_features': X_train.shape[1]
}
def main():
"""Main execution function."""
print("=" * 80)
print("RAY + RANDOMFORESTCLASSIFIER")
print("=" * 80)
# Configuration
num_cpus = 4
n_estimators = 100 # Default for RandomForestClassifier
num_sequential_tasks = 8
num_parallel_tasks = 8
print("\nConfiguration:")
print(f" - Ray CPUs: {num_cpus}")
print(f" - RF n_estimators: {n_estimators}")
print(f" - Dataset: make_classification() with defaults")
print(f" - max_calls=1: Each task gets a FRESH worker")
# Initialize Ray
print("\nInitializing Ray...")
ray.init(ignore_reinit_error=True, num_cpus=num_cpus)
print("✓ Ray initialized")
# Generate dataset (using defaults from make_classification)
print("\nGenerating dataset...")
X, y = make_classification(
n_samples=300, # Default
n_features=20, # Default
n_informative=2, # Default
n_redundant=2, # Default
n_classes=2, # Default
random_state=42
)
print(f"✓ Dataset created: {X.shape[0]} samples, {X.shape[1]} features")
# =========================================================================
# TEST 1: Sequential Execution
# =========================================================================
print("\n" + "=" * 80)
print(f"TEST 1: SEQUENTIAL EXECUTION ({num_sequential_tasks} tasks)")
print("=" * 80)
print("\nEach task will:")
print(" 1. Get a fresh worker (max_calls=1)")
print(" 2. Import sklearn (fresh import per worker)")
print(" 3. Deserialize data from Ray object store")
print(" 4. Fit RandomForestClassifier")
print(" 5. Worker terminates after returning result")
print("\nExpected: Each task takes similar time (import + deserialization + fit)\n")
sequential_results = []
sequential_start = time.time()
for i in range(num_sequential_tasks):
task_start = time.time()
# Submit task and wait for result
result = ray.get(fit_random_forest.remote(
task_id=i + 1,
X_train=X,
y_train=y,
n_estimators=n_estimators
))
wall_time = time.time() - task_start
sequential_results.append({
'result': result,
'wall_time': wall_time
})
print(f"Task {result['task_id']}: "
f"Wall={wall_time:.3f}s, "
f"Fit={result['fit_time']:.3f}s, "
f"Total={result['total_time']:.3f}s, "
f"Accuracy={result['train_score']:.3f}")
sequential_total = time.time() - sequential_start
# Calculate statistics
avg_wall_ = avg_wall = sum(r['wall_time'] for r in sequential_results) / len(sequential_results)
avg_fit = sum(r['result']['fit_time'] for r in sequential_results) / len(sequential_results)
avg_overhead = avg_wall - avg_fit
print("\n" + "-" * 80)
print("Sequential Results:")
print(f" Total time: {sequential_total:.3f}s")
print(f" Average wall time per task: {avg_wall:.3f}s")
print(f" Average fit time per task: {avg_fit:.3f}s")
print(f" Average overhead per task: {avg_overhead:.3f}s")
print(f" (overhead = worker creation + sklearn import + deserialization)")
# =========================================================================
# TEST 2: Sequential Execution pre-created RF
# =========================================================================
print("\n" + "=" * 80)
print(f"TEST 2: SEQUENTIAL EXECUTION pre-created RF ({num_sequential_tasks} tasks)")
print("=" * 80)
print("\nEach task will:")
print(" 1. Get a fresh worker (max_calls=1)")
print(" 2. Import sklearn (fresh import per worker)")
print(" 3. Deserialize data and estimator from Ray object store")
print(" 4. Fit RandomForestClassifier")
print(" 5. Worker terminates after returning result")
print("\nExpected: Each task takes similar time (import + deserialization + fit)\n")
sequential_results = []
sequential_start = time.time()
rf = RandomForestClassifier(n_estimators=n_estimators, random_state=42)
for i in range(num_sequential_tasks):
task_start = time.time()
# Submit task and wait for result
result = ray.get(fit_existing_random_forest.remote(
task_id=i + 1,
rf=rf,
X_train=X,
y_train=y,
))
wall_time = time.time() - task_start
sequential_results.append({
'result': result,
'wall_time': wall_time
})
print(f"Task {result['task_id']}: "
f"Wall={wall_time:.3f}s, "
f"Fit={result['fit_time']:.3f}s, "
f"Total={result['total_time']:.3f}s, "
f"Accuracy={result['train_score']:.3f}")
sequential_total = time.time() - sequential_start
# Calculate statistics
avg_wall = sum(r['wall_time'] for r in sequential_results) / len(sequential_results)
avg_fit = sum(r['result']['fit_time'] for r in sequential_results) / len(sequential_results)
avg_overhead = avg_wall - avg_fit
print("\n" + "-" * 80)
print("Sequential Results:")
print(f" Total time: {sequential_total:.3f}s")
print(f" Average wall time per task: {avg_wall:.3f}s")
print(f" Average fit time per task: {avg_fit:.3f}s")
print(f" Average overhead per task: {avg_overhead:.3f}s")
print(f" (overhead = worker creation + sklearn import + deserialization)")
# =========================================================================
# TEST 3: Sequential Execution (max_calls=10)
# =========================================================================
print("\n" + "=" * 80)
print(f"TEST 3: SEQUENTIAL EXECUTION max_calls=10 ({num_sequential_tasks} tasks)")
print("=" * 80)
print("\nEach task will:")
print(" 1. Get a worker (max_calls=10)")
print(" 2. Deserialize data from Ray object store")
print(" 3. Fit RandomForestClassifier")
print(" 4. Worker terminates after returning result")
print("\nExpected: Each task takes similar time (deserialization + fit)\n")
sequential_results = []
sequential_start = time.time()
for i in range(num_sequential_tasks):
task_start = time.time()
# Submit task and wait for result
result = ray.get(fit_random_forest_max10.remote(
task_id=i + 1,
X_train=X,
y_train=y,
n_estimators=n_estimators
))
wall_time = time.time() - task_start
sequential_results.append({
'result': result,
'wall_time': wall_time
})
print(f"Task {result['task_id']}: "
f"Wall={wall_time:.3f}s, "
f"Fit={result['fit_time']:.3f}s, "
f"Total={result['total_time']:.3f}s, "
f"Accuracy={result['train_score']:.3f}")
sequential_total = time.time() - sequential_start
# Calculate statistics
avg_wall = sum(r['wall_time'] for r in sequential_results) / len(sequential_results)
avg_fit = sum(r['result']['fit_time'] for r in sequential_results) / len(sequential_results)
avg_overhead = avg_wall - avg_fit
print("\n" + "-" * 80)
print("Sequential Results:")
print(f" Total time: {sequential_total:.3f}s")
print(f" Average wall time per task: {avg_wall:.3f}s")
print(f" Average fit time per task: {avg_fit:.3f}s")
print(f" Average overhead per task: {avg_overhead:.3f}s")
print(f" (overhead = worker creation + sklearn import + deserialization)")
# =========================================================================
# TEST 4: Parallel Execution
# =========================================================================
print("\n" + "=" * 80)
print(f"TEST 4: PARALLEL EXECUTION ({num_parallel_tasks} tasks)")
print("=" * 80)
print("\nSubmitting all tasks at once:")
print(f" - {num_parallel_tasks} workers created in parallel")
print(f" - Each worker imports sklearn simultaneously")
print(f" - Wall time ≈ time for single task (not sum of all tasks)")
print()
parallel_start = time.time()
# Submit all tasks at once
futures = [
fit_random_forest.remote(
task_id=i + 1,
X_train=X,
y_train=y,
n_estimators=n_estimators
)
for i in range(num_parallel_tasks)
]
# Wait for all to complete
parallel_results = ray.get(futures)
parallel_wall_time = time.time() - parallel_start
print(f"Parallel execution completed in {parallel_wall_time:.3f}s\n")
for result in parallel_results:
print(f"Task {result['task_id']}: "
f"Fit={result['fit_time']:.3f}s, "
f"Accuracy={result['train_score']:.3f}")
print("\n" + "-" * 80)
print("Parallel Results:")
print(f" Wall time: {parallel_wall_time:.3f}s")
print(f" Expected if sequential: ~{avg_wall_ * num_parallel_tasks:.3f}s")
print(f" Speedup: {(avg_wall_ * num_parallel_tasks) / parallel_wall_time:.2f}x")
print(f" Efficiency: {((avg_wall_ * num_parallel_tasks) / parallel_wall_time) / num_parallel_tasks * 100:.1f}%")
# Cleanup
print("\n" + "=" * 80)
print("Shutting down Ray...")
ray.shutdown()
print("✓ Complete")
print("=" * 80)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment