Skip to content

Instantly share code, notes, and snippets.

@nickva
Created February 7, 2026 17:32
Show Gist options
  • Select an option

  • Save nickva/c018c6f982af3174c9b24f871a40c87e to your computer and use it in GitHub Desktop.

Select an option

Save nickva/c018c6f982af3174c9b24f871a40c87e to your computer and use it in GitHub Desktop.
Create and query couchdb views
#!/usr/bin/env python
import os
import sys
import time
import uuid
import random
import requests
import argparse
import configparser
TIMEOUT=120
URL='http://localhost:5984'
DBNAME = 'db'
Q = '16'
REQUEST_TIMEOUT_DEFAULT = 1000
BATCH = 1500
MAP_FUN = '''
function(doc){
for(var i=0;i<10;i++) {
tag = %s;
emit(doc._id+'_'+i, i*100);
}
}
'''
def log(*args):
sargs = []
for a in args:
try:
sargs.append(str(a))
except:
sargs.append('?')
msg = " ".join(sargs)
sys.stderr.write(msg + '\n')
sys.stderr.flush()
class Server:
def __init__(self, args, timeout=REQUEST_TIMEOUT_DEFAULT):
self.sess = requests.Session()
self.sess.auth = args.auth
self.sess.headers.update(parse_headers(args.headers))
self.url = args.url.rstrip('/')
self.timeout = timeout
def _apply_timeout(self, kw):
if self.timeout is not None and 'timeout' not in kw:
kw['timeout'] = self.timeout
return kw
def get(self, path = '', **kw):
kw = self._apply_timeout(kw)
r = self.sess.get(self.url + '/' + path, **kw)
r.raise_for_status()
return r.json()
def post(self, path, **kw):
kw = self._apply_timeout(kw)
r = self.sess.post(self.url + '/' + path, **kw)
if r.status_code > 202:
log(" post > 202 code", r.status_code, " request:", path)
log(" args", kw)
log(" response", r.json())
r.raise_for_status()
return r.json()
def put(self, path, **kw):
kw = self._apply_timeout(kw)
r = self.sess.put(self.url + '/' + path, **kw)
r.raise_for_status()
return r.json()
def delete(self, path, **kw):
kw = self._apply_timeout(kw)
r = self.sess.delete(self.url + '/' + path, **kw)
r.raise_for_status()
return r.json()
def head(self, path, **kw):
kw = self._apply_timeout(kw)
r = self.sess.head(self.url + '/' + path, **kw)
return r.status_code
def __iter__(self):
dbs = self.get('_all_dbs')
return iter(dbs)
def __str__(self):
return "<Server:%s>" % self.url
def __iter__(self):
dbs = self.get('_all_dbs')
return iter(dbs)
def __str__(self):
return "<Server:%s>" % self.url
def __contains__(self, dbname):
res = self.sess.head(self.url + '/' + dbname)
code = res.status_code
if code == 200:
return True
if code == 404:
return False
raise Exception(f"Unexpected head status code {res}")
# CouchDB specific methods
def version(self):
vjson = self.get()
return vjson['version']
def bulk_docs(self, dbname, docs, new_edits=True, w=None):
if w is not None:
w = str(w)
w_str = '?w=' + str(w)
else:
w_str = ''
if new_edits:
json = {'docs': docs}
return self.post(dbname + '/_bulk_docs' + w_str, json = json)
else:
json = {'docs': docs, 'new_edits': False}
return self.post(dbname + '/_bulk_docs' + w_str, json = json)
def changes(self, dbname, style='main_only', since=0, limit=None):
if limit is None:
return self.get(f'{dbname}/_changes?style={style}&since={since}')
else:
return self.get(f'{dbname}/_changes?style={style}&limit={limit}&since={since}')
def purge(self, dbname, revs):
return self.post(dbname + '/_purge', json = revs)
def set_deleted_document_ttl(self, dbname, val):
body = {'deleted_document_ttl':val}
return self.put(dbname + '/_auto_purge', json = body)
def membership(self):
return self.get('_membership')
def config_set(self, section, key, val, node='_local'):
url = f'_node/{node}/_config/{section}/{key}'
val = str(val)
headers = {'x-couch-persist': 'false'}
return self.put(url, data='"'+val+'"', headers = headers)
def config_get(self, section, key, node='_local'):
url = f'_node/{node}/_config/{section}/{key}'
return self.get(url)
def config_delete(self, section, key, node='_local'):
url = f'_node/{node}/_config/{section}/{key}'
headers = {'x-couch-persist': 'false'}
return self.delete(url, headers = headers)
def system_get(self, node='_local'):
url = f'_node/{node}/_system'
return self.get(url)
def purged_infos_limit_get(self, dbname):
url = f'{dbname}/_purged_infos_limit'
return int(self.get(url))
def purged_infos_limit_set(self, dbname, val):
val = int(val)
url = f'{dbname}/_purged_infos_limit'
return self.put(url, data=str(val))
def parse_headers(headers):
if not isinstance(headers, list):
raise Exception("Headers %s is not a list" % headers)
res = {}
for header in headers:
split_eq = header.split(':')
if len(split_eq) != 2:
raise Exception("Header %s doesn't look like header:value" % header)
[key, val] = split_eq
res[key.strip()] = val.strip()
return res
def add_view(srv, db, tag):
srv.put(db + '/_design/ddoc?w=3', json = {
"views": {
"view": {
"map": MAP_FUN % tag,
#"reduce": "_stats"
}
},
#"autoupdate": False
})
def delete_view(srv, db):
path = db + '/_design/ddoc'
if srv.head(path) == 404:
return
json = srv.get(path)
rev = json['_rev']
srv.delete(db + f'/_design/ddoc?rev={rev}&w=3')
def _rand_id():
return uuid.uuid4().hex
def doc(_id):
return {'_id':_id, 'data':_id}
def fill(args, dbname, srv):
batches = args.num_docs // BATCH
log(f" * Filling {dbname} {args.num_docs}")
for b in range(1, batches + 1):
srv.bulk_docs(dbname, [doc(_rand_id()) for docnum in range(BATCH)])
left = args.num_docs - batches * BATCH
srv.bulk_docs(dbname, [doc(_rand_id()) for docnum in range(left)])
def create_view(s, dbname):
delete_view(s, dbname)
time.sleep(1)
add_view(s, dbname, random.randrange(100000))
print(f"Building view")
while True:
state = s.get(f'{dbname}/_design/ddoc/_info')
updates = state['view_index']['updates_pending']['total']
if updates == 0:
break
time.sleep(1)
def query_view(s, dbname, cycles):
if cycles == 0:
return
dts = []
for i in range(cycles):
key = _rand_id()[:8]
t0 = time.time()
res = s.get(f'{dbname}/_design/ddoc/_view/view?start_key="{key}"&limit={Q}')
#print(key, res)
t1 = time.time()
dt = t1 - t0
dts.append(dt)
dts.sort()
s = sum(dts)
a = s / cycles
max_dt = max(dts)
med = dts[len(dts) // 2]
print(f"Query stats n: {cycles} avg:{a:.6f} max:{max_dt:.6f}, median:{med:.6f} ")
def main(args):
args = _get_auth(args)
s = Server(args)
if args.query_only:
query_view(s, args.dbname, args.cycles)
elif args.build_view_and_query:
create_view(s, args.dbname)
query_view(s, args.dbname, args.cycles)
else:
if args.dbname in s:
s.delete(args.dbname)
time.sleep(1)
s.put(args.dbname, params = {'q':Q})
fill(args, args.dbname, s)
create_view(s, args.dbname)
query_view(s, args.dbname, args.cycles)
def _args():
description = "Do a few crud operations as a stampede"
p = argparse.ArgumentParser(description = description)
p.add_argument('-u', '--url', default=URL, help="Server URL")
p.add_argument('-d', '--dbname', help = "DB name", default="db")
p.add_argument('-c', '--cycles', type=int, default=1)
p.add_argument('-n', '--num-docs', type=int, default=0)
p.add_argument('-b', '--build-view-and-query', action="store_true")
p.add_argument('-q', '--query-only', action="store_true")
p.add_argument('-a', '--auth', default=None, help = "user:pass or can be specified in AUTH env var")
p.add_argument('-e', '--headers', action="append", default=[], help = "Extra headers header=foo")
return p.parse_args()
def _get_auth(args):
"""
Load and cache adm auth stuff from ~/.clou most people already have it
set up so it makes it easier.
"""
if args.auth:
args.auth = tuple(args.auth.split(':'))
elif 'AUTH' in os.environ:
authstr = os.environ['AUTH']
args.auth = tuple(authstr.split(':'))
log(" *** Using auth", args.auth[0]," from AUTH env var")
else:
args.auth = get_adm_creds('~/.clou')
return args
def get_adm_creds(clou):
log(f" * Getting admin creds from {clou}")
cfg = configparser.ConfigParser()
cfg.read(os.path.expanduser(clou))
creds = (
cfg.get('cloudant', 'adm_user'),
cfg.get('cloudant', 'adm_password'),
)
if not creds[0] or not creds[1]:
raise Exception("Could not get admin creds")
return creds
if __name__=='__main__':
main(_args())
@nickva
Copy link
Author

nickva commented Feb 7, 2026

Create a 1M doc db and 10M rows view. Don't query it (cycles = 0)

./view_cycle.py -u http://localhost:15984 -a adm:pass -n 1000000 -c0

Query a view only 10k times (don't re-create db or view):

./view_cycle.py -u http://localhost:15984 -a adm:pass -n 1000000 -q -c10000

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment