# Copyright 2019, 2021-2024 The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.
"""Data models for db artifacts."""
from datetime import datetime, timedelta
from functools import partial
from pathlib import Path
from typing import Any, Optional, TYPE_CHECKING
from django.conf import settings
from django.core.exceptions import ValidationError
from django.db import models, transaction
from django.db.models import (
F,
Q,
QuerySet,
UniqueConstraint,
)
from django.db.models.functions import Coalesce
from debusine.artifacts import LocalArtifact
from debusine.db.models.files import File
from debusine.db.models.workspaces import Workspace
if TYPE_CHECKING:
from django_stubs_ext.db.models import TypedModelMeta
from debusine.db.models.work_requests import WorkRequest
else:
TypedModelMeta = object
class ArtifactManager(models.Manager["Artifact"]):
"""Manager for the Artifact model."""
@classmethod
def create_from_local_artifact(
cls,
local_artifact: LocalArtifact[Any],
workspace: Workspace,
*,
created_by_work_request: Optional["WorkRequest"] = None,
) -> "Artifact":
"""Return a new Artifact based on a :class:`LocalArtifact`."""
file_store = workspace.default_file_store.get_backend_object()
artifact = Artifact.objects.create(
category=local_artifact.category,
workspace=workspace,
data=local_artifact.data.dict(),
created_by_work_request=created_by_work_request,
)
for artifact_path, local_path in local_artifact.files.items():
file = File.from_local_path(local_path)
file_store.add_file(local_path, fileobj=file)
FileInArtifact.objects.create(
artifact=artifact, path=artifact_path, file=file
)
return artifact
def not_expired(self, at: datetime) -> QuerySet["Artifact"]:
"""
Return queryset with artifacts that have not expired.
:param at: datetime to check if the artifacts are not expired.
:return: artifacts that expire_at is None (do not expire) or
expire_at is after the given datetime.
"""
return (
self.get_queryset()
.annotate(
effective_expiration_delay=Coalesce(
"expiration_delay",
"workspace__default_expiration_delay",
)
)
.filter(
Q(effective_expiration_delay=timedelta(0))
| Q(
# https://github.com/typeddjango/django-stubs/issues/1548
created_at__gt=(
at - F("effective_expiration_delay") # type: ignore[operator] # noqa: E501
)
)
)
)
def expired(self, at: datetime) -> QuerySet["Artifact"]:
"""
Return queryset with artifacts that have expired.
:param at: datetime to check if the artifacts are expired.
:return: artifacts that expire_at is before the given datetime.
"""
return (
self.get_queryset()
.annotate(
effective_expiration_delay=Coalesce(
"expiration_delay",
"workspace__default_expiration_delay",
)
)
.exclude(effective_expiration_delay=timedelta(0))
.filter(
# https://github.com/typeddjango/django-stubs/issues/1548
created_at__lte=(
at - F("effective_expiration_delay") # type: ignore[operator] # noqa: E501
)
)
)
def part_of_collection_with_retains_artifacts(self) -> QuerySet["Artifact"]:
"""
Return Artifacts where the parent_collections has retains_artifacts set.
:return: Artifacts that are part of a retains_artifacts collection.
"""
# Import here to prevent circular imports
from debusine.db.models.collections import Collection
from debusine.db.models.work_requests import WorkRequest
RetainsArtifacts = Collection.RetainsArtifacts
return self.get_queryset().filter(
Q(parent_collections__retains_artifacts=RetainsArtifacts.ALWAYS)
| Q(
parent_collections__retains_artifacts=RetainsArtifacts.WORKFLOW,
parent_collections__workflow__status__in={
WorkRequest.Statuses.PENDING,
WorkRequest.Statuses.RUNNING,
WorkRequest.Statuses.BLOCKED,
},
)
)
[docs]
class Artifact(models.Model):
"""Artifact model."""
category = models.CharField(max_length=255)
workspace = models.ForeignKey(Workspace, on_delete=models.PROTECT)
files = models.ManyToManyField(File, through="db.FileInArtifact")
data = models.JSONField(default=dict, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
expiration_delay = models.DurationField(blank=True, null=True)
created_by = models.ForeignKey(
"User", blank=True, null=True, on_delete=models.PROTECT
)
created_by_work_request = models.ForeignKey(
"WorkRequest", blank=True, null=True, on_delete=models.SET_NULL
)
objects = ArtifactManager()
[docs]
def clean(self):
"""
Ensure that data is valid for this artifact category.
:raise ValidationError: for invalid data.
"""
if not isinstance(self.data, dict):
raise ValidationError({"data": "data must be a dictionary"})
try:
artifact_cls = LocalArtifact.class_from_category(self.category)
except ValueError as e:
raise ValidationError(
{"category": f"{self.category}: invalid artifact category"}
) from e
try:
artifact_cls.create_data(self.data)
except ValueError as e:
raise ValidationError(
{"data": f"invalid artifact data: {e}"}
) from e
[docs]
def effective_expiration_delay(self):
"""Return expiration_delay, inherited if None."""
expiration_delay = self.expiration_delay
if self.expiration_delay is None: # inherit
expiration_delay = self.workspace.default_expiration_delay
return expiration_delay
@property
def expire_at(self) -> datetime | None:
"""Return computed expiration date."""
delay = self.effective_expiration_delay()
if delay == timedelta(0):
return None
return self.created_at + delay
[docs]
def expired(self, at: datetime) -> bool:
"""
Return True if this artifact has expired at a given datetime.
:param at: datetime to check if the artifact is expired.
:return bool: True if the artifact's expire_at is on or earlier than
the parameter at.
"""
expire_at = self.expire_at
if expire_at is None:
return False
return expire_at <= at
def __str__(self) -> str:
"""Return basic information of Artifact."""
return (
f"Id: {self.id} "
f"Category: {self.category} "
f"Workspace: {self.workspace.id}"
)
[docs]
class FileInArtifact(models.Model):
"""File in artifact."""
artifact = models.ForeignKey(Artifact, on_delete=models.PROTECT)
path = models.CharField(max_length=500)
file = models.ForeignKey(File, on_delete=models.PROTECT)
class Meta(TypedModelMeta):
constraints = [
UniqueConstraint(
fields=["artifact", "path"],
name="%(app_label)s_%(class)s_unique_artifact_path",
),
]
def __str__(self) -> str:
"""Return basic information of FileInArtifact."""
return (
f"Id: {self.id} Artifact: {self.artifact.id} "
f"Path: {self.path} File: {self.file.id}"
)
[docs]
class FileUpload(models.Model):
"""File that is being/has been uploaded."""
file_in_artifact = models.OneToOneField(
FileInArtifact, on_delete=models.PROTECT
)
path = models.CharField(
max_length=500,
help_text="Path in the uploads directory",
unique=True,
)
last_activity_at = models.DateTimeField(auto_now_add=True)
[docs]
@classmethod
def current_size(cls, artifact: Artifact, path_in_artifact: str) -> int:
"""
Return current file size.
The current file size might be smaller than the expected size of the
file if the file has not finished being uploaded.
Raise ValueError if path_in_artifact does not exist in Artifact or
if there's no FileUpload object for the specific File.
"""
try:
file_in_artifact = FileInArtifact.objects.get(
artifact=artifact, path=path_in_artifact
)
except FileInArtifact.DoesNotExist:
raise ValueError(
f'No FileInArtifact for Artifact {artifact.id} '
f'and path "{path_in_artifact}"'
)
try:
file_upload = FileUpload.objects.get(
file_in_artifact=file_in_artifact
)
except FileUpload.DoesNotExist:
raise ValueError(
f"No FileUpload for FileInArtifact {file_in_artifact.id}"
)
try:
size = file_upload.absolute_file_path().stat().st_size
except FileNotFoundError:
size = 0
return size
[docs]
def delete(self, *args, **kwargs):
"""Schedule deletion of the file in the store."""
file_path = self.absolute_file_path()
result = super().delete(*args, **kwargs)
# If this method is called from a transaction: transaction.on_commit()
# will call file_path.unlink when the most outer transaction is
# committed.
#
# In the case that the code is running without a transaction:
# the file_path.unlink will happen now.
#
# It's important that file_path.unlink is called only if the
# DB is updated with the deletion. Otherwise, the file could be
# deleted from the store but still referenced from the DB.
transaction.on_commit(partial(file_path.unlink, missing_ok=True))
return result
[docs]
def absolute_file_path(self) -> Path:
"""
Return the absolute file path of the file.
The files are stored in settings.DEBUSINE_UPLOAD_DIRECTORY.
"""
return Path(settings.DEBUSINE_UPLOAD_DIRECTORY) / self.path
def __str__(self) -> str:
"""Return basic information."""
return f"{self.id}"
[docs]
class ArtifactRelation(models.Model):
"""Model relations between artifacts."""
[docs]
class Relations(models.TextChoices):
EXTENDS = "extends", "Extends"
RELATES_TO = "relates-to", "Relates to"
BUILT_USING = "built-using", "Built using"
artifact = models.ForeignKey(
Artifact, on_delete=models.PROTECT, related_name="relations"
)
target = models.ForeignKey(
Artifact, on_delete=models.PROTECT, related_name="targeted_by"
)
type = models.CharField(max_length=11, choices=Relations.choices)
def __str__(self) -> str:
"""Return str for the object."""
return f"{self.artifact.id} {self.type} {self.target.id}"
class Meta(TypedModelMeta):
constraints = [
UniqueConstraint(
fields=["artifact", "target", "type"],
name="%(app_label)s_%(class)s_unique_artifact_target_type",
)
]