Move WorkflowMixin to workflows submodule

pull/12894/head
Matt Westcott 2025-02-18 16:41:34 +00:00
rodzic 6505a02d61
commit 1f1956246e
2 zmienionych plików z 196 dodań i 193 usunięć

Wyświetl plik

@ -61,7 +61,6 @@ from wagtail.coreutils import (
safe_md5,
)
from wagtail.fields import StreamField
from wagtail.locks import WorkflowLock
from wagtail.log_actions import log
from wagtail.query import PageQuerySet
from wagtail.search import index
@ -131,6 +130,7 @@ from .workflows import ( # noqa: F401
TaskStateQuerySet,
Workflow,
WorkflowContentType,
WorkflowMixin,
WorkflowState,
WorkflowStateManager,
WorkflowStateQuerySet,
@ -297,197 +297,6 @@ class PageBase(models.base.ModelBase):
PAGE_MODEL_CLASSES.append(cls)
class WorkflowMixin:
"""A mixin that allows a model to have workflows."""
@classmethod
def check(cls, **kwargs):
return [
*super().check(**kwargs),
*cls._check_draftstate_and_revision_mixins(),
]
@classmethod
def _check_draftstate_and_revision_mixins(cls):
mro = cls.mro()
error = checks.Error(
"WorkflowMixin requires DraftStateMixin and RevisionMixin "
"(in that order).",
hint=(
"Make sure your model's inheritance order is as follows: "
"WorkflowMixin, DraftStateMixin, RevisionMixin."
),
obj=cls,
id="wagtailcore.E006",
)
try:
if not (
mro.index(WorkflowMixin)
< mro.index(DraftStateMixin)
< mro.index(RevisionMixin)
):
return [error]
except ValueError:
return [error]
return []
@classmethod
def get_default_workflow(cls):
"""
Returns the active workflow assigned to the model.
For non-``Page`` models, workflows are assigned to the model's content type,
thus shared across all instances instead of being assigned to individual
instances (unless :meth:`~WorkflowMixin.get_workflow` is overridden).
This method is used to determine the workflow to use when creating new
instances of the model. On ``Page`` models, this method is unused as the
workflow can be determined from the parent page's workflow.
"""
if not getattr(settings, "WAGTAIL_WORKFLOW_ENABLED", True):
return None
content_type = ContentType.objects.get_for_model(cls, for_concrete_model=False)
workflow_content_type = (
WorkflowContentType.objects.filter(
workflow__active=True,
content_type=content_type,
)
.select_related("workflow")
.first()
)
if workflow_content_type:
return workflow_content_type.workflow
return None
@property
def has_workflow(self):
"""
Returns ```True``` if the object has an active workflow assigned, otherwise ```False```.
"""
return self.get_workflow() is not None
def get_workflow(self):
"""
Returns the active workflow assigned to the object.
"""
return self.get_default_workflow()
@property
def workflow_states(self):
"""
Returns workflow states that belong to the object.
To allow filtering ``WorkflowState`` queries by the object,
subclasses should define a
:class:`~django.contrib.contenttypes.fields.GenericRelation` to
:class:`~wagtail.models.WorkflowState` with the desired
``related_query_name``. This property can be replaced with the
``GenericRelation`` or overridden to allow custom logic, which can be
useful if the model has inheritance.
"""
return WorkflowState.objects.for_instance(self)
@property
def workflow_in_progress(self):
"""
Returns ```True``` if a workflow is in progress on the current object, otherwise ```False```.
"""
if not getattr(settings, "WAGTAIL_WORKFLOW_ENABLED", True):
return False
# `_current_workflow_states` may be populated by `prefetch_workflow_states`
# on querysets as a performance optimization
if hasattr(self, "_current_workflow_states"):
for state in self._current_workflow_states:
if state.status == WorkflowState.STATUS_IN_PROGRESS:
return True
return False
return self.workflow_states.filter(
status=WorkflowState.STATUS_IN_PROGRESS
).exists()
@property
def current_workflow_state(self):
"""
Returns the in progress or needs changes workflow state on this object, if it exists.
"""
if not getattr(settings, "WAGTAIL_WORKFLOW_ENABLED", True):
return None
# `_current_workflow_states` may be populated by `prefetch_workflow_states`
# on querysets as a performance optimization
if hasattr(self, "_current_workflow_states"):
try:
return self._current_workflow_states[0]
except IndexError:
return
return (
self.workflow_states.active()
.select_related("current_task_state__task")
.first()
)
@property
def current_workflow_task_state(self):
"""
Returns (specific class of) the current task state of the workflow on this object, if it exists.
"""
current_workflow_state = self.current_workflow_state
if (
current_workflow_state
and current_workflow_state.status == WorkflowState.STATUS_IN_PROGRESS
and current_workflow_state.current_task_state
):
return current_workflow_state.current_task_state.specific
@property
def current_workflow_task(self):
"""
Returns (specific class of) the current task in progress on this object, if it exists.
"""
current_workflow_task_state = self.current_workflow_task_state
if current_workflow_task_state:
return current_workflow_task_state.task.specific
@property
def status_string(self):
if not self.live:
if self.expired:
return _("expired")
elif self.approved_schedule:
return _("scheduled")
elif self.workflow_in_progress:
return _("in moderation")
else:
return _("draft")
else:
if self.approved_schedule:
return _("live + scheduled")
elif self.workflow_in_progress:
return _("live + in moderation")
elif self.has_unpublished_changes:
return _("live + draft")
else:
return _("live")
def get_lock(self):
# Standard locking should take precedence over workflow locking
# because it's possible for both to be used at the same time
lock = super().get_lock()
if lock:
return lock
current_workflow_task = self.current_workflow_task
if current_workflow_task:
return WorkflowLock(self, current_workflow_task)
class AbstractPage(
WorkflowMixin,
PreviewableMixin,

Wyświetl plik

@ -3,6 +3,7 @@ from django.conf import settings
from django.contrib.auth.models import Group
from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
from django.core import checks
from django.core.exceptions import PermissionDenied, ValidationError
from django.db import models, transaction
from django.db.models import Q
@ -19,6 +20,7 @@ from modelcluster.models import (
from wagtail.coreutils import get_content_type_label
from wagtail.forms import TaskStateCommentForm
from wagtail.locks import WorkflowLock
from wagtail.log_actions import log
from wagtail.query import SpecificQuerySetMixin
from wagtail.signals import (
@ -33,9 +35,10 @@ from wagtail.signals import (
)
from .copying import _copy, _copy_m2m_relations
from .draft_state import DraftStateMixin
from .locking import LockableMixin
from .orderable import Orderable
from .revisions import Revision
from .revisions import Revision, RevisionMixin
from .specific import SpecificMixin
@ -1150,3 +1153,194 @@ class TaskState(SpecificMixin, models.Model):
class Meta:
verbose_name = _("Task state")
verbose_name_plural = _("Task states")
class WorkflowMixin:
"""A mixin that allows a model to have workflows."""
@classmethod
def check(cls, **kwargs):
return [
*super().check(**kwargs),
*cls._check_draftstate_and_revision_mixins(),
]
@classmethod
def _check_draftstate_and_revision_mixins(cls):
mro = cls.mro()
error = checks.Error(
"WorkflowMixin requires DraftStateMixin and RevisionMixin "
"(in that order).",
hint=(
"Make sure your model's inheritance order is as follows: "
"WorkflowMixin, DraftStateMixin, RevisionMixin."
),
obj=cls,
id="wagtailcore.E006",
)
try:
if not (
mro.index(WorkflowMixin)
< mro.index(DraftStateMixin)
< mro.index(RevisionMixin)
):
return [error]
except ValueError:
return [error]
return []
@classmethod
def get_default_workflow(cls):
"""
Returns the active workflow assigned to the model.
For non-``Page`` models, workflows are assigned to the model's content type,
thus shared across all instances instead of being assigned to individual
instances (unless :meth:`~WorkflowMixin.get_workflow` is overridden).
This method is used to determine the workflow to use when creating new
instances of the model. On ``Page`` models, this method is unused as the
workflow can be determined from the parent page's workflow.
"""
if not getattr(settings, "WAGTAIL_WORKFLOW_ENABLED", True):
return None
content_type = ContentType.objects.get_for_model(cls, for_concrete_model=False)
workflow_content_type = (
WorkflowContentType.objects.filter(
workflow__active=True,
content_type=content_type,
)
.select_related("workflow")
.first()
)
if workflow_content_type:
return workflow_content_type.workflow
return None
@property
def has_workflow(self):
"""
Returns ```True``` if the object has an active workflow assigned, otherwise ```False```.
"""
return self.get_workflow() is not None
def get_workflow(self):
"""
Returns the active workflow assigned to the object.
"""
return self.get_default_workflow()
@property
def workflow_states(self):
"""
Returns workflow states that belong to the object.
To allow filtering ``WorkflowState`` queries by the object,
subclasses should define a
:class:`~django.contrib.contenttypes.fields.GenericRelation` to
:class:`~wagtail.models.WorkflowState` with the desired
``related_query_name``. This property can be replaced with the
``GenericRelation`` or overridden to allow custom logic, which can be
useful if the model has inheritance.
"""
return WorkflowState.objects.for_instance(self)
@property
def workflow_in_progress(self):
"""
Returns ```True``` if a workflow is in progress on the current object, otherwise ```False```.
"""
if not getattr(settings, "WAGTAIL_WORKFLOW_ENABLED", True):
return False
# `_current_workflow_states` may be populated by `prefetch_workflow_states`
# on querysets as a performance optimization
if hasattr(self, "_current_workflow_states"):
for state in self._current_workflow_states:
if state.status == WorkflowState.STATUS_IN_PROGRESS:
return True
return False
return self.workflow_states.filter(
status=WorkflowState.STATUS_IN_PROGRESS
).exists()
@property
def current_workflow_state(self):
"""
Returns the in progress or needs changes workflow state on this object, if it exists.
"""
if not getattr(settings, "WAGTAIL_WORKFLOW_ENABLED", True):
return None
# `_current_workflow_states` may be populated by `prefetch_workflow_states`
# on querysets as a performance optimization
if hasattr(self, "_current_workflow_states"):
try:
return self._current_workflow_states[0]
except IndexError:
return
return (
self.workflow_states.active()
.select_related("current_task_state__task")
.first()
)
@property
def current_workflow_task_state(self):
"""
Returns (specific class of) the current task state of the workflow on this object, if it exists.
"""
current_workflow_state = self.current_workflow_state
if (
current_workflow_state
and current_workflow_state.status == WorkflowState.STATUS_IN_PROGRESS
and current_workflow_state.current_task_state
):
return current_workflow_state.current_task_state.specific
@property
def current_workflow_task(self):
"""
Returns (specific class of) the current task in progress on this object, if it exists.
"""
current_workflow_task_state = self.current_workflow_task_state
if current_workflow_task_state:
return current_workflow_task_state.task.specific
@property
def status_string(self):
if not self.live:
if self.expired:
return _("expired")
elif self.approved_schedule:
return _("scheduled")
elif self.workflow_in_progress:
return _("in moderation")
else:
return _("draft")
else:
if self.approved_schedule:
return _("live + scheduled")
elif self.workflow_in_progress:
return _("live + in moderation")
elif self.has_unpublished_changes:
return _("live + draft")
else:
return _("live")
def get_lock(self):
# Standard locking should take precedence over workflow locking
# because it's possible for both to be used at the same time
lock = super().get_lock()
if lock:
return lock
current_workflow_task = self.current_workflow_task
if current_workflow_task:
return WorkflowLock(self, current_workflow_task)