Skip to content

Instantly share code, notes, and snippets.

@RJ-Infinity
Created September 20, 2024 17:38
Show Gist options
  • Select an option

  • Save RJ-Infinity/2ce0c5e643cc059f31c0a79f04caf067 to your computer and use it in GitHub Desktop.

Select an option

Save RJ-Infinity/2ce0c5e643cc059f31c0a79f04caf067 to your computer and use it in GitHub Desktop.
this decorator creates a task which automaticly starts when called and then can be awaited later or never at all and it will still continue running
from asyncio import create_task, Future
from functools import wraps
# this is needed as the create_task only holds a weak reference to the task so it could be garbage collected
_running_tasks = {}
def _get_id():
i=0
yield i
while True:
i+=1
yield i
_ids = _get_id()
async def _referenced_task(task, task_id: int, result: Future):
try:result.set_result(await task)
except Exception as e:
result.set_exception(e)
raise
# use finnaly to make sure exceptions bubble
finally:del _running_tasks[task_id]#anything after this is not garenteed to run
def task_run(fn):
@wraps(fn)
def inner(*args, **kwargs):
task_id=next(_ids)
result = Future()
_running_tasks[task_id] = create_task(_referenced_task(fn(*args, **kwargs), task_id, result))
return result
return inner
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment