Created
February 7, 2026 17:32
-
-
Save nickva/c018c6f982af3174c9b24f871a40c87e to your computer and use it in GitHub Desktop.
Create and query couchdb views
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| import os | |
| import sys | |
| import time | |
| import uuid | |
| import random | |
| import requests | |
| import argparse | |
| import configparser | |
| TIMEOUT=120 | |
| URL='http://localhost:5984' | |
| DBNAME = 'db' | |
| Q = '16' | |
| REQUEST_TIMEOUT_DEFAULT = 1000 | |
| BATCH = 1500 | |
| MAP_FUN = ''' | |
| function(doc){ | |
| for(var i=0;i<10;i++) { | |
| tag = %s; | |
| emit(doc._id+'_'+i, i*100); | |
| } | |
| } | |
| ''' | |
| def log(*args): | |
| sargs = [] | |
| for a in args: | |
| try: | |
| sargs.append(str(a)) | |
| except: | |
| sargs.append('?') | |
| msg = " ".join(sargs) | |
| sys.stderr.write(msg + '\n') | |
| sys.stderr.flush() | |
| class Server: | |
| def __init__(self, args, timeout=REQUEST_TIMEOUT_DEFAULT): | |
| self.sess = requests.Session() | |
| self.sess.auth = args.auth | |
| self.sess.headers.update(parse_headers(args.headers)) | |
| self.url = args.url.rstrip('/') | |
| self.timeout = timeout | |
| def _apply_timeout(self, kw): | |
| if self.timeout is not None and 'timeout' not in kw: | |
| kw['timeout'] = self.timeout | |
| return kw | |
| def get(self, path = '', **kw): | |
| kw = self._apply_timeout(kw) | |
| r = self.sess.get(self.url + '/' + path, **kw) | |
| r.raise_for_status() | |
| return r.json() | |
| def post(self, path, **kw): | |
| kw = self._apply_timeout(kw) | |
| r = self.sess.post(self.url + '/' + path, **kw) | |
| if r.status_code > 202: | |
| log(" post > 202 code", r.status_code, " request:", path) | |
| log(" args", kw) | |
| log(" response", r.json()) | |
| r.raise_for_status() | |
| return r.json() | |
| def put(self, path, **kw): | |
| kw = self._apply_timeout(kw) | |
| r = self.sess.put(self.url + '/' + path, **kw) | |
| r.raise_for_status() | |
| return r.json() | |
| def delete(self, path, **kw): | |
| kw = self._apply_timeout(kw) | |
| r = self.sess.delete(self.url + '/' + path, **kw) | |
| r.raise_for_status() | |
| return r.json() | |
| def head(self, path, **kw): | |
| kw = self._apply_timeout(kw) | |
| r = self.sess.head(self.url + '/' + path, **kw) | |
| return r.status_code | |
| def __iter__(self): | |
| dbs = self.get('_all_dbs') | |
| return iter(dbs) | |
| def __str__(self): | |
| return "<Server:%s>" % self.url | |
| def __iter__(self): | |
| dbs = self.get('_all_dbs') | |
| return iter(dbs) | |
| def __str__(self): | |
| return "<Server:%s>" % self.url | |
| def __contains__(self, dbname): | |
| res = self.sess.head(self.url + '/' + dbname) | |
| code = res.status_code | |
| if code == 200: | |
| return True | |
| if code == 404: | |
| return False | |
| raise Exception(f"Unexpected head status code {res}") | |
| # CouchDB specific methods | |
| def version(self): | |
| vjson = self.get() | |
| return vjson['version'] | |
| def bulk_docs(self, dbname, docs, new_edits=True, w=None): | |
| if w is not None: | |
| w = str(w) | |
| w_str = '?w=' + str(w) | |
| else: | |
| w_str = '' | |
| if new_edits: | |
| json = {'docs': docs} | |
| return self.post(dbname + '/_bulk_docs' + w_str, json = json) | |
| else: | |
| json = {'docs': docs, 'new_edits': False} | |
| return self.post(dbname + '/_bulk_docs' + w_str, json = json) | |
| def changes(self, dbname, style='main_only', since=0, limit=None): | |
| if limit is None: | |
| return self.get(f'{dbname}/_changes?style={style}&since={since}') | |
| else: | |
| return self.get(f'{dbname}/_changes?style={style}&limit={limit}&since={since}') | |
| def purge(self, dbname, revs): | |
| return self.post(dbname + '/_purge', json = revs) | |
| def set_deleted_document_ttl(self, dbname, val): | |
| body = {'deleted_document_ttl':val} | |
| return self.put(dbname + '/_auto_purge', json = body) | |
| def membership(self): | |
| return self.get('_membership') | |
| def config_set(self, section, key, val, node='_local'): | |
| url = f'_node/{node}/_config/{section}/{key}' | |
| val = str(val) | |
| headers = {'x-couch-persist': 'false'} | |
| return self.put(url, data='"'+val+'"', headers = headers) | |
| def config_get(self, section, key, node='_local'): | |
| url = f'_node/{node}/_config/{section}/{key}' | |
| return self.get(url) | |
| def config_delete(self, section, key, node='_local'): | |
| url = f'_node/{node}/_config/{section}/{key}' | |
| headers = {'x-couch-persist': 'false'} | |
| return self.delete(url, headers = headers) | |
| def system_get(self, node='_local'): | |
| url = f'_node/{node}/_system' | |
| return self.get(url) | |
| def purged_infos_limit_get(self, dbname): | |
| url = f'{dbname}/_purged_infos_limit' | |
| return int(self.get(url)) | |
| def purged_infos_limit_set(self, dbname, val): | |
| val = int(val) | |
| url = f'{dbname}/_purged_infos_limit' | |
| return self.put(url, data=str(val)) | |
| def parse_headers(headers): | |
| if not isinstance(headers, list): | |
| raise Exception("Headers %s is not a list" % headers) | |
| res = {} | |
| for header in headers: | |
| split_eq = header.split(':') | |
| if len(split_eq) != 2: | |
| raise Exception("Header %s doesn't look like header:value" % header) | |
| [key, val] = split_eq | |
| res[key.strip()] = val.strip() | |
| return res | |
| def add_view(srv, db, tag): | |
| srv.put(db + '/_design/ddoc?w=3', json = { | |
| "views": { | |
| "view": { | |
| "map": MAP_FUN % tag, | |
| #"reduce": "_stats" | |
| } | |
| }, | |
| #"autoupdate": False | |
| }) | |
| def delete_view(srv, db): | |
| path = db + '/_design/ddoc' | |
| if srv.head(path) == 404: | |
| return | |
| json = srv.get(path) | |
| rev = json['_rev'] | |
| srv.delete(db + f'/_design/ddoc?rev={rev}&w=3') | |
| def _rand_id(): | |
| return uuid.uuid4().hex | |
| def doc(_id): | |
| return {'_id':_id, 'data':_id} | |
| def fill(args, dbname, srv): | |
| batches = args.num_docs // BATCH | |
| log(f" * Filling {dbname} {args.num_docs}") | |
| for b in range(1, batches + 1): | |
| srv.bulk_docs(dbname, [doc(_rand_id()) for docnum in range(BATCH)]) | |
| left = args.num_docs - batches * BATCH | |
| srv.bulk_docs(dbname, [doc(_rand_id()) for docnum in range(left)]) | |
| def create_view(s, dbname): | |
| delete_view(s, dbname) | |
| time.sleep(1) | |
| add_view(s, dbname, random.randrange(100000)) | |
| print(f"Building view") | |
| while True: | |
| state = s.get(f'{dbname}/_design/ddoc/_info') | |
| updates = state['view_index']['updates_pending']['total'] | |
| if updates == 0: | |
| break | |
| time.sleep(1) | |
| def query_view(s, dbname, cycles): | |
| if cycles == 0: | |
| return | |
| dts = [] | |
| for i in range(cycles): | |
| key = _rand_id()[:8] | |
| t0 = time.time() | |
| res = s.get(f'{dbname}/_design/ddoc/_view/view?start_key="{key}"&limit={Q}') | |
| #print(key, res) | |
| t1 = time.time() | |
| dt = t1 - t0 | |
| dts.append(dt) | |
| dts.sort() | |
| s = sum(dts) | |
| a = s / cycles | |
| max_dt = max(dts) | |
| med = dts[len(dts) // 2] | |
| print(f"Query stats n: {cycles} avg:{a:.6f} max:{max_dt:.6f}, median:{med:.6f} ") | |
| def main(args): | |
| args = _get_auth(args) | |
| s = Server(args) | |
| if args.query_only: | |
| query_view(s, args.dbname, args.cycles) | |
| elif args.build_view_and_query: | |
| create_view(s, args.dbname) | |
| query_view(s, args.dbname, args.cycles) | |
| else: | |
| if args.dbname in s: | |
| s.delete(args.dbname) | |
| time.sleep(1) | |
| s.put(args.dbname, params = {'q':Q}) | |
| fill(args, args.dbname, s) | |
| create_view(s, args.dbname) | |
| query_view(s, args.dbname, args.cycles) | |
| def _args(): | |
| description = "Do a few crud operations as a stampede" | |
| p = argparse.ArgumentParser(description = description) | |
| p.add_argument('-u', '--url', default=URL, help="Server URL") | |
| p.add_argument('-d', '--dbname', help = "DB name", default="db") | |
| p.add_argument('-c', '--cycles', type=int, default=1) | |
| p.add_argument('-n', '--num-docs', type=int, default=0) | |
| p.add_argument('-b', '--build-view-and-query', action="store_true") | |
| p.add_argument('-q', '--query-only', action="store_true") | |
| p.add_argument('-a', '--auth', default=None, help = "user:pass or can be specified in AUTH env var") | |
| p.add_argument('-e', '--headers', action="append", default=[], help = "Extra headers header=foo") | |
| return p.parse_args() | |
| def _get_auth(args): | |
| """ | |
| Load and cache adm auth stuff from ~/.clou most people already have it | |
| set up so it makes it easier. | |
| """ | |
| if args.auth: | |
| args.auth = tuple(args.auth.split(':')) | |
| elif 'AUTH' in os.environ: | |
| authstr = os.environ['AUTH'] | |
| args.auth = tuple(authstr.split(':')) | |
| log(" *** Using auth", args.auth[0]," from AUTH env var") | |
| else: | |
| args.auth = get_adm_creds('~/.clou') | |
| return args | |
| def get_adm_creds(clou): | |
| log(f" * Getting admin creds from {clou}") | |
| cfg = configparser.ConfigParser() | |
| cfg.read(os.path.expanduser(clou)) | |
| creds = ( | |
| cfg.get('cloudant', 'adm_user'), | |
| cfg.get('cloudant', 'adm_password'), | |
| ) | |
| if not creds[0] or not creds[1]: | |
| raise Exception("Could not get admin creds") | |
| return creds | |
| if __name__=='__main__': | |
| main(_args()) |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Create a 1M doc db and 10M rows view. Don't query it (cycles = 0)
Query a view only 10k times (don't re-create db or view):