Created
December 12, 2025 10:30
-
-
Save mherkazandjian/05324baecc25b8dc33966726901cf017 to your computer and use it in GitHub Desktop.
(ram-)disk benchmark using dd
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| <keywords> | |
| test, python, fuse, ramdisk, benchmark, io, disk, performance | |
| </keywords> | |
| <description> | |
| Run a benchmark of IO of the ramdisk using multiple pinned threads | |
| </description> | |
| """ | |
| import os | |
| import sys | |
| import time | |
| import multiprocessing | |
| import threading | |
| import subprocess | |
| import shlex | |
| import argparse | |
| import numpy as np | |
| from argparse import RawTextHelpFormatter | |
| def parse_args(): | |
| parser = argparse.ArgumentParser( | |
| description=( | |
| "Run a benchmark of IO of the ramdisk\n" | |
| "\n" | |
| "usage example:\n" | |
| "\n" | |
| " $ python ramdisk_benchmark.py --bs=25600 --count=10000 --pack --pack-cores 8 --runs=10\n" | |
| " $ python ramdisk_benchmark.py --bs=1024 --count=100000 --pack --pack-cores 16 --runs=10\n" | |
| ), | |
| formatter_class=RawTextHelpFormatter | |
| ) | |
| parser.add_argument( | |
| "-d", | |
| "--dirpath", | |
| type=str, | |
| default='/dev/shm', | |
| dest="dirpath", | |
| help='the directory that will be used' | |
| ) | |
| parser.add_argument( | |
| "-b", | |
| "--bs", | |
| type=int, | |
| default=256000, | |
| dest="bs", | |
| help='The block size of each IO operation' | |
| ) | |
| parser.add_argument( | |
| "-c", | |
| "--count", | |
| type=int, | |
| default=2000, | |
| dest="count", | |
| help='The number of IO block operations for each thread' | |
| ) | |
| parser.add_argument( | |
| "--scatter", | |
| action="store_true", | |
| default=False, | |
| dest="scatter", | |
| help='scatter across cores using strides' | |
| ) | |
| parser.add_argument( | |
| "--scatter-stride", | |
| type=int, | |
| default=1, | |
| dest="scatter_stride", | |
| help='scatter stride across cores' | |
| ) | |
| parser.add_argument( | |
| "--pack", | |
| action="store_true", | |
| default=False, | |
| dest="pack", | |
| help='pack threads across cores' | |
| ) | |
| parser.add_argument( | |
| "--pack-cores", | |
| type=int, | |
| default=1, | |
| dest="pack_cores", | |
| help='pack thread up to --pack-cores cores (default=1)' | |
| ) | |
| parser.add_argument( | |
| "--runs", | |
| type=int, | |
| default=1, | |
| dest="runs", | |
| help='the number of times to run the benchmark' | |
| ) | |
| parser.add_argument( | |
| "--write-benchmark", | |
| action="store_true", | |
| default=False, | |
| dest="write_benchmark", | |
| help='run only the write benchmark' | |
| ) | |
| parser.add_argument( | |
| "--read-benchmark", | |
| action="store_true", | |
| default=False, | |
| dest="read_benchmark", | |
| help='run only the read benchmark' | |
| ) | |
| parser.add_argument( | |
| "--no-cleanup", | |
| action="store_true", | |
| default=False, | |
| dest="no_cleanup", | |
| help='do not delete files after benchmarking' | |
| ) | |
| return parser.parse_args() | |
| def write_file(core_id, bs, count, fpath, results_dict, lock, no_cleanup=False): | |
| # Cleanup if file exists | |
| if os.path.exists(fpath): | |
| os.remove(fpath) | |
| # Start timer for this thread | |
| t0 = time.time() | |
| # Write file | |
| cmd = f"taskset --cpu-list {core_id} dd if=/dev/zero of={fpath} bs={bs} count={count} oflag=nocache conv=fsync" | |
| process = subprocess.Popen( | |
| shlex.split(cmd), | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE | |
| ) | |
| process.wait() | |
| # Stop timer for this thread | |
| t1 = time.time() | |
| dt = t1 - t0 | |
| # Delete file unless no_cleanup is set | |
| if not no_cleanup and os.path.exists(fpath): | |
| os.remove(fpath) | |
| # Store result with thread identifier | |
| with lock: | |
| results_dict[core_id] = dt | |
| def read_file(core_id, bs, count, fpath, results_dict, lock, no_cleanup=False): | |
| # File should exist from write phase - check | |
| if not os.path.exists(fpath): | |
| print(f"Warning: File {fpath} does not exist for reading") | |
| return | |
| # Start timer for this thread | |
| t0 = time.time() | |
| # Read file | |
| cmd = f"taskset --cpu-list {core_id} dd if={fpath} of=/dev/null bs={bs} count={count} iflag=nocache,sync conv=nocreat" | |
| process = subprocess.Popen( | |
| shlex.split(cmd), | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE | |
| ) | |
| process.wait() | |
| # Stop timer for this thread | |
| t1 = time.time() | |
| dt = t1 - t0 | |
| # Delete file unless no_cleanup is set | |
| if not no_cleanup and os.path.exists(fpath): | |
| os.remove(fpath) | |
| # Store result with thread identifier | |
| with lock: | |
| results_dict[core_id] = dt | |
| def run_benchmark(args): | |
| #bs = 256000 # block size in bytes, e.g bs=256000 count=2000 pack_up_to=8 => total file size = 4GB | |
| #count = 20000 # number of blocks | |
| #scatter = False | |
| #scatter_every = 2 | |
| #pack = True # pin processes in packed mode | |
| #pack_up_to = 4 # spawn this number of processes | |
| bs = args.bs | |
| count = args.count | |
| scatter = args.scatter | |
| scatter_every = args.scatter_stride | |
| pack = args.pack | |
| pack_up_to = args.pack_cores | |
| n_runs = args.runs | |
| dirpath = args.dirpath | |
| no_cleanup = args.no_cleanup | |
| # Determine which benchmarks to run | |
| run_write = args.write_benchmark | |
| run_read = args.read_benchmark | |
| # If neither specified, run both (default behavior) | |
| if not run_write and not run_read: | |
| run_write = True | |
| run_read = True | |
| # Print benchmark parameters summary | |
| print("=" * 60) | |
| print("benchmark parameters summary") | |
| print("=" * 60) | |
| print(f"directory path: {dirpath}") | |
| print(f"block size: {bs} bytes ({bs / 2**10:.2f} KB)") | |
| print(f"count per thread: {count}") | |
| print(f"number of runs: {n_runs}") | |
| print(f"scatter mode: {scatter}") | |
| if scatter: | |
| print(f"scatter stride: {scatter_every}") | |
| print(f"pack mode: {pack}") | |
| if pack: | |
| print(f"pack cores: {pack_up_to}") | |
| print(f"run write benchmark: {run_write}") | |
| print(f"run read benchmark: {run_read}") | |
| print(f"no cleanup: {no_cleanup}") | |
| print("=" * 60) | |
| print() | |
| n_cores = multiprocessing.cpu_count() | |
| print(f'number of cores available = {n_cores}') | |
| subprocess.Popen( | |
| shlex.split(f'rm -fvr {dirpath}/benchmark_*.dat'), | |
| stdout=subprocess.PIPE, stderr=subprocess.PIPE).wait() | |
| # write_file(0, 1, 100, '/dev/shm/foo.dat') | |
| cores_list = range(n_cores) | |
| if scatter: | |
| cores_list = cores_list[::scatter_every] | |
| elif pack: | |
| cores_list = cores_list[0:pack_up_to] | |
| else: | |
| pass | |
| n_cores = len(cores_list) | |
| print(f'number of used cores used in the benchmark = {n_cores}') | |
| print('core ids used:') | |
| print('\t' + ' '.join(map(str, cores_list))) | |
| # Write benchmark | |
| if run_write: | |
| # Cleanup before starting | |
| subprocess.Popen( | |
| shlex.split(f'rm -fvr {dirpath}/benchmark_*.dat'), | |
| stdout=subprocess.PIPE, stderr=subprocess.PIPE).wait() | |
| all_write_times = {} | |
| lock = threading.Lock() | |
| # Start wall clock timer | |
| wall_t0 = time.time() | |
| for run_no in range(n_runs): | |
| print(f'run write benchmark {run_no}') | |
| write_times = {} | |
| # Measure time for all threads in this run | |
| run_t0 = time.time() | |
| threads = [] | |
| for cpu_id in cores_list: | |
| thread = threading.Thread( | |
| name=f'write_file_{cpu_id}', | |
| target=write_file, | |
| args=( | |
| cpu_id, | |
| bs, | |
| count, | |
| os.path.join(dirpath, f'benchmark_{cpu_id}.dat'), | |
| write_times, | |
| lock, | |
| no_cleanup | |
| ) | |
| ) | |
| threads.append(thread) | |
| [thread.start() for thread in threads] | |
| [thread.join() for thread in threads] | |
| run_t1 = time.time() | |
| run_dt = run_t1 - run_t0 | |
| # Store times for this run | |
| for core_id, thread_time in write_times.items(): | |
| if core_id not in all_write_times: | |
| all_write_times[core_id] = [] | |
| all_write_times[core_id].append(thread_time) | |
| print(f' run {run_no} completed in {run_dt:.4f} s') | |
| # Stop wall clock timer | |
| wall_t1 = time.time() | |
| wall_dt = wall_t1 - wall_t0 | |
| # Calculate write statistics | |
| total_count = count * n_cores * n_runs | |
| total_size = bs * total_count | |
| # Calculate per-thread statistics | |
| all_thread_times = [] | |
| for core_id in cores_list: | |
| if core_id in all_write_times: | |
| all_thread_times.extend(all_write_times[core_id]) | |
| avg_thread_time = np.mean(all_thread_times) if all_thread_times else 0 | |
| std_thread_time = np.std(all_thread_times) if all_thread_times else 0 | |
| print("\n=== write results ===") | |
| print(f'total operations = {len(all_thread_times)}') | |
| print(f'wall clock time = {wall_dt:.4f} s') | |
| print(f'average thread time = {avg_thread_time:.4f} s (std = {std_thread_time:.4f} s)') | |
| print(f'total data written per run = {total_size / n_runs / 2**30:.2f} GiB') | |
| print(f'total data written = {total_size / 2**30:.2f} GiB') | |
| print(f'total bandwidth = {total_size / 2**30 / wall_dt:.2f} GiB/s') | |
| print(f'total chunks / sec write = {total_count / wall_dt / 1e6:.2f} M chunks/s') | |
| # Read benchmark - first need to write files again | |
| if run_read: | |
| print("\npreparing files for read benchmark...") | |
| # Prepare files once using multi-threaded write (before timing starts) | |
| prep_threads = [] | |
| for cpu_id in cores_list: | |
| fpath = os.path.join(dirpath, f'benchmark_{cpu_id}.dat') | |
| cmd = f"taskset --cpu-list {cpu_id} dd if=/dev/zero of={fpath} bs={bs} count={count} oflag=nocache conv=fsync" | |
| def prepare_file(cmd): | |
| process = subprocess.Popen( | |
| shlex.split(cmd), | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE | |
| ) | |
| process.wait() | |
| thread = threading.Thread( | |
| target=prepare_file, | |
| args=(cmd,) | |
| ) | |
| prep_threads.append(thread) | |
| [thread.start() for thread in prep_threads] | |
| [thread.join() for thread in prep_threads] | |
| print('files prepared for read benchmark') | |
| all_read_times = {} | |
| lock = threading.Lock() | |
| # Start wall clock timer (after file preparation) | |
| wall_t0 = time.time() | |
| for run_no in range(n_runs): | |
| print(f'run read benchmark {run_no}') | |
| read_times = {} | |
| # Measure time for all threads in this run | |
| run_t0 = time.time() | |
| threads = [] | |
| for cpu_id in cores_list: | |
| thread = threading.Thread( | |
| name=f'read_file_{cpu_id}', | |
| target=read_file, | |
| args=( | |
| cpu_id, | |
| bs, | |
| count, | |
| os.path.join(dirpath, f'benchmark_{cpu_id}.dat'), | |
| read_times, | |
| lock, | |
| True # Always preserve files during benchmark runs | |
| ) | |
| ) | |
| threads.append(thread) | |
| [thread.start() for thread in threads] | |
| [thread.join() for thread in threads] | |
| run_t1 = time.time() | |
| run_dt = run_t1 - run_t0 | |
| # Store times for this run | |
| for core_id, thread_time in read_times.items(): | |
| if core_id not in all_read_times: | |
| all_read_times[core_id] = [] | |
| all_read_times[core_id].append(thread_time) | |
| print(f' run {run_no} completed in {run_dt:.4f} s') | |
| # Stop wall clock timer | |
| wall_t1 = time.time() | |
| wall_dt = wall_t1 - wall_t0 | |
| # Calculate read statistics | |
| total_count = count * n_cores * n_runs | |
| total_size = bs * total_count | |
| # Calculate per-thread statistics | |
| all_thread_times = [] | |
| for core_id in cores_list: | |
| if core_id in all_read_times: | |
| all_thread_times.extend(all_read_times[core_id]) | |
| avg_thread_time = np.mean(all_thread_times) if all_thread_times else 0 | |
| std_thread_time = np.std(all_thread_times) if all_thread_times else 0 | |
| print("\n=== read results ===") | |
| print(f'total operations = {len(all_thread_times)}') | |
| print(f'wall clock time = {wall_dt:.4f} s') | |
| print(f'average thread time = {avg_thread_time:.4f} s (std = {std_thread_time:.4f} s)') | |
| print(f'total data read per run = {total_size / n_runs / 2**30:.2f} GiB') | |
| print(f'total data read = {total_size / 2**30:.2f} GiB') | |
| print(f'total bandwidth = {total_size / 2**30 / wall_dt:.2f} GiB/s') | |
| print(f'total chunks / sec read = {total_count / wall_dt / 1e6:.2f} M chunks/s') | |
| # Final cleanup (only if not no_cleanup) | |
| if not no_cleanup: | |
| subprocess.Popen( | |
| shlex.split(f'rm -fvr {dirpath}/benchmark_*.dat'), | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE | |
| ).wait() | |
| else: | |
| print(f"\nfiles preserved in {dirpath}/benchmark_*.dat") | |
| def main(): | |
| args = parse_args() | |
| run_benchmark(args) | |
| if __name__ == '__main__': | |
| args_parsed = main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment