Skip to content

Instantly share code, notes, and snippets.

@keerah
Last active February 9, 2026 23:38
Show Gist options
  • Select an option

  • Save keerah/cbcda91f531a847ea2880ae056374972 to your computer and use it in GitHub Desktop.

Select an option

Save keerah/cbcda91f531a847ea2880ae056374972 to your computer and use it in GitHub Desktop.
ActivityWatch python script to query work apps usage/time track
# Script that computes how many hours was spent in a regex-specified "work" category for each day in a given time span.
# By Keerah.com, based on ActivityWatch example
# Output:
# Daily table with time and apps list sorted by the app usage
# App table by usage for entire period
# Verification of matches to make sure you measured nothing except the real work
# If you are getting unrelated matches you need to tighten your regex criteria
#
# To use it in Obsidian:
# Get Excecute Code extension for Obsidian
# Pip install aw_client (use Py <= 3.12)
# And use in a note like this:
# ```batch
# py @vault_path/Work/awquery.py "photoshop|teddy.*bears|texture\d*jpg|char.*blend" "25-03-11" "25-03-29" flood
# ```
# @vault_path is the Execute Code's magic variable
#
# Tested on Windows, may require edits to work on Mac
# If yo do not output to Obsidian, you can remove encode().decode chain in line 225 to get a normal UTF-8 output
import logging, re, socket, sys
from datetime import datetime as dt, timedelta as td, timezone
import aw_client
from aw_client import queries
from tabulate import tabulate
# new day starts at this shift instead of midnight
day_offset = td(hours=4)
# table formatting, change to "simple" for a plain table
table_format = "pipe"
# limit the number of matches output for verification
match_limit = 100
def _pretty_td(timedelta: td | list[td]) -> str:
if type(timedelta) == list:
atd = sum([td(seconds=ev['duration']) for ev in timedelta], td())
else:
atd = timedelta
s = str(atd)
s = re.sub(r"^(0+[:]?)+", "", s)
s = s.rjust(len(str(atd)), " ")
s = re.sub(r"[.]\d+", "", s)
return s
assert _pretty_td(td(seconds=120)) == " 2:00"
assert _pretty_td(td(hours=9, minutes=5)) == "9:05:00"
def query(regex: str, timeperiods: list[tuple[dt, dt]], hostname: str, flooding: bool):
categories: list[tuple[list[str], dict]] = [
(
["Work"],
{
"type": "regex",
"regex": regex,
"ignore_case": True,
},
)
]
aw = aw_client.ActivityWatchClient(client_name="my_working_hours")
canonicalQuery = queries.canonicalEvents(
queries.DesktopQueryParams(
bid_window=f"aw-watcher-window_{hostname}",
bid_afk=f"aw-watcher-afk_{hostname}",
classes=categories,
filter_classes=[["Work"]],
filter_afk=True,
include_audible=False,
)
)
query = f"""
{canonicalQuery}
{'events = flood(events);' if flooding else ''}
events2 = chunk_events_by_key(events, "title");
events2 = merge_events_by_keys(events2, ["title"]);
events = merge_events_by_keys(events, ["app"]);
duration = sum_durations(events);
RETURN = {{"events": events, "duration": duration, "events2": events2}};
"""
res = aw.query(query, timeperiods)
return res
def main():
# parsing arguments
# usage example:
# py awquery.py "photoshop|teddy.*bears|texture\d*jpg|char.*blend" "25-03-11" "25-03-29" flood
if len(sys.argv) < 4:
print("Usage: py awquery.py <regex: \"str\"> <start date: \"YY-MM-DD\"> <end date: \"YY-MM-DD>\" <flood: on if present>")
exit(1)
try:
regex = sys.argv[1]
except Exception as e:
print("First argument should be a regexp string")
exit(1)
try:
sdate = dt.strptime(sys.argv[2], '%y-%m-%d').astimezone()
except Exception as e:
print("The start date format must be YY-MM-DD")
exit(1)
try:
edate = dt.strptime(sys.argv[3], '%y-%m-%d').astimezone()
except Exception as e:
print("The end date format must be YY-MM-DD")
exit(1)
if sdate > edate:
print("Start date can't be later than the end date!")
exit(1)
if len(sys.argv) == 5:
flooding = True if sys.argv[4].lower() == 'flood' else False
else:
flooding = False
hostname = socket.gethostname()
sdate = (dt.combine(sdate.date(), sdate.time()) + day_offset).astimezone()
days = int((edate - sdate).days) + 2
# daily periods to query
oneday = td(days=1)
timeperiods = [(sdate + i*oneday, sdate + (i+1)*oneday) for i in range(days)]
timeperiods.reverse()
res = query(regex, timeperiods, hostname, flooding)
print(f"ActivityWatch data for host \"{hostname}\" using:")
print(f"- regex : `{regex}`")
print(f"- flooding : {'on' if flooding == True else 'off'}")
print(f"- day offset : {day_offset}")
print(f"- period : {sdate.strftime('%a, %#d %b %Y')} - {edate.strftime('%a, %#d %b %Y')} = {days} days")
print("")
tab_out: list = []
total_apps: dict = {}
for i, (start, _) in enumerate(timeperiods):
# list of apps sorted by the usage time
apps: list[tuple[float,str]] = sorted(
[
(evi.get('duration',0),
evi.get('data',{}).get('app','').replace('.exe',''))
for evi in res[i]["events"]
],
reverse=True
)
tab_out.append(
[
start.date(),
_pretty_td(res[i]['events']),
len(res[i]["events"]),
', '.join([a for _, a in apps])]
)
# collect total app usage
for dur, app in apps:
if total_apps.get(app,None) is None:
total_apps[app] = dur
else:
total_apps[app] += dur
print(
tabulate(
tab_out,
headers=["Date", "Duration", "Events", "Apps"],
colalign=("left", "right"),
tablefmt=table_format,
)
)
print("")
total = 0
count = 0
for events in res:
count += len(events['events'])
total += sum([ev['duration'] for ev in events['events']])
total_hours, remn = divmod(total, 3600)
total_hours = int(total_hours)
total_mins, _ = divmod(remn, 60)
total_mins = int(total_mins)
print(f"Total: {_pretty_td(td(seconds=total))} = {total_hours:02}:{total_mins:02} (h:m) in {count} events")
print("")
total_apps = dict(sorted(total_apps.items(), key=lambda item: item[1], reverse=True))
print(
tabulate(
[(app, _pretty_td(td(seconds=dur))) for app, dur in total_apps.items()],
headers=["App", "Duration"],
colalign=("left", "right"),
tablefmt=table_format,
)
)
print("")
print(f"Total apps used: {len(total_apps)}")
# Output matches for verification
# The output is from the start of the title to the end of the match if the entire output is not equal to the match
# Otherwise it outputs from start of the match till the next space
matchsearch = re.compile(regex, flags=re.I)
spacesearch = re.compile(r'(\|)|(\-)|(\︱)')
matches = set()
for events in res:
for ev2 in events['events2']:
title: str = ev2['data']['title']
matched = matchsearch.search(title)
if matched:
match_output: str = title[:matched.end()]
if match_output == matched.group():
next_space = spacesearch.search(title, pos=matched.end())
end = None if next_space is None else next_space.end()
match_output = title[matched.start():end]
if match_output == matched.group():
match_output = title[matched.start():]
matches.add(match_output.strip(" -|︱").encode("unicode_escape").decode("utf_8")) # encode/decode is required only for Obsidian Execute Code
if len(matches) > match_limit:
ver_header = f"First {match_limit} matches out of {len(matches)} for matching verification:"
else:
ver_header = f"{len(matches)} matches for matching verification:"
print("")
print(ver_header)
print("-"*len(ver_header))
print(f"{'\n'.join(list(matches)[:match_limit])}")
if __name__ == "__main__":
# ignore log warnings in aw_transform
logging.getLogger("aw_transform").setLevel(logging.ERROR)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment