Source code for debusine.db.models.work_requests

# Copyright 2019, 2021-2024 The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.

"""Data models for db work requests."""

import logging
from datetime import datetime, timedelta
from typing import Any, Optional, TYPE_CHECKING, TypeAlias

from django.core.exceptions import FieldError, ValidationError
from django.db import models
from django.db.models import (
    F,
    JSONField,
    Q,
    QuerySet,
    UniqueConstraint,
)
from django.db.models.functions import Coalesce
from django.utils import timezone

from debusine.artifacts.models import CollectionCategory
from debusine.server import notifications
from debusine.tasks import BaseTask, TaskConfigError
from debusine.tasks.models import (
    ActionTypes,
    ActionUpdateCollectionWithArtifacts,
    EventReaction,
    EventReactions,
    NotificationDataEmail,
    TaskTypes,
)

if TYPE_CHECKING:
    from django_stubs_ext.db.models import TypedModelMeta

    from debusine.db.models.auth import User
    from debusine.db.models.workers import Worker
    from debusine.server.workflows.models import WorkRequestWorkflowData
else:
    TypedModelMeta = object

logger = logging.getLogger(__name__)


[docs] class WorkflowTemplate(models.Model): """ Database model for Workflow templates. Workflow templates contain the information needed to instantiate a workflow, with a Workflow orchestrator and mandatory parameters. """ class Meta(TypedModelMeta): constraints = [ UniqueConstraint( fields=["name", "workspace"], name="%(app_label)s_%(class)s_unique_name_workspace", ), ] name = models.CharField(max_length=255) workspace = models.ForeignKey("Workspace", on_delete=models.PROTECT) task_name = models.CharField( max_length=100, verbose_name='Name of the Workflow orchestrator class' ) task_data = JSONField(default=dict, blank=True) priority = models.IntegerField( default=0, help_text="Base priority for work requests created from this template", )
[docs] def clean(self): """ Ensure that task_name and task data are valid. :raise ValidationError: for invalid data. """ # Import here to prevent circular imports from debusine.server.workflows import Workflow, WorkflowValidationError if not isinstance(self.task_data, dict): raise ValidationError( {"task_data": "task data must be a dictionary"} ) # Instantiate the orchestrator and use it to validate task_data workflow_cls = Workflow.from_name(self.task_name) try: workflow_cls.validate_template_data(self.task_data) except ( KeyError, ValueError, RuntimeError, WorkflowValidationError, ) as exc: raise ValidationError({"task_data": str(exc)})
class _WorkRequestStatuses(models.TextChoices): """Choices for WorkRequest.status.""" PENDING = "pending", "Pending" RUNNING = "running", "Running" COMPLETED = "completed", "Completed" ABORTED = "aborted", "Aborted" BLOCKED = "blocked", "Blocked" class WorkRequestManager(models.Manager["WorkRequest"]): """Manager for WorkRequest model.""" def create_workflow_callback( self, *, parent: "WorkRequest", step: str, display_name: str | None = None, status: _WorkRequestStatuses | None = None, ): """ Create a workflow callback WorkRequest. A parent is always required, as a callback only makes sense as part of a workflow. :param step: string set by the workflow to identify this callback """ # Import here to prevent circular imports from debusine.server.workflows.models import WorkRequestWorkflowData return self.create( workspace=parent.workspace, parent=parent, created_by=parent.created_by, status=status or WorkRequest.Statuses.PENDING, task_type=TaskTypes.INTERNAL, task_name="workflow", task_data={}, priority_base=parent.priority_effective, workflow_data_json=WorkRequestWorkflowData( step=step, display_name=display_name or step ).dict(exclude_unset=True), ) def create_workflow( self, *, template: WorkflowTemplate, data: dict[str, Any], created_by: "User", parent: Optional["WorkRequest"] = None, status: _WorkRequestStatuses | None = None, ) -> "WorkRequest": """Create a workflow from a template and user-provided data.""" # Import here to prevent circular imports from debusine.db.models import Collection from debusine.server.workflows import Workflow, WorkflowValidationError # Lookup the orchestrator workflow_cls = Workflow.from_name(template.task_name) # Merge user provided data into template data task_data = workflow_cls.build_workflow_data(template.task_data, data) # Build the WorkRequest work_request = self.create( workspace=template.workspace, parent=parent, created_by=created_by, status=status or WorkRequest.Statuses.PENDING, task_type=TaskTypes.WORKFLOW, task_name=template.task_name, task_data=task_data, priority_base=template.priority, ) # Root work requests need an internal collection # (:ref:`collection-workflow-internal`) if parent is None: work_request.internal_collection = Collection.objects.create( name=f"workflow-{work_request.id}", category=CollectionCategory.WORKFLOW_INTERNAL, workspace=work_request.workspace, retains_artifacts=Collection.RetainsArtifacts.WORKFLOW, workflow=work_request, ) work_request.save() try: # Instantiate the orchestrator orchestrator = workflow_cls(work_request) # Thorough input validation orchestrator.validate_input() except ( KeyError, ValueError, RuntimeError, WorkflowValidationError, TaskConfigError, ): # TODO: How can we store the error so that the user can see it? logger.exception("Cannot create a workflow") work_request.mark_completed(WorkRequest.Results.ERROR) return work_request def create_synchronization_point( self, *, parent: "WorkRequest", step: str, display_name: str | None = None, status: _WorkRequestStatuses | None = None, ): """ Create a synchronization point WorkRequest. A parent is always required, as a synchronization point only makes sense as part of a workflow. """ # Import here to prevent circular imports from debusine.server.workflows.models import WorkRequestWorkflowData return self.create( workspace=parent.workspace, parent=parent, created_by=parent.created_by, status=status or WorkRequest.Statuses.PENDING, task_type=TaskTypes.INTERNAL, task_name="synchronization_point", task_data={}, priority_base=parent.priority_effective, workflow_data_json=WorkRequestWorkflowData( step=step, display_name=display_name or step ).dict(exclude_unset=True), ) def pending( self, exclude_assigned: bool = False, worker: Optional["Worker"] = None ) -> QuerySet["WorkRequest"]: """ Return a QuerySet of tasks in WorkRequest.Statuses.PENDING status. QuerySet is ordered by descending effective priority, then by created_at. Filter out the assigned pending ones if exclude_assigned=True, and include only the WorkRequest for worker. PENDING is the default status of a task on creation. """ if exclude_assigned and worker is not None: raise ValueError("Cannot exclude_assigned and filter by worker") qs = WorkRequest.objects.filter(status=WorkRequest.Statuses.PENDING) if exclude_assigned: qs = qs.exclude(worker__isnull=False) if worker is not None: qs = qs.filter(worker=worker) qs = qs.order_by( (F("priority_base") + F("priority_adjustment")).desc(), "created_at" ) return qs def running( self, worker: Optional["Worker"] = None ) -> QuerySet["WorkRequest"]: """Return a QuerySet of tasks in running status.""" qs = WorkRequest.objects.filter(status=WorkRequest.Statuses.RUNNING) if worker is not None: qs = qs.filter(worker=worker) return qs def completed(self) -> QuerySet["WorkRequest"]: """Return a QuerySet of tasks in completed status.""" return WorkRequest.objects.filter(status=WorkRequest.Statuses.COMPLETED) def aborted(self) -> QuerySet["WorkRequest"]: """Return a QuerySet of tasks in aborted status.""" return WorkRequest.objects.filter(status=WorkRequest.Statuses.ABORTED) def expired(self, at: datetime) -> QuerySet["WorkRequest"]: """ Return queryset with work requests that have expired. :param at: datetime to check if the work requests are expired. :return: work requests which expire before the given datetime. """ return ( self.get_queryset() .annotate( effective_expiration_delay=Coalesce( "expiration_delay", "workspace__default_expiration_delay", ) ) .exclude(effective_expiration_delay=timedelta(0)) .filter( # https://github.com/typeddjango/django-stubs/issues/1548 created_at__lte=( at - F("effective_expiration_delay") # type: ignore[operator] # noqa: E501 ) ) )
[docs] class WorkRequest(models.Model): """ Database model of a request to execute a task. Time-consuming operations offloaded to Workers and using Artifacts (and associated Files) as input and output. Submission API needs to check if the request is valid using ontological rules - e.g. whether the specified distribution for a build task exists. Avoid exposing the status of tasks to the admin interface to avoid runaway changes whilst the scheduler process is running. The WorkRequest uses the non-Django tasks module to do the checks on whether a task can run on a particular worker. WorkRequest State Machine ========================= New WorkRequest database entries default to ``WorkRequest.Statuses.PENDING``. Once the WorkRequest is assigned to a worker and is running starts running the status is changed to ``WorkRequest.Statuses.RUNNING``. If the WorkRequest is aborted, the Scheduled.Task status is ``WorkRequest.Statuses.ABORTED``. If the task finish on the Worker the WorkRequest status will be ``WorkRequest.Statuses.COMPLETED`` and a WorkRequest.Result is then set, ``WorkRequest.Results.PASSED`` or ``WorkRequest.Results.FAILED``. .. graphviz:: digraph { Statuses_PENDING -> Statuses_RUNNING -> Statuses_COMPLETED; Statuses_PENDING -> Statuses_COMPLETED; Statuses_PENDING -> Statuses_ABORTED; Statuses_PENDING -> Statuses_RUNNING -> Statuses_ABORTED; } ``WorkRequest.started_at`` is set when the WorkRequest moves from ``WorkRequest.Statuses.PENDING`` to ``WorkRequest.Statuses.RUNNING``. ``WorkRequest.completed_at`` is set when the Task moves from ``WorkRequest.Statuses.RUNNING`` to ``WorkRequest.Statuses.COMPLETED``. """ objects = WorkRequestManager() Statuses: TypeAlias = _WorkRequestStatuses
[docs] class Results(models.TextChoices): NONE = "", "" SUCCESS = "success", "Success" FAILURE = "failure", "Failure" ERROR = "error", "Error"
[docs] class UnblockStrategy(models.TextChoices): DEPS = "deps", "Dependencies have completed" MANUAL = "manual", "Manually unblocked"
workspace = models.ForeignKey("Workspace", on_delete=models.PROTECT) created_at = models.DateTimeField(auto_now_add=True) started_at = models.DateTimeField(blank=True, null=True) completed_at = models.DateTimeField(blank=True, null=True) created_by = models.ForeignKey("User", on_delete=models.PROTECT) status = models.CharField( max_length=9, choices=Statuses.choices, default=Statuses.PENDING, editable=False, ) result = models.CharField( max_length=7, choices=Results.choices, default=Results.NONE, editable=False, ) # any one work request can only be on one worker # even if the worker can handle multiple work request. worker = models.ForeignKey( "Worker", null=True, blank=True, on_delete=models.CASCADE, related_name="assigned_work_requests", ) task_type = models.CharField( max_length=8, choices=TaskTypes.choices, default=TaskTypes.WORKER, editable=False, verbose_name="Type of task to execute", ) task_name = models.CharField( max_length=100, verbose_name='Name of the task to execute' ) task_data = JSONField(default=dict, blank=True) dynamic_task_data = JSONField(null=True, blank=True) priority_base = models.IntegerField( default=0, help_text="Base priority of this work request" ) priority_adjustment = models.IntegerField( default=0, help_text=( "Administrator adjustment to the priority of this work request" ), ) # Workflows unblock_strategy = models.CharField( max_length=6, choices=UnblockStrategy.choices, default=UnblockStrategy.DEPS, editable=False, ) dependencies = models.ManyToManyField( "self", symmetrical=False, related_name="reverse_dependencies" ) parent = models.ForeignKey( "self", # WorkRequests are only deleted through expiration, use CASCADE on_delete=models.CASCADE, blank=True, null=True, related_name="children", ) workflow_data_json = JSONField( default=dict, blank=True, db_column="workflow_data" ) event_reactions_json = JSONField( default=dict, blank=True, db_column="event_reactions" ) internal_collection = models.ForeignKey( "Collection", null=True, blank=True, on_delete=models.PROTECT ) expiration_delay = models.DurationField(blank=True, null=True) supersedes = models.OneToOneField( "WorkRequest", on_delete=models.SET_NULL, related_name="superseded", null=True, blank=True, ) class Meta(TypedModelMeta): indexes = [ # Handles the main scheduler query. models.Index( (F("priority_base") + F("priority_adjustment")).desc(), "created_at", name="%(app_label)s_%(class)s_pending_idx", condition=Q(status=_WorkRequestStatuses.PENDING), ), # Handles queries from workers. models.Index( "worker", name="%(app_label)s_%(class)s_worker_idx", condition=Q( status__in=( _WorkRequestStatuses.PENDING, _WorkRequestStatuses.RUNNING, ) ), ), ] permissions = [ ( "manage_workrequest_priorities", "Can set positive priority adjustments on work requests", ) ] ordering = ["id"] def __str__(self) -> str: """Return the id of the WorkRequest.""" return str(self.id) @property def workflow_data(self) -> Optional["WorkRequestWorkflowData"]: """Access workflow_data_json as a python structure.""" # Import here to avoid a circular dependency from debusine.server.workflows.models import WorkRequestWorkflowData if self.workflow_data_json: return WorkRequestWorkflowData(**self.workflow_data_json) else: return None @workflow_data.setter def workflow_data(self, value: Optional["WorkRequestWorkflowData"]) -> None: """Set workflow_data_json from a python structure.""" if value is None: self.workflow_data_json = {} else: self.workflow_data_json = value.dict( exclude_unset=True, ) @property def event_reactions(self) -> EventReactions: """Access event_reactions_json as a pydantic structure.""" return EventReactions(**self.event_reactions_json) @event_reactions.setter def event_reactions(self, value: EventReactions) -> None: """Set event_reactions_json from a pydantic structure.""" self.event_reactions_json = value.dict()
[docs] def create_child( self, task_name: str, status: Statuses = Statuses.BLOCKED, task_type: TaskTypes = TaskTypes.WORKER, task_data: dict[str, Any] | None = None, workflow_data: Optional["WorkRequestWorkflowData"] = None, ) -> "WorkRequest": """Create a child WorkRequest.""" if self.task_type != TaskTypes.WORKFLOW: raise ValueError("Only workflows may have child work requests.") return WorkRequest.objects.create( workspace=self.workspace, parent=self, created_by=self.created_by, status=status, task_type=task_type, task_name=task_name, task_data=task_data or {}, workflow_data_json=( workflow_data.dict(exclude_unset=True) if workflow_data else {} ), )
[docs] def reschedule(self) -> "WorkRequest": """Create a WorkRequest that supersedes this one.""" if self.task_type not in (TaskTypes.WORKER, TaskTypes.INTERNAL): raise ValueError("Only worker or internal tasks can be rescheduled") # TODO: implement rescheduling a workflow, which can involve more # complex work like rescheduling each of its work requests that aborted # or failed (see #416) if hasattr(self, "superseded"): raise ValueError("Cannot reschedule old superseded tasks") if not ( self.status == WorkRequest.Statuses.ABORTED or ( self.status == WorkRequest.Statuses.COMPLETED and self.result == WorkRequest.Results.FAILURE ) ): raise ValueError("Only aborted or failed tasks can be rescheduled") # TODO: compute dynamic data to see if all lookups can still be # satisfied: this allows us to fail if any input artifacts where # already expired, before making changes in the database. # This check can be moved to a check-only method, to be used in views # to assess whether an aborted/failed task can be restarted work_request = WorkRequest.objects.create( workspace=self.workspace, created_by=self.created_by, status=WorkRequest.Statuses.PENDING, task_type=self.task_type, task_name=self.task_name, task_data=self.task_data, priority_base=self.priority_base, priority_adjustment=self.priority_adjustment, unblock_strategy=self.unblock_strategy, parent=self.parent, workflow_data_json=self.workflow_data_json, event_reactions_json=self.event_reactions_json, expiration_delay=self.expiration_delay, supersedes=self, ) # Copy forward dependencies for dep in self.dependencies.all(): work_request.add_dependency(dep) # Update any reverse-dependencies of the old work request to point to # the new one, and move them from pending to blocked for dep in self.reverse_dependencies.all(): dep.dependencies.remove(self) dep.add_dependency(work_request) # If the task was part of an aborted workflow, set it back to running if (workflow := work_request.parent) and ( workflow.status == WorkRequest.Statuses.ABORTED or ( workflow.status == WorkRequest.Statuses.COMPLETED and workflow.result == WorkRequest.Results.FAILURE ) ): workflow.status = WorkRequest.Statuses.RUNNING workflow.completed_at = None workflow.save() return work_request
[docs] def clean(self): """ Ensure that task data is valid for this task name. :raise ValidationError: for invalid data. """ if not isinstance(self.task_data, dict): raise ValidationError( {"task_data": "task data must be a dictionary"} ) match self.task_type: case TaskTypes.WORKER | TaskTypes.SERVER: try: task_cls = BaseTask.class_from_name( self.task_type, self.task_name ) except (KeyError, ValueError) as e: raise ValidationError( { "task_name": f"{self.task_name}:" f" invalid {self.task_type} task name" } ) from e try: task_cls(task_data=self.task_data) except TaskConfigError as e: raise ValidationError( { "task_data": f"invalid {self.task_type}" f" task data: {e}" } ) from e case TaskTypes.WORKFLOW: # Import here to prevent circular imports from debusine.server.workflows import Workflow try: workflow_cls = Workflow.from_name(self.task_name) except (KeyError, ValueError) as e: raise ValidationError( { "task_name": f"{self.task_name}:" f" invalid workflow name" } ) from e try: workflow_cls(self) except TaskConfigError as e: raise ValidationError( {"task_data": f"invalid workflow data: {e}"} ) from e # TODO: do we want to run expensive tests here # (Workflow.validate_input), like looking up the types of # references artifacts to validate them? case TaskTypes.INTERNAL: if self.task_name not in ("synchronization_point", "workflow"): raise ValidationError( { "task_name": f"{self.task_name}:" " invalid task name for internal task" } ) # Without this pass, python coverage is currently unable to # detect that code does flow through here pass case _: raise NotImplementedError( f"task type {self.task_type} not yet supported" )
[docs] def mark_running(self) -> bool: """Worker has begun executing the task.""" if ( self.task_type in {TaskTypes.WORKER, TaskTypes.SERVER} and self.worker is None ): logger.debug( "Cannot mark WorkRequest %s as running: it does not have " "an assigned worker", self.pk, ) return False if self.status == self.Statuses.RUNNING: # It was already running - nothing to do return True if self.status != self.Statuses.PENDING: logger.debug( "Cannot mark as running - current status %s", self.status ) return False if self.worker is not None: work_requests_running_for_worker = WorkRequest.objects.running( worker=self.worker ) # There is a possible race condition here. This check (and # other checks in this class) currently help to avoid # development mistakes not database full integrity if ( work_requests_running_for_worker.count() >= self.worker.concurrency ): logger.debug( "Cannot mark WorkRequest %s as running - the assigned " "worker %s is running too many other WorkRequests: %s", self.pk, self.worker, list(work_requests_running_for_worker.order_by("id")), ) return False self.started_at = timezone.now() self.status = self.Statuses.RUNNING self.save() logger.debug("Marked WorkRequest %s as running", self.pk) return True
[docs] def mark_completed(self, result: "WorkRequest.Results") -> bool: """Worker has finished executing the task.""" if self.status not in (self.Statuses.PENDING, self.Statuses.RUNNING): logger.debug( "Cannot mark WorkRequest %s as completed: current status is %s", self.pk, self.status, ) return False self.result = result self.completed_at = timezone.now() self.status = self.Statuses.COMPLETED self.save() logger.debug("Marked WorkRequest %s as completed", self.pk) # mark dependencies ready before sending notification self.unblock_reverse_dependencies() self.process_event_reactions() if self.parent is not None: self.parent.maybe_finish_workflow() return True
[docs] def process_event_reactions(self): """Process list of actions to perform on completion.""" actions = self.get_triggered_actions() notifications.notify_work_request_completed( self, actions[ActionTypes.SEND_NOTIFICATION] ) self.process_update_collection_with_artifacts( actions[ActionTypes.UPDATE_COLLECTION_WITH_ARTIFACTS] )
[docs] def get_triggered_actions(self) -> dict[str, list[EventReaction]]: """Filter events to trigger, grouped by type.""" actions: dict[str, list[EventReaction]] = { action: [] for action in ActionTypes } if self.result in (WorkRequest.Results.SUCCESS,): for action in self.event_reactions.on_success: action_type = action.action actions[action_type].append(action) elif self.result in ( WorkRequest.Results.FAILURE, WorkRequest.Results.ERROR, ): for action in self.event_reactions.on_failure: action_type = action.action actions[action_type].append(action) else: # Results.NONE pass return actions
[docs] def process_update_collection_with_artifacts( self, actions: list[EventReaction] ) -> None: """Update collection following event_reactions.""" # local import to avoid circular dependency from debusine.db.models.collections import CollectionItem from debusine.server.collections import ( CollectionManagerInterface, ItemAdditionError, ItemRemovalError, ) from debusine.server.collections.lookup import lookup_single for update in actions: assert isinstance(update, ActionUpdateCollectionWithArtifacts) try: collection = lookup_single( update.collection, self.workspace, user=self.created_by, workflow_root=self.get_workflow_root(), expect_type=CollectionItem.Types.COLLECTION, ).collection except LookupError as e: logger.error("%s in WorkRequest %s", e, self.pk) # noqa: G200 continue manager = CollectionManagerInterface.get_manager_for(collection) try: artifacts_to_add = self.artifact_set.filter( **update.artifact_filters ) except FieldError: logger.exception( "Invalid update-collection-with-artifacts" " artifact_filters in WorkRequest %s", self.pk, ) continue for artifact in artifacts_to_add: try: expanded_variables = CollectionItem.expand_variables( update.variables or {}, artifact ) except (KeyError, ValueError): logger.exception( "Invalid update-collection-with-artifacts variables " "in WorkRequest %s", self.pk, ) continue if update.name_template is not None: item_name = CollectionItem.expand_name( update.name_template, expanded_variables ) item_variables = None else: item_name = None item_variables = expanded_variables try: manager.add_artifact( artifact, user=self.created_by, variables=item_variables, name=item_name, replace=True, ) except (ItemAdditionError, ItemRemovalError): logger.exception( "Cannot replace or add artifact %s to collection %s" " from WorkRequest %s", artifact, collection, self.pk, )
[docs] def mark_pending(self) -> bool: """Worker is ready for execution.""" if self.status not in (self.Statuses.BLOCKED): logger.debug( "Cannot mark WorkRequest %s as pending: current status is %s", self.pk, self.status, ) return False self.status = self.Statuses.PENDING self.save() logger.debug("Marked WorkRequest %s as pending", self.pk) return True
[docs] def add_dependency(self, dependency: "WorkRequest") -> None: """Make this work request depend on another one.""" if self.parent is not None: # Work requests in a workflow may only depend on other work # requests in the same workflow. my_root = self.get_workflow_root() assert my_root is not None if dependency.get_workflow_root() != my_root: raise ValueError( "Work requests in a workflow may not depend on other work " "requests outside that workflow" ) self.dependencies.add(dependency) if ( self.status == WorkRequest.Statuses.PENDING and self.unblock_strategy == WorkRequest.UnblockStrategy.DEPS and dependency.status != WorkRequest.Statuses.COMPLETED ): self.status = WorkRequest.Statuses.BLOCKED self.save()
[docs] def maybe_finish_workflow(self) -> bool: """Update workflow status if its children are no longer in progress.""" # Only relevant for running workflows where all work requests have been # either completed or aborted if ( self.task_type != TaskTypes.WORKFLOW or self.status != WorkRequest.Statuses.RUNNING or self.has_children_in_progress ): return False # If there are aborted child requests, abort the workflow if ( self.children.filter(superseded__isnull=True) .exclude(status=WorkRequest.Statuses.COMPLETED) .exists() ): self.mark_aborted() return True # If there are failed/errored child requests, fail the workflow if ( self.children.filter(superseded__isnull=True) .exclude( Q(result=WorkRequest.Results.SUCCESS) | Q(workflow_data_json__contains={"allow_failure": True}) ) .exists() ): self.mark_completed(WorkRequest.Results.FAILURE) return True # All child requests succeeded self.mark_completed(WorkRequest.Results.SUCCESS) return True
[docs] def can_be_unblocked(self) -> bool: """Return True if and only if this work request can be unblocked.""" # If this work request is in a workflow, then that workflow must # be running. if ( self.parent is not None and self.parent.status != WorkRequest.Statuses.RUNNING ): return False if self.unblock_strategy == WorkRequest.UnblockStrategy.DEPS: # This work request can be unblocked if and only if all its # dependencies have completed. return not self.dependencies.filter( ~Q(status=WorkRequest.Statuses.COMPLETED) ).exists() else: # We don't know how to unblock this work request. return False
[docs] def unblock_reverse_dependencies(self): """Unblock reverse dependencies.""" # Shortcuts to keep line length sane r = WorkRequest.Results s = WorkRequest.Statuses allow_failure = self.workflow_data and self.workflow_data.allow_failure if self.result == r.SUCCESS or allow_failure: # lookup work requests that depend on the completed work # request and unblock them if no other work request is # blocking them for rdep in self.reverse_dependencies.filter(status=s.BLOCKED): if rdep.can_be_unblocked(): rdep.mark_pending() else: # failure and !allow_failure for rdep in self.reverse_dependencies.filter(status=s.BLOCKED): rdep.mark_aborted()
[docs] def unblock_workflow_children(self) -> None: """Unblock children of a workflow that has just started running.""" for child in self.children.filter(status=WorkRequest.Statuses.BLOCKED): if child.can_be_unblocked(): child.mark_pending()
[docs] def mark_aborted(self) -> bool: """ Worker has aborted the task after request from UI. Task will typically be in CREATED or RUNNING status. """ self.completed_at = timezone.now() self.status = self.Statuses.ABORTED self.save() if self.parent is not None: self.parent.maybe_finish_workflow() logger.debug( "Marked WorkRequest %s as aborted (from status %s)", self.pk, self.status, ) return True
[docs] def assign_worker(self, worker: Optional["Worker"]) -> None: """Assign worker and save it.""" self.worker = worker self.save() notifications.notify_work_request_assigned(self)
@property def duration(self) -> float | None: """Return duration in seconds between started_at and completed_at.""" if self.started_at and self.completed_at: return (self.completed_at - self.started_at).total_seconds() else: return None @property def priority_effective(self) -> int: """The effective priority of this work request.""" return self.priority_base + self.priority_adjustment @property def is_part_of_workflow(self) -> bool: """Return whether this work request is part of a workflow.""" return self.task_type == TaskTypes.WORKFLOW or self.parent is not None
[docs] def get_workflow_root(self) -> Optional["WorkRequest"]: """Return the root of this work request's workflow, if any.""" if not self.is_part_of_workflow: return None parent = self while parent.parent is not None: parent = parent.parent return parent
@property def workflow_display_name(self) -> str: """Return this work request's name for display in a workflow.""" if self.workflow_data is not None: return self.workflow_data.display_name else: return self.task_name @property def has_children_in_progress(self) -> bool: """Return whether child work requests are still in progress.""" return self.children.exclude( status__in={ WorkRequest.Statuses.COMPLETED, WorkRequest.Statuses.ABORTED, } ).exists()
[docs] def effective_expiration_delay(self): """Return expiration_delay, inherited if None.""" expiration_delay = self.expiration_delay if self.expiration_delay is None: # inherit expiration_delay = self.workspace.default_expiration_delay return expiration_delay
[docs] class NotificationChannel(models.Model): """Model to store notification configuration."""
[docs] class Methods(models.TextChoices): EMAIL = "email", "Email"
data_validators = {Methods.EMAIL: NotificationDataEmail} name = models.CharField( max_length=20, unique=True, ) method = models.CharField(max_length=10, choices=Methods.choices) data = models.JSONField(default=dict, blank=True)
[docs] def clean(self): """ Ensure that data is valid for the specific method. :raise ValidationError: for invalid data. """ try: self.data_validators[self.method](**self.data) except (TypeError, ValueError) as exc: raise ValidationError( f"NotificationChannel data is not valid: {exc}" ) return super().clean()
[docs] def save(self, *args, **kwargs): """Run validators and save the instance.""" self.full_clean() return super().save(*args, **kwargs)
def __str__(self): """Return name.""" return self.name