# Copyright 2019, 2021-2024 The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.
"""Data models for db work requests."""
import logging
from datetime import datetime, timedelta
from typing import Any, Optional, TYPE_CHECKING, TypeAlias
from django.core.exceptions import FieldError, ValidationError
from django.db import models
from django.db.models import (
F,
JSONField,
Q,
QuerySet,
UniqueConstraint,
)
from django.db.models.functions import Coalesce
from django.utils import timezone
from debusine.artifacts.models import CollectionCategory
from debusine.server import notifications
from debusine.tasks import BaseTask, TaskConfigError
from debusine.tasks.models import (
ActionTypes,
ActionUpdateCollectionWithArtifacts,
EventReaction,
EventReactions,
NotificationDataEmail,
TaskTypes,
)
if TYPE_CHECKING:
from django_stubs_ext.db.models import TypedModelMeta
from debusine.db.models.auth import User
from debusine.db.models.workers import Worker
from debusine.server.workflows.models import WorkRequestWorkflowData
else:
TypedModelMeta = object
logger = logging.getLogger(__name__)
[docs]
class WorkflowTemplate(models.Model):
"""
Database model for Workflow templates.
Workflow templates contain the information needed to instantiate a
workflow, with a Workflow orchestrator and mandatory parameters.
"""
class Meta(TypedModelMeta):
constraints = [
UniqueConstraint(
fields=["name", "workspace"],
name="%(app_label)s_%(class)s_unique_name_workspace",
),
]
name = models.CharField(max_length=255)
workspace = models.ForeignKey("Workspace", on_delete=models.PROTECT)
task_name = models.CharField(
max_length=100, verbose_name='Name of the Workflow orchestrator class'
)
task_data = JSONField(default=dict, blank=True)
priority = models.IntegerField(
default=0,
help_text="Base priority for work requests created from this template",
)
[docs]
def clean(self):
"""
Ensure that task_name and task data are valid.
:raise ValidationError: for invalid data.
"""
# Import here to prevent circular imports
from debusine.server.workflows import Workflow, WorkflowValidationError
if not isinstance(self.task_data, dict):
raise ValidationError(
{"task_data": "task data must be a dictionary"}
)
# Instantiate the orchestrator and use it to validate task_data
workflow_cls = Workflow.from_name(self.task_name)
try:
workflow_cls.validate_template_data(self.task_data)
except (
KeyError,
ValueError,
RuntimeError,
WorkflowValidationError,
) as exc:
raise ValidationError({"task_data": str(exc)})
class _WorkRequestStatuses(models.TextChoices):
"""Choices for WorkRequest.status."""
PENDING = "pending", "Pending"
RUNNING = "running", "Running"
COMPLETED = "completed", "Completed"
ABORTED = "aborted", "Aborted"
BLOCKED = "blocked", "Blocked"
class WorkRequestManager(models.Manager["WorkRequest"]):
"""Manager for WorkRequest model."""
def create_workflow_callback(
self,
*,
parent: "WorkRequest",
step: str,
display_name: str | None = None,
status: _WorkRequestStatuses | None = None,
):
"""
Create a workflow callback WorkRequest.
A parent is always required, as a callback only makes sense as part of
a workflow.
:param step: string set by the workflow to identify this callback
"""
# Import here to prevent circular imports
from debusine.server.workflows.models import WorkRequestWorkflowData
return self.create(
workspace=parent.workspace,
parent=parent,
created_by=parent.created_by,
status=status or WorkRequest.Statuses.PENDING,
task_type=TaskTypes.INTERNAL,
task_name="workflow",
task_data={},
priority_base=parent.priority_effective,
workflow_data_json=WorkRequestWorkflowData(
step=step, display_name=display_name or step
).dict(exclude_unset=True),
)
def create_workflow(
self,
*,
template: WorkflowTemplate,
data: dict[str, Any],
created_by: "User",
parent: Optional["WorkRequest"] = None,
status: _WorkRequestStatuses | None = None,
) -> "WorkRequest":
"""Create a workflow from a template and user-provided data."""
# Import here to prevent circular imports
from debusine.db.models import Collection
from debusine.server.workflows import Workflow, WorkflowValidationError
# Lookup the orchestrator
workflow_cls = Workflow.from_name(template.task_name)
# Merge user provided data into template data
task_data = workflow_cls.build_workflow_data(template.task_data, data)
# Build the WorkRequest
work_request = self.create(
workspace=template.workspace,
parent=parent,
created_by=created_by,
status=status or WorkRequest.Statuses.PENDING,
task_type=TaskTypes.WORKFLOW,
task_name=template.task_name,
task_data=task_data,
priority_base=template.priority,
)
# Root work requests need an internal collection
# (:ref:`collection-workflow-internal`)
if parent is None:
work_request.internal_collection = Collection.objects.create(
name=f"workflow-{work_request.id}",
category=CollectionCategory.WORKFLOW_INTERNAL,
workspace=work_request.workspace,
retains_artifacts=Collection.RetainsArtifacts.WORKFLOW,
workflow=work_request,
)
work_request.save()
try:
# Instantiate the orchestrator
orchestrator = workflow_cls(work_request)
# Thorough input validation
orchestrator.validate_input()
except (
KeyError,
ValueError,
RuntimeError,
WorkflowValidationError,
TaskConfigError,
):
# TODO: How can we store the error so that the user can see it?
logger.exception("Cannot create a workflow")
work_request.mark_completed(WorkRequest.Results.ERROR)
return work_request
def create_synchronization_point(
self,
*,
parent: "WorkRequest",
step: str,
display_name: str | None = None,
status: _WorkRequestStatuses | None = None,
):
"""
Create a synchronization point WorkRequest.
A parent is always required, as a synchronization point only makes
sense as part of a workflow.
"""
# Import here to prevent circular imports
from debusine.server.workflows.models import WorkRequestWorkflowData
return self.create(
workspace=parent.workspace,
parent=parent,
created_by=parent.created_by,
status=status or WorkRequest.Statuses.PENDING,
task_type=TaskTypes.INTERNAL,
task_name="synchronization_point",
task_data={},
priority_base=parent.priority_effective,
workflow_data_json=WorkRequestWorkflowData(
step=step, display_name=display_name or step
).dict(exclude_unset=True),
)
def pending(
self, exclude_assigned: bool = False, worker: Optional["Worker"] = None
) -> QuerySet["WorkRequest"]:
"""
Return a QuerySet of tasks in WorkRequest.Statuses.PENDING status.
QuerySet is ordered by descending effective priority, then by
created_at.
Filter out the assigned pending ones if exclude_assigned=True,
and include only the WorkRequest for worker.
PENDING is the default status of a task on creation.
"""
if exclude_assigned and worker is not None:
raise ValueError("Cannot exclude_assigned and filter by worker")
qs = WorkRequest.objects.filter(status=WorkRequest.Statuses.PENDING)
if exclude_assigned:
qs = qs.exclude(worker__isnull=False)
if worker is not None:
qs = qs.filter(worker=worker)
qs = qs.order_by(
(F("priority_base") + F("priority_adjustment")).desc(), "created_at"
)
return qs
def running(
self, worker: Optional["Worker"] = None
) -> QuerySet["WorkRequest"]:
"""Return a QuerySet of tasks in running status."""
qs = WorkRequest.objects.filter(status=WorkRequest.Statuses.RUNNING)
if worker is not None:
qs = qs.filter(worker=worker)
return qs
def completed(self) -> QuerySet["WorkRequest"]:
"""Return a QuerySet of tasks in completed status."""
return WorkRequest.objects.filter(status=WorkRequest.Statuses.COMPLETED)
def aborted(self) -> QuerySet["WorkRequest"]:
"""Return a QuerySet of tasks in aborted status."""
return WorkRequest.objects.filter(status=WorkRequest.Statuses.ABORTED)
def expired(self, at: datetime) -> QuerySet["WorkRequest"]:
"""
Return queryset with work requests that have expired.
:param at: datetime to check if the work requests are expired.
:return: work requests which expire before the given datetime.
"""
return (
self.get_queryset()
.annotate(
effective_expiration_delay=Coalesce(
"expiration_delay",
"workspace__default_expiration_delay",
)
)
.exclude(effective_expiration_delay=timedelta(0))
.filter(
# https://github.com/typeddjango/django-stubs/issues/1548
created_at__lte=(
at - F("effective_expiration_delay") # type: ignore[operator] # noqa: E501
)
)
)
[docs]
class WorkRequest(models.Model):
"""
Database model of a request to execute a task.
Time-consuming operations offloaded to Workers and using
Artifacts (and associated Files) as input and output.
Submission API needs to check if the request is valid using
ontological rules - e.g. whether the specified distribution for
a build task exists.
Avoid exposing the status of tasks to the admin interface to avoid
runaway changes whilst the scheduler process is running.
The WorkRequest uses the non-Django tasks module to do the checks on
whether a task can run on a particular worker.
WorkRequest State Machine
=========================
New WorkRequest database entries default to
``WorkRequest.Statuses.PENDING``.
Once the WorkRequest is assigned to a worker and is running starts running
the status is changed to ``WorkRequest.Statuses.RUNNING``.
If the WorkRequest is aborted, the Scheduled.Task status is
``WorkRequest.Statuses.ABORTED``.
If the task finish on the Worker the WorkRequest status will be
``WorkRequest.Statuses.COMPLETED`` and a WorkRequest.Result is then set,
``WorkRequest.Results.PASSED`` or ``WorkRequest.Results.FAILED``.
.. graphviz::
digraph {
Statuses_PENDING -> Statuses_RUNNING -> Statuses_COMPLETED;
Statuses_PENDING -> Statuses_COMPLETED;
Statuses_PENDING -> Statuses_ABORTED;
Statuses_PENDING -> Statuses_RUNNING -> Statuses_ABORTED;
}
``WorkRequest.started_at`` is set when the WorkRequest moves from
``WorkRequest.Statuses.PENDING`` to ``WorkRequest.Statuses.RUNNING``.
``WorkRequest.completed_at`` is set when the Task moves from
``WorkRequest.Statuses.RUNNING`` to ``WorkRequest.Statuses.COMPLETED``.
"""
objects = WorkRequestManager()
Statuses: TypeAlias = _WorkRequestStatuses
[docs]
class Results(models.TextChoices):
NONE = "", ""
SUCCESS = "success", "Success"
FAILURE = "failure", "Failure"
ERROR = "error", "Error"
[docs]
class UnblockStrategy(models.TextChoices):
DEPS = "deps", "Dependencies have completed"
MANUAL = "manual", "Manually unblocked"
workspace = models.ForeignKey("Workspace", on_delete=models.PROTECT)
created_at = models.DateTimeField(auto_now_add=True)
started_at = models.DateTimeField(blank=True, null=True)
completed_at = models.DateTimeField(blank=True, null=True)
created_by = models.ForeignKey("User", on_delete=models.PROTECT)
status = models.CharField(
max_length=9,
choices=Statuses.choices,
default=Statuses.PENDING,
editable=False,
)
result = models.CharField(
max_length=7,
choices=Results.choices,
default=Results.NONE,
editable=False,
)
# any one work request can only be on one worker
# even if the worker can handle multiple work request.
worker = models.ForeignKey(
"Worker",
null=True,
blank=True,
on_delete=models.CASCADE,
related_name="assigned_work_requests",
)
task_type = models.CharField(
max_length=8,
choices=TaskTypes.choices,
default=TaskTypes.WORKER,
editable=False,
verbose_name="Type of task to execute",
)
task_name = models.CharField(
max_length=100, verbose_name='Name of the task to execute'
)
task_data = JSONField(default=dict, blank=True)
dynamic_task_data = JSONField(null=True, blank=True)
priority_base = models.IntegerField(
default=0, help_text="Base priority of this work request"
)
priority_adjustment = models.IntegerField(
default=0,
help_text=(
"Administrator adjustment to the priority of this work request"
),
)
# Workflows
unblock_strategy = models.CharField(
max_length=6,
choices=UnblockStrategy.choices,
default=UnblockStrategy.DEPS,
editable=False,
)
dependencies = models.ManyToManyField(
"self", symmetrical=False, related_name="reverse_dependencies"
)
parent = models.ForeignKey(
"self",
# WorkRequests are only deleted through expiration, use CASCADE
on_delete=models.CASCADE,
blank=True,
null=True,
related_name="children",
)
workflow_data_json = JSONField(
default=dict, blank=True, db_column="workflow_data"
)
event_reactions_json = JSONField(
default=dict, blank=True, db_column="event_reactions"
)
internal_collection = models.ForeignKey(
"Collection", null=True, blank=True, on_delete=models.PROTECT
)
expiration_delay = models.DurationField(blank=True, null=True)
supersedes = models.OneToOneField(
"WorkRequest",
on_delete=models.SET_NULL,
related_name="superseded",
null=True,
blank=True,
)
class Meta(TypedModelMeta):
indexes = [
# Handles the main scheduler query.
models.Index(
(F("priority_base") + F("priority_adjustment")).desc(),
"created_at",
name="%(app_label)s_%(class)s_pending_idx",
condition=Q(status=_WorkRequestStatuses.PENDING),
),
# Handles queries from workers.
models.Index(
"worker",
name="%(app_label)s_%(class)s_worker_idx",
condition=Q(
status__in=(
_WorkRequestStatuses.PENDING,
_WorkRequestStatuses.RUNNING,
)
),
),
]
permissions = [
(
"manage_workrequest_priorities",
"Can set positive priority adjustments on work requests",
)
]
ordering = ["id"]
def __str__(self) -> str:
"""Return the id of the WorkRequest."""
return str(self.id)
@property
def workflow_data(self) -> Optional["WorkRequestWorkflowData"]:
"""Access workflow_data_json as a python structure."""
# Import here to avoid a circular dependency
from debusine.server.workflows.models import WorkRequestWorkflowData
if self.workflow_data_json:
return WorkRequestWorkflowData(**self.workflow_data_json)
else:
return None
@workflow_data.setter
def workflow_data(self, value: Optional["WorkRequestWorkflowData"]) -> None:
"""Set workflow_data_json from a python structure."""
if value is None:
self.workflow_data_json = {}
else:
self.workflow_data_json = value.dict(
exclude_unset=True,
)
@property
def event_reactions(self) -> EventReactions:
"""Access event_reactions_json as a pydantic structure."""
return EventReactions(**self.event_reactions_json)
@event_reactions.setter
def event_reactions(self, value: EventReactions) -> None:
"""Set event_reactions_json from a pydantic structure."""
self.event_reactions_json = value.dict()
[docs]
def create_child(
self,
task_name: str,
status: Statuses = Statuses.BLOCKED,
task_type: TaskTypes = TaskTypes.WORKER,
task_data: dict[str, Any] | None = None,
workflow_data: Optional["WorkRequestWorkflowData"] = None,
) -> "WorkRequest":
"""Create a child WorkRequest."""
if self.task_type != TaskTypes.WORKFLOW:
raise ValueError("Only workflows may have child work requests.")
return WorkRequest.objects.create(
workspace=self.workspace,
parent=self,
created_by=self.created_by,
status=status,
task_type=task_type,
task_name=task_name,
task_data=task_data or {},
workflow_data_json=(
workflow_data.dict(exclude_unset=True) if workflow_data else {}
),
)
[docs]
def reschedule(self) -> "WorkRequest":
"""Create a WorkRequest that supersedes this one."""
if self.task_type not in (TaskTypes.WORKER, TaskTypes.INTERNAL):
raise ValueError("Only worker or internal tasks can be rescheduled")
# TODO: implement rescheduling a workflow, which can involve more
# complex work like rescheduling each of its work requests that aborted
# or failed (see #416)
if hasattr(self, "superseded"):
raise ValueError("Cannot reschedule old superseded tasks")
if not (
self.status == WorkRequest.Statuses.ABORTED
or (
self.status == WorkRequest.Statuses.COMPLETED
and self.result == WorkRequest.Results.FAILURE
)
):
raise ValueError("Only aborted or failed tasks can be rescheduled")
# TODO: compute dynamic data to see if all lookups can still be
# satisfied: this allows us to fail if any input artifacts where
# already expired, before making changes in the database.
# This check can be moved to a check-only method, to be used in views
# to assess whether an aborted/failed task can be restarted
work_request = WorkRequest.objects.create(
workspace=self.workspace,
created_by=self.created_by,
status=WorkRequest.Statuses.PENDING,
task_type=self.task_type,
task_name=self.task_name,
task_data=self.task_data,
priority_base=self.priority_base,
priority_adjustment=self.priority_adjustment,
unblock_strategy=self.unblock_strategy,
parent=self.parent,
workflow_data_json=self.workflow_data_json,
event_reactions_json=self.event_reactions_json,
expiration_delay=self.expiration_delay,
supersedes=self,
)
# Copy forward dependencies
for dep in self.dependencies.all():
work_request.add_dependency(dep)
# Update any reverse-dependencies of the old work request to point to
# the new one, and move them from pending to blocked
for dep in self.reverse_dependencies.all():
dep.dependencies.remove(self)
dep.add_dependency(work_request)
# If the task was part of an aborted workflow, set it back to running
if (workflow := work_request.parent) and (
workflow.status == WorkRequest.Statuses.ABORTED
or (
workflow.status == WorkRequest.Statuses.COMPLETED
and workflow.result == WorkRequest.Results.FAILURE
)
):
workflow.status = WorkRequest.Statuses.RUNNING
workflow.completed_at = None
workflow.save()
return work_request
[docs]
def clean(self):
"""
Ensure that task data is valid for this task name.
:raise ValidationError: for invalid data.
"""
if not isinstance(self.task_data, dict):
raise ValidationError(
{"task_data": "task data must be a dictionary"}
)
match self.task_type:
case TaskTypes.WORKER | TaskTypes.SERVER:
try:
task_cls = BaseTask.class_from_name(
self.task_type, self.task_name
)
except (KeyError, ValueError) as e:
raise ValidationError(
{
"task_name": f"{self.task_name}:"
f" invalid {self.task_type} task name"
}
) from e
try:
task_cls(task_data=self.task_data)
except TaskConfigError as e:
raise ValidationError(
{
"task_data": f"invalid {self.task_type}"
f" task data: {e}"
}
) from e
case TaskTypes.WORKFLOW:
# Import here to prevent circular imports
from debusine.server.workflows import Workflow
try:
workflow_cls = Workflow.from_name(self.task_name)
except (KeyError, ValueError) as e:
raise ValidationError(
{
"task_name": f"{self.task_name}:"
f" invalid workflow name"
}
) from e
try:
workflow_cls(self)
except TaskConfigError as e:
raise ValidationError(
{"task_data": f"invalid workflow data: {e}"}
) from e
# TODO: do we want to run expensive tests here
# (Workflow.validate_input), like looking up the types of
# references artifacts to validate them?
case TaskTypes.INTERNAL:
if self.task_name not in ("synchronization_point", "workflow"):
raise ValidationError(
{
"task_name": f"{self.task_name}:"
" invalid task name for internal task"
}
)
# Without this pass, python coverage is currently unable to
# detect that code does flow through here
pass
case _:
raise NotImplementedError(
f"task type {self.task_type} not yet supported"
)
[docs]
def mark_running(self) -> bool:
"""Worker has begun executing the task."""
if (
self.task_type in {TaskTypes.WORKER, TaskTypes.SERVER}
and self.worker is None
):
logger.debug(
"Cannot mark WorkRequest %s as running: it does not have "
"an assigned worker",
self.pk,
)
return False
if self.status == self.Statuses.RUNNING:
# It was already running - nothing to do
return True
if self.status != self.Statuses.PENDING:
logger.debug(
"Cannot mark as running - current status %s", self.status
)
return False
if self.worker is not None:
work_requests_running_for_worker = WorkRequest.objects.running(
worker=self.worker
)
# There is a possible race condition here. This check (and
# other checks in this class) currently help to avoid
# development mistakes not database full integrity
if (
work_requests_running_for_worker.count()
>= self.worker.concurrency
):
logger.debug(
"Cannot mark WorkRequest %s as running - the assigned "
"worker %s is running too many other WorkRequests: %s",
self.pk,
self.worker,
list(work_requests_running_for_worker.order_by("id")),
)
return False
self.started_at = timezone.now()
self.status = self.Statuses.RUNNING
self.save()
logger.debug("Marked WorkRequest %s as running", self.pk)
return True
[docs]
def mark_completed(self, result: "WorkRequest.Results") -> bool:
"""Worker has finished executing the task."""
if self.status not in (self.Statuses.PENDING, self.Statuses.RUNNING):
logger.debug(
"Cannot mark WorkRequest %s as completed: current status is %s",
self.pk,
self.status,
)
return False
self.result = result
self.completed_at = timezone.now()
self.status = self.Statuses.COMPLETED
self.save()
logger.debug("Marked WorkRequest %s as completed", self.pk)
# mark dependencies ready before sending notification
self.unblock_reverse_dependencies()
self.process_event_reactions()
if self.parent is not None:
self.parent.maybe_finish_workflow()
return True
[docs]
def process_event_reactions(self):
"""Process list of actions to perform on completion."""
actions = self.get_triggered_actions()
notifications.notify_work_request_completed(
self, actions[ActionTypes.SEND_NOTIFICATION]
)
self.process_update_collection_with_artifacts(
actions[ActionTypes.UPDATE_COLLECTION_WITH_ARTIFACTS]
)
[docs]
def get_triggered_actions(self) -> dict[str, list[EventReaction]]:
"""Filter events to trigger, grouped by type."""
actions: dict[str, list[EventReaction]] = {
action: [] for action in ActionTypes
}
if self.result in (WorkRequest.Results.SUCCESS,):
for action in self.event_reactions.on_success:
action_type = action.action
actions[action_type].append(action)
elif self.result in (
WorkRequest.Results.FAILURE,
WorkRequest.Results.ERROR,
):
for action in self.event_reactions.on_failure:
action_type = action.action
actions[action_type].append(action)
else: # Results.NONE
pass
return actions
[docs]
def process_update_collection_with_artifacts(
self, actions: list[EventReaction]
) -> None:
"""Update collection following event_reactions."""
# local import to avoid circular dependency
from debusine.db.models.collections import CollectionItem
from debusine.server.collections import (
CollectionManagerInterface,
ItemAdditionError,
ItemRemovalError,
)
from debusine.server.collections.lookup import lookup_single
for update in actions:
assert isinstance(update, ActionUpdateCollectionWithArtifacts)
try:
collection = lookup_single(
update.collection,
self.workspace,
user=self.created_by,
workflow_root=self.get_workflow_root(),
expect_type=CollectionItem.Types.COLLECTION,
).collection
except LookupError as e:
logger.error("%s in WorkRequest %s", e, self.pk) # noqa: G200
continue
manager = CollectionManagerInterface.get_manager_for(collection)
try:
artifacts_to_add = self.artifact_set.filter(
**update.artifact_filters
)
except FieldError:
logger.exception(
"Invalid update-collection-with-artifacts"
" artifact_filters in WorkRequest %s",
self.pk,
)
continue
for artifact in artifacts_to_add:
try:
expanded_variables = CollectionItem.expand_variables(
update.variables or {}, artifact
)
except (KeyError, ValueError):
logger.exception(
"Invalid update-collection-with-artifacts variables "
"in WorkRequest %s",
self.pk,
)
continue
if update.name_template is not None:
item_name = CollectionItem.expand_name(
update.name_template, expanded_variables
)
item_variables = None
else:
item_name = None
item_variables = expanded_variables
try:
manager.add_artifact(
artifact,
user=self.created_by,
variables=item_variables,
name=item_name,
replace=True,
)
except (ItemAdditionError, ItemRemovalError):
logger.exception(
"Cannot replace or add artifact %s to collection %s"
" from WorkRequest %s",
artifact,
collection,
self.pk,
)
[docs]
def mark_pending(self) -> bool:
"""Worker is ready for execution."""
if self.status not in (self.Statuses.BLOCKED):
logger.debug(
"Cannot mark WorkRequest %s as pending: current status is %s",
self.pk,
self.status,
)
return False
self.status = self.Statuses.PENDING
self.save()
logger.debug("Marked WorkRequest %s as pending", self.pk)
return True
[docs]
def add_dependency(self, dependency: "WorkRequest") -> None:
"""Make this work request depend on another one."""
if self.parent is not None:
# Work requests in a workflow may only depend on other work
# requests in the same workflow.
my_root = self.get_workflow_root()
assert my_root is not None
if dependency.get_workflow_root() != my_root:
raise ValueError(
"Work requests in a workflow may not depend on other work "
"requests outside that workflow"
)
self.dependencies.add(dependency)
if (
self.status == WorkRequest.Statuses.PENDING
and self.unblock_strategy == WorkRequest.UnblockStrategy.DEPS
and dependency.status != WorkRequest.Statuses.COMPLETED
):
self.status = WorkRequest.Statuses.BLOCKED
self.save()
[docs]
def maybe_finish_workflow(self) -> bool:
"""Update workflow status if its children are no longer in progress."""
# Only relevant for running workflows where all work requests have been
# either completed or aborted
if (
self.task_type != TaskTypes.WORKFLOW
or self.status != WorkRequest.Statuses.RUNNING
or self.has_children_in_progress
):
return False
# If there are aborted child requests, abort the workflow
if (
self.children.filter(superseded__isnull=True)
.exclude(status=WorkRequest.Statuses.COMPLETED)
.exists()
):
self.mark_aborted()
return True
# If there are failed/errored child requests, fail the workflow
if (
self.children.filter(superseded__isnull=True)
.exclude(
Q(result=WorkRequest.Results.SUCCESS)
| Q(workflow_data_json__contains={"allow_failure": True})
)
.exists()
):
self.mark_completed(WorkRequest.Results.FAILURE)
return True
# All child requests succeeded
self.mark_completed(WorkRequest.Results.SUCCESS)
return True
[docs]
def can_be_unblocked(self) -> bool:
"""Return True if and only if this work request can be unblocked."""
# If this work request is in a workflow, then that workflow must
# be running.
if (
self.parent is not None
and self.parent.status != WorkRequest.Statuses.RUNNING
):
return False
if self.unblock_strategy == WorkRequest.UnblockStrategy.DEPS:
# This work request can be unblocked if and only if all its
# dependencies have completed.
return not self.dependencies.filter(
~Q(status=WorkRequest.Statuses.COMPLETED)
).exists()
else:
# We don't know how to unblock this work request.
return False
[docs]
def unblock_reverse_dependencies(self):
"""Unblock reverse dependencies."""
# Shortcuts to keep line length sane
r = WorkRequest.Results
s = WorkRequest.Statuses
allow_failure = self.workflow_data and self.workflow_data.allow_failure
if self.result == r.SUCCESS or allow_failure:
# lookup work requests that depend on the completed work
# request and unblock them if no other work request is
# blocking them
for rdep in self.reverse_dependencies.filter(status=s.BLOCKED):
if rdep.can_be_unblocked():
rdep.mark_pending()
else: # failure and !allow_failure
for rdep in self.reverse_dependencies.filter(status=s.BLOCKED):
rdep.mark_aborted()
[docs]
def unblock_workflow_children(self) -> None:
"""Unblock children of a workflow that has just started running."""
for child in self.children.filter(status=WorkRequest.Statuses.BLOCKED):
if child.can_be_unblocked():
child.mark_pending()
[docs]
def mark_aborted(self) -> bool:
"""
Worker has aborted the task after request from UI.
Task will typically be in CREATED or RUNNING status.
"""
self.completed_at = timezone.now()
self.status = self.Statuses.ABORTED
self.save()
if self.parent is not None:
self.parent.maybe_finish_workflow()
logger.debug(
"Marked WorkRequest %s as aborted (from status %s)",
self.pk,
self.status,
)
return True
[docs]
def assign_worker(self, worker: Optional["Worker"]) -> None:
"""Assign worker and save it."""
self.worker = worker
self.save()
notifications.notify_work_request_assigned(self)
@property
def duration(self) -> float | None:
"""Return duration in seconds between started_at and completed_at."""
if self.started_at and self.completed_at:
return (self.completed_at - self.started_at).total_seconds()
else:
return None
@property
def priority_effective(self) -> int:
"""The effective priority of this work request."""
return self.priority_base + self.priority_adjustment
@property
def is_part_of_workflow(self) -> bool:
"""Return whether this work request is part of a workflow."""
return self.task_type == TaskTypes.WORKFLOW or self.parent is not None
[docs]
def get_workflow_root(self) -> Optional["WorkRequest"]:
"""Return the root of this work request's workflow, if any."""
if not self.is_part_of_workflow:
return None
parent = self
while parent.parent is not None:
parent = parent.parent
return parent
@property
def workflow_display_name(self) -> str:
"""Return this work request's name for display in a workflow."""
if self.workflow_data is not None:
return self.workflow_data.display_name
else:
return self.task_name
@property
def has_children_in_progress(self) -> bool:
"""Return whether child work requests are still in progress."""
return self.children.exclude(
status__in={
WorkRequest.Statuses.COMPLETED,
WorkRequest.Statuses.ABORTED,
}
).exists()
[docs]
def effective_expiration_delay(self):
"""Return expiration_delay, inherited if None."""
expiration_delay = self.expiration_delay
if self.expiration_delay is None: # inherit
expiration_delay = self.workspace.default_expiration_delay
return expiration_delay
[docs]
class NotificationChannel(models.Model):
"""Model to store notification configuration."""
[docs]
class Methods(models.TextChoices):
EMAIL = "email", "Email"
data_validators = {Methods.EMAIL: NotificationDataEmail}
name = models.CharField(
max_length=20,
unique=True,
)
method = models.CharField(max_length=10, choices=Methods.choices)
data = models.JSONField(default=dict, blank=True)
[docs]
def clean(self):
"""
Ensure that data is valid for the specific method.
:raise ValidationError: for invalid data.
"""
try:
self.data_validators[self.method](**self.data)
except (TypeError, ValueError) as exc:
raise ValidationError(
f"NotificationChannel data is not valid: {exc}"
)
return super().clean()
[docs]
def save(self, *args, **kwargs):
"""Run validators and save the instance."""
self.full_clean()
return super().save(*args, **kwargs)
def __str__(self):
"""Return name."""
return self.name