Created
December 17, 2025 20:46
-
-
Save leewardbound/c861fdfa53c119d7e1aa2d2f181435a1 to your computer and use it in GitHub Desktop.
Django Async Task Model Mixin Helper
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from datetime import datetime, timedelta, timezone | |
| from django.db import models | |
| class AsyncTaskStatus(models.TextChoices): | |
| PENDING = "pending" | |
| IN_PROGRESS = "in_progress" | |
| COMPLETED = "completed" | |
| FAILED = "failed" | |
| def make_async_task_mixin(task_name: str): | |
| """ | |
| Dynamically create an abstract mixin that adds async-task tracking fields | |
| and lifecycle hooks for the given task name. | |
| Usage: | |
| - Add the mixin to a Django model: `class Foo(make_async_task_mixin("bar"), models.Model): ...` | |
| - Implement the instance runner: `def async_bar_run_async_impl(self, *args, **kwargs): ...` | |
| - (Optional) react to progress or completion: | |
| - `def async_bar_on_progress(self, progress_pct: int): ...` (default stores pct) | |
| - `def async_bar_on_complete(self): ...` | |
| - Queue a job from an instance: `instance.async_bar_start()` | |
| which enqueues Django-Q task pointing to `Model.async_bar_run`. | |
| - The classmethod runner loads the instance by id, marks in-progress, | |
| executes the instance impl, updates success/failure, and triggers hooks. | |
| - Fields added (per task): status, started_at, completed_at, task_id, | |
| error, traceback, progress_pct (default 0..100). | |
| """ | |
| prefix = f"async_{task_name}" | |
| def get_task_path(self) -> str: | |
| """ | |
| Resolve the dotted callable path for the classmethod runner so the mixin | |
| works across apps. Defaults to the model's module and classmethod name. | |
| """ | |
| return f"{self.__module__}.{self.__class__.__name__}.{prefix}_run" | |
| def start_async(self) -> str: | |
| from django_q.tasks import async_task | |
| task_path = self.__getattribute__(f"{prefix}_get_task_path")() | |
| now = datetime.now(tz=timezone.utc) | |
| setattr(self, f"{prefix}_started_at", now) | |
| setattr(self, f"{prefix}_status", AsyncTaskStatus.IN_PROGRESS) | |
| self.save(update_fields=[f"{prefix}_started_at", f"{prefix}_status"]) | |
| task_id = async_task(task_path, self.id) | |
| self.__getattribute__(f"_{prefix}_persist_task_id")(task_id) | |
| self.__getattribute__(f"{prefix}_on_progress")(0) | |
| return task_id | |
| def cancel_async(self, reason: str = "Cancelled") -> str: | |
| now = datetime.now(tz=timezone.utc) | |
| setattr(self, f"{prefix}_status", AsyncTaskStatus.FAILED) | |
| setattr(self, f"{prefix}_error", reason) | |
| setattr(self, f"{prefix}_completed_at", now) | |
| self.save( | |
| update_fields=[ | |
| f"{prefix}_status", | |
| f"{prefix}_error", | |
| f"{prefix}_completed_at", | |
| ] | |
| ) | |
| from django_q.tasks import Task | |
| Task.objects.filter(id=self.__getattribute__(f"{prefix}_task_id")).delete() | |
| return reason | |
| def reset_async(self) -> None: | |
| fields = { | |
| f"{prefix}_status": AsyncTaskStatus.PENDING, | |
| f"{prefix}_started_at": None, | |
| f"{prefix}_completed_at": None, | |
| f"{prefix}_task_id": None, | |
| f"{prefix}_error": None, | |
| f"{prefix}_traceback": None, | |
| f"{prefix}_progress_pct": 0, | |
| } | |
| for field_name, value in fields.items(): | |
| setattr(self, field_name, value) | |
| self.save(update_fields=list(fields.keys())) | |
| def run_async_impl(self, *args, **kwargs) -> None: | |
| """ | |
| Override in subclasses to execute the actual task logic on the instance. | |
| """ | |
| raise NotImplementedError(f"Implement {prefix}_run_impl logic") | |
| def _mark_failed(self, exc: Exception) -> None: | |
| import traceback as _traceback | |
| now = datetime.now(tz=timezone.utc) | |
| setattr(self, f"{prefix}_status", AsyncTaskStatus.FAILED) | |
| setattr(self, f"{prefix}_error", str(exc)) | |
| setattr(self, f"{prefix}_traceback", "".join(_traceback.format_exception(exc))) | |
| setattr(self, f"{prefix}_completed_at", now) | |
| self.save( | |
| update_fields=[ | |
| f"{prefix}_status", | |
| f"{prefix}_error", | |
| f"{prefix}_traceback", | |
| f"{prefix}_completed_at", | |
| ] | |
| ) | |
| def _mark_success(self) -> None: | |
| now = datetime.now(tz=timezone.utc) | |
| setattr(self, f"{prefix}_status", AsyncTaskStatus.COMPLETED) | |
| setattr(self, f"{prefix}_completed_at", now) | |
| setattr(self, f"{prefix}_error", None) | |
| setattr(self, f"{prefix}_traceback", None) | |
| setattr(self, f"{prefix}_progress_pct", 100) | |
| self.save( | |
| update_fields=[ | |
| f"{prefix}_status", | |
| f"{prefix}_completed_at", | |
| f"{prefix}_error", | |
| f"{prefix}_traceback", | |
| f"{prefix}_progress_pct", | |
| ] | |
| ) | |
| # Optional user hook | |
| getattr(self, f"{prefix}_on_complete")() | |
| def _mark_in_progress(self) -> None: | |
| now = datetime.now(tz=timezone.utc) | |
| setattr(self, f"{prefix}_started_at", now) | |
| setattr(self, f"{prefix}_status", AsyncTaskStatus.IN_PROGRESS) | |
| self.save(update_fields=[f"{prefix}_started_at", f"{prefix}_status"]) | |
| # Optional user hook | |
| getattr(self, f"{prefix}_on_progress")(0) | |
| def _persist_task_id(self, task_id: str) -> None: | |
| setattr(self, f"{prefix}_task_id", task_id) | |
| self.save(update_fields=[f"{prefix}_task_id"]) | |
| def get_eta_remaining(self) -> int: | |
| progress_pct = getattr(self, f"{prefix}_progress_pct") | |
| if progress_pct == 0: | |
| return 0 | |
| started_at = getattr(self, f"{prefix}_started_at") | |
| return (100 - progress_pct) * (datetime.now(tz=timezone.utc) - started_at).total_seconds() / progress_pct | |
| def get_eta_datetime(self) -> datetime: | |
| progress_pct = getattr(self, f"{prefix}_progress_pct") | |
| if progress_pct == 0: | |
| return None | |
| eta_remaining = getattr(self, f"{prefix}_get_eta_remaining")() | |
| return datetime.now(tz=timezone.utc) + timedelta(seconds=eta_remaining) | |
| def on_progress(self, progress_pct: int) -> None: | |
| """ | |
| Hook for incremental progress updates (0-100). Default stores the pct. | |
| Override to emit events/metrics but call super() if you still want storage. | |
| """ | |
| setattr(self, f"{prefix}_progress_pct", progress_pct) | |
| self.save(update_fields=[f"{prefix}_progress_pct"]) | |
| return progress_pct | |
| def on_complete(self) -> None: | |
| """Override to react when task completes successfully.""" | |
| return None | |
| @classmethod | |
| def run_async(cls, obj_id, *args, **kwargs): | |
| """ | |
| Entry point invoked by the async task worker. Fetches the instance, | |
| marks progress, executes the instance impl, and records success/failure. | |
| """ | |
| obj = cls.objects.get(pk=obj_id) | |
| obj.__getattribute__(f"_{prefix}_mark_in_progress")() | |
| try: | |
| result = obj.__getattribute__(f"{prefix}_run_async_impl")(*args, **kwargs) | |
| obj.__getattribute__(f"_{prefix}_mark_success")() | |
| return result | |
| except Exception as exc: | |
| obj.__getattribute__(f"_{prefix}_mark_failed")(exc) | |
| raise | |
| # Give each hook a unique, descriptive name tied to the task so Django-Q | |
| # can import the classmethod runner and the instance can expose helpers like | |
| # async_<task>_start/async_<task>_cancel/etc. | |
| get_task_path.__name__ = f"{prefix}_get_task_path" | |
| start_async.__name__ = f"{prefix}_start" | |
| cancel_async.__name__ = f"{prefix}_cancel" | |
| reset_async.__name__ = f"{prefix}_reset" | |
| run_async.__name__ = f"{prefix}_run" | |
| run_async_impl.__name__ = f"{prefix}_run_async_impl" | |
| on_progress.__name__ = f"{prefix}_on_progress" | |
| on_complete.__name__ = f"{prefix}_on_complete" | |
| get_eta_remaining.__name__ = f"{prefix}_get_eta_remaining" | |
| get_eta_datetime.__name__ = f"{prefix}_get_eta_datetime" | |
| _mark_failed.__name__ = f"_{prefix}_mark_failed" | |
| _mark_success.__name__ = f"_{prefix}_mark_success" | |
| _mark_in_progress.__name__ = f"_{prefix}_mark_in_progress" | |
| _persist_task_id.__name__ = f"_{prefix}_persist_task_id" | |
| return type( | |
| f"{task_name}AsyncTaskMixin", | |
| (models.Model,), | |
| { | |
| "__module__": __name__, | |
| # Keep abstract to avoid migrations; intended to be mixed into concrete models. | |
| "Meta": type("Meta", (), {"abstract": True}), | |
| f"{prefix}_status": models.CharField( | |
| max_length=255, | |
| null=True, | |
| blank=True, | |
| choices=AsyncTaskStatus.choices, | |
| default=AsyncTaskStatus.PENDING, | |
| ), | |
| f"{prefix}_started_at": models.DateTimeField(null=True, blank=True), | |
| f"{prefix}_completed_at": models.DateTimeField(null=True, blank=True), | |
| f"{prefix}_task_id": models.CharField(max_length=255, null=True, blank=True), | |
| f"{prefix}_error": models.TextField(null=True, blank=True), | |
| f"{prefix}_traceback": models.TextField(null=True, blank=True), | |
| f"{prefix}_progress_pct": models.IntegerField(default=0), | |
| get_task_path.__name__: get_task_path, | |
| start_async.__name__: start_async, | |
| cancel_async.__name__: cancel_async, | |
| reset_async.__name__: reset_async, | |
| run_async.__name__: classmethod(run_async), | |
| run_async_impl.__name__: run_async_impl, | |
| _mark_failed.__name__: _mark_failed, | |
| _mark_success.__name__: _mark_success, | |
| _mark_in_progress.__name__: _mark_in_progress, | |
| _persist_task_id.__name__: _persist_task_id, | |
| get_eta_remaining.__name__: get_eta_remaining, | |
| get_eta_datetime.__name__: get_eta_datetime, | |
| on_progress.__name__: on_progress, | |
| on_complete.__name__: on_complete, | |
| }, | |
| ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment