Created
February 3, 2026 21:16
-
-
Save ankona/dd9a9c912526a4dcb7f1ef3d78f940a2 to your computer and use it in GitHub Desktop.
Prefect-based git repository caching for C-Star
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import shutil | |
| import typing as t | |
| from datetime import timedelta | |
| from pathlib import Path | |
| from prefect import flow, task | |
| from prefect.cache_policies import INPUTS, TASK_SOURCE, CachePolicy | |
| from cstar.base.gitutils import _clone | |
| from cstar.base.log import get_logger | |
| from cstar.base.utils import get_cache_dir, slugify | |
| log = get_logger(__name__) | |
| REPO_CACHE_POLICY: t.Final[CachePolicy] = TASK_SOURCE + INPUTS | |
| """Cache policy for repositories retrieved in prefect tasks. | |
| Uses the repository URL to uniquely identify the asset and invalidates the cache | |
| whenever the source code of the task is updated. | |
| """ | |
| REPO_CACHE_DURATION: t.Final[timedelta] = timedelta(seconds=5 * 60) | |
| """Cache duration (in seconds) for repositories retrieved in prefect tasks.""" | |
| def cache_location(repo_uri: str) -> Path: | |
| """Generate a path where a sourcecode repository will be cached. | |
| Parameters | |
| ---------- | |
| repo_uri : str | |
| The URI of the remote source code repository. | |
| Returns | |
| ------- | |
| Path | |
| """ | |
| cache_dir = get_cache_dir() | |
| cache_dir.mkdir(parents=True, exist_ok=True) | |
| return cache_dir / slugify(repo_uri) | |
| @task( | |
| cache_policy=REPO_CACHE_POLICY, | |
| cache_expiration=REPO_CACHE_DURATION, | |
| cache_result_in_memory=True, | |
| ) | |
| def materialized_clone(repo_uri: str) -> Path: | |
| """Clone a remote source code repository into the local asset cache. | |
| Parameters | |
| ---------- | |
| repo_uri : str | |
| The URI of the remote source code repository. | |
| Returns | |
| ------- | |
| The path to the repository in the cache. | |
| """ | |
| cache_to_path = cache_location(repo_uri) | |
| if cache_to_path.exists(): | |
| return cache_to_path | |
| _clone(repo_uri, cache_to_path) | |
| return cache_to_path | |
| @flow | |
| def get_repo(repo_uri: str, target: Path) -> Path: | |
| """Clone a remote repository into a local directory. | |
| Parameters | |
| ---------- | |
| repo_uri : str | |
| The URI of the remote source code repository. | |
| target : Path | |
| The path where the remote repository should be cloned. | |
| Returns | |
| ------- | |
| Path | |
| Raises | |
| ------ | |
| ValueError | |
| If the repository uri is empty. | |
| """ | |
| asset_key = repo_uri.casefold().strip().casefold() | |
| if not asset_key: | |
| raise ValueError("An invalid repository URI was provided") | |
| # custom_clone_fn = materialized_clone.with_options(assets=[asset_key]) | |
| # cached_clone_path = custom_clone_fn(asset_key) | |
| cached_clone_path = materialized_clone(asset_key) | |
| if not target.parent.exists(): | |
| target.parent.mkdir(parents=True, exist_ok=True) | |
| try: | |
| shutil.copytree(cached_clone_path, target, symlinks=True) | |
| except FileNotFoundError: | |
| log.exception(f"Copy from `{cached_clone_path}` to `{target}` failed.") | |
| raise | |
| return target |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment