Last active
December 15, 2025 16:45
-
-
Save betatim/338c984020d4f061ade00caa81abcb03 to your computer and use it in GitHub Desktop.
Investigate ray with max_calls=1 for cuml.accel
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Ray + RandomForestClassifier with max_calls=1 | |
| Demonstrates the impact of max_calls=1 on Ray task execution when using | |
| scikit-learn's RandomForestClassifier. | |
| """ | |
| import time | |
| import ray | |
| from sklearn.datasets import make_classification | |
| from sklearn.ensemble import RandomForestClassifier | |
| @ray.remote(max_calls=1) | |
| def fit_existing_random_forest(task_id, rf, X_train, y_train): | |
| """ | |
| Fit a RandomForestClassifier on provided data. | |
| With max_calls=1, this task will be executed by a fresh worker that: | |
| 1. Imports sklearn (if not already in worker) | |
| 2. Deserializes X_train and y_train | |
| 3. Fits the model | |
| 4. Returns results and terminates | |
| Args: | |
| task_id: Identifier for this task | |
| X_train: Training features | |
| y_train: Training labels | |
| Returns: | |
| Dictionary with timing and accuracy information | |
| """ | |
| start_time = time.time() | |
| # Create and fit the model | |
| fit_start = time.time() | |
| rf.fit(X_train, y_train) | |
| fit_time = time.time() - fit_start | |
| # Calculate training accuracy (quick check) | |
| train_score = rf.score(X_train, y_train) | |
| total_time = time.time() - start_time | |
| return { | |
| 'task_id': task_id, | |
| 'total_time': total_time, | |
| 'fit_time': fit_time, | |
| 'train_score': train_score, | |
| 'n_samples': len(X_train), | |
| 'n_features': X_train.shape[1] | |
| } | |
| @ray.remote(max_calls=1) | |
| def fit_random_forest(task_id, X_train, y_train, n_estimators=100): | |
| """ | |
| Fit a RandomForestClassifier on provided data. | |
| With max_calls=1, this task will be executed by a fresh worker that: | |
| 1. Imports sklearn (if not already in worker) | |
| 2. Deserializes X_train and y_train | |
| 3. Fits the model | |
| 4. Returns results and terminates | |
| Args: | |
| task_id: Identifier for this task | |
| X_train: Training features | |
| y_train: Training labels | |
| n_estimators: Number of trees in the forest | |
| Returns: | |
| Dictionary with timing and accuracy information | |
| """ | |
| start_time = time.time() | |
| # Create and fit the model | |
| create_start = time.time() | |
| rf = RandomForestClassifier(n_estimators=n_estimators, random_state=task_id) | |
| create_time = time.time() - create_start | |
| fit_start = time.time() | |
| rf.fit(X_train, y_train) | |
| fit_time = time.time() - fit_start | |
| # Calculate training accuracy (quick check) | |
| train_score = rf.score(X_train, y_train) | |
| total_time = time.time() - start_time | |
| return { | |
| 'task_id': task_id, | |
| 'total_time': total_time, | |
| 'fit_time': fit_time, | |
| 'create_time': create_time, | |
| 'train_score': train_score, | |
| 'n_estimators': n_estimators, | |
| 'n_samples': len(X_train), | |
| 'n_features': X_train.shape[1] | |
| } | |
| @ray.remote(max_calls=10) | |
| def fit_random_forest_max10(task_id, X_train, y_train, n_estimators=100): | |
| """ | |
| Fit a RandomForestClassifier on provided data. | |
| With max_calls=1, this task will be executed by a fresh worker that: | |
| 1. Imports sklearn (if not already in worker) | |
| 2. Deserializes X_train and y_train | |
| 3. Fits the model | |
| 4. Returns results and terminates | |
| Args: | |
| task_id: Identifier for this task | |
| X_train: Training features | |
| y_train: Training labels | |
| n_estimators: Number of trees in the forest | |
| Returns: | |
| Dictionary with timing and accuracy information | |
| """ | |
| start_time = time.time() | |
| # Create and fit the model | |
| create_start = time.time() | |
| rf = RandomForestClassifier(n_estimators=n_estimators, random_state=task_id) | |
| create_time = time.time() - create_start | |
| fit_start = time.time() | |
| rf.fit(X_train, y_train) | |
| fit_time = time.time() - fit_start | |
| # Calculate training accuracy (quick check) | |
| train_score = rf.score(X_train, y_train) | |
| total_time = time.time() - start_time | |
| return { | |
| 'task_id': task_id, | |
| 'total_time': total_time, | |
| 'fit_time': fit_time, | |
| 'create_time': create_time, | |
| 'train_score': train_score, | |
| 'n_estimators': n_estimators, | |
| 'n_samples': len(X_train), | |
| 'n_features': X_train.shape[1] | |
| } | |
| def main(): | |
| """Main execution function.""" | |
| print("=" * 80) | |
| print("RAY + RANDOMFORESTCLASSIFIER") | |
| print("=" * 80) | |
| # Configuration | |
| num_cpus = 4 | |
| n_estimators = 100 # Default for RandomForestClassifier | |
| num_sequential_tasks = 8 | |
| num_parallel_tasks = 8 | |
| print("\nConfiguration:") | |
| print(f" - Ray CPUs: {num_cpus}") | |
| print(f" - RF n_estimators: {n_estimators}") | |
| print(f" - Dataset: make_classification() with defaults") | |
| print(f" - max_calls=1: Each task gets a FRESH worker") | |
| # Initialize Ray | |
| print("\nInitializing Ray...") | |
| ray.init(ignore_reinit_error=True, num_cpus=num_cpus) | |
| print("✓ Ray initialized") | |
| # Generate dataset (using defaults from make_classification) | |
| print("\nGenerating dataset...") | |
| X, y = make_classification( | |
| n_samples=300, # Default | |
| n_features=20, # Default | |
| n_informative=2, # Default | |
| n_redundant=2, # Default | |
| n_classes=2, # Default | |
| random_state=42 | |
| ) | |
| print(f"✓ Dataset created: {X.shape[0]} samples, {X.shape[1]} features") | |
| # ========================================================================= | |
| # TEST 1: Sequential Execution | |
| # ========================================================================= | |
| print("\n" + "=" * 80) | |
| print(f"TEST 1: SEQUENTIAL EXECUTION ({num_sequential_tasks} tasks)") | |
| print("=" * 80) | |
| print("\nEach task will:") | |
| print(" 1. Get a fresh worker (max_calls=1)") | |
| print(" 2. Import sklearn (fresh import per worker)") | |
| print(" 3. Deserialize data from Ray object store") | |
| print(" 4. Fit RandomForestClassifier") | |
| print(" 5. Worker terminates after returning result") | |
| print("\nExpected: Each task takes similar time (import + deserialization + fit)\n") | |
| sequential_results = [] | |
| sequential_start = time.time() | |
| for i in range(num_sequential_tasks): | |
| task_start = time.time() | |
| # Submit task and wait for result | |
| result = ray.get(fit_random_forest.remote( | |
| task_id=i + 1, | |
| X_train=X, | |
| y_train=y, | |
| n_estimators=n_estimators | |
| )) | |
| wall_time = time.time() - task_start | |
| sequential_results.append({ | |
| 'result': result, | |
| 'wall_time': wall_time | |
| }) | |
| print(f"Task {result['task_id']}: " | |
| f"Wall={wall_time:.3f}s, " | |
| f"Fit={result['fit_time']:.3f}s, " | |
| f"Total={result['total_time']:.3f}s, " | |
| f"Accuracy={result['train_score']:.3f}") | |
| sequential_total = time.time() - sequential_start | |
| # Calculate statistics | |
| avg_wall_ = avg_wall = sum(r['wall_time'] for r in sequential_results) / len(sequential_results) | |
| avg_fit = sum(r['result']['fit_time'] for r in sequential_results) / len(sequential_results) | |
| avg_overhead = avg_wall - avg_fit | |
| print("\n" + "-" * 80) | |
| print("Sequential Results:") | |
| print(f" Total time: {sequential_total:.3f}s") | |
| print(f" Average wall time per task: {avg_wall:.3f}s") | |
| print(f" Average fit time per task: {avg_fit:.3f}s") | |
| print(f" Average overhead per task: {avg_overhead:.3f}s") | |
| print(f" (overhead = worker creation + sklearn import + deserialization)") | |
| # ========================================================================= | |
| # TEST 2: Sequential Execution pre-created RF | |
| # ========================================================================= | |
| print("\n" + "=" * 80) | |
| print(f"TEST 2: SEQUENTIAL EXECUTION pre-created RF ({num_sequential_tasks} tasks)") | |
| print("=" * 80) | |
| print("\nEach task will:") | |
| print(" 1. Get a fresh worker (max_calls=1)") | |
| print(" 2. Import sklearn (fresh import per worker)") | |
| print(" 3. Deserialize data and estimator from Ray object store") | |
| print(" 4. Fit RandomForestClassifier") | |
| print(" 5. Worker terminates after returning result") | |
| print("\nExpected: Each task takes similar time (import + deserialization + fit)\n") | |
| sequential_results = [] | |
| sequential_start = time.time() | |
| rf = RandomForestClassifier(n_estimators=n_estimators, random_state=42) | |
| for i in range(num_sequential_tasks): | |
| task_start = time.time() | |
| # Submit task and wait for result | |
| result = ray.get(fit_existing_random_forest.remote( | |
| task_id=i + 1, | |
| rf=rf, | |
| X_train=X, | |
| y_train=y, | |
| )) | |
| wall_time = time.time() - task_start | |
| sequential_results.append({ | |
| 'result': result, | |
| 'wall_time': wall_time | |
| }) | |
| print(f"Task {result['task_id']}: " | |
| f"Wall={wall_time:.3f}s, " | |
| f"Fit={result['fit_time']:.3f}s, " | |
| f"Total={result['total_time']:.3f}s, " | |
| f"Accuracy={result['train_score']:.3f}") | |
| sequential_total = time.time() - sequential_start | |
| # Calculate statistics | |
| avg_wall = sum(r['wall_time'] for r in sequential_results) / len(sequential_results) | |
| avg_fit = sum(r['result']['fit_time'] for r in sequential_results) / len(sequential_results) | |
| avg_overhead = avg_wall - avg_fit | |
| print("\n" + "-" * 80) | |
| print("Sequential Results:") | |
| print(f" Total time: {sequential_total:.3f}s") | |
| print(f" Average wall time per task: {avg_wall:.3f}s") | |
| print(f" Average fit time per task: {avg_fit:.3f}s") | |
| print(f" Average overhead per task: {avg_overhead:.3f}s") | |
| print(f" (overhead = worker creation + sklearn import + deserialization)") | |
| # ========================================================================= | |
| # TEST 3: Sequential Execution (max_calls=10) | |
| # ========================================================================= | |
| print("\n" + "=" * 80) | |
| print(f"TEST 3: SEQUENTIAL EXECUTION max_calls=10 ({num_sequential_tasks} tasks)") | |
| print("=" * 80) | |
| print("\nEach task will:") | |
| print(" 1. Get a worker (max_calls=10)") | |
| print(" 2. Deserialize data from Ray object store") | |
| print(" 3. Fit RandomForestClassifier") | |
| print(" 4. Worker terminates after returning result") | |
| print("\nExpected: Each task takes similar time (deserialization + fit)\n") | |
| sequential_results = [] | |
| sequential_start = time.time() | |
| for i in range(num_sequential_tasks): | |
| task_start = time.time() | |
| # Submit task and wait for result | |
| result = ray.get(fit_random_forest_max10.remote( | |
| task_id=i + 1, | |
| X_train=X, | |
| y_train=y, | |
| n_estimators=n_estimators | |
| )) | |
| wall_time = time.time() - task_start | |
| sequential_results.append({ | |
| 'result': result, | |
| 'wall_time': wall_time | |
| }) | |
| print(f"Task {result['task_id']}: " | |
| f"Wall={wall_time:.3f}s, " | |
| f"Fit={result['fit_time']:.3f}s, " | |
| f"Total={result['total_time']:.3f}s, " | |
| f"Accuracy={result['train_score']:.3f}") | |
| sequential_total = time.time() - sequential_start | |
| # Calculate statistics | |
| avg_wall = sum(r['wall_time'] for r in sequential_results) / len(sequential_results) | |
| avg_fit = sum(r['result']['fit_time'] for r in sequential_results) / len(sequential_results) | |
| avg_overhead = avg_wall - avg_fit | |
| print("\n" + "-" * 80) | |
| print("Sequential Results:") | |
| print(f" Total time: {sequential_total:.3f}s") | |
| print(f" Average wall time per task: {avg_wall:.3f}s") | |
| print(f" Average fit time per task: {avg_fit:.3f}s") | |
| print(f" Average overhead per task: {avg_overhead:.3f}s") | |
| print(f" (overhead = worker creation + sklearn import + deserialization)") | |
| # ========================================================================= | |
| # TEST 4: Parallel Execution | |
| # ========================================================================= | |
| print("\n" + "=" * 80) | |
| print(f"TEST 4: PARALLEL EXECUTION ({num_parallel_tasks} tasks)") | |
| print("=" * 80) | |
| print("\nSubmitting all tasks at once:") | |
| print(f" - {num_parallel_tasks} workers created in parallel") | |
| print(f" - Each worker imports sklearn simultaneously") | |
| print(f" - Wall time ≈ time for single task (not sum of all tasks)") | |
| print() | |
| parallel_start = time.time() | |
| # Submit all tasks at once | |
| futures = [ | |
| fit_random_forest.remote( | |
| task_id=i + 1, | |
| X_train=X, | |
| y_train=y, | |
| n_estimators=n_estimators | |
| ) | |
| for i in range(num_parallel_tasks) | |
| ] | |
| # Wait for all to complete | |
| parallel_results = ray.get(futures) | |
| parallel_wall_time = time.time() - parallel_start | |
| print(f"Parallel execution completed in {parallel_wall_time:.3f}s\n") | |
| for result in parallel_results: | |
| print(f"Task {result['task_id']}: " | |
| f"Fit={result['fit_time']:.3f}s, " | |
| f"Accuracy={result['train_score']:.3f}") | |
| print("\n" + "-" * 80) | |
| print("Parallel Results:") | |
| print(f" Wall time: {parallel_wall_time:.3f}s") | |
| print(f" Expected if sequential: ~{avg_wall_ * num_parallel_tasks:.3f}s") | |
| print(f" Speedup: {(avg_wall_ * num_parallel_tasks) / parallel_wall_time:.2f}x") | |
| print(f" Efficiency: {((avg_wall_ * num_parallel_tasks) / parallel_wall_time) / num_parallel_tasks * 100:.1f}%") | |
| # Cleanup | |
| print("\n" + "=" * 80) | |
| print("Shutting down Ray...") | |
| ray.shutdown() | |
| print("✓ Complete") | |
| print("=" * 80) | |
| if __name__ == '__main__': | |
| main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment