Skip to content

Instantly share code, notes, and snippets.

@leewardbound
Created December 17, 2025 20:46
Show Gist options
  • Select an option

  • Save leewardbound/c861fdfa53c119d7e1aa2d2f181435a1 to your computer and use it in GitHub Desktop.

Select an option

Save leewardbound/c861fdfa53c119d7e1aa2d2f181435a1 to your computer and use it in GitHub Desktop.
Django Async Task Model Mixin Helper
from datetime import datetime, timedelta, timezone
from django.db import models
class AsyncTaskStatus(models.TextChoices):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
def make_async_task_mixin(task_name: str):
"""
Dynamically create an abstract mixin that adds async-task tracking fields
and lifecycle hooks for the given task name.
Usage:
- Add the mixin to a Django model: `class Foo(make_async_task_mixin("bar"), models.Model): ...`
- Implement the instance runner: `def async_bar_run_async_impl(self, *args, **kwargs): ...`
- (Optional) react to progress or completion:
- `def async_bar_on_progress(self, progress_pct: int): ...` (default stores pct)
- `def async_bar_on_complete(self): ...`
- Queue a job from an instance: `instance.async_bar_start()`
which enqueues Django-Q task pointing to `Model.async_bar_run`.
- The classmethod runner loads the instance by id, marks in-progress,
executes the instance impl, updates success/failure, and triggers hooks.
- Fields added (per task): status, started_at, completed_at, task_id,
error, traceback, progress_pct (default 0..100).
"""
prefix = f"async_{task_name}"
def get_task_path(self) -> str:
"""
Resolve the dotted callable path for the classmethod runner so the mixin
works across apps. Defaults to the model's module and classmethod name.
"""
return f"{self.__module__}.{self.__class__.__name__}.{prefix}_run"
def start_async(self) -> str:
from django_q.tasks import async_task
task_path = self.__getattribute__(f"{prefix}_get_task_path")()
now = datetime.now(tz=timezone.utc)
setattr(self, f"{prefix}_started_at", now)
setattr(self, f"{prefix}_status", AsyncTaskStatus.IN_PROGRESS)
self.save(update_fields=[f"{prefix}_started_at", f"{prefix}_status"])
task_id = async_task(task_path, self.id)
self.__getattribute__(f"_{prefix}_persist_task_id")(task_id)
self.__getattribute__(f"{prefix}_on_progress")(0)
return task_id
def cancel_async(self, reason: str = "Cancelled") -> str:
now = datetime.now(tz=timezone.utc)
setattr(self, f"{prefix}_status", AsyncTaskStatus.FAILED)
setattr(self, f"{prefix}_error", reason)
setattr(self, f"{prefix}_completed_at", now)
self.save(
update_fields=[
f"{prefix}_status",
f"{prefix}_error",
f"{prefix}_completed_at",
]
)
from django_q.tasks import Task
Task.objects.filter(id=self.__getattribute__(f"{prefix}_task_id")).delete()
return reason
def reset_async(self) -> None:
fields = {
f"{prefix}_status": AsyncTaskStatus.PENDING,
f"{prefix}_started_at": None,
f"{prefix}_completed_at": None,
f"{prefix}_task_id": None,
f"{prefix}_error": None,
f"{prefix}_traceback": None,
f"{prefix}_progress_pct": 0,
}
for field_name, value in fields.items():
setattr(self, field_name, value)
self.save(update_fields=list(fields.keys()))
def run_async_impl(self, *args, **kwargs) -> None:
"""
Override in subclasses to execute the actual task logic on the instance.
"""
raise NotImplementedError(f"Implement {prefix}_run_impl logic")
def _mark_failed(self, exc: Exception) -> None:
import traceback as _traceback
now = datetime.now(tz=timezone.utc)
setattr(self, f"{prefix}_status", AsyncTaskStatus.FAILED)
setattr(self, f"{prefix}_error", str(exc))
setattr(self, f"{prefix}_traceback", "".join(_traceback.format_exception(exc)))
setattr(self, f"{prefix}_completed_at", now)
self.save(
update_fields=[
f"{prefix}_status",
f"{prefix}_error",
f"{prefix}_traceback",
f"{prefix}_completed_at",
]
)
def _mark_success(self) -> None:
now = datetime.now(tz=timezone.utc)
setattr(self, f"{prefix}_status", AsyncTaskStatus.COMPLETED)
setattr(self, f"{prefix}_completed_at", now)
setattr(self, f"{prefix}_error", None)
setattr(self, f"{prefix}_traceback", None)
setattr(self, f"{prefix}_progress_pct", 100)
self.save(
update_fields=[
f"{prefix}_status",
f"{prefix}_completed_at",
f"{prefix}_error",
f"{prefix}_traceback",
f"{prefix}_progress_pct",
]
)
# Optional user hook
getattr(self, f"{prefix}_on_complete")()
def _mark_in_progress(self) -> None:
now = datetime.now(tz=timezone.utc)
setattr(self, f"{prefix}_started_at", now)
setattr(self, f"{prefix}_status", AsyncTaskStatus.IN_PROGRESS)
self.save(update_fields=[f"{prefix}_started_at", f"{prefix}_status"])
# Optional user hook
getattr(self, f"{prefix}_on_progress")(0)
def _persist_task_id(self, task_id: str) -> None:
setattr(self, f"{prefix}_task_id", task_id)
self.save(update_fields=[f"{prefix}_task_id"])
def get_eta_remaining(self) -> int:
progress_pct = getattr(self, f"{prefix}_progress_pct")
if progress_pct == 0:
return 0
started_at = getattr(self, f"{prefix}_started_at")
return (100 - progress_pct) * (datetime.now(tz=timezone.utc) - started_at).total_seconds() / progress_pct
def get_eta_datetime(self) -> datetime:
progress_pct = getattr(self, f"{prefix}_progress_pct")
if progress_pct == 0:
return None
eta_remaining = getattr(self, f"{prefix}_get_eta_remaining")()
return datetime.now(tz=timezone.utc) + timedelta(seconds=eta_remaining)
def on_progress(self, progress_pct: int) -> None:
"""
Hook for incremental progress updates (0-100). Default stores the pct.
Override to emit events/metrics but call super() if you still want storage.
"""
setattr(self, f"{prefix}_progress_pct", progress_pct)
self.save(update_fields=[f"{prefix}_progress_pct"])
return progress_pct
def on_complete(self) -> None:
"""Override to react when task completes successfully."""
return None
@classmethod
def run_async(cls, obj_id, *args, **kwargs):
"""
Entry point invoked by the async task worker. Fetches the instance,
marks progress, executes the instance impl, and records success/failure.
"""
obj = cls.objects.get(pk=obj_id)
obj.__getattribute__(f"_{prefix}_mark_in_progress")()
try:
result = obj.__getattribute__(f"{prefix}_run_async_impl")(*args, **kwargs)
obj.__getattribute__(f"_{prefix}_mark_success")()
return result
except Exception as exc:
obj.__getattribute__(f"_{prefix}_mark_failed")(exc)
raise
# Give each hook a unique, descriptive name tied to the task so Django-Q
# can import the classmethod runner and the instance can expose helpers like
# async_<task>_start/async_<task>_cancel/etc.
get_task_path.__name__ = f"{prefix}_get_task_path"
start_async.__name__ = f"{prefix}_start"
cancel_async.__name__ = f"{prefix}_cancel"
reset_async.__name__ = f"{prefix}_reset"
run_async.__name__ = f"{prefix}_run"
run_async_impl.__name__ = f"{prefix}_run_async_impl"
on_progress.__name__ = f"{prefix}_on_progress"
on_complete.__name__ = f"{prefix}_on_complete"
get_eta_remaining.__name__ = f"{prefix}_get_eta_remaining"
get_eta_datetime.__name__ = f"{prefix}_get_eta_datetime"
_mark_failed.__name__ = f"_{prefix}_mark_failed"
_mark_success.__name__ = f"_{prefix}_mark_success"
_mark_in_progress.__name__ = f"_{prefix}_mark_in_progress"
_persist_task_id.__name__ = f"_{prefix}_persist_task_id"
return type(
f"{task_name}AsyncTaskMixin",
(models.Model,),
{
"__module__": __name__,
# Keep abstract to avoid migrations; intended to be mixed into concrete models.
"Meta": type("Meta", (), {"abstract": True}),
f"{prefix}_status": models.CharField(
max_length=255,
null=True,
blank=True,
choices=AsyncTaskStatus.choices,
default=AsyncTaskStatus.PENDING,
),
f"{prefix}_started_at": models.DateTimeField(null=True, blank=True),
f"{prefix}_completed_at": models.DateTimeField(null=True, blank=True),
f"{prefix}_task_id": models.CharField(max_length=255, null=True, blank=True),
f"{prefix}_error": models.TextField(null=True, blank=True),
f"{prefix}_traceback": models.TextField(null=True, blank=True),
f"{prefix}_progress_pct": models.IntegerField(default=0),
get_task_path.__name__: get_task_path,
start_async.__name__: start_async,
cancel_async.__name__: cancel_async,
reset_async.__name__: reset_async,
run_async.__name__: classmethod(run_async),
run_async_impl.__name__: run_async_impl,
_mark_failed.__name__: _mark_failed,
_mark_success.__name__: _mark_success,
_mark_in_progress.__name__: _mark_in_progress,
_persist_task_id.__name__: _persist_task_id,
get_eta_remaining.__name__: get_eta_remaining,
get_eta_datetime.__name__: get_eta_datetime,
on_progress.__name__: on_progress,
on_complete.__name__: on_complete,
},
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment