Last active
February 9, 2026 23:38
-
-
Save keerah/cbcda91f531a847ea2880ae056374972 to your computer and use it in GitHub Desktop.
ActivityWatch python script to query work apps usage/time track
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Script that computes how many hours was spent in a regex-specified "work" category for each day in a given time span. | |
| # By Keerah.com, based on ActivityWatch example | |
| # Output: | |
| # Daily table with time and apps list sorted by the app usage | |
| # App table by usage for entire period | |
| # Verification of matches to make sure you measured nothing except the real work | |
| # If you are getting unrelated matches you need to tighten your regex criteria | |
| # | |
| # To use it in Obsidian: | |
| # Get Excecute Code extension for Obsidian | |
| # Pip install aw_client (use Py <= 3.12) | |
| # And use in a note like this: | |
| # ```batch | |
| # py @vault_path/Work/awquery.py "photoshop|teddy.*bears|texture\d*jpg|char.*blend" "25-03-11" "25-03-29" flood | |
| # ``` | |
| # @vault_path is the Execute Code's magic variable | |
| # | |
| # Tested on Windows, may require edits to work on Mac | |
| # If yo do not output to Obsidian, you can remove encode().decode chain in line 225 to get a normal UTF-8 output | |
| import logging, re, socket, sys | |
| from datetime import datetime as dt, timedelta as td, timezone | |
| import aw_client | |
| from aw_client import queries | |
| from tabulate import tabulate | |
| # new day starts at this shift instead of midnight | |
| day_offset = td(hours=4) | |
| # table formatting, change to "simple" for a plain table | |
| table_format = "pipe" | |
| # limit the number of matches output for verification | |
| match_limit = 100 | |
| def _pretty_td(timedelta: td | list[td]) -> str: | |
| if type(timedelta) == list: | |
| atd = sum([td(seconds=ev['duration']) for ev in timedelta], td()) | |
| else: | |
| atd = timedelta | |
| s = str(atd) | |
| s = re.sub(r"^(0+[:]?)+", "", s) | |
| s = s.rjust(len(str(atd)), " ") | |
| s = re.sub(r"[.]\d+", "", s) | |
| return s | |
| assert _pretty_td(td(seconds=120)) == " 2:00" | |
| assert _pretty_td(td(hours=9, minutes=5)) == "9:05:00" | |
| def query(regex: str, timeperiods: list[tuple[dt, dt]], hostname: str, flooding: bool): | |
| categories: list[tuple[list[str], dict]] = [ | |
| ( | |
| ["Work"], | |
| { | |
| "type": "regex", | |
| "regex": regex, | |
| "ignore_case": True, | |
| }, | |
| ) | |
| ] | |
| aw = aw_client.ActivityWatchClient(client_name="my_working_hours") | |
| canonicalQuery = queries.canonicalEvents( | |
| queries.DesktopQueryParams( | |
| bid_window=f"aw-watcher-window_{hostname}", | |
| bid_afk=f"aw-watcher-afk_{hostname}", | |
| classes=categories, | |
| filter_classes=[["Work"]], | |
| filter_afk=True, | |
| include_audible=False, | |
| ) | |
| ) | |
| query = f""" | |
| {canonicalQuery} | |
| {'events = flood(events);' if flooding else ''} | |
| events2 = chunk_events_by_key(events, "title"); | |
| events2 = merge_events_by_keys(events2, ["title"]); | |
| events = merge_events_by_keys(events, ["app"]); | |
| duration = sum_durations(events); | |
| RETURN = {{"events": events, "duration": duration, "events2": events2}}; | |
| """ | |
| res = aw.query(query, timeperiods) | |
| return res | |
| def main(): | |
| # parsing arguments | |
| # usage example: | |
| # py awquery.py "photoshop|teddy.*bears|texture\d*jpg|char.*blend" "25-03-11" "25-03-29" flood | |
| if len(sys.argv) < 4: | |
| print("Usage: py awquery.py <regex: \"str\"> <start date: \"YY-MM-DD\"> <end date: \"YY-MM-DD>\" <flood: on if present>") | |
| exit(1) | |
| try: | |
| regex = sys.argv[1] | |
| except Exception as e: | |
| print("First argument should be a regexp string") | |
| exit(1) | |
| try: | |
| sdate = dt.strptime(sys.argv[2], '%y-%m-%d').astimezone() | |
| except Exception as e: | |
| print("The start date format must be YY-MM-DD") | |
| exit(1) | |
| try: | |
| edate = dt.strptime(sys.argv[3], '%y-%m-%d').astimezone() | |
| except Exception as e: | |
| print("The end date format must be YY-MM-DD") | |
| exit(1) | |
| if sdate > edate: | |
| print("Start date can't be later than the end date!") | |
| exit(1) | |
| if len(sys.argv) == 5: | |
| flooding = True if sys.argv[4].lower() == 'flood' else False | |
| else: | |
| flooding = False | |
| hostname = socket.gethostname() | |
| sdate = (dt.combine(sdate.date(), sdate.time()) + day_offset).astimezone() | |
| days = int((edate - sdate).days) + 2 | |
| # daily periods to query | |
| oneday = td(days=1) | |
| timeperiods = [(sdate + i*oneday, sdate + (i+1)*oneday) for i in range(days)] | |
| timeperiods.reverse() | |
| res = query(regex, timeperiods, hostname, flooding) | |
| print(f"ActivityWatch data for host \"{hostname}\" using:") | |
| print(f"- regex : `{regex}`") | |
| print(f"- flooding : {'on' if flooding == True else 'off'}") | |
| print(f"- day offset : {day_offset}") | |
| print(f"- period : {sdate.strftime('%a, %#d %b %Y')} - {edate.strftime('%a, %#d %b %Y')} = {days} days") | |
| print("") | |
| tab_out: list = [] | |
| total_apps: dict = {} | |
| for i, (start, _) in enumerate(timeperiods): | |
| # list of apps sorted by the usage time | |
| apps: list[tuple[float,str]] = sorted( | |
| [ | |
| (evi.get('duration',0), | |
| evi.get('data',{}).get('app','').replace('.exe','')) | |
| for evi in res[i]["events"] | |
| ], | |
| reverse=True | |
| ) | |
| tab_out.append( | |
| [ | |
| start.date(), | |
| _pretty_td(res[i]['events']), | |
| len(res[i]["events"]), | |
| ', '.join([a for _, a in apps])] | |
| ) | |
| # collect total app usage | |
| for dur, app in apps: | |
| if total_apps.get(app,None) is None: | |
| total_apps[app] = dur | |
| else: | |
| total_apps[app] += dur | |
| print( | |
| tabulate( | |
| tab_out, | |
| headers=["Date", "Duration", "Events", "Apps"], | |
| colalign=("left", "right"), | |
| tablefmt=table_format, | |
| ) | |
| ) | |
| print("") | |
| total = 0 | |
| count = 0 | |
| for events in res: | |
| count += len(events['events']) | |
| total += sum([ev['duration'] for ev in events['events']]) | |
| total_hours, remn = divmod(total, 3600) | |
| total_hours = int(total_hours) | |
| total_mins, _ = divmod(remn, 60) | |
| total_mins = int(total_mins) | |
| print(f"Total: {_pretty_td(td(seconds=total))} = {total_hours:02}:{total_mins:02} (h:m) in {count} events") | |
| print("") | |
| total_apps = dict(sorted(total_apps.items(), key=lambda item: item[1], reverse=True)) | |
| print( | |
| tabulate( | |
| [(app, _pretty_td(td(seconds=dur))) for app, dur in total_apps.items()], | |
| headers=["App", "Duration"], | |
| colalign=("left", "right"), | |
| tablefmt=table_format, | |
| ) | |
| ) | |
| print("") | |
| print(f"Total apps used: {len(total_apps)}") | |
| # Output matches for verification | |
| # The output is from the start of the title to the end of the match if the entire output is not equal to the match | |
| # Otherwise it outputs from start of the match till the next space | |
| matchsearch = re.compile(regex, flags=re.I) | |
| spacesearch = re.compile(r'(\|)|(\-)|(\︱)') | |
| matches = set() | |
| for events in res: | |
| for ev2 in events['events2']: | |
| title: str = ev2['data']['title'] | |
| matched = matchsearch.search(title) | |
| if matched: | |
| match_output: str = title[:matched.end()] | |
| if match_output == matched.group(): | |
| next_space = spacesearch.search(title, pos=matched.end()) | |
| end = None if next_space is None else next_space.end() | |
| match_output = title[matched.start():end] | |
| if match_output == matched.group(): | |
| match_output = title[matched.start():] | |
| matches.add(match_output.strip(" -|︱").encode("unicode_escape").decode("utf_8")) # encode/decode is required only for Obsidian Execute Code | |
| if len(matches) > match_limit: | |
| ver_header = f"First {match_limit} matches out of {len(matches)} for matching verification:" | |
| else: | |
| ver_header = f"{len(matches)} matches for matching verification:" | |
| print("") | |
| print(ver_header) | |
| print("-"*len(ver_header)) | |
| print(f"{'\n'.join(list(matches)[:match_limit])}") | |
| if __name__ == "__main__": | |
| # ignore log warnings in aw_transform | |
| logging.getLogger("aw_transform").setLevel(logging.ERROR) | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment