Skip to content

Instantly share code, notes, and snippets.

@mherkazandjian
Created December 12, 2025 10:30
Show Gist options
  • Select an option

  • Save mherkazandjian/05324baecc25b8dc33966726901cf017 to your computer and use it in GitHub Desktop.

Select an option

Save mherkazandjian/05324baecc25b8dc33966726901cf017 to your computer and use it in GitHub Desktop.
(ram-)disk benchmark using dd
"""
<keywords>
test, python, fuse, ramdisk, benchmark, io, disk, performance
</keywords>
<description>
Run a benchmark of IO of the ramdisk using multiple pinned threads
</description>
"""
import os
import sys
import time
import multiprocessing
import threading
import subprocess
import shlex
import argparse
import numpy as np
from argparse import RawTextHelpFormatter
def parse_args():
parser = argparse.ArgumentParser(
description=(
"Run a benchmark of IO of the ramdisk\n"
"\n"
"usage example:\n"
"\n"
" $ python ramdisk_benchmark.py --bs=25600 --count=10000 --pack --pack-cores 8 --runs=10\n"
" $ python ramdisk_benchmark.py --bs=1024 --count=100000 --pack --pack-cores 16 --runs=10\n"
),
formatter_class=RawTextHelpFormatter
)
parser.add_argument(
"-d",
"--dirpath",
type=str,
default='/dev/shm',
dest="dirpath",
help='the directory that will be used'
)
parser.add_argument(
"-b",
"--bs",
type=int,
default=256000,
dest="bs",
help='The block size of each IO operation'
)
parser.add_argument(
"-c",
"--count",
type=int,
default=2000,
dest="count",
help='The number of IO block operations for each thread'
)
parser.add_argument(
"--scatter",
action="store_true",
default=False,
dest="scatter",
help='scatter across cores using strides'
)
parser.add_argument(
"--scatter-stride",
type=int,
default=1,
dest="scatter_stride",
help='scatter stride across cores'
)
parser.add_argument(
"--pack",
action="store_true",
default=False,
dest="pack",
help='pack threads across cores'
)
parser.add_argument(
"--pack-cores",
type=int,
default=1,
dest="pack_cores",
help='pack thread up to --pack-cores cores (default=1)'
)
parser.add_argument(
"--runs",
type=int,
default=1,
dest="runs",
help='the number of times to run the benchmark'
)
parser.add_argument(
"--write-benchmark",
action="store_true",
default=False,
dest="write_benchmark",
help='run only the write benchmark'
)
parser.add_argument(
"--read-benchmark",
action="store_true",
default=False,
dest="read_benchmark",
help='run only the read benchmark'
)
parser.add_argument(
"--no-cleanup",
action="store_true",
default=False,
dest="no_cleanup",
help='do not delete files after benchmarking'
)
return parser.parse_args()
def write_file(core_id, bs, count, fpath, results_dict, lock, no_cleanup=False):
# Cleanup if file exists
if os.path.exists(fpath):
os.remove(fpath)
# Start timer for this thread
t0 = time.time()
# Write file
cmd = f"taskset --cpu-list {core_id} dd if=/dev/zero of={fpath} bs={bs} count={count} oflag=nocache conv=fsync"
process = subprocess.Popen(
shlex.split(cmd),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
process.wait()
# Stop timer for this thread
t1 = time.time()
dt = t1 - t0
# Delete file unless no_cleanup is set
if not no_cleanup and os.path.exists(fpath):
os.remove(fpath)
# Store result with thread identifier
with lock:
results_dict[core_id] = dt
def read_file(core_id, bs, count, fpath, results_dict, lock, no_cleanup=False):
# File should exist from write phase - check
if not os.path.exists(fpath):
print(f"Warning: File {fpath} does not exist for reading")
return
# Start timer for this thread
t0 = time.time()
# Read file
cmd = f"taskset --cpu-list {core_id} dd if={fpath} of=/dev/null bs={bs} count={count} iflag=nocache,sync conv=nocreat"
process = subprocess.Popen(
shlex.split(cmd),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
process.wait()
# Stop timer for this thread
t1 = time.time()
dt = t1 - t0
# Delete file unless no_cleanup is set
if not no_cleanup and os.path.exists(fpath):
os.remove(fpath)
# Store result with thread identifier
with lock:
results_dict[core_id] = dt
def run_benchmark(args):
#bs = 256000 # block size in bytes, e.g bs=256000 count=2000 pack_up_to=8 => total file size = 4GB
#count = 20000 # number of blocks
#scatter = False
#scatter_every = 2
#pack = True # pin processes in packed mode
#pack_up_to = 4 # spawn this number of processes
bs = args.bs
count = args.count
scatter = args.scatter
scatter_every = args.scatter_stride
pack = args.pack
pack_up_to = args.pack_cores
n_runs = args.runs
dirpath = args.dirpath
no_cleanup = args.no_cleanup
# Determine which benchmarks to run
run_write = args.write_benchmark
run_read = args.read_benchmark
# If neither specified, run both (default behavior)
if not run_write and not run_read:
run_write = True
run_read = True
# Print benchmark parameters summary
print("=" * 60)
print("benchmark parameters summary")
print("=" * 60)
print(f"directory path: {dirpath}")
print(f"block size: {bs} bytes ({bs / 2**10:.2f} KB)")
print(f"count per thread: {count}")
print(f"number of runs: {n_runs}")
print(f"scatter mode: {scatter}")
if scatter:
print(f"scatter stride: {scatter_every}")
print(f"pack mode: {pack}")
if pack:
print(f"pack cores: {pack_up_to}")
print(f"run write benchmark: {run_write}")
print(f"run read benchmark: {run_read}")
print(f"no cleanup: {no_cleanup}")
print("=" * 60)
print()
n_cores = multiprocessing.cpu_count()
print(f'number of cores available = {n_cores}')
subprocess.Popen(
shlex.split(f'rm -fvr {dirpath}/benchmark_*.dat'),
stdout=subprocess.PIPE, stderr=subprocess.PIPE).wait()
# write_file(0, 1, 100, '/dev/shm/foo.dat')
cores_list = range(n_cores)
if scatter:
cores_list = cores_list[::scatter_every]
elif pack:
cores_list = cores_list[0:pack_up_to]
else:
pass
n_cores = len(cores_list)
print(f'number of used cores used in the benchmark = {n_cores}')
print('core ids used:')
print('\t' + ' '.join(map(str, cores_list)))
# Write benchmark
if run_write:
# Cleanup before starting
subprocess.Popen(
shlex.split(f'rm -fvr {dirpath}/benchmark_*.dat'),
stdout=subprocess.PIPE, stderr=subprocess.PIPE).wait()
all_write_times = {}
lock = threading.Lock()
# Start wall clock timer
wall_t0 = time.time()
for run_no in range(n_runs):
print(f'run write benchmark {run_no}')
write_times = {}
# Measure time for all threads in this run
run_t0 = time.time()
threads = []
for cpu_id in cores_list:
thread = threading.Thread(
name=f'write_file_{cpu_id}',
target=write_file,
args=(
cpu_id,
bs,
count,
os.path.join(dirpath, f'benchmark_{cpu_id}.dat'),
write_times,
lock,
no_cleanup
)
)
threads.append(thread)
[thread.start() for thread in threads]
[thread.join() for thread in threads]
run_t1 = time.time()
run_dt = run_t1 - run_t0
# Store times for this run
for core_id, thread_time in write_times.items():
if core_id not in all_write_times:
all_write_times[core_id] = []
all_write_times[core_id].append(thread_time)
print(f' run {run_no} completed in {run_dt:.4f} s')
# Stop wall clock timer
wall_t1 = time.time()
wall_dt = wall_t1 - wall_t0
# Calculate write statistics
total_count = count * n_cores * n_runs
total_size = bs * total_count
# Calculate per-thread statistics
all_thread_times = []
for core_id in cores_list:
if core_id in all_write_times:
all_thread_times.extend(all_write_times[core_id])
avg_thread_time = np.mean(all_thread_times) if all_thread_times else 0
std_thread_time = np.std(all_thread_times) if all_thread_times else 0
print("\n=== write results ===")
print(f'total operations = {len(all_thread_times)}')
print(f'wall clock time = {wall_dt:.4f} s')
print(f'average thread time = {avg_thread_time:.4f} s (std = {std_thread_time:.4f} s)')
print(f'total data written per run = {total_size / n_runs / 2**30:.2f} GiB')
print(f'total data written = {total_size / 2**30:.2f} GiB')
print(f'total bandwidth = {total_size / 2**30 / wall_dt:.2f} GiB/s')
print(f'total chunks / sec write = {total_count / wall_dt / 1e6:.2f} M chunks/s')
# Read benchmark - first need to write files again
if run_read:
print("\npreparing files for read benchmark...")
# Prepare files once using multi-threaded write (before timing starts)
prep_threads = []
for cpu_id in cores_list:
fpath = os.path.join(dirpath, f'benchmark_{cpu_id}.dat')
cmd = f"taskset --cpu-list {cpu_id} dd if=/dev/zero of={fpath} bs={bs} count={count} oflag=nocache conv=fsync"
def prepare_file(cmd):
process = subprocess.Popen(
shlex.split(cmd),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
process.wait()
thread = threading.Thread(
target=prepare_file,
args=(cmd,)
)
prep_threads.append(thread)
[thread.start() for thread in prep_threads]
[thread.join() for thread in prep_threads]
print('files prepared for read benchmark')
all_read_times = {}
lock = threading.Lock()
# Start wall clock timer (after file preparation)
wall_t0 = time.time()
for run_no in range(n_runs):
print(f'run read benchmark {run_no}')
read_times = {}
# Measure time for all threads in this run
run_t0 = time.time()
threads = []
for cpu_id in cores_list:
thread = threading.Thread(
name=f'read_file_{cpu_id}',
target=read_file,
args=(
cpu_id,
bs,
count,
os.path.join(dirpath, f'benchmark_{cpu_id}.dat'),
read_times,
lock,
True # Always preserve files during benchmark runs
)
)
threads.append(thread)
[thread.start() for thread in threads]
[thread.join() for thread in threads]
run_t1 = time.time()
run_dt = run_t1 - run_t0
# Store times for this run
for core_id, thread_time in read_times.items():
if core_id not in all_read_times:
all_read_times[core_id] = []
all_read_times[core_id].append(thread_time)
print(f' run {run_no} completed in {run_dt:.4f} s')
# Stop wall clock timer
wall_t1 = time.time()
wall_dt = wall_t1 - wall_t0
# Calculate read statistics
total_count = count * n_cores * n_runs
total_size = bs * total_count
# Calculate per-thread statistics
all_thread_times = []
for core_id in cores_list:
if core_id in all_read_times:
all_thread_times.extend(all_read_times[core_id])
avg_thread_time = np.mean(all_thread_times) if all_thread_times else 0
std_thread_time = np.std(all_thread_times) if all_thread_times else 0
print("\n=== read results ===")
print(f'total operations = {len(all_thread_times)}')
print(f'wall clock time = {wall_dt:.4f} s')
print(f'average thread time = {avg_thread_time:.4f} s (std = {std_thread_time:.4f} s)')
print(f'total data read per run = {total_size / n_runs / 2**30:.2f} GiB')
print(f'total data read = {total_size / 2**30:.2f} GiB')
print(f'total bandwidth = {total_size / 2**30 / wall_dt:.2f} GiB/s')
print(f'total chunks / sec read = {total_count / wall_dt / 1e6:.2f} M chunks/s')
# Final cleanup (only if not no_cleanup)
if not no_cleanup:
subprocess.Popen(
shlex.split(f'rm -fvr {dirpath}/benchmark_*.dat'),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
).wait()
else:
print(f"\nfiles preserved in {dirpath}/benchmark_*.dat")
def main():
args = parse_args()
run_benchmark(args)
if __name__ == '__main__':
args_parsed = main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment